mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-12 22:10:25 +00:00
[v14.0/forgejo] feat: provide multiple tasks to Runner in one FetchTask when requested (#10751)
**Backport:** https://codeberg.org/forgejo/forgejo/pulls/10602 Permits the Forgejo to return multiple tasks to the Runner in one API call, if requested. Fixes #8917. Related runner PR: https://code.forgejo.org/forgejo/runner/pulls/1245 ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests - I added test coverage for Go changes... - [ ] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I added test coverage for JavaScript changes... - [ ] in `web_src/js/*.test.js` if it can be unit tested. - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)). ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [ ] I do not want this change to show in the release notes. - [x] I want the title to show in the release notes with a link to this pull request. - [ ] I want the content of the `release-notes/<pull request number>.md` to be be used for the release notes instead of the title. <!--start release-notes-assistant--> ## Release notes <!--URL:https://codeberg.org/forgejo/forgejo--> - Features - [PR](https://codeberg.org/forgejo/forgejo/pulls/10602): <!--number 10602 --><!--line 0 --><!--description cHJvdmlkZSBtdWx0aXBsZSB0YXNrcyB0byBSdW5uZXIgaW4gb25lIEZldGNoVGFzayB3aGVuIHJlcXVlc3RlZA==-->provide multiple tasks to Runner in one FetchTask when requested<!--description--> <!--end release-notes-assistant--> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10751 Reviewed-by: Michael Kriese <michael.kriese@gmx.de> Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Co-authored-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org> Co-committed-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org>
This commit is contained in:
parent
47b9fdc590
commit
f87ec19130
3 changed files with 139 additions and 2 deletions
|
|
@ -152,6 +152,7 @@ func (s *Service) FetchTask(
|
|||
latestVersion++
|
||||
}
|
||||
|
||||
var additionalTasks []*runnerv1.Task
|
||||
if tasksVersion != latestVersion {
|
||||
// if the task version in request is not equal to the version in db,
|
||||
// it means there may still be some tasks not be assigned.
|
||||
|
|
@ -162,10 +163,28 @@ func (s *Service) FetchTask(
|
|||
} else if ok {
|
||||
task = t
|
||||
}
|
||||
|
||||
taskCapacity := req.Msg.GetTaskCapacity()
|
||||
taskCapacity-- // remove 1 for the task already fetched as `task`
|
||||
for taskCapacity > 0 {
|
||||
if t, ok, err := actions_service.PickTask(ctx, runner); err != nil {
|
||||
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
|
||||
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
|
||||
// Log the error and return the tasks we've assigned so far.
|
||||
log.Error("pick task failed: %v", err)
|
||||
break
|
||||
} else if ok {
|
||||
additionalTasks = append(additionalTasks, t)
|
||||
taskCapacity--
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
|
||||
Task: task,
|
||||
TasksVersion: latestVersion,
|
||||
Task: task,
|
||||
TasksVersion: latestVersion,
|
||||
AdditionalTasks: additionalTasks,
|
||||
})
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
|||
89
tests/integration/actions_fetch_task_test.go
Normal file
89
tests/integration/actions_fetch_task_test.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
// Copyright 2025 The Forgejo Authors. All rights reserved.
|
||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
package integration
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
unit_model "forgejo.org/models/unit"
|
||||
"forgejo.org/models/unittest"
|
||||
user_model "forgejo.org/models/user"
|
||||
"forgejo.org/modules/setting"
|
||||
files_service "forgejo.org/services/repository/files"
|
||||
"forgejo.org/tests"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestActionFetchTask_TaskCapacity(t *testing.T) {
|
||||
if !setting.Database.Type.IsSQLite3() {
|
||||
// mock repo runner only supported on SQLite testing
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
onApplicationRun(t, func(t *testing.T, u *url.URL) {
|
||||
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||
|
||||
// create the repo
|
||||
repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks",
|
||||
[]unit_model.Type{unit_model.TypeActions}, nil,
|
||||
[]*files_service.ChangeRepoFile{
|
||||
{
|
||||
Operation: "create",
|
||||
TreePath: ".forgejo/workflows/matrix.yml",
|
||||
ContentReader: strings.NewReader(`
|
||||
on:
|
||||
push:
|
||||
jobs:
|
||||
job1:
|
||||
strategy:
|
||||
# matrix creates 125 different jobs from one push...
|
||||
matrix:
|
||||
d1: [a, b, c, d, e]
|
||||
d2: [a, b, c, d, e]
|
||||
d3: [a, b, c, d, e]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- run: echo ${{ matrix.d1 }} ${{ matrix.d2 }} ${{ matrix.d3 }}
|
||||
- run: sleep 2
|
||||
`),
|
||||
},
|
||||
},
|
||||
)
|
||||
defer f()
|
||||
|
||||
runner := newMockRunner()
|
||||
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
|
||||
|
||||
// Fetch with TaskCapacity undefined, set to nil, should return a single pending task
|
||||
task := runner.fetchTask(t)
|
||||
require.NotNil(t, task)
|
||||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, a)")
|
||||
|
||||
// After successfully fetching a task, the runner sets their next requested version to 0. This allows it to
|
||||
// fetch back-to-back tasks without requiring that a server-side state change occurs. That behaviour is
|
||||
// replicated here:
|
||||
runner.lastTasksVersion = 0
|
||||
|
||||
// Fetch with TaskCapacity set to 1; additional should be nil
|
||||
capacity := int64(1)
|
||||
task, addt := runner.fetchMultipleTasks(t, &capacity)
|
||||
require.NotNil(t, task, "task")
|
||||
assert.Nil(t, addt, "addt")
|
||||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, b)")
|
||||
|
||||
runner.lastTasksVersion = 0
|
||||
|
||||
capacity = 10
|
||||
task, addt = runner.fetchMultipleTasks(t, &capacity)
|
||||
require.NotNil(t, task, "task")
|
||||
require.NotNil(t, addt, "addt")
|
||||
assert.Contains(t, string(task.GetWorkflowPayload()), "name: job1 (a, a, c)")
|
||||
require.Len(t, addt, 9)
|
||||
assert.Contains(t, string(addt[0].GetWorkflowPayload()), "name: job1 (a, a, d)")
|
||||
})
|
||||
}
|
||||
|
|
@ -123,6 +123,35 @@ func (r *mockRunner) fetchTask(t *testing.T, timeout ...time.Duration) *runnerv1
|
|||
return task
|
||||
}
|
||||
|
||||
func (r *mockRunner) maybeFetchMultipleTasks(t *testing.T, taskCapacity *int64) (*runnerv1.Task, []*runnerv1.Task) {
|
||||
resp, err := r.client.runnerServiceClient.FetchTask(t.Context(), connect.NewRequest(&runnerv1.FetchTaskRequest{
|
||||
TasksVersion: r.lastTasksVersion,
|
||||
TaskCapacity: taskCapacity,
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
r.lastTasksVersion = resp.Msg.TasksVersion
|
||||
return resp.Msg.Task, resp.Msg.AdditionalTasks
|
||||
}
|
||||
|
||||
func (r *mockRunner) fetchMultipleTasks(t *testing.T, taskCapacity *int64, timeout ...time.Duration) (*runnerv1.Task, []*runnerv1.Task) {
|
||||
fetchTimeout := 10 * time.Second
|
||||
if len(timeout) > 0 {
|
||||
fetchTimeout = timeout[0]
|
||||
}
|
||||
var task *runnerv1.Task
|
||||
var additional []*runnerv1.Task
|
||||
require.Eventually(t, func() bool {
|
||||
maybeTask, maybeAdditional := r.maybeFetchMultipleTasks(t, taskCapacity)
|
||||
if maybeTask != nil {
|
||||
task = maybeTask
|
||||
additional = maybeAdditional
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}, fetchTimeout, time.Millisecond*100, "failed to fetch a task")
|
||||
return task, additional
|
||||
}
|
||||
|
||||
type mockTaskOutcome struct {
|
||||
result runnerv1.Result
|
||||
outputs map[string]string
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue