diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index 389b19ac92..61e77744c9 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -70,11 +70,21 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { updateColumns := []string{"status"} if status == actions_model.StatusWaiting { - ignore, err := tryHandleIncompleteMatrix(ctx, job, jobs) - if err != nil { + behaviour, err := tryHandleIncompleteMatrix(ctx, job, jobs) + switch behaviour { + case behaviourError: return fmt.Errorf("error in tryHandleIncompleteMatrix: %w", err) - } else if ignore { + + case behaviourExecuteJob: + // Intentional blank case -- proceed with updating the status of the job to waiting. + + case behaviourIgnoreJob: + // Skip updating this job's status to waiting, continue with other jobs in the run. continue + + case behaviourIgnoreAllJobsInRun: + // Stop processing any other jobs in this run. + return nil } } else if status == actions_model.StatusSuccess || status == actions_model.StatusFailure { // Transition to these states can be triggered by workflow call outer jobs @@ -211,31 +221,47 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status { return ret } +type behaviour int + +const ( + // behaviourError is used to indicate that there's no relevant behaviour due to an internal server error. + behaviourError behaviour = iota + + // behaviourExecuteJob indicates that the job is ready to be unblocked as normal. + behaviourExecuteJob + + // behaviourIgnoreJob indicates that the job should not be unblocked, and should instead be ignored. + behaviourIgnoreJob + + // behaviourIgnoreAllJobsInRun indicates that something went wrong and all jobs in the run should now be ignored. + behaviourIgnoreAllJobsInRun +) + // Invoked once a job has all its `needs` parameters met and is ready to transition to waiting, this may expand the // job's `strategy.matrix` into multiple new jobs. -func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (bool, error) { +func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.ActionRunJob, jobsInRun []*actions_model.ActionRunJob) (behaviour, error) { incompleteMatrix, _, err := blockedJob.HasIncompleteMatrix() if err != nil { - return false, fmt.Errorf("job HasIncompleteMatrix: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteMatrix: %w", err) } incompleteRunsOn, _, _, err := blockedJob.HasIncompleteRunsOn() if err != nil { - return false, fmt.Errorf("job HasIncompleteRunsOn: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteRunsOn: %w", err) } incompleteWith, _, _, err := blockedJob.HasIncompleteWith() if err != nil { - return false, fmt.Errorf("job HasIncompleteWith: %w", err) + return behaviourError, fmt.Errorf("job HasIncompleteWith: %w", err) } if !incompleteMatrix && !incompleteRunsOn && !incompleteWith { // Not relevant to attempt re-parsing the job if it wasn't marked as Incomplete[...] previously. - return false, nil + return behaviourExecuteJob, nil } if err := blockedJob.LoadRun(ctx); err != nil { - return false, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err) + return behaviourError, fmt.Errorf("failure LoadRun in tryHandleIncompleteMatrix: %w", err) } // Compute jobOutputs for all the other jobs required as needed by this job: @@ -247,13 +273,13 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac } else if !job.Status.IsDone() { // Unexpected: `job` is needed by `blockedJob` but it isn't done; `jobStatusResolver` shouldn't be calling // `tryHandleIncompleteMatrix` in this case. - return false, fmt.Errorf( + return behaviourError, fmt.Errorf( "jobStatusResolver attempted to tryHandleIncompleteMatrix for a job (id=%d) with an incomplete 'needs' job (id=%d)", blockedJob.ID, job.ID) } outputs, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID) if err != nil { - return false, fmt.Errorf("failed loading task outputs: %w", err) + return behaviourError, fmt.Errorf("failed loading task outputs: %w", err) } outputsMap := make(map[string]string, len(outputs)) @@ -283,10 +309,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac blockedJob.Run, actions_model.ErrorCodeJobParsingError, []any{err.Error()}); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } // Even though every job in the `needs` list is done, perform a consistency check if the job was still unable to be @@ -303,24 +329,24 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac if swf.IncompleteMatrix { errorCode, errorDetails := persistentIncompleteMatrixError(blockedJob, swf.IncompleteMatrixNeeds) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } else if swf.IncompleteRunsOn { errorCode, errorDetails := persistentIncompleteRunsOnError(blockedJob, swf.IncompleteRunsOnNeeds, swf.IncompleteRunsOnMatrix) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } else if swf.IncompleteWith { errorCode, errorDetails := persistentIncompleteWithError(blockedJob, swf.IncompleteWithNeeds, swf.IncompleteWithMatrix) if err := FailRunPreExecutionError(ctx, blockedJob.Run, errorCode, errorDetails); err != nil { - return false, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) + return behaviourError, fmt.Errorf("setting run into PreExecutionError state failed: %w", err) } - // Return `true` to skip running this job in this invalid state - return true, nil + // `FailRunPreExecutionError` will mark all the pending runs in the job failed; ignore all of them. + return behaviourIgnoreAllJobsInRun, nil } } } @@ -342,9 +368,10 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac return nil }) if err != nil { - return false, err + return behaviourError, err } - return true, nil + // job was deleted after it was replaced with one-or-more new jobs, so ignore it. + return behaviourIgnoreJob, nil } func persistentIncompleteMatrixError(job *actions_model.ActionRunJob, incompleteNeeds *jobparser.IncompleteNeeds) (actions_model.PreExecutionError, []any) { diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 6b8ccca6f9..754e29cc7f 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -581,14 +581,14 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { jobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID}) require.NoError(t, err) - skip, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun) + behaviour, err := tryHandleIncompleteMatrix(t.Context(), blockedJob, jobsInRun) if tt.errContains != "" { require.ErrorContains(t, err, tt.errContains) } else { require.NoError(t, err) if tt.consumed { - assert.True(t, skip, "skip flag") + assert.Equal(t, behaviourIgnoreJob, behaviour) // blockedJob should no longer exist in the database unittest.AssertNotExistsBean(t, &actions_model.ActionRunJob{ID: tt.runJobID}) @@ -660,10 +660,10 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) { blockedJobReloaded := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: tt.runJobID}) assert.Equal(t, actions_model.StatusFailure, blockedJobReloaded.Status) - // skip is set to true - assert.True(t, skip, "skip flag") + // ensure all other jobs in this run are ignored + assert.Equal(t, behaviourIgnoreAllJobsInRun, behaviour) } else { - assert.False(t, skip, "skip flag") + assert.Equal(t, behaviourExecuteJob, behaviour) } } })