fix: reduce deadlocks merging PRs w/ async milestone stat recalcs (#9916)

Continuing the pattern from #9868, fixes another deadlock discovered in synthetic testing of #9785.  This modifies the `milestone` table to have the `num_issues`, `num_closed_issues`, and `completeness` statistics be calculated asynchronously.

An optional `updateTimestamp` field was added to the stats queue to support the conditional updating of the milestone's modification date, retaining existing functionality.

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [x] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [ ] I do not want this change to show in the release notes.
- [x] I want the title to show in the release notes with a link to this pull request.
- [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title.

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/9916
Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
Mathieu Fenniak 2025-10-31 15:53:45 +01:00 committed by Mathieu Fenniak
parent 0869e0e08a
commit 327cdc1787
14 changed files with 196 additions and 64 deletions

View file

@ -111,11 +111,11 @@ func doChangeIssueStatus(ctx context.Context, issue *Issue, doer *user_model.Use
// Update issue count of milestone
if issue.MilestoneID > 0 {
if issue.NoAutoTime {
if err := UpdateMilestoneCountersWithDate(ctx, issue.MilestoneID, issue.UpdatedUnix); err != nil {
if err := stats.QueueRecalcMilestoneByIDWithDate(issue.MilestoneID, issue.UpdatedUnix); err != nil {
return nil, err
}
} else {
if err := UpdateMilestoneCounters(ctx, issue.MilestoneID); err != nil {
if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil {
return nil, err
}
}
@ -353,7 +353,7 @@ func NewIssueWithIndex(ctx context.Context, doer *user_model.User, opts NewIssue
}
if opts.Issue.MilestoneID > 0 {
if err := UpdateMilestoneCounters(ctx, opts.Issue.MilestoneID); err != nil {
if err := stats.QueueRecalcMilestoneByID(opts.Issue.MilestoneID); err != nil {
return err
}

View file

@ -521,11 +521,11 @@ func init() {
stats.RegisterRecalc(stats.LabelByRepoID, doRecalcLabelByRepoID)
}
func doRecalcLabelByID(ctx context.Context, labelID int64) error {
func doRecalcLabelByID(ctx context.Context, labelID int64, _ optional.Option[timeutil.TimeStamp]) error {
return doRecalcLabel(ctx, builder.Eq{"id": labelID})
}
func doRecalcLabelByRepoID(ctx context.Context, repoID int64) error {
func doRecalcLabelByRepoID(ctx context.Context, repoID int64, _ optional.Option[timeutil.TimeStamp]) error {
return doRecalcLabel(ctx, builder.Eq{"repo_id": repoID})
}

View file

@ -8,6 +8,8 @@ import (
"forgejo.org/models/db"
"forgejo.org/models/unittest"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -18,7 +20,7 @@ func TestRecalcLabelByLabelID(t *testing.T) {
// Verify no error on recalc of a deleted/non-existent object; important because async recalcs can be queued and
// then occur later after more state changes have happened.
err := doRecalcLabelByID(t.Context(), -1000)
err := doRecalcLabelByID(t.Context(), -1000, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
// Intentionally corrupt counts from fixture, then recalc them
@ -29,7 +31,7 @@ func TestRecalcLabelByLabelID(t *testing.T) {
Update(map[string]any{"num_issues": 1000, "num_closed_issues": 1001})
require.NoError(t, err)
require.EqualValues(t, 1, updated)
err = doRecalcLabelByID(t.Context(), label.ID)
err = doRecalcLabelByID(t.Context(), label.ID, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
label = unittest.AssertExistsAndLoadBean(t, &Label{ID: 1})
assert.Equal(t, 2, label.NumIssues)
@ -41,7 +43,7 @@ func TestRecalcLabelByRepoID(t *testing.T) {
// Verify no error on recalc of a deleted/non-existent object; important because async recalcs can be queued and
// then occur later after more state changes have happened.
err := doRecalcLabelByRepoID(t.Context(), -1000)
err := doRecalcLabelByRepoID(t.Context(), -1000, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
// Intentionally corrupt counts from fixture, then recalc them
@ -60,7 +62,7 @@ func TestRecalcLabelByRepoID(t *testing.T) {
Update(map[string]any{"num_issues": 1000, "num_closed_issues": 1001})
require.NoError(t, err)
require.EqualValues(t, 1, updated)
err = doRecalcLabelByRepoID(t.Context(), label1.RepoID)
err = doRecalcLabelByRepoID(t.Context(), label1.RepoID, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
label1 = unittest.AssertExistsAndLoadBean(t, &Label{ID: 1})
label2 = unittest.AssertExistsAndLoadBean(t, &Label{ID: 2})

View file

@ -15,6 +15,7 @@ import (
api "forgejo.org/modules/structs"
"forgejo.org/modules/timeutil"
"forgejo.org/modules/util"
"forgejo.org/services/stats"
"xorm.io/builder"
)
@ -193,42 +194,7 @@ func updateMilestone(ctx context.Context, m *Milestone) error {
if err != nil {
return err
}
return UpdateMilestoneCounters(ctx, m.ID)
}
func updateMilestoneCounters(ctx context.Context, id int64, noAutoTime bool, updatedUnix timeutil.TimeStamp) error {
e := db.GetEngine(ctx)
sess := e.ID(id).
SetExpr("num_issues", builder.Select("count(*)").From("issue").Where(
builder.Eq{"milestone_id": id},
)).
SetExpr("num_closed_issues", builder.Select("count(*)").From("issue").Where(
builder.Eq{
"milestone_id": id,
"is_closed": true,
},
))
if noAutoTime {
sess.SetExpr("updated_unix", updatedUnix).NoAutoTime()
}
_, err := sess.Update(&Milestone{})
if err != nil {
return err
}
_, err = e.Exec("UPDATE `milestone` SET completeness=100*num_closed_issues/(CASE WHEN num_issues > 0 THEN num_issues ELSE 1 END) WHERE id=?",
id,
)
return err
}
// UpdateMilestoneCounters calculates NumIssues, NumClosesIssues and Completeness
func UpdateMilestoneCounters(ctx context.Context, id int64) error {
return updateMilestoneCounters(ctx, id, false, 0)
}
// UpdateMilestoneCountersWithDate calculates NumIssues, NumClosesIssues and Completeness and set the UpdatedUnix date
func UpdateMilestoneCountersWithDate(ctx context.Context, id int64, updatedUnix timeutil.TimeStamp) error {
return updateMilestoneCounters(ctx, id, true, updatedUnix)
return stats.QueueRecalcMilestoneByID(m.ID)
}
// ChangeMilestoneStatusByRepoIDAndID changes a milestone open/closed status if the milestone ID is in the repo.
@ -384,3 +350,46 @@ func InsertMilestones(ctx context.Context, ms ...*Milestone) (err error) {
}
return committer.Commit()
}
func init() {
stats.RegisterRecalc(stats.MilestoneByMilestoneID, doRecalcMilestoneByID)
}
func doRecalcMilestoneByID(ctx context.Context, milestoneID int64, updateTimestamp optional.Option[timeutil.TimeStamp]) error {
return doRecalcMilestone(ctx, builder.Eq{"id": milestoneID}, updateTimestamp)
}
func doRecalcMilestone(ctx context.Context, cond builder.Cond, updateTimestamp optional.Option[timeutil.TimeStamp]) error {
return db.WithTx(ctx, func(ctx context.Context) error {
e := db.GetEngine(ctx)
sess := e.
SetExpr("num_issues",
builder.Select("count(*)").From("issue").
Where(builder.Eq{"milestone_id": builder.Expr("milestone.id")}),
).
SetExpr("num_closed_issues",
builder.Select("count(*)").
From("issue").
Where(builder.Eq{
"issue.milestone_id": builder.Expr("milestone.id"),
"issue.is_closed": true,
}),
).
Where(cond)
if updateTimestamp.Has() {
sess.SetExpr("updated_unix", updateTimestamp.Value()).NoAutoTime()
}
_, err := sess.Update(&Milestone{})
if err != nil {
return err
}
_, err = e.
SetExpr("completeness", "100*num_closed_issues/(CASE WHEN num_issues > 0 THEN num_issues ELSE 1 END)").
Where(cond).
NoAutoTime(). // don't change time from earlier UPDATE
Update(&Milestone{})
return err
})
}

View file

@ -0,0 +1,66 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: GPL-3.0-or-later
package issues
import (
"testing"
"forgejo.org/models/db"
"forgejo.org/models/unittest"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRecalcMilestoneByMilestoneID(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
// Verify no error on recalc of a deleted/non-existent object; important because async recalcs can be queued and
// then occur later after more state changes have happened.
err := doRecalcMilestoneByID(t.Context(), -1000, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
// Intentionally corrupt counts from fixture, then recalc them
milestone := unittest.AssertExistsAndLoadBean(t, &Milestone{ID: 1})
updated, err := db.GetEngine(t.Context()).
Table(&Milestone{}).
Where("id = ?", milestone.ID).
Update(map[string]any{
"num_issues": 1000,
"num_closed_issues": 1001,
"completeness": 99,
"updated_unix": 123,
})
require.NoError(t, err)
require.EqualValues(t, 1, updated)
err = doRecalcMilestoneByID(t.Context(), milestone.ID, optional.None[timeutil.TimeStamp]())
require.NoError(t, err)
milestone = unittest.AssertExistsAndLoadBean(t, &Milestone{ID: 1})
assert.Equal(t, 1, milestone.NumIssues)
assert.Equal(t, 0, milestone.NumClosedIssues)
assert.Equal(t, 0, milestone.Completeness)
assert.NotEqualValues(t, 123, milestone.UpdatedUnix)
// Exercise the updateTimestamp option to the recalc
updated, err = db.GetEngine(t.Context()).
Table(&Milestone{}).
Where("id = ?", milestone.ID).
Update(map[string]any{
"num_issues": 1000,
"num_closed_issues": 1001,
"completeness": 99,
"updated_unix": 123,
})
require.NoError(t, err)
require.EqualValues(t, 1, updated)
err = doRecalcMilestoneByID(t.Context(), milestone.ID, optional.Some(timeutil.TimeStamp(456)))
require.NoError(t, err)
milestone = unittest.AssertExistsAndLoadBean(t, &Milestone{ID: 1})
assert.Equal(t, 1, milestone.NumIssues)
assert.Equal(t, 0, milestone.NumClosedIssues)
assert.Equal(t, 0, milestone.Completeness)
assert.EqualValues(t, 456, milestone.UpdatedUnix)
}

View file

@ -15,6 +15,7 @@ import (
"forgejo.org/modules/setting"
api "forgejo.org/modules/structs"
"forgejo.org/modules/timeutil"
"forgejo.org/services/stats"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -340,14 +341,16 @@ func TestUpdateMilestoneCounters(t *testing.T) {
issue.ClosedUnix = timeutil.TimeStampNow()
_, err := db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue)
require.NoError(t, err)
require.NoError(t, issues_model.UpdateMilestoneCounters(db.DefaultContext, issue.MilestoneID))
err = stats.QueueRecalcMilestoneByID(issue.MilestoneID)
require.NoError(t, err)
unittest.CheckConsistencyFor(t, &issues_model.Milestone{})
issue.IsClosed = false
issue.ClosedUnix = 0
_, err = db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue)
require.NoError(t, err)
require.NoError(t, issues_model.UpdateMilestoneCounters(db.DefaultContext, issue.MilestoneID))
err = stats.QueueRecalcMilestoneByID(issue.MilestoneID)
require.NoError(t, err)
unittest.CheckConsistencyFor(t, &issues_model.Milestone{})
}

View file

@ -98,8 +98,7 @@ func milestoneStatsCorrectNumIssuesRepo(ctx context.Context, id int64) error {
}
for _, result := range results {
id, _ := strconv.ParseInt(string(result["id"]), 10, 64)
err = issues_model.UpdateMilestoneCounters(ctx, id)
if err != nil {
if err := stats.QueueRecalcMilestoneByID(id); err != nil {
return err
}
}
@ -195,7 +194,9 @@ func CheckRepoStats(ctx context.Context) error {
// Milestone.Num{,Closed}Issues
{
statsQuery(milestoneStatsQueryNumIssues, true),
issues_model.UpdateMilestoneCounters,
func(ctx context.Context, milestoneID int64) error {
return stats.QueueRecalcMilestoneByID(milestoneID)
},
"milestone count 'num_closed_issues' and 'num_issues'",
},
// User.NumRepos

View file

@ -23,6 +23,7 @@ import (
"forgejo.org/modules/storage"
"forgejo.org/modules/timeutil"
notify_service "forgejo.org/services/notify"
"forgejo.org/services/stats"
)
// NewIssue creates new issue with labels for repository.
@ -303,7 +304,7 @@ func deleteIssue(ctx context.Context, issue *issues_model.Issue) error {
}
}
if err := issues_model.UpdateMilestoneCounters(ctx, issue.MilestoneID); err != nil {
if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil {
return fmt.Errorf("error updating counters for milestone id %d: %w",
issue.MilestoneID, err)
}

View file

@ -12,6 +12,7 @@ import (
issues_model "forgejo.org/models/issues"
user_model "forgejo.org/models/user"
notify_service "forgejo.org/services/notify"
"forgejo.org/services/stats"
)
func updateMilestoneCounters(ctx context.Context, issue *issues_model.Issue, id int64) error {
@ -29,11 +30,11 @@ func updateMilestoneCounters(ctx context.Context, issue *issues_model.Issue, id
if issue.UpdatedUnix > updatedUnix {
updatedUnix = issue.UpdatedUnix
}
if err := issues_model.UpdateMilestoneCountersWithDate(ctx, id, updatedUnix); err != nil {
if err := stats.QueueRecalcMilestoneByIDWithDate(id, updatedUnix); err != nil {
return err
}
} else {
if err := issues_model.UpdateMilestoneCounters(ctx, id); err != nil {
if err := stats.QueueRecalcMilestoneByID(id); err != nil {
return err
}
}

View file

@ -0,0 +1,25 @@
// Copyright 2025 The Forgejo Authors. All rights reserved.
// SPDX-License-Identifier: GPL-3.0-or-later
package stats
import (
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
)
// Queue a recalculation of the stats on a `Milestone` for a given milestone by its ID
func QueueRecalcMilestoneByID(labelID int64) error {
return safePush(recalcRequest{
RecalcType: MilestoneByMilestoneID,
ObjectID: labelID,
})
}
func QueueRecalcMilestoneByIDWithDate(labelID int64, updateTimestamp timeutil.TimeStamp) error {
return safePush(recalcRequest{
RecalcType: MilestoneByMilestoneID,
ObjectID: labelID,
UpdateTimestamp: optional.Some(updateTimestamp),
})
}

View file

@ -8,20 +8,24 @@ import (
"fmt"
"strconv"
"strings"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
)
type recalcRequest struct {
RecalcType RecalcType
ObjectID int64
RecalcType RecalcType
ObjectID int64
UpdateTimestamp optional.Option[timeutil.TimeStamp]
}
func (r *recalcRequest) string() string {
return fmt.Sprintf("recalcRequest:%d:%d", r.RecalcType, r.ObjectID)
return fmt.Sprintf("recalcRequest:%d:%d:%d", r.RecalcType, r.ObjectID, r.UpdateTimestamp.ValueOrDefault(0))
}
func recalcRequestFromString(s string) (*recalcRequest, error) {
tags := strings.Split(s, ":")
if len(tags) != 3 {
if len(tags) != 4 {
return nil, errors.New("expected three tags")
} else if tags[0] != "recalcRequest" {
return nil, fmt.Errorf("expected tag `recalcRequest`, but was %s", tags[0])
@ -34,8 +38,19 @@ func recalcRequestFromString(s string) (*recalcRequest, error) {
if err != nil {
return nil, fmt.Errorf("unable to parse object ID: %w", err)
}
timestamp, err := strconv.ParseInt(tags[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("unable to parse timestamp ID: %w", err)
}
var updateTimestamp optional.Option[timeutil.TimeStamp]
if timestamp == 0 {
updateTimestamp = optional.None[timeutil.TimeStamp]()
} else {
updateTimestamp = optional.Some(timeutil.TimeStamp(timestamp))
}
return &recalcRequest{
RecalcType: RecalcType(recalcType),
ObjectID: objectID,
RecalcType: RecalcType(recalcType),
ObjectID: objectID,
UpdateTimestamp: updateTimestamp,
}, nil
}

View file

@ -43,7 +43,9 @@ import (
"forgejo.org/modules/graceful"
"forgejo.org/modules/log"
"forgejo.org/modules/optional"
"forgejo.org/modules/queue"
"forgejo.org/modules/timeutil"
)
type RecalcType int
@ -51,9 +53,10 @@ type RecalcType int
const (
LabelByLabelID RecalcType = iota
LabelByRepoID
MilestoneByMilestoneID
)
type RecalcHandler func(context.Context, int64) error
type RecalcHandler func(context.Context, int64, optional.Option[timeutil.TimeStamp]) error
var (
// string queue is used for consistent unique behaviour independent of json serialization
@ -99,7 +102,7 @@ func handler(items ...string) []string {
log.Error("Unrecognized RecalcType %d, ignoring", req.RecalcType)
continue
}
if err := handler(ctx, req.ObjectID); err != nil {
if err := handler(ctx, req.ObjectID, req.UpdateTimestamp); err != nil {
log.Error("Error in stats recalc %v on object %d: %v", req.RecalcType, req.ObjectID, err)
}
}

View file

@ -9,6 +9,9 @@ import (
"sync"
"testing"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -16,7 +19,7 @@ import (
func TestQueueAndFlush(t *testing.T) {
var mu sync.Mutex
callValues := []int64{}
RegisterRecalc(-99, func(ctx context.Context, i int64) error {
RegisterRecalc(-99, func(ctx context.Context, i int64, _ optional.Option[timeutil.TimeStamp]) error {
mu.Lock()
defer mu.Unlock()
callValues = append(callValues, i)
@ -41,7 +44,7 @@ func TestQueueAndFlush(t *testing.T) {
func TestQueueUnique(t *testing.T) {
var mu sync.Mutex
callValues := []int64{}
RegisterRecalc(-100, func(ctx context.Context, i int64) error {
RegisterRecalc(-100, func(ctx context.Context, i int64, _ optional.Option[timeutil.TimeStamp]) error {
mu.Lock()
defer mu.Unlock()
callValues = append(callValues, i)
@ -72,7 +75,7 @@ func TestQueueUnique(t *testing.T) {
func TestQueueAndError(t *testing.T) {
var mu sync.Mutex
callValues := []int64{}
RegisterRecalc(-101, func(ctx context.Context, i int64) error {
RegisterRecalc(-101, func(ctx context.Context, i int64, _ optional.Option[timeutil.TimeStamp]) error {
mu.Lock()
defer mu.Unlock()
callValues = append(callValues, i)

View file

@ -412,6 +412,7 @@ func TestAPIEditIssueMilestoneAutoDate(t *testing.T) {
Milestone: &milestone,
}).AddTokenAuth(token)
MakeRequest(t, req, http.StatusCreated)
unittest.FlushAsyncCalcs(t)
// the execution of the API call supposedly lasted less than one minute
milestoneAfter := unittest.AssertExistsAndLoadBean(t, &issues_model.Milestone{ID: milestone})
@ -431,6 +432,7 @@ func TestAPIEditIssueMilestoneAutoDate(t *testing.T) {
Updated: &updatedAt,
}).AddTokenAuth(token)
MakeRequest(t, req, http.StatusCreated)
unittest.FlushAsyncCalcs(t)
// the milestone date should be set to 'updatedAt'
// dates are converted into the same tz, in order to compare them
@ -452,6 +454,7 @@ func TestAPIEditIssueMilestoneAutoDate(t *testing.T) {
Updated: &updatedAt,
}).AddTokenAuth(token)
MakeRequest(t, req, http.StatusCreated)
unittest.FlushAsyncCalcs(t)
// the milestone date should not change
// dates are converted into the same tz, in order to compare them