diff --git a/models/actions/run_job.go b/models/actions/run_job.go index 1d70cde877..926eb9f0de 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -137,6 +137,22 @@ func GetRunJobsByRunID(ctx context.Context, runID int64) ([]*ActionRunJob, error return jobs, nil } +// Check if the ActionRun has any jobs other than those included in the jobs parameter. +func RunHasOtherJobs(ctx context.Context, runID int64, jobs []*ActionRunJob) (bool, error) { + jobIDs := make([]int64, len(jobs)) + for i, job := range jobs { + jobIDs[i] = job.ID + } + otherJobs, err := db.GetEngine(ctx). + Where("run_id = ?", runID). + Where(builder.NotIn("id", jobIDs)). + Count(&ActionRunJob{}) + if err != nil { + return false, err + } + return otherJobs > 0, nil +} + // All calls to UpdateRunJobWithoutNotification that change run.Status for any run from a not done status to a done status must call the ActionRunNowDone notification channel. // Use the wrapper function UpdateRunJob instead. func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) { diff --git a/models/actions/run_job_test.go b/models/actions/run_job_test.go index 7c204d4eb9..90080473c7 100644 --- a/models/actions/run_job_test.go +++ b/models/actions/run_job_test.go @@ -281,3 +281,23 @@ func TestActionRunJob_HasIncompleteWith(t *testing.T) { }) } } + +func TestRunHasOtherJobs(t *testing.T) { + require.NoError(t, unittest.PrepareTestDatabase()) + + jobs, err := GetRunJobsByRunID(t.Context(), 791) + require.NoError(t, err) + assert.Len(t, jobs, 1) + + has, err := RunHasOtherJobs(t.Context(), 791, nil) + require.NoError(t, err) + assert.True(t, has) + + has, err = RunHasOtherJobs(t.Context(), 791, []*ActionRunJob{}) + require.NoError(t, err) + assert.True(t, has) + + has, err = RunHasOtherJobs(t.Context(), 791, jobs) + require.NoError(t, err) + assert.False(t, has) +} diff --git a/services/actions/Test_checkJobsOfRun/action_run.yml b/services/actions/Test_checkJobsOfRun/action_run.yml new file mode 100644 index 0000000000..1033fb43f8 --- /dev/null +++ b/services/actions/Test_checkJobsOfRun/action_run.yml @@ -0,0 +1,18 @@ +- + id: 900 + title: "running workflow_dispatch run" + repo_id: 63 + owner_id: 2 + workflow_id: "running.yaml" + index: 4 + trigger_user_id: 2 + ref: "refs/heads/main" + commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee" + trigger_event: "workflow_dispatch" + is_fork_pull_request: 0 + status: 6 # running + started: 1683636528 + created: 1683636108 + updated: 1683636626 + need_approval: 0 + approved_by: 0 diff --git a/services/actions/Test_checkJobsOfRun/action_run_job.yml b/services/actions/Test_checkJobsOfRun/action_run_job.yml new file mode 100644 index 0000000000..3ececeb458 --- /dev/null +++ b/services/actions/Test_checkJobsOfRun/action_run_job.yml @@ -0,0 +1,40 @@ +- + id: 601 + run_id: 900 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: job_1 + attempt: 0 + job_id: produce-artifacts + task_id: 0 + status: 7 # blocked + runs_on: '["fedora"]' + needs: '["define-matrix"]' + workflow_payload: | + "on": + push: + jobs: + produce-artifacts: + name: produce-artifacts (incomplete matrix) + runs-on: docker + steps: + - run: echo "OK!" + strategy: + matrix: + color: ${{ fromJSON(needs.define-matrix.outputs.colors) }} + incomplete_matrix: true +- + id: 602 + run_id: 900 + repo_id: 63 + owner_id: 2 + commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee + is_fork_pull_request: 0 + name: define-matrix + attempt: 0 + job_id: define-matrix + task_id: 100 + status: 1 # success + runs_on: '["fedora"]' diff --git a/services/actions/Test_checkJobsOfRun/action_task_output.yml b/services/actions/Test_checkJobsOfRun/action_task_output.yml new file mode 100644 index 0000000000..b678ac22a9 --- /dev/null +++ b/services/actions/Test_checkJobsOfRun/action_task_output.yml @@ -0,0 +1,5 @@ +- + id: 100 + task_id: 100 + output_key: colors + output_value: '["red", "blue", "green"]' diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index bf4881ea04..a7f4273338 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -44,7 +44,7 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { ctx := graceful.GetManager().ShutdownContext() var ret []*jobUpdate for _, update := range items { - if err := checkJobsOfRun(ctx, update.RunID); err != nil { + if err := checkJobsOfRun(ctx, update.RunID, 0); err != nil { logger.Error("checkJobsOfRun failed for RunID = %d: %v", update.RunID, err) ret = append(ret, update) } @@ -52,7 +52,15 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate { return ret } -func checkJobsOfRun(ctx context.Context, runID int64) error { +func checkJobsOfRun(ctx context.Context, runID int64, recursionCount int) error { + // Recursion happens if one job finishing causes another job to be evaluated so that it creates new jobs (eg. + // dynamic matrix), those new jobs need to have their 'needs' re-evaluated. Safety check here against infinite + // recursion -- no clear reason this should happen more than once in a check since after one recurse there aren't + // any actual new jobs completed, but better safe than sorry. + if recursionCount > 5 { + return fmt.Errorf("checkJobsOfRun for runID %d hit recursion limit %d", runID, recursionCount) + } + jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID}) if err != nil { return err @@ -107,6 +115,17 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { return err } CreateCommitStatus(ctx, jobs...) + + // tryHandleIncompleteMatrix can create new jobs in this run which may initially be persisted in the DB as blocked + // because they have non-empty `needs`. In that case, we need to recursively run the job emitter so that new jobs + // are recognized as having their `needs` completed and be set as unblocked. Check if any new jobs were created and + // rerun the job emitter if so. + if hasNewJobs, err := actions_model.RunHasOtherJobs(ctx, runID, jobs); err != nil { + return fmt.Errorf("RunHasOtherJobs error: %w", err) + } else if hasNewJobs { + return checkJobsOfRun(ctx, runID, recursionCount+1) + } + return nil } diff --git a/services/actions/job_emitter_test.go b/services/actions/job_emitter_test.go index 6899508f37..7d5853ce1a 100644 --- a/services/actions/job_emitter_test.go +++ b/services/actions/job_emitter_test.go @@ -788,3 +788,32 @@ func Test_tryHandleWorkflowCallOuterJob(t *testing.T) { }) } } + +func Test_checkJobsOfRun_ExpandsMatrixWithCorrectOutputJobStatuses(t *testing.T) { + defer unittest.OverrideFixtures("services/actions/Test_checkJobsOfRun")() + require.NoError(t, unittest.PrepareTestDatabase()) + + jobs, err := actions_model.GetRunJobsByRunID(t.Context(), 900) + require.NoError(t, err) + require.Len(t, jobs, 2) + + require.NoError(t, checkJobsOfRun(t.Context(), 900, 0)) + + jobs, err = actions_model.GetRunJobsByRunID(t.Context(), 900) + require.NoError(t, err) + assert.Len(t, jobs, 4) + for _, job := range jobs { + switch job.Name { + case "define-matrix": + assert.Equal(t, actions_model.StatusSuccess, job.Status) + case "produce-artifacts (blue)": + fallthrough + case "produce-artifacts (green)": + fallthrough + case "produce-artifacts (red)": + assert.Equal(t, actions_model.StatusWaiting, job.Status) + default: + assert.Fail(t, "unexpected job name") + } + } +}