mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-12 22:10:25 +00:00
[v14.0/forgejo] fix: retry ActionRun updates when optimistic-concurrency-control indicates record changed (#10906)
**Backport:** https://codeberg.org/forgejo/forgejo/pulls/10893 When concurrent updates occur to the `action_run` table, fetching a task via `FetchTask` can result in an error: ``` time="2026-01-16T16:02:30Z" level=error msg="failed to fetch task" error="internal: pick task: CreateTaskForRunner: update run 2358339: run has changed" ``` This is an error with no known harm. However, this error is correlated with the forgejo/forgejo repo encountering zombie tasks, where they appear as if they are recorded as dispatched to a runner but a runner doesn't pick them up. I think it would be worthwhile to prevent this error and see if it fixes the zombie tasks, or eliminate it as a potential cause. See https://code.forgejo.org/forgejo/runner/issues/1302#issuecomment-73859 for a more detailed technical analysis. ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [ ] in their respective `*_test.go` for unit tests. - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [ ] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change. - [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change. *The decision if the pull request will be shown in the release notes is up to the mergers / release team.* The content of the `release-notes/<pull request number>.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead. Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10906 Reviewed-by: Michael Kriese <michael.kriese@gmx.de> Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Co-authored-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org> Co-committed-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org>
This commit is contained in:
parent
964288a4a8
commit
f33e2d1efd
3 changed files with 89 additions and 8 deletions
|
|
@ -455,6 +455,10 @@ func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error)
|
|||
return run, nil
|
||||
}
|
||||
|
||||
// Error returned when ActionRun's optimistic concurrency control has indicated that the record has been updated in the
|
||||
// database by another session since it was loaded in-memory in this session.
|
||||
var ErrActionRunOutOfDate = errors.New("run has changed")
|
||||
|
||||
// UpdateRun updates a run.
|
||||
// It requires the inputted run has Version set.
|
||||
// It will return error if the version is not matched (it means the run has been changed after loaded).
|
||||
|
|
@ -471,8 +475,9 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s
|
|||
return err
|
||||
}
|
||||
if affected == 0 {
|
||||
return errors.New("run has changed")
|
||||
// It's impossible that the run is not found, since Gitea never deletes runs.
|
||||
// UPDATE has no conditions on it, and we never delete runs, so the only possible cause of this is
|
||||
// `xorm:"version"` tagged field indicated that the version has changed since the record was loaded.
|
||||
return ErrActionRunOutOfDate
|
||||
}
|
||||
|
||||
if run.Status != 0 || slices.Contains(cols, "status") {
|
||||
|
|
|
|||
|
|
@ -5,12 +5,14 @@ package actions
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"forgejo.org/models/db"
|
||||
"forgejo.org/modules/container"
|
||||
"forgejo.org/modules/log"
|
||||
"forgejo.org/modules/timeutil"
|
||||
"forgejo.org/modules/util"
|
||||
|
||||
|
|
@ -174,7 +176,7 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
for {
|
||||
// Other goroutines may aggregate the status of the run and update it too.
|
||||
// So we need load the run and its jobs before updating the run.
|
||||
run, err := GetRunByID(ctx, job.RunID)
|
||||
|
|
@ -185,23 +187,39 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
run.Status = AggregateJobStatus(jobs)
|
||||
|
||||
updateRequired := false
|
||||
newStatus := AggregateJobStatus(jobs)
|
||||
if run.Status != newStatus {
|
||||
run.Status = newStatus
|
||||
updateRequired = true
|
||||
}
|
||||
if run.Started.IsZero() && run.Status.IsRunning() {
|
||||
run.Started = timeutil.TimeStampNow()
|
||||
updateRequired = true
|
||||
}
|
||||
if run.Stopped.IsZero() && run.Status.IsDone() {
|
||||
run.Stopped = timeutil.TimeStampNow()
|
||||
updateRequired = true
|
||||
}
|
||||
// As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here.
|
||||
if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil {
|
||||
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
||||
if updateRequired {
|
||||
// As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here.
|
||||
if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil && errors.Is(err, ErrActionRunOutOfDate) {
|
||||
// Retry update; another session affected `run` simultaneously. It wasn't necessarily another update
|
||||
// from this same loop -- there are other codepaths that update `ActionRun`.
|
||||
log.Debug("UpdateRunWithoutNotification failed with %v; looping for retry", err)
|
||||
continue
|
||||
} else if err != nil {
|
||||
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
|
||||
}
|
||||
}
|
||||
break // exit retry loop
|
||||
}
|
||||
|
||||
return affected, nil
|
||||
}
|
||||
|
||||
func AggregateJobStatus(jobs []*ActionRunJob) Status {
|
||||
var AggregateJobStatus = func(jobs []*ActionRunJob) Status {
|
||||
allSuccessOrSkipped := len(jobs) != 0
|
||||
allSkipped := len(jobs) != 0
|
||||
var hasFailure, hasCancelled, hasWaiting, hasRunning, hasBlocked bool
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"forgejo.org/models/db"
|
||||
"forgejo.org/models/unittest"
|
||||
"forgejo.org/modules/test"
|
||||
|
||||
"code.forgejo.org/forgejo/runner/v12/act/jobparser"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -159,3 +160,60 @@ func TestActionRunJob_IsIncompleteRunsOn(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateRunJobWithoutNotificationConcurrency(t *testing.T) {
|
||||
require.NoError(t, unittest.PrepareTestDatabase())
|
||||
|
||||
testJob := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
|
||||
testRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: testJob.RunID})
|
||||
|
||||
// UpdateRunJobWithoutNotification is intended to update the related `ActionRun`, setting its `Started`, `Stopped`,
|
||||
// and `Status` field to an appropriate state considering the job update. It has a retry loop to perform this work
|
||||
// even if `ActionRun` is updated concurrently. To test that loop, we're going to intercept the invocation of
|
||||
// AggregateJobStatus and freeze that update process, perform a different modification to the run, and then release
|
||||
// the frozen test. The retry loop should trigger and a second pass updating the `ActionRun` should succeed.
|
||||
|
||||
syncBeginPoint := make(chan any)
|
||||
syncMidPoint := make(chan any)
|
||||
syncEndPoint := make(chan any)
|
||||
firstPass := true
|
||||
|
||||
defer test.MockVariableValue(&AggregateJobStatus, func(jobs []*ActionRunJob) Status {
|
||||
// Synchronization here needs to handle the faact that `AggregateJobStatus` will be invoked twice -- pause
|
||||
// correctly on the first run, but continue with no concerns on the second run.
|
||||
if firstPass {
|
||||
firstPass = false
|
||||
// Signal that we're in AggregateJobStatus()...
|
||||
close(syncBeginPoint)
|
||||
// Wait until signalled to continue
|
||||
<-syncMidPoint
|
||||
}
|
||||
return StatusCancelled
|
||||
})()
|
||||
|
||||
go func() {
|
||||
testJob.Status = StatusCancelled
|
||||
updated, err := UpdateRunJobWithoutNotification(t.Context(), testJob, nil, "status")
|
||||
close(syncEndPoint) // close before asserts, so that the test doesn't hang if it fails
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 1, updated)
|
||||
}()
|
||||
|
||||
// Wait until UpdateRunJobWithoutNotification reaches AggregateJobStatus()...
|
||||
<-syncBeginPoint
|
||||
|
||||
// Perform a concurrent modification to `ActionRun`
|
||||
testRun.Status = StatusSkipped
|
||||
err := UpdateRunWithoutNotification(t.Context(), testRun, "status")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Signal for AggregateJobStatus to continue
|
||||
close(syncMidPoint)
|
||||
|
||||
// Wait for goroutine to complete
|
||||
<-syncEndPoint
|
||||
|
||||
// Reload the `ActionRun`
|
||||
testRun = unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: testJob.RunID})
|
||||
assert.Equal(t, StatusCancelled, testRun.Status)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue