fix: possible cause of invalid issue counts; cache invalidation occurs before a active transaction is committed (#10130)

Although #9922 was deployed to Codeberg, it was reported on Matrix that a user observed a `-1` pull request count.

@Gusted checked and verified that the stats stored in redis appeared incorrect, and that no errors occurred on Codeberg that included the repo ID (eg. deadlocks, SQL queries).
```
127.0.0.1:6379> GET Repo:CountPulls:924266
"1"
127.0.0.1:6379> GET Repo:CountPullsClosed:924266
"2"
```

One possible cause is that when `UpdateRepoIssueNumbers` is invoked and invalidates the cache key for the repository, it is currently in a transaction; the next request for that cached count could be computed before the transaction is committed and the update is visible.  It's been verified that `UpdateRepoIssueNumbers` is called within a transaction in most interactions (I put a panic in it if `db.InTransaction(ctx)`, and most related tests failed).

This PR fixes that hole by performing the cache invalidation in an `AfterTx()` hook which is invoked after the transaction is committed to the database.

(Another possible cause is documented in #10127)

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10130
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
Mathieu Fenniak 2025-11-17 01:07:29 +01:00 committed by Gusted
parent 20f8572b92
commit a9452d11d0
18 changed files with 290 additions and 73 deletions

View file

@ -228,8 +228,10 @@ func RepoNumOpenActions(ctx context.Context, repoID int64) int {
return num
}
func clearRepoRunCountCache(repo *repo_model.Repository) {
func clearRepoRunCountCache(ctx context.Context, repo *repo_model.Repository) {
db.AfterTx(ctx, func() {
cache.Remove(actionsCountOpenCacheKey(repo.ID))
})
}
func condRunsThatNeedApproval(repoID, pullRequestID int64) builder.Cond {
@ -309,7 +311,7 @@ func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWork
run.Repo = repo
}
clearRepoRunCountCache(run.Repo)
clearRepoRunCountCache(ctx, run.Repo)
runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
@ -473,7 +475,7 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s
}
run.Repo = repo
}
clearRepoRunCountCache(run.Repo)
clearRepoRunCountCache(ctx, run.Repo)
}
return nil

View file

@ -51,19 +51,19 @@ func TestRepoNumOpenActions(t *testing.T) {
t.Run("Repo 1", func(t *testing.T) {
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
clearRepoRunCountCache(repo)
clearRepoRunCountCache(t.Context(), repo)
assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID))
})
t.Run("Repo 4", func(t *testing.T) {
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 4})
clearRepoRunCountCache(repo)
clearRepoRunCountCache(t.Context(), repo)
assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID))
})
t.Run("Repo 63", func(t *testing.T) {
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 63})
clearRepoRunCountCache(repo)
clearRepoRunCountCache(t.Context(), repo)
assert.Equal(t, 1, RepoNumOpenActions(t.Context(), repo.ID))
})
@ -78,7 +78,7 @@ func TestRepoNumOpenActions(t *testing.T) {
assert.Equal(t, 1, RepoNumOpenActions(t.Context(), repo.ID))
// Now that we clear the cache, computation should be performed
clearRepoRunCountCache(repo)
clearRepoRunCountCache(t.Context(), repo)
assert.Equal(t, 0, RepoNumOpenActions(t.Context(), repo.ID))
})
}

View file

