mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-17 00:06:35 +00:00
fix: simultaneously experiencing a PreExecutionError and unblocking a different job causes error blocking job emitter queue (#10665)
In a workflow such as:
```yaml
jobs:
define-matrix:
runs-on: docker
outputs:
array-value: ${{ steps.define.outputs.array }}
steps:
- id: define
run: |
echo 'array=["value 1", "value 2"]' >> "$FORGEJO_OUTPUT"
array-job:
runs-on: docker
needs: define-matrix
strategy:
matrix:
array: ${{ fromJSON(needs.define-matrix.outputs.array-value-oops-i-made-an-error-here) }}
steps: # ...
other-job:
runs-on: docker
needs: define-matrix
steps: # .... ${{ needs.define-matrix.outputs.array-value }}
```
After the job `define-matrix` is done, an error will be triggered because `array-value-oops-i-made-an-error-here` is not a valid output, and so `array-job` can't be figured out. When the job emitter triggers that error and stores it in the database, it will mark all the jobs in the workflow as failed (`FailRunPreExecutionError()`) in order to ensure that no blocked jobs remain and appear stuck forever.
However, `other-job` is also unblocked by `job_emitter.go` because it's dependency of `define-matrix` is now complete. After the error occurs, job emitter will attempt to unblock `other-job` and the conditional `UpdateRunJob` will fail because the condition `"status": StatusBlocked` is no longer true:
0af52cdca2/services/actions/job_emitter.go (L88-L92)
This causes an error, and that error rolls back the transaction in `checkJobsOfRun`, and that causes job emitter's queue to constantly retry the same work which has the same outcome each time.
This fix tells `checkJobsOfRun` that an error occurred that prevents all jobs in the run from progressing, and therefore no updates need to proceed.
Discovered while authoring https://code.forgejo.org/forgejo/end-to-end/pulls/1367 and causing an error unintentionally. 🤣
## Checklist
The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).
### Tests
- I added test coverage for Go changes...
- [x] in their respective `*_test.go` for unit tests.
- [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I added test coverage for JavaScript changes...
- [ ] in `web_src/js/*.test.js` if it can be unit tested.
- [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).
### Documentation
- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [x] I did not document these changes and I do not expect someone else to do it.
### Release notes
- [ ] I do not want this change to show in the release notes.
- [x] I want the title to show in the release notes with a link to this pull request.
- [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title.
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10665
Reviewed-by: Andreas Ahlenstorf <aahlenst@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
parent
9d6ae1471e
commit
fa5a52b983
2 changed files with 57 additions and 30 deletions
|
|
@ -70,11 +70,21 @@ func checkJobsOfRun(ctx context.Context, runID int64) error {
|
|||
updateColumns := []string{"status"}
|
||||
|
||||
if status == actions_model.StatusWaiting {
|
||||
ignore, err := tryHandleIncompleteMatrix(ctx, job, jobs)
|
||||
if err != nil {
|
||||
behaviour, err := tryHandleIncompleteMatrix(ctx, job, jobs)
|
||||
switch behaviour {
|
||||
case behaviourError:
|
||||
return fmt.Errorf("error in tryHandleIncompleteMatrix: %w", err)
|
||||
} else if ignore {
|
||||
|
||||
case behaviourExecuteJob:
|
||||
// Intentional blank case -- proceed with updating the status of the job to waiting.
|
||||
|
||||
case behaviourIgnoreJob:
|
||||
// Skip updating this job's status to waiting, continue with other jobs in the run.
|
||||
continue
|
||||
|
||||
case behaviourIgnoreAllJobsInRun:
|
||||
// Stop processing any other jobs in this run.
|
||||
return nil
|
||||
}
|
||||
} else if status == actions_model.StatusSuccess || status == actions_model.StatusFailure {
|
||||
// Transition to these states can be triggered by workflow call outer jobs
|
||||
|
|
@ -211,31 +221,47 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
|
|||
return ret
|
||||
}
|
||||
|
||||
type behaviour int
|
||||
|
||||
const (
|
||||
// behaviourError is used to indicate that there's no relevant behaviour due to an internal server error.
|
||||
behaviourError behaviour = iota
|
||||
|
||||
// behaviourExecuteJob indicates that the job is ready to be unblocked as normal.
|
||||
behaviourExecuteJob
|
||||
|
||||
// behaviourIgnoreJob indicates that the job should not be unblocked, and should instead be ignored.
|
||||
behaviourIgnoreJob
|
||||
|
||||
// behaviourIgnoreAllJobsInRun indicates that something went wrong and all jobs in the run should now be ignored.
|
||||
behaviourIgnoreAllJobsInRun
|
||||
)
|
||||
|
||||
// Invoked once a job has all its `needs` parameters met and is ready to transition to waiting, this may expand the
|
||||
// job's `strategy.matrix` into multiple new jobs.
|
||||
func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (bool, error) {
|
||||
func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (behaviour, error) {
|
||||
incompleteMatrix, _, err := blockedJob.HasIncompleteMatrix()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("job HasIncompleteMatrix: %w", err)
|
||||
return behaviourError, fmt.Errorf("job HasIncompleteMatrix: %w", err)
|
||||
}
|
||||
|
||||
incompleteRunsOn, _, _, err := blockedJob.HasIncompleteRunsOn()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("job HasIncompleteRunsOn: %w", err)
|
||||
return behaviourError, fmt.Errorf("job HasIncompleteRunsOn: %w", err)
|
||||
}
|
||||
|
||||
incompleteWith, _, _, err := blockedJob.HasIncompleteWith()
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("job HasIncompleteWith: %w", err)
|
||||
return behaviourError, fmt.Errorf("job HasIncompleteWith: %w", err)
|
||||
}
|
||||
|
||||
if !incompleteMatrix && !incompleteRunsOn && !incompleteWith {
|
||||
// Not relevant to attempt re-parsing the job if it wasn't marked as Incomplete[...] previously.
|
||||
return false, nil
|
||||
return behaviourExecuteJob, nil
|
||||
}
|
||||
|
||||
if err := blockedJob.LoadRun(ctx); err != nil {
|
||||
return false, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err)
|
||||
return behaviourError, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err)
|
||||
}
|
||||
|
||||
// Compute jobOutputs for all the other jobs required as needed by this job:
|
||||
|
|
@ -247,13 +273,13 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
|
|||
} else if !job.Status.IsDone() {
|
||||
// Unexpected: `job` is needed by `blockedJob` but it isn't done; `jobStatusResolver` shouldn't be calling
|
||||
// `tryHandleIncompleteMatrix` in this case.
|
||||
return false, fmt.Errorf(
|
||||
return behaviourError, fmt.Errorf(
|
||||
"jobStatusResolver attempted to tryHandleIncompleteMatrix for a job (id=%d) with an incomplete 'needs' job (id=%d)", blockedJob.ID, job.ID)
|
||||
}
|
||||
|
||||
outputs, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed loading task outputs: %w", err)
|
||||
return behaviourError, fmt.Errorf("failed loading task outputs: %w", err)
|
||||
}
|
||||
|
||||
outputsMap := make(map[string]string, len(outputs))
|
||||
|
|
@ -283,10 +309,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
|
|||
blockedJob.Run,
|
||||
actions_model.ErrorCodeJobParsingError,
|
||||
[]any{err.Error()}); err != nil {
|
||||
return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
}
|
||||
// Return `true` to skip running this job in this invalid state
|
||||
return true, nil
|
||||
// `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them.
|
||||
return behaviourIgnoreAllJobsInRun, nil
|
||||
}
|
||||
|
||||
// Even though every job in the `needs` list is done, perform a consistency check if the job was still unable to be
|
||||
|
|
@ -303,24 +329,24 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
|
|||
if swf.IncompleteMatrix {
|
||||
errorCode, errorDetails := persistentIncompleteMatrixError(blockedJob, swf.IncompleteMatrixNeeds)
|
||||
if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil {
|
||||
return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
}
|
||||
// Return `true` to skip running this job in this invalid state
|
||||
return true, nil
|
||||
// `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them.
|
||||
return behaviourIgnoreAllJobsInRun, nil
|
||||
} else if swf.IncompleteRunsOn {
|
||||
errorCode, errorDetails := persistentIncompleteRunsOnError(blockedJob, swf.IncompleteRunsOnNeeds, swf.IncompleteRunsOnMatrix)
|
||||
if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil {
|
||||
return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
}
|
||||
// Return `true` to skip running this job in this invalid state
|
||||
return true, nil
|
||||
// `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them.
|
||||
return behaviourIgnoreAllJobsInRun, nil
|
||||
} else if swf.IncompleteWith {
|
||||
errorCode, errorDetails := persistentIncompleteWithError(blockedJob, swf.IncompleteWithNeeds, swf.IncompleteWithMatrix)
|
||||
if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil {
|
||||
return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err)
|
||||
}
|
||||
// Return `true` to skip running this job in this invalid state
|
||||
return true, nil
|
||||
// `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them.
|
||||
return behaviourIgnoreAllJobsInRun, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -342,9 +368,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
|
|||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
return behaviourError, err
|
||||
}
|
||||
return true, nil
|
||||
// job was deleted after it was replaced with one-or-more new jobs, so ignore it.
|
||||
return behaviourIgnoreJob, nil
|
||||
}
|
||||
|
||||
func persistentIncompleteMatrixError(job *actions_model.ActionRunJob, incompleteNeeds *jobparser.IncompleteNeeds) (actions_model.PreExecutionError, []any) {
|
||||
|
|
|
|||
|
|
@ -581,14 +581,14 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
|||
jobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID})
|
||||
require.NoError(t, err)
|
||||
|
||||
skip, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun)
|
||||
behaviour, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun)
|
||||
|
||||
if tt.errContains != "" {
|
||||
require.ErrorContains(t, err, tt.errContains)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
if tt.consumed {
|
||||
assert.True(t, skip, "skip flag")
|
||||
assert.Equal(t, behaviourIgnoreJob, behaviour)
|
||||
|
||||
// blockedJob should no longer exist in the database
|
||||
unittest.AssertNotExistsBean(t, &actions_model.ActionRunJob{ID: tt.runJobID})
|
||||
|
|
@ -660,10 +660,10 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
|
|||
blockedJobReloaded := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: tt.runJobID})
|
||||
assert.Equal(t, actions_model.StatusFailure, blockedJobReloaded.Status)
|
||||
|
||||
// skip is set to true
|
||||
assert.True(t, skip, "skip flag")
|
||||
// ensure all other jobs in this run are ignored
|
||||
assert.Equal(t, behaviourIgnoreAllJobsInRun, behaviour)
|
||||
} else {
|
||||
assert.False(t, skip, "skip flag")
|
||||
assert.Equal(t, behaviourExecuteJob, behaviour)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue