diff --git a/models/actions/run.go b/models/actions/run.go index 8194c07940..1a53fff216 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -455,6 +455,10 @@ func GetRunByIndex(ctx context.Context, repoID, index int64) (*ActionRun, error) return run, nil } +// Error returned when ActionRun's optimistic concurrency control has indicated that the record has been updated in the +// database by another session since it was loaded in-memory in this session. +var ErrActionRunOutOfDate = errors.New("run has changed") + // UpdateRun updates a run. // It requires the inputted run has Version set. // It will return error if the version is not matched (it means the run has been changed after loaded). @@ -471,8 +475,9 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s return err } if affected == 0 { - return errors.New("run has changed") - // It's impossible that the run is not found, since Gitea never deletes runs. + // UPDATE has no conditions on it, and we never delete runs, so the only possible cause of this is + // `xorm:"version"` tagged field indicated that the version has changed since the record was loaded. + return ErrActionRunOutOfDate } if run.Status != 0 || slices.Contains(cols, "status") { diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 8096bfb6b5..a2f2a82901 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -5,12 +5,14 @@ package actions import ( "context" + "errors" "fmt" "slices" "time" "forgejo.org/models/db" "forgejo.org/modules/container" + "forgejo.org/modules/log" "forgejo.org/modules/timeutil" "forgejo.org/modules/util" @@ -174,7 +176,7 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con } } - { + for { // Other goroutines may aggregate the status of the run and update it too. // So we need load the run and its jobs before updating the run. run, err := GetRunByID(ctx, job.RunID) @@ -185,23 +187,39 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con if err != nil { return 0, err } - run.Status = AggregateJobStatus(jobs) + + updateRequired := false + newStatus := AggregateJobStatus(jobs) + if run.Status != newStatus { + run.Status = newStatus + updateRequired = true + } if run.Started.IsZero() && run.Status.IsRunning() { run.Started = timeutil.TimeStampNow() + updateRequired = true } if run.Stopped.IsZero() && run.Status.IsDone() { run.Stopped = timeutil.TimeStampNow() + updateRequired = true } - // As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here. - if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil { - return 0, fmt.Errorf("update run %d: %w", run.ID, err) + if updateRequired { + // As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here. + if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil && errors.Is(err, ErrActionRunOutOfDate) { + // Retry update; another session affected `run` simultaneously. It wasn't necessarily another update + // from this same loop -- there are other codepaths that update `ActionRun`. + log.Debug("UpdateRunWithoutNotification failed with %v; looping for retry", err) + continue + } else if err != nil { + return 0, fmt.Errorf("update run %d: %w", run.ID, err) + } } + break // exit retry loop } return affected, nil } -func AggregateJobStatus(jobs []*ActionRunJob) Status { +var AggregateJobStatus = func(jobs []*ActionRunJob) Status { allSuccessOrSkipped := len(jobs) != 0 allSkipped := len(jobs) != 0 var hasFailure, hasCancelled, hasWaiting, hasRunning, hasBlocked bool diff --git a/models/actions/run_job_test.go b/models/actions/run_job_test.go index 9cba48705b..b440e35d8c 100644 --- a/models/actions/run_job_test.go +++ b/models/actions/run_job_test.go @@ -8,6 +8,7 @@ import ( "forgejo.org/models/db" "forgejo.org/models/unittest" + "forgejo.org/modules/test" "code.forgejo.org/forgejo/runner/v12/act/jobparser" "github.com/stretchr/testify/assert" @@ -159,3 +160,60 @@ func TestActionRunJob_IsIncompleteRunsOn(t *testing.T) { }) } } + +func TestUpdateRunJobWithoutNotificationConcurrency(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + testJob := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192}) + testRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: testJob.RunID}) + + // UpdateRunJobWithoutNotification is intended to update the related `ActionRun`, setting its `Started`, `Stopped`, + // and `Status` field to an appropriate state considering the job update. It has a retry loop to perform this work + // even if `ActionRun` is updated concurrently. To test that loop, we're going to intercept the invocation of + // AggregateJobStatus and freeze that update process, perform a different modification to the run, and then release + // the frozen test. The retry loop should trigger and a second pass updating the `ActionRun` should succeed. + + syncBeginPoint := make(chan any) + syncMidPoint := make(chan any) + syncEndPoint := make(chan any) + firstPass := true + + defer test.MockVariableValue(&AggregateJobStatus, func(jobs []*ActionRunJob) Status { + // Synchronization here needs to handle the faact that `AggregateJobStatus` will be invoked twice -- pause + // correctly on the first run, but continue with no concerns on the second run. + if firstPass { + firstPass = false + // Signal that we're in AggregateJobStatus()... + close(syncBeginPoint) + // Wait until signalled to continue + <-syncMidPoint + } + return StatusCancelled + })() + + go func() { + testJob.Status = StatusCancelled + updated, err := UpdateRunJobWithoutNotification(t.Context(), testJob, nil, "status") + close(syncEndPoint) // close before asserts, so that the test doesn't hang if it fails + require.NoError(t, err) + assert.EqualValues(t, 1, updated) + }() + + // Wait until UpdateRunJobWithoutNotification reaches AggregateJobStatus()... + <-syncBeginPoint + + // Perform a concurrent modification to `ActionRun` + testRun.Status = StatusSkipped + err := UpdateRunWithoutNotification(t.Context(), testRun, "status") + require.NoError(t, err) + + // Signal for AggregateJobStatus to continue + close(syncMidPoint) + + // Wait for goroutine to complete + <-syncEndPoint + + // Reload the `ActionRun` + testRun = unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: testJob.RunID}) + assert.Equal(t, StatusCancelled, testRun.Status) +}