mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-12 22:10:25 +00:00
[v14.0/forgejo] fix: make concurrency group job cancellation effect runs that are failed (#10871)
**Backport:** https://codeberg.org/forgejo/forgejo/pulls/10863 When an action's job fails, it marks the entire run as failed. Concurrency group cancellation was only looking for runs that are in a pending state, and therefore after a single job failed in the run, none of the other jobs in the run could be cancelled by a matching cancel-in-progress job. Raised in https://codeberg.org/Codeberg/Community/issues/2315. Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10871 Reviewed-by: Michael Kriese <michael.kriese@gmx.de> Co-authored-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org> Co-committed-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org>
This commit is contained in:
parent
e147d8d805
commit
16ee36b023
3 changed files with 48 additions and 36 deletions
|
|
@ -5,7 +5,6 @@ package actions
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"forgejo.org/models/db"
|
||||
repo_model "forgejo.org/models/repo"
|
||||
|
|
@ -65,15 +64,14 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
|
|||
|
||||
type FindRunOptions struct {
|
||||
db.ListOptions
|
||||
RepoID int64
|
||||
OwnerID int64
|
||||
WorkflowID string
|
||||
Ref string // the commit/tag/… that caused this workflow
|
||||
TriggerUserID int64
|
||||
TriggerEvent webhook_module.HookEventType
|
||||
Approved bool // not util.OptionalBool, it works only when it's true
|
||||
Status []Status
|
||||
ConcurrencyGroup string
|
||||
RepoID int64
|
||||
OwnerID int64
|
||||
WorkflowID string
|
||||
Ref string // the commit/tag/… that caused this workflow
|
||||
TriggerUserID int64
|
||||
TriggerEvent webhook_module.HookEventType
|
||||
Approved bool // not util.OptionalBool, it works only when it's true
|
||||
Status []Status
|
||||
}
|
||||
|
||||
func (opts FindRunOptions) ToConds() builder.Cond {
|
||||
|
|
@ -102,9 +100,6 @@ func (opts FindRunOptions) ToConds() builder.Cond {
|
|||
if opts.TriggerEvent != "" {
|
||||
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
|
||||
}
|
||||
if opts.ConcurrencyGroup != "" {
|
||||
cond = cond.And(builder.Eq{"concurrency_group": strings.ToLower(opts.ConcurrencyGroup)})
|
||||
}
|
||||
return cond
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
actions_model "forgejo.org/models/actions"
|
||||
|
|
@ -212,7 +213,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
|
|||
// Iterate over each found run and cancel its associated jobs.
|
||||
errorSlice := []error{}
|
||||
for _, run := range runs {
|
||||
err := cancelJobsForRun(ctx, run)
|
||||
err := cancelJobsForRun(ctx, run.ID)
|
||||
errorSlice = append(errorSlice, err)
|
||||
}
|
||||
err = errors.Join(errorSlice...)
|
||||
|
|
@ -225,22 +226,27 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
|
|||
|
||||
// Cancels all pending jobs in the same repository with the same concurrency group.
|
||||
func CancelPreviousWithConcurrencyGroup(ctx context.Context, repoID int64, concurrencyGroup string) error {
|
||||
runs, _, err := db.FindAndCount[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
|
||||
RepoID: repoID,
|
||||
ConcurrencyGroup: concurrencyGroup,
|
||||
Status: []actions_model.Status{actions_model.StatusRunning, actions_model.StatusWaiting, actions_model.StatusBlocked},
|
||||
})
|
||||
if err != nil {
|
||||
// Find all runs in the concurrency group which have at least one job that is still pending; we can't use the run's
|
||||
// status for this because runs are set to failed if a single job is marked as failed, even if other jobs are still
|
||||
// running.
|
||||
runIDs := make([]int64, 0, 10)
|
||||
if err := db.GetEngine(ctx).Table("action_run").
|
||||
Join("INNER", "action_run_job", "action_run_job.run_id = action_run.id").
|
||||
Where("action_run.repo_id = ? AND action_run.concurrency_group = ?", repoID, strings.ToLower(concurrencyGroup)).
|
||||
In("action_run_job.status", actions_model.PendingStatuses()).
|
||||
Distinct("action_run.id").
|
||||
Select("action_run.id").
|
||||
Find(&runIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Iterate over each found run and cancel its associated jobs.
|
||||
errorSlice := []error{}
|
||||
for _, run := range runs {
|
||||
err := cancelJobsForRun(ctx, run)
|
||||
for _, runID := range runIDs {
|
||||
err := cancelJobsForRun(ctx, runID)
|
||||
errorSlice = append(errorSlice, err)
|
||||
}
|
||||
err = errors.Join(errorSlice...)
|
||||
err := errors.Join(errorSlice...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -248,10 +254,10 @@ func CancelPreviousWithConcurrencyGroup(ctx context.Context, repoID int64, concu
|
|||
return nil
|
||||
}
|
||||
|
||||
func cancelJobsForRun(ctx context.Context, run *actions_model.ActionRun) error {
|
||||
func cancelJobsForRun(ctx context.Context, runID int64) error {
|
||||
// Find all jobs associated with the current run.
|
||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
|
||||
RunID: run.ID,
|
||||
RunID: runID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -290,7 +296,7 @@ func cancelJobsForRun(ctx context.Context, run *actions_model.ActionRun) error {
|
|||
|
||||
// If the job has an associated task, try to stop the task, effectively cancelling the job.
|
||||
if err := StopTask(ctx, job.TaskID, actions_model.StatusCancelled); err != nil {
|
||||
errorSlice = append(errorSlice, errors.New("job has changed, try again"))
|
||||
errorSlice = append(errorSlice, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -205,8 +205,10 @@ func TestCancelPreviousJobs(t *testing.T) {
|
|||
|
||||
func TestCancelPreviousWithConcurrencyGroup(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
updateRun901 map[string]any
|
||||
name string
|
||||
updateRun901 map[string]any
|
||||
updateRun901Jobs map[string]any
|
||||
expected901Status actions_model.Status
|
||||
}{
|
||||
// run 900 & 901 in the fixture data have almost the same data and so should both be cancelled by
|
||||
// TestCancelPreviousWithConcurrencyGroup -- but each test case will vary something different about 601 to
|
||||
|
|
@ -220,8 +222,15 @@ func TestCancelPreviousWithConcurrencyGroup(t *testing.T) {
|
|||
updateRun901: map[string]any{"concurrency_group": "321cba"},
|
||||
},
|
||||
{
|
||||
name: "only cancels running",
|
||||
updateRun901: map[string]any{"status": actions_model.StatusSuccess},
|
||||
name: "only cancels running",
|
||||
updateRun901: map[string]any{"status": actions_model.StatusSuccess},
|
||||
updateRun901Jobs: map[string]any{"status": actions_model.StatusSuccess},
|
||||
expected901Status: actions_model.StatusSuccess,
|
||||
},
|
||||
{
|
||||
name: "cancels running job in failed run",
|
||||
updateRun901: map[string]any{"status": actions_model.StatusFailure}, // still has a running job in it (601), but run is marked as failed as-if another job had failed in it
|
||||
expected901Status: actions_model.StatusCancelled,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
|
@ -235,18 +244,20 @@ func TestCancelPreviousWithConcurrencyGroup(t *testing.T) {
|
|||
affected, err := e.Table(&actions_model.ActionRun{}).Where("id = ?", 901).Update(tc.updateRun901)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
newStatus, ok := tc.updateRun901["status"]
|
||||
if ok {
|
||||
expected901Status = newStatus.(actions_model.Status)
|
||||
}
|
||||
}
|
||||
if tc.updateRun901Jobs != nil {
|
||||
affected, err := e.Table(&actions_model.ActionRunJob{}).Where("run_id = ?", 901).Update(tc.updateRun901Jobs)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, affected)
|
||||
}
|
||||
if tc.expected901Status != actions_model.StatusUnknown {
|
||||
expected901Status = tc.expected901Status
|
||||
}
|
||||
|
||||
run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 900})
|
||||
assert.Equal(t, actions_model.StatusRunning, run.Status)
|
||||
assert.EqualValues(t, 1683636626, run.Updated)
|
||||
assert.Equal(t, "abc123", run.ConcurrencyGroup)
|
||||
run = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{ID: 901})
|
||||
assert.Equal(t, expected901Status, run.Status)
|
||||
runJob := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: 900})
|
||||
assert.Equal(t, actions_model.StatusRunning, runJob.Status)
|
||||
assert.EqualValues(t, 1683636528, runJob.Started)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue