From fa5a52b9834adbcbc45b781e69e042bb9e2884d8 Mon Sep 17 00:00:00 2001 From: Mathieu Fenniak Date: Fri, 2 Jan 2026 17:11:12 +0100 Subject: [PATCH] fix: simultaneously experiencing a PreExecutionError and unblocking a different job causes error blocking job emitter queue (#10665) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In a workflow such as: ```yaml jobs: define-matrix: runs-on: docker outputs: array-value: ${{ steps.define.outputs.array }} steps: - id: define run: | echo 'array=["value 1", "value 2"]' >> "$FORGEJO_OUTPUT" array-job: runs-on: docker needs: define-matrix strategy: matrix: array: ${{ fromJSON(needs.define-matrix.outputs.array-value-oops-i-made-an-error-here) }} steps: # ... other-job: runs-on: docker needs: define-matrix steps: # .... ${{ needs.define-matrix.outputs.array-value }} ``` After the job `define-matrix` is done, an error will be triggered because `array-value-oops-i-made-an-error-here` is not a valid output, and so `array-job` can't be figured out. When the job emitter triggers that error and stores it in the database, it will mark all the jobs in the workflow as failed (`FailRunPreExecutionError()`) in order to ensure that no blocked jobs remain and appear stuck forever. However, `other-job` is also unblocked by `job_emitter.go` because it's dependency of `define-matrix` is now complete. After the error occurs, job emitter will attempt to unblock `other-job` and the conditional `UpdateRunJob` will fail because the condition `"status": StatusBlocked` is no longer true: https://codeberg.org/forgejo/forgejo/src/commit/0af52cdca24dbf0d9e62a0551f6fc2d7919e68fd/services/actions/job_emitter.go#L88-L92 This causes an error, and that error rolls back the transaction in `checkJobsOfRun`, and that causes job emitter's queue to constantly retry the same work which has the same outcome each time. This fix tells `checkJobsOfRun` that an error occurred that prevents all jobs in the run from progressing, and therefore no updates need to proceed. Discovered while authoring https://code.forgejo.org/forgejo/end-to-end/pulls/1367 and causing an error unintentionally. 🤣 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/.md` to be be used for the release notes instead of the title. Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10665 Reviewed-by: Andreas Ahlenstorf Co-authored-by: Mathieu Fenniak Co-committed-by: Mathieu Fenniak --- services/actions/job_emitter.go | 77 +++++++++++++++++++--------- services/actions/job_emitter_test.go | 10 ++-- 2 files changed, 57 insertions(+), 30 deletions(-) diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 389b19ac92..61e77744c9 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -70,11 +70,21 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { updateColumns := []string{"status"} if status == actions_model.StatusWaiting { - ignore, err := tryHandleIncompleteMatrix(ctx, job, jobs) - if err != nil { + behaviour, err := tryHandleIncompleteMatrix(ctx, job, jobs) + switch behaviour { + case behaviourError: return fmt.Errorf("error in tryHandleIncompleteMatrix: %w", err) - } else if ignore { + + case behaviourExecuteJob: + // Intentional blank case -- proceed with updating the status of the job to waiting. + + case behaviourIgnoreJob: + // Skip updating this job's status to waiting, continue with other jobs in the run. continue + + case behaviourIgnoreAllJobsInRun: + // Stop processing any other jobs in this run. + return nil } } else if status == actions_model.StatusSuccess || status == actions_model.StatusFailure { // Transition to these states can be triggered by workflow call outer jobs @@ -211,31 +221,47 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { return ret } +type behaviour int + +const ( + // behaviourError is used to indicate that there's no relevant behaviour due to an internal server error. + behaviourError behaviour = iota + + // behaviourExecuteJob indicates that the job is ready to be unblocked as normal. + behaviourExecuteJob + + // behaviourIgnoreJob indicates that the job should not be unblocked, and should instead be ignored. + behaviourIgnoreJob + + // behaviourIgnoreAllJobsInRun indicates that something went wrong and all jobs in the run should now be ignored. + behaviourIgnoreAllJobsInRun +) + // Invoked once a job has all its `needs` parameters met and is ready to transition to waiting, this may expand the // job's `strategy.matrix` into multiple new jobs. -func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (bool, error) { +func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (behaviour, error) { incompleteMatrix, _, err := blockedJob.HasIncompleteMatrix() if err != nil { - return false, fmt.Errorf("job HasIncompleteMatrix: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteMatrix: %w", err) } incompleteRunsOn, _, _, err := blockedJob.HasIncompleteRunsOn() if err != nil { - return false, fmt.Errorf("job HasIncompleteRunsOn: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteRunsOn: %w", err) } incompleteWith, _, _, err := blockedJob.HasIncompleteWith() if err != nil { - return false, fmt.Errorf("job HasIncompleteWith: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteWith: %w", err) } if !incompleteMatrix && !incompleteRunsOn && !incompleteWith { // Not relevant to attempt re-parsing the job if it wasn't marked as Incomplete[...] previously. - return false, nil + return behaviourExecuteJob, nil } if err := blockedJob.LoadRun(ctx); err != nil { - return false, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err) + return behaviourError, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err) } // Compute jobOutputs for all the other jobs required as needed by this job: @@ -247,13 +273,13 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac } else if !job.Status.IsDone() { // Unexpected: `job` is needed by `blockedJob` but it isn't done; `jobStatusResolver` shouldn't be calling // `tryHandleIncompleteMatrix` in this case. - return false, fmt.Errorf( + return behaviourError, fmt.Errorf( "jobStatusResolver attempted to tryHandleIncompleteMatrix for a job (id=%d) with an incomplete 'needs' job (id=%d)", blockedJob.ID, job.ID) } outputs, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID) if err != nil { - return false, fmt.Errorf("failed loading task outputs: %w", err) + return behaviourError, fmt.Errorf("failed loading task outputs: %w", err) } outputsMap := make(map[string]string, len(outputs)) @@ -283,10 +309,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac blockedJob.Run, actions_model.ErrorCodeJobParsingError, []any{err.Error()}); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } // Even though every job in the `needs` list is done, perform a consistency check if the job was still unable to be @@ -303,24 +329,24 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac if swf.IncompleteMatrix { errorCode, errorDetails := persistentIncompleteMatrixError(blockedJob, swf.IncompleteMatrixNeeds) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } else if swf.IncompleteRunsOn { errorCode, errorDetails := persistentIncompleteRunsOnError(blockedJob, swf.IncompleteRunsOnNeeds, swf.IncompleteRunsOnMatrix) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } else if swf.IncompleteWith { errorCode, errorDetails := persistentIncompleteWithError(blockedJob, swf.IncompleteWithNeeds, swf.IncompleteWithMatrix) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } } } @@ -342,9 +368,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac return nil }) if err != nil { - return false, err + return behaviourError, err } - return true, nil + // job was deleted after it was replaced with one-or-more new jobs, so ignore it. + return behaviourIgnoreJob, nil } func persistentIncompleteMatrixError(job *actions_model.ActionRunJob, incompleteNeeds *jobparser.IncompleteNeeds) (actions_model.PreExecutionError, []any) { diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 6b8ccca6f9..754e29cc7f 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -581,14 +581,14 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { jobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID}) require.NoError(t, err) - skip, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun) + behaviour, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun) if tt.errContains != "" { require.ErrorContains(t, err, tt.errContains) } else { require.NoError(t, err) if tt.consumed { - assert.True(t, skip, "skip flag") + assert.Equal(t, behaviourIgnoreJob, behaviour) // blockedJob should no longer exist in the database unittest.AssertNotExistsBean(t, &actions_model.ActionRunJob{ID: tt.runJobID}) @@ -660,10 +660,10 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { blockedJobReloaded := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: tt.runJobID}) assert.Equal(t, actions_model.StatusFailure, blockedJobReloaded.Status) - // skip is set to true - assert.True(t, skip, "skip flag") + // ensure all other jobs in this run are ignored + assert.Equal(t, behaviourIgnoreAllJobsInRun, behaviour) } else { - assert.False(t, skip, "skip flag") + assert.Equal(t, behaviourExecuteJob, behaviour) } } })