From f87ec19130b01e1826214cf5dc5f12853568937c Mon Sep 17 00:00:00 2001 From: forgejo-backport-action Date: Fri, 9 Jan 2026 21:26:27 +0100 Subject: [PATCH] [v14.0/forgejo] feat: provide multiple tasks to Runner in one FetchTask when requested (#10751) **Backport:** https://codeberg.org/forgejo/forgejo/pulls/10602 Permits the Forgejo to return multiple tasks to the Runner in one API call, if requested. Fixes #8917. Related runner PR: https://code.forgejo.org/forgejo/runner/pulls/1245 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [ ] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/.md` to be be used for the release notes instead of the title. ## Release notes - Features - [PR](https://codeberg.org/forgejo/forgejo/pulls/10602): provide multiple tasks to Runner in one FetchTask when requested Co-authored-by: Mathieu Fenniak Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10751 Reviewed-by: Michael Kriese Reviewed-by: Mathieu Fenniak Co-authored-by: forgejo-backport-action Co-committed-by: forgejo-backport-action --- routers/api/actions/runner/runner.go | 23 ++++- tests/integration/actions_fetch_task_test.go | 89 ++++++++++++++++++++ tests/integration/actions_runner_test.go | 29 +++++++ 3 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 tests/integration/actions_fetch_task_test.go diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 83a93cb1c9..3ee402733e 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -152,6 +152,7 @@ func (s *Service) FetchTask( latestVersion++ } + var additionalTasks []*runnerv1.Task if tasksVersion != latestVersion { // if the task version in request is not equal to the version in db, // it means there may still be some tasks not be assigned. @@ -162,10 +163,28 @@ func (s *Service) FetchTask( } else if ok { task = t } + + taskCapacity := req.Msg.GetTaskCapacity() + taskCapacity-- // remove 1 for the task already fetched as `task` + for taskCapacity > 0 { + if t, ok, err := actions_service.PickTask(ctx, runner); err != nil { + // Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner + // and if we don't return them, they can't be picked up by another runner and will become zombie tasks. + // Log the error and return the tasks we've assigned so far. + log.Error("pick task failed: %v", err) + break + } else if ok { + additionalTasks = append(additionalTasks, t) + taskCapacity-- + } else { + break + } + } } res := connect.NewResponse(&runnerv1.FetchTaskResponse{ - Task: task, - TasksVersion: latestVersion, + Task: task, + TasksVersion: latestVersion, + AdditionalTasks: additionalTasks, }) return res, nil } diff --git a/tests/integration/actions_fetch_task_test.go b/tests/integration/actions_fetch_task_test.go new file mode 100644 index 0000000000..7416896be4 --- /dev/null +++ b/tests/integration/actions_fetch_task_test.go @@ -0,0 +1,89 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package integration + +import ( + "net/url" + "strings" + "testing" + + unit_model "forgejo.org/models/unit" + "forgejo.org/models/unittest" + user_model "forgejo.org/models/user" + "forgejo.org/modules/setting" + files_service "forgejo.org/services/repository/files" + "forgejo.org/tests" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActionFetchTask_TaskCapacity(t *testing.T) { + if !setting.Database.Type.IsSQLite3() { + // mock repo runner only supported on SQLite testing + t.Skip() + } + + onApplicationRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + // create the repo + repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks", + []unit_model.Type{unit_model.TypeActions}, nil, + []*files_service.ChangeRepoFile{ + { + Operation: "create", + TreePath: ".forgejo/workflows/matrix.yml", + ContentReader: strings.NewReader(` +on: + push: +jobs: + job1: + strategy: + # matrix creates 125 different jobs from one push... + matrix: + d1: [a, b, c, d, e] + d2: [a, b, c, d, e] + d3: [a, b, c, d, e] + runs-on: ubuntu-latest + steps: + - run: echo ${{ matrix.d1 }} ${{ matrix.d2 }} ${{ matrix.d3 }} + - run: sleep 2 +`), + }, + }, + ) + defer f() + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + // Fetch with TaskCapacity undefined, set to nil, should return a single pending task + task := runner.fetchTask(t) + require.NotNil(t, task) + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, a)") + + // After successfully fetching a task, the runner sets their next requested version to 0. This allows it to + // fetch back-to-back tasks without requiring that a server-side state change occurs. That behaviour is + // replicated here: + runner.lastTasksVersion = 0 + + // Fetch with TaskCapacity set to 1; additional should be nil + capacity := int64(1) + task, addt := runner.fetchMultipleTasks(t, &capacity) + require.NotNil(t, task, "task") + assert.Nil(t, addt, "addt") + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, b)") + + runner.lastTasksVersion = 0 + + capacity = 10 + task, addt = runner.fetchMultipleTasks(t, &capacity) + require.NotNil(t, task, "task") + require.NotNil(t, addt, "addt") + assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, c)") + require.Len(t, addt, 9) + assert.Contains(t, string(addt[0].GetWorkflowPayload()), "name: job1 (a, a, d)") + }) +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 6cedaf6def..83e544464e 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -123,6 +123,35 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1 return task } +func (r *mockRunner) maybeFetchMultipleTasks(t *testing.T, taskCapacity *int64) (*runnerv1.Task, []*runnerv1.Task) { + resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: r.lastTasksVersion, + TaskCapacity: taskCapacity, + })) + require.NoError(t, err) + r.lastTasksVersion = resp.Msg.TasksVersion + return resp.Msg.Task, resp.Msg.AdditionalTasks +} + +func (r *mockRunner) fetchMultipleTasks(t *testing.T, taskCapacity *int64, timeout ...time.Duration) (*runnerv1.Task, []*runnerv1.Task) { + fetchTimeout := 10 * time.Second + if len(timeout) > 0 { + fetchTimeout = timeout[0] + } + var task *runnerv1.Task + var additional []*runnerv1.Task + require.Eventually(t, func() bool { + maybeTask, maybeAdditional := r.maybeFetchMultipleTasks(t, taskCapacity) + if maybeTask != nil { + task = maybeTask + additional = maybeAdditional + return true + } + return false + }, fetchTimeout, time.Millisecond*100, "failed to fetch a task") + return task, additional +} + type mockTaskOutcome struct { result runnerv1.Result outputs map[string]string