From 5e1c13f50ee92e1eb061306bbbeae7e70207fa76 Mon Sep 17 00:00:00 2001 From: Andreas Ahlenstorf Date: Wed, 25 Mar 2026 17:27:05 +0100 Subject: [PATCH] feat: allow runners to request a particular job (#11676) Forgejo Runner can optionally ask for a particular job. Example: `forgejo-runner one-job --handle 9d52c7d8-aebe-426b-b015-dd453aacaada`. This change adds the necessary job filtering to Forgejo. See https://code.forgejo.org/forgejo/forgejo-actions-feature-requests/issues/76 for the motivation and design considerations. PR for the extension of the runner protocol: https://code.forgejo.org/forgejo/actions-proto/pulls/18 Related change in Forgejo Runner with usage example: https://code.forgejo.org/forgejo/runner/pulls/1443 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests for Go changes (can be removed for JavaScript changes) - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I ran... - [x] `make pr-go` before pushing ### Tests for JavaScript changes (can be removed for Go changes) - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change. - [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change. *The decision if the pull request will be shown in the release notes is up to the mergers / release team.* The content of the `release-notes/.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead. ## Release notes - Features - [PR](https://codeberg.org/forgejo/forgejo/pulls/11676): allow runners to request a particular job Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11676 Reviewed-by: Mathieu Fenniak Co-authored-by: Andreas Ahlenstorf Co-committed-by: Andreas Ahlenstorf --- models/actions/run_job.go | 9 ++ models/actions/run_job_test.go | 33 ++++- models/actions/task.go | 8 +- models/fixtures/action_run_job.yml | 3 + .../forgejo_migrations/v15c_add_job_handle.go | 23 ++++ modules/structs/action.go | 2 + routers/api/actions/runner/runner.go | 124 ++++++++++++++---- routers/api/v1/shared/runners.go | 1 + routers/web/repo/actions/view.go | 2 +- services/actions/task.go | 4 +- templates/swagger/v1_json.tmpl | 5 + tests/integration/actions_fetch_task_test.go | 86 ++++++++++++ tests/integration/actions_runner_test.go | 10 ++ tests/integration/api_admin_actions_test.go | 1 + tests/integration/api_org_actions_test.go | 1 + tests/integration/api_repo_actions_test.go | 1 + tests/integration/api_user_actions_test.go | 1 + 17 files changed, 278 insertions(+), 36 deletions(-) create mode 100644 models/forgejo_migrations/v15c_add_job_handle.go diff --git a/models/actions/run_job.go b/models/actions/run_job.go index a9fbdfaed4..412e60aefd 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -15,6 +15,7 @@ import ( "forgejo.org/modules/util" "code.forgejo.org/forgejo/runner/v12/act/jobparser" + gouuid "github.com/google/uuid" "go.yaml.in/yaml/v3" "xorm.io/builder" ) @@ -30,6 +31,7 @@ type ActionRunJob struct { IsForkPullRequest bool Name string `xorm:"VARCHAR(255)"` Attempt int64 + Handle string `xorm:"unique"` WorkflowPayload []byte JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id Needs []string `xorm:"JSON TEXT"` @@ -108,6 +110,12 @@ func (job *ActionRunJob) LoadAttributes(ctx context.Context) error { return job.Run.LoadAttributes(ctx) } +// IsRequestedByRunner returns true if this attempt of this ActionRunJob was explicitly requested by the runner or if +// the runner expressed no preference. +func (job *ActionRunJob) IsRequestedByRunner(handle *string) bool { + return handle == nil || job.Handle == *handle +} + func (job *ActionRunJob) ItRunsOn(labels []string) bool { if len(labels) == 0 || len(job.RunsOn) == 0 { return false @@ -126,6 +134,7 @@ func (job *ActionRunJob) PrepareNextAttempt(initialStatus Status) error { job.Started = 0 job.Stopped = 0 job.TaskID = 0 + job.Handle = gouuid.New().String() job.Status = initialStatus return nil diff --git a/models/actions/run_job_test.go b/models/actions/run_job_test.go index 945b2d150f..05c504cf64 100644 --- a/models/actions/run_job_test.go +++ b/models/actions/run_job_test.go @@ -304,16 +304,21 @@ func TestRunHasOtherJobs(t *testing.T) { } func TestActionRunJobPrepareNextAttempt(t *testing.T) { - job := ActionRunJob{ID: 46} + lastHandle := "original-handle" + job := ActionRunJob{ID: 46, Handle: lastHandle} + err := job.PrepareNextAttempt(StatusWaiting) require.NoError(t, err) + assert.NotEqual(t, lastHandle, job.Handle) + assert.NotEmpty(t, job.Handle) assert.Equal(t, int64(1), job.Attempt) assert.Zero(t, job.Started) assert.Zero(t, job.Stopped) assert.Zero(t, job.TaskID) assert.Equal(t, StatusWaiting, job.Status) + lastHandle = job.Handle job.Started = timeutil.TimeStampNow() job.Stopped = timeutil.TimeStampNow() job.TaskID = int64(59) @@ -322,19 +327,45 @@ func TestActionRunJobPrepareNextAttempt(t *testing.T) { err = job.PrepareNextAttempt(StatusBlocked) require.NoError(t, err) + assert.NotEqual(t, lastHandle, job.Handle) + assert.NotEmpty(t, job.Handle) assert.Equal(t, int64(2), job.Attempt) assert.Zero(t, job.Started) assert.Zero(t, job.Stopped) assert.Zero(t, job.TaskID) assert.Equal(t, StatusBlocked, job.Status) + lastHandle = job.Handle + // The job hasn't finished yet. Preparing a next attempt should not be possible. It should be left untouched. err = job.PrepareNextAttempt(StatusWaiting) require.ErrorContains(t, err, "cannot prepare next attempt because job 46 is active: blocked") + assert.Equal(t, lastHandle, job.Handle) assert.Equal(t, int64(2), job.Attempt) assert.Zero(t, job.Started) assert.Zero(t, job.Stopped) assert.Zero(t, job.TaskID) assert.Equal(t, StatusBlocked, job.Status) } + +func TestIsRequestedByRunner(t *testing.T) { + sameHandle := "4a1ca0be-4470-486d-8504-89b4a5ac00cf" + differentHandle := "88423da3-67af-4f2d-9a92-a0db822697e9" + emptyHandle := "" + + job := &ActionRunJob{ID: 422, Attempt: 5, Handle: sameHandle} + + assert.True(t, job.IsRequestedByRunner(nil)) + assert.True(t, job.IsRequestedByRunner(&sameHandle)) + assert.False(t, job.IsRequestedByRunner(&differentHandle)) + assert.False(t, job.IsRequestedByRunner(&emptyHandle)) + + // Old jobs that were created before the introduction of Handle do not have one. + emptyHandleJob := &ActionRunJob{ID: 422, Attempt: 5, Handle: ""} + + assert.True(t, emptyHandleJob.IsRequestedByRunner(nil)) + assert.True(t, emptyHandleJob.IsRequestedByRunner(&emptyHandle)) + + assert.False(t, emptyHandleJob.IsRequestedByRunner(&differentHandle)) +} diff --git a/models/actions/task.go b/models/actions/task.go index 6b169de3c0..1a208dad9d 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -347,7 +347,7 @@ func GetAvailableJobsForRunner(e db.Engine, runner *ActionRunner) ([]*ActionRunJ return jobs, nil } -func CreateTaskForRunner(ctx context.Context, runner *ActionRunner, requestKey *string) (*ActionTask, bool, error) { +func CreateTaskForRunner(ctx context.Context, runner *ActionRunner, requestKey, handle *string) (*ActionTask, bool, error) { ctx, committer, err := db.TxContext(ctx) if err != nil { return nil, false, err @@ -364,9 +364,9 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner, requestKey * // TODO: a more efficient way to filter labels var job *ActionRunJob log.Trace("runner labels: %v", runner.AgentLabels) - for _, v := range jobs { - if v.ItRunsOn(runner.AgentLabels) { - job = v + for _, j := range jobs { + if j.IsRequestedByRunner(handle) && j.ItRunsOn(runner.AgentLabels) { + job = j break } } diff --git a/models/fixtures/action_run_job.yml b/models/fixtures/action_run_job.yml index 2d8ea9ff6f..a3afaa8461 100644 --- a/models/fixtures/action_run_job.yml +++ b/models/fixtures/action_run_job.yml @@ -152,6 +152,7 @@ is_fork_pull_request: false name: job_2 attempt: 1 + handle: 18e9cf40-c2f6-409f-b832-b945ea7dc79b job_id: job_2 task_id: 47 status: 5 @@ -167,6 +168,7 @@ is_fork_pull_request: false name: job_2 attempt: 2 + handle: a723d3e3-49a1-4e6b-947f-e987e60bfbd6 job_id: job_2 task_id: 47 status: 5 @@ -182,6 +184,7 @@ is_fork_pull_request: false name: job_2 attempt: 1 + handle: 40317a2f-2f00-4a82-8cc4-57347989a493 job_id: job_2 task_id: 47 status: 5 diff --git a/models/forgejo_migrations/v15c_add_job_handle.go b/models/forgejo_migrations/v15c_add_job_handle.go new file mode 100644 index 0000000000..59dca1b991 --- /dev/null +++ b/models/forgejo_migrations/v15c_add_job_handle.go @@ -0,0 +1,23 @@ +// Copyright 2026 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package forgejo_migrations + +import ( + "xorm.io/xorm" +) + +func init() { + registerMigration(&Migration{ + Description: "add handle to action_run_job", + Upgrade: addActionRunJobHandle, + }) +} + +func addActionRunJobHandle(x *xorm.Engine) error { + type ActionRunJob struct { + Handle string `xorm:"unique"` + } + _, err := x.SyncWithOptions(xorm.SyncOptions{IgnoreDropIndices: true}, new(ActionRunJob)) + return err +} diff --git a/modules/structs/action.go b/modules/structs/action.go index 93aab4206c..a39ae11d65 100644 --- a/modules/structs/action.go +++ b/modules/structs/action.go @@ -14,6 +14,8 @@ type ActionRunJob struct { ID int64 `json:"id"` // How many times the job has been attempted including the current attempt. Attempt int64 `json:"attempt"` + // Opaque identifier that uniquely identifies a single attempt of a job. + Handle string `json:"handle"` // the repository id RepoID int64 `json:"repo_id"` // the owner id diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 41e33fc476..2020bae89c 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -142,49 +142,31 @@ func (s *Service) FetchTask( requestKey := getRequestKey(ctx) if requestKey != nil { - // Search for previous tasks is based upon both the runner and the request key in order to reduce the security - // risk. If a request key is leaked (eg. it appears in a log file, log file gets published in a bug report) it - // could be used indefinitely to retrieve the associated task(s), so requiring the correctly authenticated - // runner reduces that risk. - recoveredTasks, err := actions_model.GetTasksByRunnerRequestKey(ctx, runner, *requestKey) + recoveredTasks, err := recoverTasks(ctx, runner, *requestKey) if err != nil { - return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("query by request key failed: %w", err)) + return nil, connect.NewError(connect.CodeInternal, err) } else if len(recoveredTasks) > 0 { - // Recovered tasks from a repeat request key - tasks, err := actions_service.RecoverTasks(ctx, recoveredTasks) - if err != nil { - return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("recover tasks failed: %w", err)) - } resp := &runnerv1.FetchTaskResponse{ - Task: tasks[0], + Task: recoveredTasks[0], TasksVersion: 0, - AdditionalTasks: tasks[1:], + AdditionalTasks: recoveredTasks[1:], } return connect.NewResponse(resp), nil } } - var task *runnerv1.Task - tasksVersion := req.Msg.TasksVersion // task version from runner - latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) + latestVersion, err := getLatestTasksVersion(ctx, runner.OwnerID, runner.RepoID) if err != nil { - return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("query tasks version failed: %w", err)) - } else if latestVersion == 0 { - if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil { - return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("fail to increase task version: %w", err)) - } - // if we don't increase the value of `latestVersion` here, - // the response of FetchTask will return tasksVersion as zero. - // and the runner will treat it as an old version of Gitea. - latestVersion++ + return nil, connect.NewError(connect.CodeInternal, err) } + var task *runnerv1.Task var additionalTasks []*runnerv1.Task - if tasksVersion != latestVersion { + if req.Msg.TasksVersion != latestVersion { // if the task version in request is not equal to the version in db, // it means there may still be some tasks not be assigned. // try to pick a task for the runner that send the request. - if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil { + if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil { log.Error("pick task failed: %v", err) return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err)) } else if ok { @@ -193,7 +175,7 @@ func (s *Service) FetchTask( taskCapacity := req.Msg.GetTaskCapacity() taskCapacity-- // remove 1 for the task already fetched as `task` for taskCapacity > 0 { - if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil { + if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil { // Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner // and if we don't return them, they can't be picked up by another runner and will become zombie tasks. // Log the error and return the tasks we've assigned so far. @@ -216,6 +198,55 @@ func (s *Service) FetchTask( return res, nil } +func (s *Service) FetchSingleTask( + ctx context.Context, + req *connect.Request[runnerv1.FetchSingleTaskRequest], +) (*connect.Response[runnerv1.FetchSingleTaskResponse], error) { + runner := GetRunner(ctx) + + requestKey := getRequestKey(ctx) + if requestKey != nil { + recoveredTasks, err := recoverTasks(ctx, runner, *requestKey) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } else if len(recoveredTasks) == 1 { + resp := &runnerv1.FetchSingleTaskResponse{ + TasksVersion: 0, + Task: recoveredTasks[0], + } + return connect.NewResponse(resp), nil + } else if len(recoveredTasks) > 1 { + return nil, connect.NewError(connect.CodeInternal, + fmt.Errorf("cannot recover %d tasks because runner requested only one", len(recoveredTasks))) + } + } + + latestVersion, err := getLatestTasksVersion(ctx, runner.OwnerID, runner.RepoID) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + + var task *runnerv1.Task + if req.Msg.TasksVersion != latestVersion { + var handle *string + if req.Msg.Handle != nil && *req.Msg.Handle != "" { + handle = req.Msg.Handle + } + + if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, handle); err != nil { + log.Error("pick task failed: %v", err) + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err)) + } else if ok { + task = t + } + } + res := connect.NewResponse(&runnerv1.FetchSingleTaskResponse{ + TasksVersion: latestVersion, + Task: task, + }) + return res, nil +} + // UpdateTask updates the task status. func (s *Service) UpdateTask( ctx context.Context, @@ -354,3 +385,40 @@ func (s *Service) UpdateLog( return res, nil } + +func recoverTasks(ctx context.Context, runner *actions_model.ActionRunner, requestKey string) ([]*runnerv1.Task, error) { + // Search for previous tasks is based upon both the runner and the request key in order to reduce the security + // risk. If a request key is leaked (eg. it appears in a log file, log file gets published in a bug report) it + // could be used indefinitely to retrieve the associated task(s), so requiring the correctly authenticated + // runner reduces that risk. + recoveredTasks, err := actions_model.GetTasksByRunnerRequestKey(ctx, runner, requestKey) + if err != nil { + return nil, fmt.Errorf("query by request key failed: %w", err) + } else if len(recoveredTasks) > 0 { + // Recovered tasks from a repeat request key + tasks, err := actions_service.RecoverTasks(ctx, recoveredTasks) + if err != nil { + return nil, fmt.Errorf("recover tasks failed: %w", err) + } + return tasks, nil + } + + return []*runnerv1.Task{}, nil +} + +func getLatestTasksVersion(ctx context.Context, ownerID, repoID int64) (int64, error) { + latestVersion, err := actions_model.GetTasksVersionByScope(ctx, ownerID, repoID) + if err != nil { + return 0, fmt.Errorf("query tasks version failed: %w", err) + } else if latestVersion == 0 { + if err := actions_model.IncreaseTaskVersion(ctx, ownerID, repoID); err != nil { + return 0, fmt.Errorf("fail to increase task version: %w", err) + } + // if we don't increase the value of `latestVersion` here, + // the response of FetchTask will return tasksVersion as zero. + // and the runner will treat it as an old version of Gitea. + latestVersion++ + } + + return latestVersion, nil +} diff --git a/routers/api/v1/shared/runners.go b/routers/api/v1/shared/runners.go index 7a5c363f13..21021e4076 100644 --- a/routers/api/v1/shared/runners.go +++ b/routers/api/v1/shared/runners.go @@ -77,6 +77,7 @@ func fromRunJobModelToResponse(job []*actions_model.ActionRunJob, labels []strin res = append(res, &structs.ActionRunJob{ ID: job[i].ID, Attempt: job[i].Attempt, + Handle: job[i].Handle, RepoID: job[i].RepoID, OwnerID: job[i].OwnerID, Name: job[i].Name, diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index c775de7ef5..a645f317f9 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -587,7 +587,7 @@ func rerunJob(ctx *app_context.Context, job *actions_model.ActionRunJob, shouldB } if err := db.WithTx(ctx, func(ctx context.Context) error { - _, err := actions_service.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "attempt", "task_id", "status", "started", "stopped") + _, err := actions_service.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "handle", "attempt", "task_id", "status", "started", "stopped") return err }); err != nil { return err diff --git a/services/actions/task.go b/services/actions/task.go index 4f78092bdb..ce43180b7b 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -20,7 +20,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey *string) (*runnerv1.Task, bool, error) { +func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey, handle *string) (*runnerv1.Task, bool, error) { var ( task *runnerv1.Task job *actions_model.ActionRunJob @@ -40,7 +40,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKe } if err := db.WithTx(ctx, func(ctx context.Context) error { - t, ok, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey) + t, ok, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey, handle) if err != nil { return fmt.Errorf("CreateTaskForRunner: %w", err) } diff --git a/templates/swagger/v1_json.tmpl b/templates/swagger/v1_json.tmpl index 9737239467..d45f9b2893 100644 --- a/templates/swagger/v1_json.tmpl +++ b/templates/swagger/v1_json.tmpl @@ -22401,6 +22401,11 @@ "format": "int64", "x-go-name": "Attempt" }, + "handle": { + "description": "Opaque identifier that uniquely identifies a single attempt of a job.", + "type": "string", + "x-go-name": "Handle" + }, "id": { "description": "Identifier of this job.", "type": "integer", diff --git a/tests/integration/actions_fetch_task_test.go b/tests/integration/actions_fetch_task_test.go index 95502b7424..7796286565 100644 --- a/tests/integration/actions_fetch_task_test.go +++ b/tests/integration/actions_fetch_task_test.go @@ -191,3 +191,89 @@ jobs: assert.NotEqual(t, task1.Id, task2.Id) }) } + +func TestActionFetchTask_RequestedJob(t *testing.T) { + if !setting.Database.Type.IsSQLite3() { + // mock repo runner only supported on SQLite testing + t.Skip() + } + + onApplicationRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + // create the repo + repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks", + []unit_model.Type{unit_model.TypeActions}, nil, + []*files_service.ChangeRepoFile{ + { + Operation: "create", + TreePath: ".forgejo/workflows/simple.yml", + ContentReader: strings.NewReader(` +on: + push: +jobs: + job1: + runs-on: ubuntu-latest + steps: + - run: echo OK + job2: + runs-on: debian + steps: + - run: echo OK + job3: + runs-on: debian + steps: + - run: echo OK +`), + }, + }, + ) + defer f() + + debianRunner := newMockRunner() + debianRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "debian-runner", []string{"debian"}) + + ubuntuRunner := newMockRunner() + ubuntuRunner.registerAsRepoRunner(t, user2.Name, repo.Name, "ubuntu-runner", []string{"ubuntu-latest"}) + + job1 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job1"}) + job2 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job2"}) + job3 := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RepoID: repo.ID, Name: "job3"}) + + assert.NotEmpty(t, job1.Handle) + assert.NotEmpty(t, job2.Handle) + assert.NotEmpty(t, job3.Handle) + + nonExistingHandle := "does-not-exist" + emptyHandle := "" + + // The runner's labels do not match. Therefore, it does not receive the job despite explicitly asking for it. + task := debianRunner.maybeFetchSingleTask(t, &job1.Handle) + require.Nil(t, task) + + // If the requested job does not exist or is not ready, the runner does not receive any job. + task = ubuntuRunner.maybeFetchSingleTask(t, &nonExistingHandle) + require.Nil(t, task) + + ubuntuRunner.lastTasksVersion = 0 + debianRunner.lastTasksVersion = 0 + + // The next job waiting in line for the debian-runner is job2. But because the runner explicitly asks for job3, + // it receives job3 instead. + task = debianRunner.maybeFetchSingleTask(t, &job3.Handle) + require.NotNil(t, task) + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job3") + + ubuntuRunner.lastTasksVersion = 0 + debianRunner.lastTasksVersion = 0 + + // Without explicitly asking for a job, the runners receives the next job waiting in line. + task = debianRunner.maybeFetchSingleTask(t, nil) + require.NotNil(t, task) + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job2") + + task = ubuntuRunner.maybeFetchSingleTask(t, &emptyHandle) + require.NotNil(t, task) + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1") + }) +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 70784c1f64..28be946712 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -150,6 +150,16 @@ func (r *mockRunner) maybeFetchTask(t *testing.T) *runnerv1.Task { return resp.Msg.Task } +func (r *mockRunner) maybeFetchSingleTask(t *testing.T, handle *string) *runnerv1.Task { + resp, err := r.client.runnerServiceClient.FetchSingleTask(t.Context(), connect.NewRequest(&runnerv1.FetchSingleTaskRequest{ + TasksVersion: r.lastTasksVersion, + Handle: handle, + })) + require.NoError(t, err) + r.lastTasksVersion = resp.Msg.TasksVersion + return resp.Msg.Task +} + func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1.Task { fetchTimeout := 10 * time.Second if len(timeout) > 0 { diff --git a/tests/integration/api_admin_actions_test.go b/tests/integration/api_admin_actions_test.go index 5f71a41c83..173e22ca91 100644 --- a/tests/integration/api_admin_actions_test.go +++ b/tests/integration/api_admin_actions_test.go @@ -47,6 +47,7 @@ func TestAPIAdminActionsGetJobs(t *testing.T) { expected := api.ActionRunJob{ ID: 393, Attempt: 1, + Handle: "18e9cf40-c2f6-409f-b832-b945ea7dc79b", RepoID: 1, OwnerID: 1, Name: "job_2", diff --git a/tests/integration/api_org_actions_test.go b/tests/integration/api_org_actions_test.go index 1d71f02d83..08de84cdf2 100644 --- a/tests/integration/api_org_actions_test.go +++ b/tests/integration/api_org_actions_test.go @@ -38,6 +38,7 @@ func TestActionsAPISearchActionJobs_OrgRunner(t *testing.T) { job395 := api.ActionRunJob{ ID: 395, Attempt: 1, + Handle: "40317a2f-2f00-4a82-8cc4-57347989a493", RepoID: 1, OwnerID: 3, Name: "job_2", diff --git a/tests/integration/api_repo_actions_test.go b/tests/integration/api_repo_actions_test.go index c3808c2058..fa56346a27 100644 --- a/tests/integration/api_repo_actions_test.go +++ b/tests/integration/api_repo_actions_test.go @@ -52,6 +52,7 @@ func TestActionsAPISearchActionJobs_RepoRunner(t *testing.T) { job393 := api.ActionRunJob{ ID: 393, Attempt: 1, + Handle: "18e9cf40-c2f6-409f-b832-b945ea7dc79b", RepoID: 1, OwnerID: 1, Name: "job_2", diff --git a/tests/integration/api_user_actions_test.go b/tests/integration/api_user_actions_test.go index 2e22686bea..b9a0b16ab4 100644 --- a/tests/integration/api_user_actions_test.go +++ b/tests/integration/api_user_actions_test.go @@ -39,6 +39,7 @@ func TestActionsAPISearchActionJobs_UserRunner(t *testing.T) { job394 := api.ActionRunJob{ ID: 394, Attempt: 2, + Handle: "a723d3e3-49a1-4e6b-947f-e987e60bfbd6", RepoID: 1, OwnerID: 2, Name: "job_2",