diff --git a/models/actions/run.go b/models/actions/run.go index d8a37d0b60..ad4785e15a 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -228,8 +228,10 @@ func RepoNumOpenActions(ctx context.Context, repoID int64) int { return num } -func clearRepoRunCountCache(repo *repo_model.Repository) { - cache.Remove(actionsCountOpenCacheKey(repo.ID)) +func clearRepoRunCountCache(ctx context.Context, repo *repo_model.Repository) { + db.AfterTx(ctx, func() { + cache.Remove(actionsCountOpenCacheKey(repo.ID)) + }) } func condRunsThatNeedApproval(repoID, pullRequestID int64) builder.Cond { @@ -309,7 +311,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork run.Repo = repo } - clearRepoRunCountCache(run.Repo) + clearRepoRunCountCache(ctx, run.Repo) runJobs := make([]*ActionRunJob, 0, len(jobs)) var hasWaiting bool @@ -473,7 +475,7 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s } run.Repo = repo } - clearRepoRunCountCache(run.Repo) + clearRepoRunCountCache(ctx, run.Repo) } return nil diff --git a/models/actions/run_test.go b/models/actions/run_test.go index fdb5ec320d..08772c494e 100644 --- a/models/actions/run_test.go +++ b/models/actions/run_test.go @@ -51,19 +51,19 @@ func TestRepoNumOpenActions(t *testing.T) { t.Run("Repo 1", func(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1}) - clearRepoRunCountCache(repo) + clearRepoRunCountCache(t.Context(), repo) assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID)) }) t.Run("Repo 4", func(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4}) - clearRepoRunCountCache(repo) + clearRepoRunCountCache(t.Context(), repo) assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID)) }) t.Run("Repo 63", func(t *testing.T) { repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 63}) - clearRepoRunCountCache(repo) + clearRepoRunCountCache(t.Context(), repo) assert.Equal(t, 1, RepoNumOpenActions(t.Context(), repo.ID)) }) @@ -78,7 +78,7 @@ func TestRepoNumOpenActions(t *testing.T) { assert.Equal(t, 1, RepoNumOpenActions(t.Context(), repo.ID)) // Now that we clear the cache, computation should be performed - clearRepoRunCountCache(repo) + clearRepoRunCountCache(t.Context(), repo) assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID)) }) } diff --git a/models/db/context.go b/models/db/context.go index 34736ddbef..58693e0078 100644 --- a/models/db/context.go +++ b/models/db/context.go @@ -29,8 +29,9 @@ var ( // Context represents a db context type Context struct { context.Context - e Engine - transaction bool + e Engine + transaction bool + afterCommitHooks []func() } func newContext(ctx context.Context, e Engine, transaction bool) *Context { @@ -99,11 +100,19 @@ type Committer interface { // It can be closed early, but can't be committed early, it is useful for reusing a transaction. type halfCommitter struct { committer Committer + parentCtx context.Context + txCtx *Context committed bool } func (c *halfCommitter) Commit() error { c.committed = true + + // Pass hooks installed into txCtx up to parentCtx + for _, hook := range c.txCtx.afterCommitHooks { + AfterTx(c.parentCtx, hook) + } + // should do nothing, and the parent committer will commit later return nil } @@ -118,6 +127,27 @@ func (c *halfCommitter) Close() error { return c.committer.Close() } +// Wraps an xorm.Session with execution of AfterTx hooks +type hookCommitter struct { + sess *xorm.Session + txCtx *Context +} + +func (c *hookCommitter) Commit() error { + err := c.sess.Commit() + if err != nil { + return err + } + for _, hook := range c.txCtx.afterCommitHooks { + hook() + } + return nil +} + +func (c *hookCommitter) Close() error { + return c.sess.Close() +} + // TxContext represents a transaction Context, // it will reuse the existing transaction in the parent context or create a new one. // Some tips to use: @@ -132,7 +162,8 @@ func (c *halfCommitter) Close() error { // d. It doesn't mean rollback is forbidden, but always do it only when there is an error, and you do want to rollback. func TxContext(parentCtx context.Context) (*Context, Committer, error) { if sess, ok := inTransaction(parentCtx); ok { - return newContext(parentCtx, sess, true), &halfCommitter{committer: sess}, nil + txCtx := newContext(parentCtx, sess, true) + return txCtx, &halfCommitter{committer: sess, parentCtx: parentCtx, txCtx: txCtx}, nil } sess := x.NewSession() @@ -141,18 +172,24 @@ func TxContext(parentCtx context.Context) (*Context, Committer, error) { return nil, nil, err } - return newContext(parentCtx, sess, true), sess, nil + txCtx := newContext(parentCtx, sess, true) + return txCtx, &hookCommitter{sess, txCtx}, nil } // WithTx represents executing database operations on a transaction, if the transaction exist, // this function will reuse it otherwise will create a new one and close it when finished. func WithTx(parentCtx context.Context, f func(ctx context.Context) error) error { if sess, ok := inTransaction(parentCtx); ok { - err := f(newContext(parentCtx, sess, true)) + txCtx := newContext(parentCtx, sess, true) + err := f(txCtx) if err != nil { // rollback immediately, in case the caller ignores returned error and tries to commit the transaction. _ = sess.Close() } + // Pass hooks installed into txCtx up to parentCtx + for _, hook := range txCtx.afterCommitHooks { + AfterTx(parentCtx, hook) + } return err } return txWithNoCheck(parentCtx, f) @@ -165,11 +202,33 @@ func txWithNoCheck(parentCtx context.Context, f func(ctx context.Context) error) return err } - if err := f(newContext(parentCtx, sess, true)); err != nil { + txCtx := newContext(parentCtx, sess, true) + if err := f(txCtx); err != nil { return err } - return sess.Commit() + if err := sess.Commit(); err != nil { + return err + } + + for _, hook := range txCtx.afterCommitHooks { + hook() + } + + return nil +} + +// AfterTx registers a function to be called after the current transaction commits. If not in a transaction, the +// function is called immediately. The hook will only be called if the transaction commits successfully; if the +// transaction rolls back, the hook is discarded. +func AfterTx(ctx context.Context, hook func()) { + dbCtx, ok := ctx.(*Context) + if !ok || !dbCtx.transaction { + // Not in a db transaction context, run immediately + hook() + return + } + dbCtx.afterCommitHooks = append(dbCtx.afterCommitHooks, hook) } // Insert inserts records into database diff --git a/models/db/context_committer_test.go b/models/db/context_committer_test.go index 849c5dea41..a516937673 100644 --- a/models/db/context_committer_test.go +++ b/models/db/context_committer_test.go @@ -57,7 +57,7 @@ func Test_halfCommitter(t *testing.T) { */ testWithCommitter := func(committer Committer, f func(committer Committer) error) { - if err := f(&halfCommitter{committer: committer}); err == nil { + if err := f(&halfCommitter{committer: committer, txCtx: &Context{}}); err == nil { committer.Commit() } committer.Close() diff --git a/models/db/context_test.go b/models/db/context_test.go index d12d79ebe1..525ab54d99 100644 --- a/models/db/context_test.go +++ b/models/db/context_test.go @@ -5,9 +5,12 @@ package db_test import ( "context" + "errors" + "fmt" "testing" "forgejo.org/models/db" + issues_model "forgejo.org/models/issues" "forgejo.org/models/unittest" "github.com/stretchr/testify/assert" @@ -97,3 +100,123 @@ func TestTxContext(t *testing.T) { })) }) } + +func TestAfterTx(t *testing.T) { + tests := []struct { + executionMode string + rollback bool + }{ + { + executionMode: "NoTx", + }, + { + executionMode: "WithTx", + }, + { + executionMode: "WithTxNested", + }, + { + executionMode: "WithTx", + rollback: true, + }, + { + executionMode: "WithTxNested", + rollback: true, + }, + { + executionMode: "TxContext", + }, + { + executionMode: "TxContextNested", + }, + { + executionMode: "TxContext", + rollback: true, + }, + { + executionMode: "TxContextNested", + rollback: true, + }, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%s/%v", tc.executionMode, tc.rollback), func(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + ctx := t.Context() + + var err error + var countBefore, countAfter, hookCount int64 + + countBefore, err = db.GetEngine(ctx).Count(&issues_model.PullRequest{}) + require.NoError(t, err) + + sut := func(ctx context.Context) { + _, err = db.GetEngine(ctx).Insert( + &issues_model.PullRequest{IssueID: 2, BaseRepoID: 1, HeadRepoID: 1000}) + require.NoError(t, err) + db.AfterTx(ctx, func() { + countAfter, err = db.GetEngine(ctx).Count(&issues_model.PullRequest{}) + require.NoError(t, err) + assert.False(t, db.InTransaction(ctx)) + hookCount++ + }) + } + + switch tc.executionMode { + case "NoTx": + sut(ctx) + case "WithTx": + db.WithTx(ctx, func(ctx context.Context) error { + sut(ctx) + if tc.rollback { + return errors.New("rollback") + } + return nil + }) + case "WithTxNested": + db.WithTx(ctx, func(ctx context.Context) error { + return db.WithTx(ctx, func(ctx context.Context) error { + sut(ctx) + if tc.rollback { + return errors.New("rollback") + } + return nil + }) + }) + case "TxContext": + txCtx, committer, err := db.TxContext(ctx) + require.NoError(t, err) + sut(txCtx) + if !tc.rollback { + err = committer.Commit() + require.NoError(t, err) + } + committer.Close() + case "TxContextNested": + txCtx1, committer1, err := db.TxContext(ctx) + require.NoError(t, err) + txCtx2, committer2, err := db.TxContext(txCtx1) + require.NoError(t, err) + sut(txCtx2) + err = committer2.Commit() + require.NoError(t, err) + committer2.Close() + if !tc.rollback { + err = committer1.Commit() + require.NoError(t, err) + } + committer1.Close() + default: + t.Fatalf("unexpected execution mode: %q", tc.executionMode) + } + + if tc.rollback { + assert.EqualValues(t, 0, hookCount) + assert.EqualValues(t, 0, countAfter) + } else { + assert.EqualValues(t, 1, hookCount) + assert.Equal(t, countBefore+1, countAfter) + } + }) + } +} diff --git a/models/issues/issue_label.go b/models/issues/issue_label.go index aaa2e2e7c2..878cbf14c4 100644 --- a/models/issues/issue_label.go +++ b/models/issues/issue_label.go @@ -57,7 +57,9 @@ func newIssueLabel(ctx context.Context, issue *Issue, label *Label, doer *user_m issue.Labels = append(issue.Labels, label) - return stats.QueueRecalcLabelByID(label.ID) + stats.QueueRecalcLabelByID(ctx, label.ID) + + return nil } // Remove all issue labels in the given exclusive scope @@ -192,7 +194,9 @@ func deleteIssueLabel(ctx context.Context, issue *Issue, label *Label, doer *use return err } - return stats.QueueRecalcLabelByID(label.ID) + stats.QueueRecalcLabelByID(ctx, label.ID) + + return nil } // DeleteIssueLabel deletes issue-label relation. diff --git a/models/issues/issue_update.go b/models/issues/issue_update.go index 2a6e7f8f65..84a8820cd1 100644 --- a/models/issues/issue_update.go +++ b/models/issues/issue_update.go @@ -103,21 +103,15 @@ func doChangeIssueStatus(ctx context.Context, issue *Issue, doer *user_model.Use return nil, err } for _, label := range issue.Labels { - if err := stats.QueueRecalcLabelByID(label.ID); err != nil { - return nil, err - } + stats.QueueRecalcLabelByID(ctx, label.ID) } // Update issue count of milestone if issue.MilestoneID > 0 { if issue.NoAutoTime { - if err := stats.QueueRecalcMilestoneByIDWithDate(issue.MilestoneID, issue.UpdatedUnix); err != nil { - return nil, err - } + stats.QueueRecalcMilestoneByIDWithDate(ctx, issue.MilestoneID, issue.UpdatedUnix) } else { - if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil { - return nil, err - } + stats.QueueRecalcMilestoneByID(ctx, issue.MilestoneID) } } @@ -353,9 +347,7 @@ func NewIssueWithIndex(ctx context.Context, doer *user_model.User, opts NewIssue } if opts.Issue.MilestoneID > 0 { - if err := stats.QueueRecalcMilestoneByID(opts.Issue.MilestoneID); err != nil { - return err - } + stats.QueueRecalcMilestoneByID(ctx, opts.Issue.MilestoneID) opts := &CreateCommentOptions{ Type: CommentTypeMilestone, diff --git a/models/issues/label.go b/models/issues/label.go index 4eefae0d5d..4a5c3b4bfe 100644 --- a/models/issues/label.go +++ b/models/issues/label.go @@ -246,7 +246,9 @@ func UpdateLabel(ctx context.Context, l *Label) error { return err } - return stats.QueueRecalcLabelByID(l.ID) + stats.QueueRecalcLabelByID(ctx, l.ID) + + return nil } // DeleteLabel delete a label diff --git a/models/issues/milestone.go b/models/issues/milestone.go index cd3fcb3147..d718decb18 100644 --- a/models/issues/milestone.go +++ b/models/issues/milestone.go @@ -194,7 +194,8 @@ func updateMilestone(ctx context.Context, m *Milestone) error { if err != nil { return err } - return stats.QueueRecalcMilestoneByID(m.ID) + stats.QueueRecalcMilestoneByID(ctx, m.ID) + return nil } // ChangeMilestoneStatusByRepoIDAndID changes a milestone open/closed status if the milestone ID is in the repo. diff --git a/models/issues/milestone_test.go b/models/issues/milestone_test.go index c1f945658f..e486c55b24 100644 --- a/models/issues/milestone_test.go +++ b/models/issues/milestone_test.go @@ -341,16 +341,14 @@ func TestUpdateMilestoneCounters(t *testing.T) { issue.ClosedUnix = timeutil.TimeStampNow() _, err := db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue) require.NoError(t, err) - err = stats.QueueRecalcMilestoneByID(issue.MilestoneID) - require.NoError(t, err) + stats.QueueRecalcMilestoneByID(t.Context(), issue.MilestoneID) unittest.CheckConsistencyFor(t, &issues_model.Milestone{}) issue.IsClosed = false issue.ClosedUnix = 0 _, err = db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue) require.NoError(t, err) - err = stats.QueueRecalcMilestoneByID(issue.MilestoneID) - require.NoError(t, err) + stats.QueueRecalcMilestoneByID(t.Context(), issue.MilestoneID) unittest.CheckConsistencyFor(t, &issues_model.Milestone{}) } diff --git a/models/repo.go b/models/repo.go index 9a1b318a36..1b9cc8fa60 100644 --- a/models/repo.go +++ b/models/repo.go @@ -81,11 +81,13 @@ func labelStatsCorrectNumIssuesRepo(ctx context.Context, id int64) error { } func labelStatsCorrectNumClosedIssues(ctx context.Context, id int64) error { - return stats.QueueRecalcLabelByID(id) + stats.QueueRecalcLabelByID(ctx, id) + return nil } func labelStatsCorrectNumClosedIssuesRepo(ctx context.Context, id int64) error { - return stats.QueueRecalcLabelByRepoID(id) + stats.QueueRecalcLabelByRepoID(ctx, id) + return nil } var milestoneStatsQueryNumIssues = "SELECT `milestone`.id FROM `milestone` WHERE `milestone`.num_closed_issues!=(SELECT COUNT(*) FROM `issue` WHERE `issue`.milestone_id=`milestone`.id AND `issue`.is_closed=?) OR `milestone`.num_issues!=(SELECT COUNT(*) FROM `issue` WHERE `issue`.milestone_id=`milestone`.id)" @@ -98,9 +100,7 @@ func milestoneStatsCorrectNumIssuesRepo(ctx context.Context, id int64) error { } for _, result := range results { id, _ := strconv.ParseInt(string(result["id"]), 10, 64) - if err := stats.QueueRecalcMilestoneByID(id); err != nil { - return err - } + stats.QueueRecalcMilestoneByID(ctx, id) } return nil } @@ -171,7 +171,8 @@ func CheckRepoStats(ctx context.Context) error { { statsQuery(milestoneStatsQueryNumIssues, true), func(ctx context.Context, milestoneID int64) error { - return stats.QueueRecalcMilestoneByID(milestoneID) + stats.QueueRecalcMilestoneByID(ctx, milestoneID) + return nil }, "milestone count 'num_closed_issues' and 'num_issues'", }, diff --git a/models/repo/repo.go b/models/repo/repo.go index fd230f0f89..a8432c0f90 100644 --- a/models/repo/repo.go +++ b/models/repo/repo.go @@ -996,6 +996,8 @@ func UpdateRepoIssueNumbers(ctx context.Context, repoID int64, isPull, isClosed cacheKeyBase = countIssues } } - cache.Remove(repoCacheKey(cacheKeyBase, repoID)) + db.AfterTx(ctx, func() { + cache.Remove(repoCacheKey(cacheKeyBase, repoID)) + }) return nil } diff --git a/services/issue/issue.go b/services/issue/issue.go index 72463e8179..d64c53cd77 100644 --- a/services/issue/issue.go +++ b/services/issue/issue.go @@ -304,10 +304,7 @@ func deleteIssue(ctx context.Context, issue *issues_model.Issue) error { } } - if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil { - return fmt.Errorf("error updating counters for milestone id %d: %w", - issue.MilestoneID, err) - } + stats.QueueRecalcMilestoneByID(ctx, issue.MilestoneID) if err := activities_model.DeleteIssueActions(ctx, issue.RepoID, issue.ID, issue.Index); err != nil { return err diff --git a/services/issue/milestone.go b/services/issue/milestone.go index ca83fe9c76..928979d74e 100644 --- a/services/issue/milestone.go +++ b/services/issue/milestone.go @@ -30,13 +30,9 @@ func updateMilestoneCounters(ctx context.Context, issue *issues_model.Issue, id if issue.UpdatedUnix > updatedUnix { updatedUnix = issue.UpdatedUnix } - if err := stats.QueueRecalcMilestoneByIDWithDate(id, updatedUnix); err != nil { - return err - } + stats.QueueRecalcMilestoneByIDWithDate(ctx, id, updatedUnix) } else { - if err := stats.QueueRecalcMilestoneByID(id); err != nil { - return err - } + stats.QueueRecalcMilestoneByID(ctx, id) } return nil } diff --git a/services/stats/label.go b/services/stats/label.go index 33f9dc0cd2..3856133e1d 100644 --- a/services/stats/label.go +++ b/services/stats/label.go @@ -3,17 +3,19 @@ package stats +import "context" + // Queue a recalculation of the stats on a `Label` for a given label by its ID -func QueueRecalcLabelByID(labelID int64) error { - return safePush(recalcRequest{ +func QueueRecalcLabelByID(ctx context.Context, labelID int64) { + safePush(ctx, recalcRequest{ RecalcType: LabelByLabelID, ObjectID: labelID, }) } // Queue a recalculation of the stats on all `Label` in a given repository -func QueueRecalcLabelByRepoID(repoID int64) error { - return safePush(recalcRequest{ +func QueueRecalcLabelByRepoID(ctx context.Context, repoID int64) { + safePush(ctx, recalcRequest{ RecalcType: LabelByRepoID, ObjectID: repoID, }) diff --git a/services/stats/milestone.go b/services/stats/milestone.go index af7537cfd3..9b10ab4194 100644 --- a/services/stats/milestone.go +++ b/services/stats/milestone.go @@ -4,20 +4,22 @@ package stats import ( + "context" + "forgejo.org/modules/optional" "forgejo.org/modules/timeutil" ) // Queue a recalculation of the stats on a `Milestone` for a given milestone by its ID -func QueueRecalcMilestoneByID(labelID int64) error { - return safePush(recalcRequest{ +func QueueRecalcMilestoneByID(ctx context.Context, labelID int64) { + safePush(ctx, recalcRequest{ RecalcType: MilestoneByMilestoneID, ObjectID: labelID, }) } -func QueueRecalcMilestoneByIDWithDate(labelID int64, updateTimestamp timeutil.TimeStamp) error { - return safePush(recalcRequest{ +func QueueRecalcMilestoneByIDWithDate(ctx context.Context, labelID int64, updateTimestamp timeutil.TimeStamp) { + safePush(ctx, recalcRequest{ RecalcType: MilestoneByMilestoneID, ObjectID: labelID, UpdateTimestamp: optional.Some(updateTimestamp), diff --git a/services/stats/stats.go b/services/stats/stats.go index 7699013b37..20e2201694 100644 --- a/services/stats/stats.go +++ b/services/stats/stats.go @@ -41,6 +41,7 @@ import ( "errors" "time" + "forgejo.org/models/db" "forgejo.org/modules/graceful" "forgejo.org/modules/log" "forgejo.org/modules/optional" @@ -109,12 +110,13 @@ func handler(items ...string) []string { return nil } -func safePush(recalc recalcRequest) error { - err := statsQueue.Push(recalc.string()) - if err != nil && !errors.Is(err, queue.ErrAlreadyInQueue) { - return err - } - return nil +func safePush(ctx context.Context, recalc recalcRequest) { + db.AfterTx(ctx, func() { + err := statsQueue.Push(recalc.string()) + if err != nil && !errors.Is(err, queue.ErrAlreadyInQueue) { + log.Error("error during stat queue push: %v", err) + } + }) } // Only use for testing; do not use in production code diff --git a/services/stats/stats_test.go b/services/stats/stats_test.go index d070d2145f..46faf2e461 100644 --- a/services/stats/stats_test.go +++ b/services/stats/stats_test.go @@ -9,11 +9,13 @@ import ( "sync" "testing" + "forgejo.org/models/db" "forgejo.org/modules/optional" "forgejo.org/modules/timeutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "xorm.io/xorm" ) func TestQueueAndFlush(t *testing.T) { @@ -26,11 +28,10 @@ func TestQueueAndFlush(t *testing.T) { return nil }) - err := safePush(recalcRequest{ + safePush(t.Context(), recalcRequest{ RecalcType: -99, ObjectID: 1, }) - require.NoError(t, err) require.NoError(t, Flush(t.Context())) func() { @@ -56,11 +57,10 @@ func TestQueueUnique(t *testing.T) { // happen. So we'll test this by queuing a large number and ensuring that recalcs occured less -- usually much // less, like once or twice. for range 300 { - err := safePush(recalcRequest{ + safePush(t.Context(), recalcRequest{ RecalcType: -100, ObjectID: 1, }) - require.NoError(t, err) } require.NoError(t, Flush(t.Context())) @@ -82,11 +82,10 @@ func TestQueueAndError(t *testing.T) { return errors.New("don't like that value") }) - err := safePush(recalcRequest{ + safePush(t.Context(), recalcRequest{ RecalcType: -101, ObjectID: 1, }) - require.NoError(t, err) for range 3 { // ensure object isn't requeued by flushing multiple times require.NoError(t, Flush(t.Context())) @@ -98,3 +97,38 @@ func TestQueueAndError(t *testing.T) { assert.EqualValues(t, 1, callValues[0]) }() } + +func TestQueueAfterTx(t *testing.T) { + // This is a really micro version of unittest.PrepareTestDatabase -- as the unittest package references the stats + // package (for access to `Flush`), we can't use it without causing a circular dependency. But we need a DB in + // order to create a Tx. + x, err := xorm.NewEngine("sqlite3", "file::memory:?cache=shared&_txlock=immediate") + require.NoError(t, err) + db.SetDefaultEngine(context.Background(), x) + + var mu sync.Mutex + callValues := []int64{} + RegisterRecalc(-102, func(ctx context.Context, i int64, _ optional.Option[timeutil.TimeStamp]) error { + mu.Lock() + defer mu.Unlock() + callValues = append(callValues, i) + return nil + }) + + err = db.WithTx(t.Context(), func(ctx context.Context) error { + safePush(ctx, recalcRequest{ + RecalcType: -102, + ObjectID: 1, + }) + + require.NoError(t, Flush(t.Context())) + // Value from safePush() won't be sent yet because it was from within a DB transaction. + assert.Empty(t, callValues) + + return nil + }) + require.NoError(t, err) + + require.NoError(t, Flush(t.Context())) + assert.Len(t, callValues, 1) +}