@ -31,6 +31,7 @@ type Context struct {
context.Context
e Engine
transaction bool
afterCommitHooks []func()
}
func newContext(ctx context.Context, e Engine, transaction bool) *Context {
@ -99,11 +100,19 @@ type Committer interface {
// It can be closed early, but can't be committed early, it is useful for reusing a transaction.
type halfCommitter struct {
committer Committer
parentCtx context.Context
txCtx *Context
committed bool
}
func (c *halfCommitter) Commit() error {
c.committed = true
// Pass hooks installed into txCtx up to parentCtx
for _, hook := range c.txCtx.afterCommitHooks {
AfterTx(c.parentCtx, hook)
}
// should do nothing, and the parent committer will commit later
return nil
}
@ -118,6 +127,27 @@ func (c *halfCommitter) Close() error {
return c.committer.Close()
}
// Wraps an xorm.Session with execution of AfterTx hooks
type hookCommitter struct {
sess *xorm.Session
txCtx *Context
}
func (c *hookCommitter) Commit() error {
err := c.sess.Commit()
if err != nil {
return err
}
for _, hook := range c.txCtx.afterCommitHooks {
hook()
}
return nil
}
func (c *hookCommitter) Close() error {
return c.sess.Close()
}
// TxContext represents a transaction Context,
// it will reuse the existing transaction in the parent context or create a new one.
// Some tips to use:
@ -132,7 +162,8 @@ func (c *halfCommitter) Close() error {
// d. It doesn't mean rollback is forbidden, but always do it only when there is an error, and you do want to rollback.
func TxContext(parentCtx context.Context) (*Context, Committer, error) {
if sess, ok := inTransaction(parentCtx); ok {
return newContext(parentCtx, sess, true), &halfCommitter{committer: sess}, nil
txCtx := newContext(parentCtx, sess, true)
return txCtx, &halfCommitter{committer: sess, parentCtx: parentCtx, txCtx: txCtx}, nil
}
sess := x.NewSession()
@ -141,18 +172,24 @@ func TxContext(parentCtx context.Context) (*Context, Committer, error) {
return nil, nil, err
}
return newContext(parentCtx, sess, true), sess, nil
txCtx := newContext(parentCtx, sess, true)
return txCtx, &hookCommitter{sess, txCtx}, nil
}
// WithTx represents executing database operations on a transaction, if the transaction exist,
// this function will reuse it otherwise will create a new one and close it when finished.
func WithTx(parentCtx context.Context, f func(ctx context.Context) error) error {
if sess, ok := inTransaction(parentCtx); ok {
err := f(newContext(parentCtx, sess, true))
txCtx := newContext(parentCtx, sess, true)
err := f(txCtx)
if err != nil {
// rollback immediately, in case the caller ignores returned error and tries to commit the transaction.
_ = sess.Close()
}
// Pass hooks installed into txCtx up to parentCtx
for _, hook := range txCtx.afterCommitHooks {
AfterTx(parentCtx, hook)
}
return err
}
return txWithNoCheck(parentCtx, f)
@ -165,11 +202,33 @@ func txWithNoCheck(parentCtx context.Context, f func(ctx context.Context) error)
return err
}
if err := f(newContext(parentCtx, sess, true)); err != nil {
txCtx := newContext(parentCtx, sess, true)
if err := f(txCtx); err != nil {
return err
}
return sess.Commit()
if err := sess.Commit(); err != nil {
return err
}
for _, hook := range txCtx.afterCommitHooks {
hook()
}
return nil
}
// AfterTx registers a function to be called after the current transaction commits. If not in a transaction, the
// function is called immediately. The hook will only be called if the transaction commits successfully; if the
// transaction rolls back, the hook is discarded.
func AfterTx(ctx context.Context, hook func()) {
dbCtx, ok := ctx.(*Context)
if !ok || !dbCtx.transaction {
// Not in a db transaction context, run immediately
hook()
return
}
dbCtx.afterCommitHooks = append(dbCtx.afterCommitHooks, hook)
}
// Insert inserts records into database

View file

@ -57,7 +57,7 @@ func Test_halfCommitter(t *testing.T) {
*/
testWithCommitter := func(committer Committer, f func(committer Committer) error) {
if err := f(&halfCommitter{committer: committer}); err == nil {
if err := f(&halfCommitter{committer: committer, txCtx: &Context{}}); err == nil {
committer.Commit()
}
committer.Close()

View file

@ -5,9 +5,12 @@ package db_test
import (
"context"
"errors"
"fmt"
"testing"
"forgejo.org/models/db"
issues_model "forgejo.org/models/issues"
"forgejo.org/models/unittest"
"github.com/stretchr/testify/assert"
@ -97,3 +100,123 @@ func TestTxContext(t *testing.T) {
}))
})
}
func TestAfterTx(t *testing.T) {
tests := []struct {
executionMode string
rollback bool
}{
{
executionMode: "NoTx",
},
{
executionMode: "WithTx",
},
{
executionMode: "WithTxNested",
},
{
executionMode: "WithTx",
rollback: true,
},
{
executionMode: "WithTxNested",
rollback: true,
},
{
executionMode: "TxContext",
},
{
executionMode: "TxContextNested",
},
{
executionMode: "TxContext",
rollback: true,
},
{
executionMode: "TxContextNested",
rollback: true,
},
}
for _, tc := range tests {
t.Run(fmt.Sprintf("%s/%v", tc.executionMode, tc.rollback), func(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
ctx := t.Context()
var err error
var countBefore, countAfter, hookCount int64
countBefore, err = db.GetEngine(ctx).Count(&issues_model.PullRequest{})
require.NoError(t, err)
sut := func(ctx context.Context) {
_, err = db.GetEngine(ctx).Insert(
&issues_model.PullRequest{IssueID: 2, BaseRepoID: 1, HeadRepoID: 1000})
require.NoError(t, err)
db.AfterTx(ctx, func() {
countAfter, err = db.GetEngine(ctx).Count(&issues_model.PullRequest{})
require.NoError(t, err)
assert.False(t, db.InTransaction(ctx))
hookCount++
})
}
switch tc.executionMode {
case "NoTx":
sut(ctx)
case "WithTx":
db.WithTx(ctx, func(ctx context.Context) error {
sut(ctx)
if tc.rollback {
return errors.New("rollback")
}
return nil
})
case "WithTxNested":
db.WithTx(ctx, func(ctx context.Context) error {
return db.WithTx(ctx, func(ctx context.Context) error {
sut(ctx)
if tc.rollback {
return errors.New("rollback")
}
return nil
})
})
case "TxContext":
txCtx, committer, err := db.TxContext(ctx)
require.NoError(t, err)
sut(txCtx)
if !tc.rollback {
err = committer.Commit()
require.NoError(t, err)
}
committer.Close()
case "TxContextNested":
txCtx1, committer1, err := db.TxContext(ctx)
require.NoError(t, err)
txCtx2, committer2, err := db.TxContext(txCtx1)
require.NoError(t, err)
sut(txCtx2)
err = committer2.Commit()
require.NoError(t, err)
committer2.Close()
if !tc.rollback {
err = committer1.Commit()
require.NoError(t, err)
}
committer1.Close()
default:
t.Fatalf("unexpected execution mode: %q", tc.executionMode)
}
if tc.rollback {
assert.EqualValues(t, 0, hookCount)
assert.EqualValues(t, 0, countAfter)
} else {
assert.EqualValues(t, 1, hookCount)
assert.Equal(t, countBefore+1, countAfter)
}
})
}
}

View file

@ -57,7 +57,9 @@ func newIssueLabel(ctx context.Context, issue *Issue, label *Label, doer *user_m
issue.Labels = append(issue.Labels, label)
return stats.QueueRecalcLabelByID(label.ID)
stats.QueueRecalcLabelByID(ctx, label.ID)
return nil
}
// Remove all issue labels in the given exclusive scope
@ -192,7 +194,9 @@ func deleteIssueLabel(ctx context.Context, issue *Issue, label *Label, doer *use
return err
}
return stats.QueueRecalcLabelByID(label.ID)
stats.QueueRecalcLabelByID(ctx, label.ID)
return nil
}
// DeleteIssueLabel deletes issue-label relation.

View file

@ -103,21 +103,15 @@ func doChangeIssueStatus(ctx context.Context, issue *Issue, doer *user_model.Use
return nil, err
}
for _, label := range issue.Labels {
if err := stats.QueueRecalcLabelByID(label.ID); err != nil {
return nil, err
}
stats.QueueRecalcLabelByID(ctx, label.ID)
}
// Update issue count of milestone
if issue.MilestoneID > 0 {
if issue.NoAutoTime {
if err := stats.QueueRecalcMilestoneByIDWithDate(issue.MilestoneID, issue.UpdatedUnix); err != nil {
return nil, err
}
stats.QueueRecalcMilestoneByIDWithDate(ctx, issue.MilestoneID, issue.UpdatedUnix)
} else {
if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil {
return nil, err
}
stats.QueueRecalcMilestoneByID(ctx, issue.MilestoneID)
}
}
@ -353,9 +347,7 @@ func NewIssueWithIndex(ctx context.Context, doer *user_model.User, opts NewIssue
}
if opts.Issue.MilestoneID > 0 {
if err := stats.QueueRecalcMilestoneByID(opts.Issue.MilestoneID); err != nil {
return err
}
stats.QueueRecalcMilestoneByID(ctx, opts.Issue.MilestoneID)
opts := &CreateCommentOptions{
Type: CommentTypeMilestone,

View file

@ -246,7 +246,9 @@ func UpdateLabel(ctx context.Context, l *Label) error {
return err
}
return stats.QueueRecalcLabelByID(l.ID)
stats.QueueRecalcLabelByID(ctx, l.ID)
return nil
}
// DeleteLabel delete a label

View file

@ -194,7 +194,8 @@ func updateMilestone(ctx context.Context, m *Milestone) error {
if err != nil {
return err
}
return stats.QueueRecalcMilestoneByID(m.ID)
stats.QueueRecalcMilestoneByID(ctx, m.ID)
return nil
}
// ChangeMilestoneStatusByRepoIDAndID changes a milestone open/closed status if the milestone ID is in the repo.

View file

@ -341,16 +341,14 @@ func TestUpdateMilestoneCounters(t *testing.T) {
issue.ClosedUnix = timeutil.TimeStampNow()
_, err := db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue)
require.NoError(t, err)
err = stats.QueueRecalcMilestoneByID(issue.MilestoneID)
require.NoError(t, err)
stats.QueueRecalcMilestoneByID(t.Context(), issue.MilestoneID)
unittest.CheckConsistencyFor(t, &issues_model.Milestone{})
issue.IsClosed = false
issue.ClosedUnix = 0
_, err = db.GetEngine(db.DefaultContext).ID(issue.ID).Cols("is_closed", "closed_unix").Update(issue)
require.NoError(t, err)
err = stats.QueueRecalcMilestoneByID(issue.MilestoneID)
require.NoError(t, err)
stats.QueueRecalcMilestoneByID(t.Context(), issue.MilestoneID)
unittest.CheckConsistencyFor(t, &issues_model.Milestone{})
}

View file

@ -81,11 +81,13 @@ func labelStatsCorrectNumIssuesRepo(ctx context.Context, id int64) error {
}
func labelStatsCorrectNumClosedIssues(ctx context.Context, id int64) error {
return stats.QueueRecalcLabelByID(id)
stats.QueueRecalcLabelByID(ctx, id)
return nil
}
func labelStatsCorrectNumClosedIssuesRepo(ctx context.Context, id int64) error {
return stats.QueueRecalcLabelByRepoID(id)
stats.QueueRecalcLabelByRepoID(ctx, id)
return nil
}
var milestoneStatsQueryNumIssues = "SELECT `milestone`.id FROM `milestone` WHERE `milestone`.num_closed_issues!=(SELECT COUNT(*) FROM `issue` WHERE `issue`.milestone_id=`milestone`.id AND `issue`.is_closed=?) OR `milestone`.num_issues!=(SELECT COUNT(*) FROM `issue` WHERE `issue`.milestone_id=`milestone`.id)"
@ -98,9 +100,7 @@ func milestoneStatsCorrectNumIssuesRepo(ctx context.Context, id int64) error {
}
for _, result := range results {
id, _ := strconv.ParseInt(string(result["id"]), 10, 64)
if err := stats.QueueRecalcMilestoneByID(id); err != nil {
return err
}
stats.QueueRecalcMilestoneByID(ctx, id)
}
return nil
}
@ -171,7 +171,8 @@ func CheckRepoStats(ctx context.Context) error {
{
statsQuery(milestoneStatsQueryNumIssues, true),
func(ctx context.Context, milestoneID int64) error {
return stats.QueueRecalcMilestoneByID(milestoneID)
stats.QueueRecalcMilestoneByID(ctx, milestoneID)
return nil
},
"milestone count 'num_closed_issues' and 'num_issues'",
},

View file

@ -996,6 +996,8 @@ func UpdateRepoIssueNumbers(ctx context.Context, repoID int64, isPull, isClosed
cacheKeyBase = countIssues
}
}
db.AfterTx(ctx, func() {
cache.Remove(repoCacheKey(cacheKeyBase, repoID))
})
return nil
}

View file

@ -304,10 +304,7 @@ func deleteIssue(ctx context.Context, issue *issues_model.Issue) error {
}
}
if err := stats.QueueRecalcMilestoneByID(issue.MilestoneID); err != nil {
return fmt.Errorf("error updating counters for milestone id %d: %w",
issue.MilestoneID, err)
}
stats.QueueRecalcMilestoneByID(ctx, issue.MilestoneID)
if err := activities_model.DeleteIssueActions(ctx, issue.RepoID, issue.ID, issue.Index); err != nil {
return err

View file

@ -30,13 +30,9 @@ func updateMilestoneCounters(ctx context.Context, issue *issues_model.Issue, id
if issue.UpdatedUnix > updatedUnix {
updatedUnix = issue.UpdatedUnix
}
if err := stats.QueueRecalcMilestoneByIDWithDate(id, updatedUnix); err != nil {
return err
}
stats.QueueRecalcMilestoneByIDWithDate(ctx, id, updatedUnix)
} else {
if err := stats.QueueRecalcMilestoneByID(id); err != nil {
return err
}
stats.QueueRecalcMilestoneByID(ctx, id)
}
return nil
}

View file

@ -3,17 +3,19 @@
package stats
import "context"
// Queue a recalculation of the stats on a `Label` for a given label by its ID
func QueueRecalcLabelByID(labelID int64) error {
return safePush(recalcRequest{
func QueueRecalcLabelByID(ctx context.Context, labelID int64) {
safePush(ctx, recalcRequest{
RecalcType: LabelByLabelID,
ObjectID: labelID,
})
}
// Queue a recalculation of the stats on all `Label` in a given repository
func QueueRecalcLabelByRepoID(repoID int64) error {
return safePush(recalcRequest{
func QueueRecalcLabelByRepoID(ctx context.Context, repoID int64) {
safePush(ctx, recalcRequest{
RecalcType: LabelByRepoID,
ObjectID: repoID,
})

View file

@ -4,20 +4,22 @@
package stats
import (
"context"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
)
// Queue a recalculation of the stats on a `Milestone` for a given milestone by its ID
func QueueRecalcMilestoneByID(labelID int64) error {
return safePush(recalcRequest{
func QueueRecalcMilestoneByID(ctx context.Context, labelID int64) {
safePush(ctx, recalcRequest{
RecalcType: MilestoneByMilestoneID,
ObjectID: labelID,
})
}
func QueueRecalcMilestoneByIDWithDate(labelID int64, updateTimestamp timeutil.TimeStamp) error {
return safePush(recalcRequest{
func QueueRecalcMilestoneByIDWithDate(ctx context.Context, labelID int64, updateTimestamp timeutil.TimeStamp) {
safePush(ctx, recalcRequest{
RecalcType: MilestoneByMilestoneID,
ObjectID: labelID,
UpdateTimestamp: optional.Some(updateTimestamp),

View file

@ -41,6 +41,7 @@ import (
"errors"
"time"
"forgejo.org/models/db"
"forgejo.org/modules/graceful"
"forgejo.org/modules/log"
"forgejo.org/modules/optional"
@ -109,12 +110,13 @@ func handler(items ...string) []string {
return nil
}
func safePush(recalc recalcRequest) error {
func safePush(ctx context.Context, recalc recalcRequest) {
db.AfterTx(ctx, func() {
err := statsQueue.Push(recalc.string())
if err != nil && !errors.Is(err, queue.ErrAlreadyInQueue) {
return err
log.Error("error during stat queue push: %v", err)
}
return nil
})
}
// Only use for testing; do not use in production code

View file

@ -9,11 +9,13 @@ import (
"sync"
"testing"
"forgejo.org/models/db"
"forgejo.org/modules/optional"
"forgejo.org/modules/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"xorm.io/xorm"
)
func TestQueueAndFlush(t *testing.T) {
@ -26,11 +28,10 @@ func TestQueueAndFlush(t *testing.T) {
return nil
})
err := safePush(recalcRequest{
safePush(t.Context(), recalcRequest{
RecalcType: -99,
ObjectID: 1,
})
require.NoError(t, err)
require.NoError(t, Flush(t.Context()))
func() {
@ -56,11 +57,10 @@ func TestQueueUnique(t *testing.T) {
// happen. So we'll test this by queuing a large number and ensuring that recalcs occured less -- usually much
// less, like once or twice.
for range 300 {
err := safePush(recalcRequest{
safePush(t.Context(), recalcRequest{
RecalcType: -100,
ObjectID: 1,
})
require.NoError(t, err)
}
require.NoError(t, Flush(t.Context()))
@ -82,11 +82,10 @@ func TestQueueAndError(t *testing.T) {
return errors.New("don't like that value")
})
err := safePush(recalcRequest{
safePush(t.Context(), recalcRequest{
RecalcType: -101,
ObjectID: 1,
})
require.NoError(t, err)
for range 3 { // ensure object isn't requeued by flushing multiple times
require.NoError(t, Flush(t.Context()))
@ -98,3 +97,38 @@ func TestQueueAndError(t *testing.T) {
assert.EqualValues(t, 1, callValues[0])
}()
}
func TestQueueAfterTx(t *testing.T) {
// This is a really micro version of unittest.PrepareTestDatabase -- as the unittest package references the stats
// package (for access to `Flush`), we can't use it without causing a circular dependency. But we need a DB in
// order to create a Tx.
x, err := xorm.NewEngine("sqlite3", "file::memory:?cache=shared&_txlock=immediate")
require.NoError(t, err)
db.SetDefaultEngine(context.Background(), x)
var mu sync.Mutex
callValues := []int64{}
RegisterRecalc(-102, func(ctx context.Context, i int64, _ optional.Option[timeutil.TimeStamp]) error {
mu.Lock()
defer mu.Unlock()
callValues = append(callValues, i)
return nil
})
err = db.WithTx(t.Context(), func(ctx context.Context) error {
safePush(ctx, recalcRequest{
RecalcType: -102,
ObjectID: 1,
})
require.NoError(t, Flush(t.Context()))
// Value from safePush() won't be sent yet because it was from within a DB transaction.
assert.Empty(t, callValues)
return nil
})
require.NoError(t, err)
require.NoError(t, Flush(t.Context()))
assert.Len(t, callValues, 1)
}