fix: empty dynamic matrix can leave action run hanging incomplete (#11063)

Fixes #11030.

When a `strategy.matrix` needs to be evaluated on the output of another job, it can become evaluated into an empty set of jobs.  In this case, and assuming no other jobs in the run are active, the run should reach a settled state.  The logic to check the other jobs in the run and determine if this state has been hit needs to be explicitly added to the job emitter.

To accomplish this change, this action run state logic was extracted out of `UpdateRunJobWithoutNotification` where it could be reused.

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [x] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change.
- [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change.

*The decision if the pull request will be shown in the release notes is up to the mergers / release team.*

The content of the `release-notes/<pull request number>.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead.

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11063
Reviewed-by: Gusted <gusted@noreply.codeberg.org>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
Mathieu Fenniak 2026-01-27 17:10:59 +01:00 committed by Mathieu Fenniak
parent 02bb3ef7a6
commit c198082975
8 changed files with 273 additions and 32 deletions

View file

@ -524,4 +524,35 @@ func UpdateRunWithoutNotification(ctx context.Context, run *ActionRun, cols ...s
return nil
}
// Compute the Status, Started, and Stopped fields of an ActionRun based upon the current job state within the run.
// Returned is the [ActionRun] with modifications if necessary, a slice of column names that have been updated, or an
// error if the calculation failed. The caller is responsible for then invoking [actions_service.UpdateRun] for an
// update with notifications, or [actions_model.UpdateRunWithoutNotification] if notifications are already handled.
func ComputeRunStatus(ctx context.Context, runID int64) (run *ActionRun, columns []string, err error) {
run, err = GetRunByID(ctx, runID)
if err != nil {
return nil, nil, err
}
jobs, err := GetRunJobsByRunID(ctx, runID)
if err != nil {
return nil, nil, err
}
newStatus := AggregateJobStatus(jobs)
if run.Status != newStatus {
run.Status = newStatus
columns = append(columns, "status")
}
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
columns = append(columns, "started")
}
if run.Stopped.IsZero() && run.Status.IsDone() {
run.Stopped = timeutil.TimeStampNow()
columns = append(columns, "stopped")
}
return run, columns, nil
}
type ActionRunIndex db.ResourceIndex

View file

@ -174,38 +174,12 @@ func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, con
}
}
{
// Other goroutines may aggregate the status of the run and update it too.
// So we need load the run and its jobs before updating the run.
run, err := GetRunByID(ctx, job.RunID)
if err != nil {
return 0, err
}
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return 0, err
}
updateRequired := false
newStatus := AggregateJobStatus(jobs)
if run.Status != newStatus {
run.Status = newStatus
updateRequired = true
}
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
updateRequired = true
}
if run.Stopped.IsZero() && run.Status.IsDone() {
run.Stopped = timeutil.TimeStampNow()
updateRequired = true
}
if updateRequired {
// As the caller has to ensure the ActionRunNowDone notification is sent we can ignore doing so here.
if err := UpdateRunWithoutNotification(ctx, run, "status", "started", "stopped"); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
}
}
run, columns, err := ComputeRunStatus(ctx, job.RunID)
if err != nil {
return 0, fmt.Errorf("compute run status: %w", err)
}
if err := UpdateRunWithoutNotification(ctx, run, columns...); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
}
return affected, nil

View file

@ -375,3 +375,122 @@ jobs:
// Expect job with an incomplete with to be StatusBlocked:
assert.Equal(t, StatusBlocked, job.Status)
}
func TestComputeRunStatus(t *testing.T) {
require.NoError(t, unittest.PrepareTestDatabase())
t.Run("no changes", func(t *testing.T) {
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusSuccess, run.Status)
assert.NotContains(t, columns, "status")
assert.EqualValues(t, 1683636528, run.Started)
assert.NotContains(t, columns, "started")
assert.EqualValues(t, 1683636626, run.Stopped)
assert.NotContains(t, columns, "stopped")
})
t.Run("change status", func(t *testing.T) {
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
job.Status = StatusFailure
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusFailure, run.Status)
assert.Contains(t, columns, "status")
assert.NotContains(t, columns, "started")
assert.NotContains(t, columns, "stopped")
})
t.Run("won't change started if not running", func(t *testing.T) {
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
job.Status = StatusBlocked
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
preRun.Started = 0
affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusBlocked, run.Status)
assert.EqualValues(t, 0, run.Started)
assert.Contains(t, columns, "status")
assert.NotContains(t, columns, "started")
assert.NotContains(t, columns, "stopped")
})
t.Run("change started", func(t *testing.T) {
// Need the job to be "Running" for started to appear to change
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
job.Status = StatusRunning
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
preRun.Started = 0
affected, err = db.GetEngine(t.Context()).Cols("started").ID(preRun.ID).Update(preRun)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusRunning, run.Status)
assert.NotEqualValues(t, 0, run.Started)
assert.Contains(t, columns, "status")
assert.Contains(t, columns, "started")
assert.NotContains(t, columns, "stopped")
})
t.Run("won't change stopped if not done", func(t *testing.T) {
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
job.Status = StatusRunning
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
preRun.Stopped = 0
affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusRunning, run.Status)
assert.EqualValues(t, 0, run.Stopped)
assert.Contains(t, columns, "status")
assert.NotContains(t, columns, "stopped")
})
t.Run("change stopped", func(t *testing.T) {
// Need the job to be some version of Done for stopped to appear to change
job := unittest.AssertExistsAndLoadBean(t, &ActionRunJob{ID: 192})
job.Status = StatusSuccess
affected, err := db.GetEngine(t.Context()).Cols("status").ID(job.ID).Update(job)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
preRun := unittest.AssertExistsAndLoadBean(t, &ActionRun{ID: 791})
preRun.Stopped = 0
affected, err = db.GetEngine(t.Context()).Cols("stopped").ID(preRun.ID).Update(preRun)
require.NoError(t, err)
require.EqualValues(t, 1, affected)
run, columns, err := ComputeRunStatus(t.Context(), 791)
require.NoError(t, err)
assert.Equal(t, StatusSuccess, run.Status)
assert.NotEqualValues(t, 0, run.Stopped)
assert.NotContains(t, columns, "status")
assert.NotContains(t, columns, "started")
assert.Contains(t, columns, "stopped")
})
}

View file

@ -416,3 +416,22 @@
need_approval: 0
approved_by: 0
concurrency_group: abc123
-
id: 922
title: "running workflow_dispatch run"
repo_id: 63
owner_id: 2
workflow_id: "running.yaml"
index: 26
trigger_user_id: 2
ref: "refs/heads/main"
commit_sha: "97f29ee599c373c729132a5c46a046978311e0ee"
trigger_event: "workflow_dispatch"
is_fork_pull_request: 0
status: 6 # running
started: 1683636528
created: 1683636108
updated: 1683636626
need_approval: 0
approved_by: 0
concurrency_group: abc123

View file

@ -852,3 +852,44 @@
task_id: 108
status: 1 # success
runs_on: '["fedora"]'
-
id: 642
run_id: 922
repo_id: 63
owner_id: 2
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
is_fork_pull_request: 0
name: job2
attempt: 0
job_id: job2
task_id: 0
status: 7 # blocked
runs_on: '[]'
needs: '["job1"]'
workflow_payload: |
on:
push:
branches: [main]
jobs:
job2:
runs-on: default
strategy:
matrix:
name: ${{ fromJSON(needs.job1.outputs.empty-list) }}
steps:
- run: echo "${{ matrix.name }}"
incomplete_matrix: true
-
id: 643
run_id: 922
repo_id: 63
owner_id: 2
commit_sha: 97f29ee599c373c729132a5c46a046978311e0ee
is_fork_pull_request: 0
name: job1
attempt: 0
job_id: job1
task_id: 110
status: 1 # success
runs_on: '["fedora"]'

View file

@ -58,3 +58,8 @@
task_id: 109
output_key: workflow_input
output_value: my-workflow-input
-
id: 112
task_id: 110
output_key: empty-list
output_value: "[]"

View file

@ -365,6 +365,19 @@ func tryHandleIncompleteMatrix(ctx context.Context, blockedJob *actions_model.Ac
return fmt.Errorf("unexpected record count in delete incomplete_matrix=true job with ID %d; count = %d", blockedJob.ID, count)
}
// If len(newJobWorkflows) is 0, and blockedJob was the last job in this run, then the job will be complete --
// ComputeRunStatus will check for that state.
run, columns, err := actions_model.ComputeRunStatus(ctx, blockedJob.RunID)
if err != nil {
return fmt.Errorf("compute run status: %w", err)
}
if len(columns) != 0 {
err := UpdateRun(ctx, run, columns...)
if err != nil {
return fmt.Errorf("update run: %w", err)
}
}
return nil
})
if err != nil {

View file

@ -13,6 +13,7 @@ import (
"forgejo.org/models/db"
"forgejo.org/models/unittest"
"forgejo.org/modules/test"
notify_service "forgejo.org/services/notify"
"code.forgejo.org/forgejo/runner/v12/act/jobparser"
"code.forgejo.org/forgejo/runner/v12/act/model"
@ -276,6 +277,20 @@ jobs:
steps: []
`
type callArgsActionRunNowDone struct {
run *actions_model.ActionRun
priorStatus actions_model.Status
lastRun *actions_model.ActionRun
}
type mockNotifier struct {
notify_service.NullNotifier
calls []*callArgsActionRunNowDone
}
func (m *mockNotifier) ActionRunNowDone(ctx context.Context, run *actions_model.ActionRun, priorStatus actions_model.Status, lastRun *actions_model.ActionRun) {
m.calls = append(m.calls, &callArgsActionRunNowDone{run, priorStatus, lastRun})
}
func Test_tryHandleIncompleteMatrix(t *testing.T) {
// Shouldn't get any decoding errors during this test -- pop them up from a log warning to a test fatal error.
defer test.MockVariableValue(&model.OnDecodeNodeError, func(node yaml.Node, out any, err error) {
@ -300,6 +315,7 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
needs map[string][]string
expectIncompleteJob []string
localReusableWorkflowCallArgs *localReusableWorkflowCallArgs
actionRunStatusChange actions_model.Status
}{
{
name: "not incomplete",
@ -541,12 +557,26 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
path: "./.forgejo/workflows/reusable.yml",
},
},
{
name: "action run completed after expansion",
runJobID: 642,
consumed: true,
runJobNames: []string{
"job1",
// job2 which expanded into an empty matrix is gone
},
actionRunStatusChange: actions_model.StatusSuccess,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer unittest.OverrideFixtures("services/actions/Test_tryHandleIncompleteMatrix")()
require.NoError(t, unittest.PrepareTestDatabase())
notifier := &mockNotifier{}
notify_service.RegisterNotifier(notifier)
defer notify_service.UnregisterNotifier(notifier)
// Mock access to reusable workflows, both local and remote
var localReusableCalled []*localReusableWorkflowCallArgs
var cleanupCallCount int
@ -596,6 +626,15 @@ func Test_tryHandleIncompleteMatrix(t *testing.T) {
// expectations are that the ActionRun has an empty PreExecutionError
actionRun := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: blockedJob.RunID})
assert.EqualValues(t, 0, actionRun.PreExecutionErrorCode, "PreExecutionError Details: %#v", actionRun.PreExecutionErrorDetails)
if tt.actionRunStatusChange != 0 {
assert.Equal(t, tt.actionRunStatusChange, actionRun.Status)
require.Len(t, notifier.calls, 1)
call := notifier.calls[0]
assert.Equal(t, actionRun.ID, call.run.ID)
assert.Nil(t, call.lastRun)
assert.Equal(t, actions_model.StatusRunning, call.priorStatus)
assert.Equal(t, tt.actionRunStatusChange, call.run.Status)
}
// compare jobs that exist with `runJobNames` to ensure new jobs are inserted:
allJobsInRun, err := db.Find[actions_model.ActionRunJob](t.Context(), actions_model.FindRunJobOptions{RunID: blockedJob.RunID})