jojo/models/actions/run_job.go
Andreas Ahlenstorf d1b69632aa fix: prevent jobs with unknown needs from running (#12046)
If Forgejo encounters an Actions workflow with unknown jobs in a needs definition, Forgejo will ignore those and run the job anyway. That is bad. For example, releases could be published without any testing because the name of the testing job was misspelt.

Workflow that demonstrates the problem:

```yaml
on:
  push:
  workflow_dispatch:
jobs:
  build:
    runs-on: debian
    steps:
      - run: |
          echo "OK"
  test:
    runs-on: debian
    needs: [does-not-exist]
    steps:
      - run: |
          echo "OK"
```

Now, before a workflow is run, Forgejo will check whether all jobs referenced in `needs` exist. If any of them does not, it raises a pre-execution error which fails the workflow immediately. It also displays an appropriate error to the user, for example:

```
Workflow was not executed due to an error that blocked the execution attempt.
Job with ID test references unknown jobs in `needs`: does-not-exist.
```

Futhermore, workflows with pre-execution errors can no longer be rerun, which was previously possible.

Original issue: https://code.forgejo.org/forgejo/runner/issues/977.

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. All work and communication must conform to Forgejo's [AI Agreement](https://codeberg.org/forgejo/governance/src/branch/main/AIAgreement.md). There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests for Go changes

(can be removed for JavaScript changes)

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I ran...
  - [x] `make pr-go` before pushing

### Tests for JavaScript changes

(can be removed for Go changes)

- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [ ] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change.
- [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change.

*The decision if the pull request will be shown in the release notes is up to the mergers / release team.*

The content of the `release-notes/<pull request number>.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead.

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/12046
Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org>
Co-authored-by: Andreas Ahlenstorf <andreas@ahlenstorf.ch>
Co-committed-by: Andreas Ahlenstorf <andreas@ahlenstorf.ch>
2026-04-10 15:40:08 +02:00

363 lines
12 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"context"
"fmt"
"slices"
"time"
"forgejo.org/models/db"
"forgejo.org/modules/container"
"forgejo.org/modules/timeutil"
"forgejo.org/modules/util"
"code.forgejo.org/forgejo/runner/v12/act/jobparser"
gouuid "github.com/google/uuid"
"go.yaml.in/yaml/v3"
"xorm.io/builder"
)
// ActionRunJob represents a job of a run
type ActionRunJob struct {
ID int64
RunID int64 `xorm:"index"`
Run *ActionRun `xorm:"-"`
RepoID int64 `xorm:"index"`
OwnerID int64 `xorm:"index"`
CommitSHA string `xorm:"index"`
IsForkPullRequest bool
Name string `xorm:"VARCHAR(255)"`
Attempt int64
Handle string `xorm:"unique"`
WorkflowPayload []byte
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
Needs []string `xorm:"JSON TEXT"`
RunsOn []string `xorm:"JSON TEXT"`
TaskID int64 // the latest task of the job
Status Status `xorm:"index"`
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Created timeutil.TimeStamp `xorm:"created"`
Updated timeutil.TimeStamp `xorm:"updated index"`
workflowPayloadDecoded *jobparser.SingleWorkflow `xorm:"-"`
}
func init() {
db.RegisterModel(new(ActionRunJob))
}
func (job *ActionRunJob) HTMLURL(ctx context.Context) (string, error) {
if job.Run == nil || job.Run.Repo == nil {
return "", fmt.Errorf("action_run_job: load run and repo before accessing HTMLURL")
}
// Find the "index" of the currently selected job... kinda ugly that the URL uses the index rather than some other
// unique identifier of the job which could actually be stored upon it. But hard to change that now.
allJobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return "", err
}
jobIndex := -1
for i, otherJob := range allJobs {
if job.ID == otherJob.ID {
jobIndex = i
break
}
}
if jobIndex == -1 {
return "", fmt.Errorf("action_run_job: unable to find job on run: %d", job.ID)
}
attempt := job.Attempt
// If a job has never been fetched by a runner yet, it will have attempt 0 -- but this attempt will never have a
// valid UI since attempt is incremented to 1 if it is picked up by a runner.
if attempt == 0 {
attempt = 1
}
return fmt.Sprintf("%s/actions/runs/%d/jobs/%d/attempt/%d", job.Run.Repo.HTMLURL(), job.Run.Index, jobIndex, attempt), nil
}
func (job *ActionRunJob) Duration() time.Duration {
return calculateDuration(job.Started, job.Stopped, job.Status)
}
func (job *ActionRunJob) LoadRun(ctx context.Context) error {
if job.Run == nil {
run, err := GetRunByID(ctx, job.RunID)
if err != nil {
return err
}
job.Run = run
}
return nil
}
// LoadAttributes load Run if not loaded
func (job *ActionRunJob) LoadAttributes(ctx context.Context) error {
if job == nil {
return nil
}
if err := job.LoadRun(ctx); err != nil {
return err
}
return job.Run.LoadAttributes(ctx)
}
// IsRequestedByRunner returns true if this attempt of this ActionRunJob was explicitly requested by the runner or if
// the runner expressed no preference.
func (job *ActionRunJob) IsRequestedByRunner(handle *string) bool {
return handle == nil || job.Handle == *handle
}
func (job *ActionRunJob) ItRunsOn(labels []string) bool {
if len(labels) == 0 || len(job.RunsOn) == 0 {
return false
}
labelSet := make(container.Set[string])
labelSet.AddMultiple(labels...)
return labelSet.IsSubset(job.RunsOn)
}
func (job *ActionRunJob) PrepareNextAttempt(initialStatus Status) error {
if job.Status != StatusUnknown && !job.Status.IsDone() {
return fmt.Errorf("cannot prepare next attempt because job %d is active: %s", job.ID, job.Status.String())
}
job.Attempt++
job.Started = 0
job.Stopped = 0
job.TaskID = 0
job.Handle = gouuid.New().String()
job.Status = initialStatus
return nil
}
// CanBeRerun answers whether this ActionRunJob can be rerun. Returns true if it is done and the Run it belongs to
// is runnable. Returns false in all other cases, including when Run is nil.
func (job *ActionRunJob) CanBeRerun() bool {
if job.Run == nil || !job.Run.IsRunnable() {
return false
}
return job.Status.IsDone()
}
func GetRunJobByID(ctx context.Context, id int64) (*ActionRunJob, error) {
var job ActionRunJob
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&job)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run job with id %d: %w", id, util.ErrNotExist)
}
return &job, nil
}
func GetRunJobsByRunID(ctx context.Context, runID int64) ([]*ActionRunJob, error) {
var jobs []*ActionRunJob
if err := db.GetEngine(ctx).Where("run_id=?", runID).OrderBy("id").Find(&jobs); err != nil {
return nil, err
}
return jobs, nil
}
// Check if the ActionRun has any jobs other than those included in the jobs parameter.
func RunHasOtherJobs(ctx context.Context, runID int64, jobs []*ActionRunJob) (bool, error) {
jobIDs := make([]int64, len(jobs))
for i, job := range jobs {
jobIDs[i] = job.ID
}
otherJobs, err := db.GetEngine(ctx).
Where("run_id = ?", runID).
Where(builder.NotIn("id", jobIDs)).
Count(&ActionRunJob{})
if err != nil {
return false, err
}
return otherJobs > 0, nil
}
// All calls to UpdateRunJobWithoutNotification that change run.Status for any run from a not done status to a done status must call the ActionRunNowDone notification channel.
// Use the wrapper function UpdateRunJob instead.
func UpdateRunJobWithoutNotification(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
e := db.GetEngine(ctx)
sess := e.ID(job.ID)
if len(cols) > 0 {
sess.Cols(cols...)
}
if cond != nil {
sess.Where(cond)
}
affected, err := sess.Update(job)
if err != nil {
return 0, err
}
if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
return affected, nil
}
if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
}
}
if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
return 0, err
}
}
run, columns, err := ComputeRunStatus(ctx, job.RunID)
if err != nil {
return 0, fmt.Errorf("compute run status: %w", err)
}
if err := UpdateRunWithoutNotification(ctx, run, columns...); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
}
return affected, nil
}
var AggregateJobStatus = func(jobs []*ActionRunJob) Status {
allSuccessOrSkipped := len(jobs) != 0
allSkipped := len(jobs) != 0
var hasFailure, hasCancelled, hasWaiting, hasRunning, hasBlocked bool
for _, job := range jobs {
allSuccessOrSkipped = allSuccessOrSkipped && (job.Status == StatusSuccess || job.Status == StatusSkipped)
allSkipped = allSkipped && job.Status == StatusSkipped
hasFailure = hasFailure || job.Status == StatusFailure
hasCancelled = hasCancelled || job.Status == StatusCancelled
hasWaiting = hasWaiting || job.Status == StatusWaiting
hasRunning = hasRunning || job.Status == StatusRunning
hasBlocked = hasBlocked || job.Status == StatusBlocked
}
switch {
case allSkipped:
return StatusSkipped
case allSuccessOrSkipped:
return StatusSuccess
case hasCancelled:
return StatusCancelled
case hasFailure:
return StatusFailure
case hasRunning:
return StatusRunning
case hasWaiting:
return StatusWaiting
case hasBlocked:
return StatusBlocked
default:
return StatusUnknown // it shouldn't happen
}
}
// Retrieves the parsed workflow for this specific job. This field is often accessed multiple times in succession, so
// the parsed content is cached in-memory on the `ActionRunJob` instance.
func (job *ActionRunJob) DecodeWorkflowPayload() (*jobparser.SingleWorkflow, error) {
if job.workflowPayloadDecoded != nil {
return job.workflowPayloadDecoded, nil
}
var jobWorkflow jobparser.SingleWorkflow
err := yaml.Unmarshal(job.WorkflowPayload, &jobWorkflow)
if err != nil {
return nil, fmt.Errorf("failure unmarshaling WorkflowPayload to SingleWorkflow: %w", err)
}
job.workflowPayloadDecoded = &jobWorkflow
return job.workflowPayloadDecoded, nil
}
// If `WorkflowPayload` is changed on an `ActionRunJob`, clear any cached decoded version of the payload. Typically
// only used for unit tests.
func (job *ActionRunJob) ClearCachedWorkflowPayload() {
job.workflowPayloadDecoded = nil
}
// Checks whether the target job is an `(incomplete matrix)` job that will be blocked until the matrix is complete, and
// then regenerated and deleted. If it is incomplete, and if the information is available, the specific job and/or
// output that causes it to be incomplete will be returned as well.
func (job *ActionRunJob) HasIncompleteMatrix() (bool, *jobparser.IncompleteNeeds, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, nil, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.IncompleteMatrix, jobWorkflow.IncompleteMatrixNeeds, nil
}
// Checks whether the target job has a `runs-on` field with an expression that requires an input from another job. The
// job will be blocked until the other job is complete, and then regenerated and deleted.
func (job *ActionRunJob) HasIncompleteRunsOn() (bool, *jobparser.IncompleteNeeds, *jobparser.IncompleteMatrix, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, nil, nil, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.IncompleteRunsOn, jobWorkflow.IncompleteRunsOnNeeds, jobWorkflow.IncompleteRunsOnMatrix, nil
}
// Check whether the target job was generated as a result of expanding a reusable workflow.
func (job *ActionRunJob) IsWorkflowCallInnerJob() (bool, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.Metadata.WorkflowCallParent != "", nil
}
// Check whether this job is a caller of a reusable workflow -- in other words, the real work done in this job is in
// spawned child jobs, not this job.
func (job *ActionRunJob) IsWorkflowCallOuterJob() (bool, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.Metadata.WorkflowCallID != "", nil
}
// Checks whether the target job has a `with` field with an expression that requires an input from another job. The job
// will be blocked until the other job is complete, and then regenerated and deleted.
func (job *ActionRunJob) HasIncompleteWith() (bool, *jobparser.IncompleteNeeds, *jobparser.IncompleteMatrix, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, nil, nil, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.IncompleteWith, jobWorkflow.IncompleteWithNeeds, jobWorkflow.IncompleteWithMatrix, nil
}
// EnableOpenIDConnect checks whether the job allows for ID token generation.
func (job *ActionRunJob) EnableOpenIDConnect() (bool, error) {
jobWorkflow, err := job.DecodeWorkflowPayload()
if err != nil {
return false, fmt.Errorf("failure decoding workflow payload: %w", err)
}
return jobWorkflow.EnableOpenIDConnect, nil
}
// AllNeedsExist checks whether this ActionRunJob's Needs can theoretically be met by comparing them with the supplied
// list of all job IDs that part of a particular workflow run. Returns the list of unknown job IDs found in Needs
// alongside an indicator whether the check was successful.
func (job *ActionRunJob) AllNeedsExist(allExistingJobIDs container.Set[string]) ([]string, bool) {
unknownJobIDs := []string{}
for _, need := range job.Needs {
if !allExistingJobIDs.Contains(need) {
unknownJobIDs = append(unknownJobIDs, need)
}
}
return unknownJobIDs, len(unknownJobIDs) == 0
}