mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-12 22:10:25 +00:00
fix: allow Actions runner to recover tasks lost during fetching from intermittent errors (#11401)
Probably fixes (or improves, at least) https://code.forgejo.org/forgejo/runner/issues/1391, paired with the runner implementation https://code.forgejo.org/forgejo/runner/pulls/1393. When the FetchTask() API is invoked to create a task, unpreventable environmental errors may occur; for example, network disconnects and timeouts. It's possible that these errors occur after the server-side has assigned a task to the runner during the API call, in which case the error would cause that task to be lost between the two systems -- the server will think it's assigned to the runner, and the runner never received it. This can cause jobs to appear stuck at "Set up job". The solution implemented here is idempotency in the FetchTask() API call, which means that the "same" FetchTask() API call is expected to return the same values. Specifically, the runner creates a unique identifier which is transmitted to the server as a header `x-runner-request-key` with each FetchTask() invocation which defines the sameness of the call, and the runner retains the value until the API call receives a successful response. The server implementation returns the same tasks back if a second (or Nth) call is received with the same `x-runner-request-key` header. In order to accomplish this is records the `x-runner-request-key` value that is used with each request that assigns tasks. As a complication, the Forgejo server is unable to return the same `${{ secrets.forgejo_token }}` for the task because the server stores that value in a one-way hash in the database. To resolve this, the server regenerates the token when retrieving tasks for a second time. ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests for Go changes (can be removed for JavaScript changes) - I added test coverage for Go changes... - [x] in their respective `*_test.go` for unit tests. - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I ran... - [x] `make pr-go` before pushing ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change. - [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change. *The decision if the pull request will be shown in the release notes is up to the mergers / release team.* The content of the `release-notes/<pull request number>.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead. Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11401 Reviewed-by: Andreas Ahlenstorf <aahlenst@noreply.codeberg.org> Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
This commit is contained in:
parent
5486bfa535
commit
0ae6235386
9 changed files with 337 additions and 11 deletions
|
|
@ -0,0 +1,36 @@
|
||||||
|
-
|
||||||
|
id: 100
|
||||||
|
attempt: 3
|
||||||
|
runner_id: 12345678
|
||||||
|
status: 5 # StatusWaiting
|
||||||
|
repo_id: 4
|
||||||
|
owner_id: 1
|
||||||
|
commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0
|
||||||
|
is_fork_pull_request: false
|
||||||
|
token_hash: a1
|
||||||
|
token_salt: eeeeeeee
|
||||||
|
token_last_eight: eeeeeeee
|
||||||
|
log_filename: artifact-test2/2f/47.log
|
||||||
|
log_in_storage: true
|
||||||
|
log_length: 707
|
||||||
|
log_size: 90179
|
||||||
|
log_expired: false
|
||||||
|
runner_request_key: 0a7e017d-4201-4b34-8cf4-de0f431893a4
|
||||||
|
-
|
||||||
|
id: 101
|
||||||
|
attempt: 3
|
||||||
|
runner_id: 12345678
|
||||||
|
status: 5 # StatusWaiting
|
||||||
|
repo_id: 4
|
||||||
|
owner_id: 1
|
||||||
|
commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0
|
||||||
|
is_fork_pull_request: false
|
||||||
|
token_hash: a2
|
||||||
|
token_salt: eeeeeeee
|
||||||
|
token_last_eight: eeeeeeee
|
||||||
|
log_filename: artifact-test2/2f/47.log
|
||||||
|
log_in_storage: true
|
||||||
|
log_length: 707
|
||||||
|
log_size: 90179
|
||||||
|
log_expired: false
|
||||||
|
runner_request_key: 0a7e017d-4201-4b34-8cf4-de0f431893a4
|
||||||
|
|
@ -29,7 +29,7 @@ type ActionTask struct {
|
||||||
Job *ActionRunJob `xorm:"-"`
|
Job *ActionRunJob `xorm:"-"`
|
||||||
Steps []*ActionTaskStep `xorm:"-"`
|
Steps []*ActionTaskStep `xorm:"-"`
|
||||||
Attempt int64
|
Attempt int64
|
||||||
RunnerID int64 `xorm:"index"`
|
RunnerID int64 `xorm:"index index(request_key)"`
|
||||||
Status Status `xorm:"index"`
|
Status Status `xorm:"index"`
|
||||||
Started timeutil.TimeStamp `xorm:"index"`
|
Started timeutil.TimeStamp `xorm:"index"`
|
||||||
Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"`
|
Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"`
|
||||||
|
|
@ -51,6 +51,15 @@ type ActionTask struct {
|
||||||
LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset
|
LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset
|
||||||
LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted
|
LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted
|
||||||
|
|
||||||
|
// When the FetchTask() API is invoked to create a task, unpreventable environmental errors may occur; for example,
|
||||||
|
// network disconnects and timeouts. If that API call has a unique identifier associated with it, it is stored in
|
||||||
|
// RunnerRequestKey. This allows the API call to be implemented idempotently using this state: if one API call
|
||||||
|
// assigns a task to a runner and a second API call is received from the same runner with the same request key, the
|
||||||
|
// existing assigned tasks can be returned.
|
||||||
|
//
|
||||||
|
// Indexed for an efficient search on runner_id=? AND runner_request_key=?.
|
||||||
|
RunnerRequestKey string `xorm:"index(request_key)"`
|
||||||
|
|
||||||
Created timeutil.TimeStamp `xorm:"created"`
|
Created timeutil.TimeStamp `xorm:"created"`
|
||||||
Updated timeutil.TimeStamp `xorm:"updated index"`
|
Updated timeutil.TimeStamp `xorm:"updated index"`
|
||||||
}
|
}
|
||||||
|
|
@ -147,6 +156,11 @@ func (task *ActionTask) GenerateToken() {
|
||||||
task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight = generateSaltedToken()
|
task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight = generateSaltedToken()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After using GenerateToken, UpdateToken can be used to update the database record affecting the same columns.
|
||||||
|
func (task *ActionTask) UpdateToken(ctx context.Context) error {
|
||||||
|
return UpdateTask(ctx, task, "token_hash", "token_salt", "token_last_eight")
|
||||||
|
}
|
||||||
|
|
||||||
// Retrieve all the attempts from the same job as the target `ActionTask`. Limited fields are queried to avoid loading
|
// Retrieve all the attempts from the same job as the target `ActionTask`. Limited fields are queried to avoid loading
|
||||||
// the LogIndexes blob when not needed.
|
// the LogIndexes blob when not needed.
|
||||||
func (task *ActionTask) GetAllAttempts(ctx context.Context) ([]*ActionTask, error) {
|
func (task *ActionTask) GetAllAttempts(ctx context.Context) ([]*ActionTask, error) {
|
||||||
|
|
@ -242,6 +256,15 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro
|
||||||
return nil, errNotExist
|
return nil, errNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetTasksByRunnerRequestKey(ctx context.Context, runner *ActionRunner, requestKey string) ([]*ActionTask, error) {
|
||||||
|
var tasks []*ActionTask
|
||||||
|
err := db.GetEngine(ctx).Where("runner_id = ? AND runner_request_key = ?", runner.ID, requestKey).Find(&tasks)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return tasks, nil
|
||||||
|
}
|
||||||
|
|
||||||
func getConcurrencyCondition() builder.Cond {
|
func getConcurrencyCondition() builder.Cond {
|
||||||
concurrencyCond := builder.NewCond()
|
concurrencyCond := builder.NewCond()
|
||||||
|
|
||||||
|
|
@ -324,7 +347,7 @@ func GetAvailableJobsForRunner(e db.Engine, runner *ActionRunner) ([]*ActionRunJ
|
||||||
return jobs, nil
|
return jobs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) {
|
func CreateTaskForRunner(ctx context.Context, runner *ActionRunner, requestKey *string) (*ActionTask, bool, error) {
|
||||||
ctx, committer, err := db.TxContext(ctx)
|
ctx, committer, err := db.TxContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
|
|
@ -370,6 +393,9 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask
|
||||||
CommitSHA: job.CommitSHA,
|
CommitSHA: job.CommitSHA,
|
||||||
IsForkPullRequest: job.IsForkPullRequest,
|
IsForkPullRequest: job.IsForkPullRequest,
|
||||||
}
|
}
|
||||||
|
if requestKey != nil {
|
||||||
|
task.RunnerRequestKey = *requestKey
|
||||||
|
}
|
||||||
task.GenerateToken()
|
task.GenerateToken()
|
||||||
|
|
||||||
var workflowJob *jobparser.Job
|
var workflowJob *jobparser.Job
|
||||||
|
|
|
||||||
|
|
@ -76,3 +76,26 @@ func TestActionTask_CreatePlaceholderTask(t *testing.T) {
|
||||||
}
|
}
|
||||||
assert.Equal(t, map[string]string{"output1": "value1", "output2": "value2"}, finalOutputs)
|
assert.Equal(t, map[string]string{"output1": "value1", "output2": "value2"}, finalOutputs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestActionTask_GetTasksByRunnerRequestKey(t *testing.T) {
|
||||||
|
defer unittest.OverrideFixtures("models/actions/TestActionTask_GetTasksByRunnerRequestKey")()
|
||||||
|
require.NoError(t, unittest.PrepareTestDatabase())
|
||||||
|
|
||||||
|
runner := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: 12345678})
|
||||||
|
|
||||||
|
// not matching runner_request_key
|
||||||
|
tasks, err := GetTasksByRunnerRequestKey(t.Context(), runner, "22288392-2c70-4125-bb01-c7da79fa280c")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Empty(t, tasks)
|
||||||
|
|
||||||
|
// matching both runner_id and runner_request_key
|
||||||
|
tasks, err = GetTasksByRunnerRequestKey(t.Context(), runner, "0a7e017d-4201-4b34-8cf4-de0f431893a4")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, tasks, 2)
|
||||||
|
|
||||||
|
// not matching runner_id
|
||||||
|
runner = unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: 10000001})
|
||||||
|
tasks, err = GetTasksByRunnerRequestKey(t.Context(), runner, "0a7e017d-4201-4b34-8cf4-de0f431893a4")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Empty(t, tasks)
|
||||||
|
}
|
||||||
|
|
|
||||||
24
models/forgejo_migrations/v15b_add-runner_request_key.go
Normal file
24
models/forgejo_migrations/v15b_add-runner_request_key.go
Normal file
|
|
@ -0,0 +1,24 @@
|
||||||
|
// Copyright 2025 The Forgejo Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
|
package forgejo_migrations
|
||||||
|
|
||||||
|
import (
|
||||||
|
"xorm.io/xorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
registerMigration(&Migration{
|
||||||
|
Description: "add runner_request_key to action_task",
|
||||||
|
Upgrade: addActionTaskRunnerRequestKey,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func addActionTaskRunnerRequestKey(x *xorm.Engine) error {
|
||||||
|
type ActionTask struct {
|
||||||
|
RunnerID int64 `xorm:"index index(request_key)"`
|
||||||
|
RunnerRequestKey string `xorm:"index(request_key)"`
|
||||||
|
}
|
||||||
|
_, err := x.SyncWithOptions(xorm.SyncOptions{IgnoreDropIndices: true}, new(ActionTask))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
@ -19,8 +19,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
uuidHeaderKey = "x-runner-uuid"
|
uuidHeaderKey = "x-runner-uuid"
|
||||||
tokenHeaderKey = "x-runner-token"
|
tokenHeaderKey = "x-runner-token"
|
||||||
|
requestKeyHeaderKey = "x-runner-request-key"
|
||||||
)
|
)
|
||||||
|
|
||||||
var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unaryFunc connect.UnaryFunc) connect.UnaryFunc {
|
var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unaryFunc connect.UnaryFunc) connect.UnaryFunc {
|
||||||
|
|
@ -54,6 +55,12 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = context.WithValue(ctx, runnerCtxKey{}, runner)
|
ctx = context.WithValue(ctx, runnerCtxKey{}, runner)
|
||||||
|
|
||||||
|
requestKey := request.Header().Get(requestKeyHeaderKey)
|
||||||
|
if requestKey != "" {
|
||||||
|
ctx = context.WithValue(ctx, runnerRequestKeyCtxKey{}, requestKey)
|
||||||
|
}
|
||||||
|
|
||||||
return unaryFunc(ctx, request)
|
return unaryFunc(ctx, request)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
@ -76,3 +83,14 @@ func GetRunner(ctx context.Context) *actions_model.ActionRunner {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type runnerRequestKeyCtxKey struct{}
|
||||||
|
|
||||||
|
func getRequestKey(ctx context.Context) *string {
|
||||||
|
if v := ctx.Value(runnerRequestKeyCtxKey{}); v != nil {
|
||||||
|
if r, ok := v.(string); ok {
|
||||||
|
return &r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -140,6 +140,30 @@ func (s *Service) FetchTask(
|
||||||
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
|
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
|
||||||
runner := GetRunner(ctx)
|
runner := GetRunner(ctx)
|
||||||
|
|
||||||
|
requestKey := getRequestKey(ctx)
|
||||||
|
if requestKey != nil {
|
||||||
|
// Search for previous tasks is based upon both the runner and the request key in order to reduce the security
|
||||||
|
// risk. If a request key is leaked (eg. it appears in a log file, log file gets published in a bug report) it
|
||||||
|
// could be used indefinitely to retrieve the associated task(s), so requiring the correctly authenticated
|
||||||
|
// runner reduces that risk.
|
||||||
|
recoveredTasks, err := actions_model.GetTasksByRunnerRequestKey(ctx, runner, *requestKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("query by request key failed: %w", err))
|
||||||
|
} else if len(recoveredTasks) > 0 {
|
||||||
|
// Recovered tasks from a repeat request key
|
||||||
|
tasks, err := actions_service.RecoverTasks(ctx, recoveredTasks)
|
||||||
|
if err != nil {
|
||||||
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("recover tasks failed: %w", err))
|
||||||
|
}
|
||||||
|
resp := &runnerv1.FetchTaskResponse{
|
||||||
|
Task: tasks[0],
|
||||||
|
TasksVersion: 0,
|
||||||
|
AdditionalTasks: tasks[1:],
|
||||||
|
}
|
||||||
|
return connect.NewResponse(resp), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var task *runnerv1.Task
|
var task *runnerv1.Task
|
||||||
tasksVersion := req.Msg.TasksVersion // task version from runner
|
tasksVersion := req.Msg.TasksVersion // task version from runner
|
||||||
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
|
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID)
|
||||||
|
|
@ -160,7 +184,7 @@ func (s *Service) FetchTask(
|
||||||
// if the task version in request is not equal to the version in db,
|
// if the task version in request is not equal to the version in db,
|
||||||
// it means there may still be some tasks not be assigned.
|
// it means there may still be some tasks not be assigned.
|
||||||
// try to pick a task for the runner that send the request.
|
// try to pick a task for the runner that send the request.
|
||||||
if t, ok, err := actions_service.PickTask(ctx, runner); err != nil {
|
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil {
|
||||||
log.Error("pick task failed: %v", err)
|
log.Error("pick task failed: %v", err)
|
||||||
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
|
||||||
} else if ok {
|
} else if ok {
|
||||||
|
|
@ -169,7 +193,7 @@ func (s *Service) FetchTask(
|
||||||
taskCapacity := req.Msg.GetTaskCapacity()
|
taskCapacity := req.Msg.GetTaskCapacity()
|
||||||
taskCapacity-- // remove 1 for the task already fetched as `task`
|
taskCapacity-- // remove 1 for the task already fetched as `task`
|
||||||
for taskCapacity > 0 {
|
for taskCapacity > 0 {
|
||||||
if t, ok, err := actions_service.PickTask(ctx, runner); err != nil {
|
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil {
|
||||||
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
|
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
|
||||||
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
|
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
|
||||||
// Log the error and return the tasks we've assigned so far.
|
// Log the error and return the tasks we've assigned so far.
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import (
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
|
func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey *string) (*runnerv1.Task, bool, error) {
|
||||||
var (
|
var (
|
||||||
task *runnerv1.Task
|
task *runnerv1.Task
|
||||||
job *actions_model.ActionRunJob
|
job *actions_model.ActionRunJob
|
||||||
|
|
@ -40,7 +40,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||||
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner)
|
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("CreateTaskForRunner: %w", err)
|
return fmt.Errorf("CreateTaskForRunner: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -96,6 +96,61 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
||||||
return task, true, nil
|
return task, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RecoverTasks(ctx context.Context, tasks []*actions_model.ActionTask) ([]*runnerv1.Task, error) {
|
||||||
|
retval := make([]*runnerv1.Task, len(tasks))
|
||||||
|
|
||||||
|
err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||||
|
for i, t := range tasks {
|
||||||
|
// `Token` is stored in the database w/ a one-way hash, so we can't recover it from the original. Instead
|
||||||
|
// we generate a new token to create usable runnerv1.Task objects.
|
||||||
|
t.GenerateToken()
|
||||||
|
if err := t.UpdateToken(ctx); err != nil {
|
||||||
|
return fmt.Errorf("UpdateTask failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := t.LoadAttributes(ctx); err != nil {
|
||||||
|
return fmt.Errorf("task LoadAttributes: %w", err)
|
||||||
|
}
|
||||||
|
job := t.Job
|
||||||
|
|
||||||
|
secrets, err := getSecretsOfTask(ctx, t)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("GetSecretsOfTask: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("GetVariablesOfRun: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
needs, err := findTaskNeeds(ctx, job)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("findTaskNeeds: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskContext, err := generateTaskContext(t)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("generateTaskContext: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
retval[i] = &runnerv1.Task{
|
||||||
|
Id: t.ID,
|
||||||
|
WorkflowPayload: t.Job.WorkflowPayload,
|
||||||
|
Context: taskContext,
|
||||||
|
Secrets: secrets,
|
||||||
|
Vars: vars,
|
||||||
|
Needs: needs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return retval, nil
|
||||||
|
}
|
||||||
|
|
||||||
func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) {
|
func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) {
|
||||||
run := t.Job.Run
|
run := t.Job.Run
|
||||||
gitCtx, err := GenerateGiteaContext(run, t.Job)
|
gitCtx, err := GenerateGiteaContext(run, t.Job)
|
||||||
|
|
|
||||||
|
|
@ -8,10 +8,12 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
actions_model "forgejo.org/models/actions"
|
||||||
unit_model "forgejo.org/models/unit"
|
unit_model "forgejo.org/models/unit"
|
||||||
"forgejo.org/models/unittest"
|
"forgejo.org/models/unittest"
|
||||||
user_model "forgejo.org/models/user"
|
user_model "forgejo.org/models/user"
|
||||||
"forgejo.org/modules/setting"
|
"forgejo.org/modules/setting"
|
||||||
|
"forgejo.org/modules/util"
|
||||||
files_service "forgejo.org/services/repository/files"
|
files_service "forgejo.org/services/repository/files"
|
||||||
"forgejo.org/tests"
|
"forgejo.org/tests"
|
||||||
|
|
||||||
|
|
@ -87,3 +89,105 @@ jobs:
|
||||||
assert.Contains(t, string(addt[0].GetWorkflowPayload()), "name: job1 (a, a, d)")
|
assert.Contains(t, string(addt[0].GetWorkflowPayload()), "name: job1 (a, a, d)")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestActionFetchTask_Idempotent(t *testing.T) {
|
||||||
|
if !setting.Database.Type.IsSQLite3() {
|
||||||
|
// mock repo runner only supported on SQLite testing
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
onApplicationRun(t, func(t *testing.T, u *url.URL) {
|
||||||
|
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||||
|
|
||||||
|
// create the repo
|
||||||
|
repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks",
|
||||||
|
[]unit_model.Type{unit_model.TypeActions}, nil,
|
||||||
|
[]*files_service.ChangeRepoFile{
|
||||||
|
{
|
||||||
|
Operation: "create",
|
||||||
|
TreePath: ".forgejo/workflows/matrix.yml",
|
||||||
|
ContentReader: strings.NewReader(`
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
jobs:
|
||||||
|
job1:
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
d1: [a, b]
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- run: sleep 2
|
||||||
|
`),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
defer f()
|
||||||
|
|
||||||
|
runner := newMockRunner()
|
||||||
|
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"})
|
||||||
|
|
||||||
|
runner.setRequestKey("4b518ff2-00c6-4c22-ba05-77d5b597c2b4")
|
||||||
|
|
||||||
|
// First request that fetches a task:
|
||||||
|
task1 := runner.fetchTask(t)
|
||||||
|
require.NotNil(t, task1)
|
||||||
|
assert.Contains(t, string(task1.GetWorkflowPayload()), "name: job1")
|
||||||
|
{
|
||||||
|
// Base assumption, the FORGEJO_TOKEN secret can be identified... this is typical but we'll verify that it
|
||||||
|
// doesn't work after the idempotent fetch.
|
||||||
|
taskTokenTest, err := actions_model.GetRunningTaskByToken(t.Context(), task1.Secrets["FORGEJO_TOKEN"])
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, task1.Id, taskTokenTest.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Having retrieved a task... if we sent a fetchTask call with the same requestKey then we expect to get the
|
||||||
|
// same task again:
|
||||||
|
task1fetchedAgain := runner.fetchTask(t)
|
||||||
|
require.NotNil(t, task1fetchedAgain)
|
||||||
|
assert.Contains(t, string(task1fetchedAgain.GetWorkflowPayload()), "name: job1")
|
||||||
|
|
||||||
|
assert.Equal(t, task1.Id, task1fetchedAgain.Id)
|
||||||
|
assert.Equal(t, task1.WorkflowPayload, task1fetchedAgain.WorkflowPayload)
|
||||||
|
m1 := task1.Context.AsMap()
|
||||||
|
m1fetchedAgain := task1fetchedAgain.Context.AsMap()
|
||||||
|
for k, v1 := range m1 {
|
||||||
|
v2 := m1fetchedAgain[k]
|
||||||
|
// "token" isn't expected to be the same as it is regenerated on recovery from idempotent fetch. But it is
|
||||||
|
// expected to be present, so we test for equal length. "gitea_runtime_token" is a signed JWT which can
|
||||||
|
// change between invocations based upon precise timestamps used, and so similarly should be validated to be
|
||||||
|
// present not necessarily identical.
|
||||||
|
if k == "token" || k == "gitea_runtime_token" {
|
||||||
|
assert.Len(t, v1.(string), len(v2.(string)))
|
||||||
|
} else {
|
||||||
|
assert.EqualValues(t, v1, v2, "context[%q]", k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v1 := range task1.Secrets {
|
||||||
|
v2 := task1fetchedAgain.Secrets[k]
|
||||||
|
if k == "FORGEJO_TOKEN" || k == "GITEA_TOKEN" || k == "GITHUB_TOKEN" {
|
||||||
|
// token isn't expected to be the same... but should be present.
|
||||||
|
assert.Len(t, v1, len(v2))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, v1, v2, "secret[%q]", k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, task1.Needs, task1fetchedAgain.Needs)
|
||||||
|
assert.Equal(t, task1.Vars, task1fetchedAgain.Vars)
|
||||||
|
|
||||||
|
{
|
||||||
|
// Original FORGEJO_TOKEN should not be usable anymore.
|
||||||
|
_, err := actions_model.GetRunningTaskByToken(t.Context(), task1.Secrets["FORGEJO_TOKEN"])
|
||||||
|
require.ErrorIs(t, err, util.ErrNotExist)
|
||||||
|
// New FORGEJO_TOKEN should be usable.
|
||||||
|
taskTokenTest, err := actions_model.GetRunningTaskByToken(t.Context(), task1fetchedAgain.Secrets["FORGEJO_TOKEN"])
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, task1fetchedAgain.Id, taskTokenTest.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// But now if we change the request key, we don't expect to get the same task anymore:
|
||||||
|
runner.setRequestKey("6d47d5f3-eaa2-449f-9040-8b20287401b3")
|
||||||
|
task2 := runner.fetchTask(t)
|
||||||
|
require.NotNil(t, task2)
|
||||||
|
assert.NotEqual(t, task1.Id, task2.Id)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
|
|
||||||
type mockRunner struct {
|
type mockRunner struct {
|
||||||
client *mockRunnerClient
|
client *mockRunnerClient
|
||||||
|
uuid, token string
|
||||||
lastTasksVersion int64
|
lastTasksVersion int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -38,7 +39,7 @@ func newMockRunner() *mockRunner {
|
||||||
return &mockRunner{client: client}
|
return &mockRunner{client: client}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockRunnerClient(uuid, token string) *mockRunnerClient {
|
func newMockRunnerClientWithRequestKey(uuid, token, requestKey string) *mockRunnerClient {
|
||||||
baseURL := fmt.Sprintf("%sapi/actions", setting.AppURL)
|
baseURL := fmt.Sprintf("%sapi/actions", setting.AppURL)
|
||||||
|
|
||||||
opt := connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
|
opt := connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc {
|
||||||
|
|
@ -49,6 +50,9 @@ func newMockRunnerClient(uuid, token string) *mockRunnerClient {
|
||||||
if token != "" {
|
if token != "" {
|
||||||
req.Header().Set("x-runner-token", token)
|
req.Header().Set("x-runner-token", token)
|
||||||
}
|
}
|
||||||
|
if requestKey != "" {
|
||||||
|
req.Header().Set("x-runner-request-key", requestKey)
|
||||||
|
}
|
||||||
return next(ctx, req)
|
return next(ctx, req)
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
|
@ -61,6 +65,10 @@ func newMockRunnerClient(uuid, token string) *mockRunnerClient {
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMockRunnerClient(uuid, token string) *mockRunnerClient {
|
||||||
|
return newMockRunnerClientWithRequestKey(uuid, token, "")
|
||||||
|
}
|
||||||
|
|
||||||
func (r *mockRunner) doPing(t *testing.T) {
|
func (r *mockRunner) doPing(t *testing.T) {
|
||||||
resp, err := r.client.pingServiceClient.Ping(t.Context(), connect.NewRequest(&pingv1.PingRequest{
|
resp, err := r.client.pingServiceClient.Ping(t.Context(), connect.NewRequest(&pingv1.PingRequest{
|
||||||
Data: "mock-runner",
|
Data: "mock-runner",
|
||||||
|
|
@ -79,7 +87,9 @@ func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []strin
|
||||||
Ephemeral: false,
|
Ephemeral: false,
|
||||||
}))
|
}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token)
|
r.uuid = resp.Msg.Runner.Uuid
|
||||||
|
r.token = resp.Msg.Runner.Token
|
||||||
|
r.client = newMockRunnerClient(r.uuid, r.token)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *mockRunner) doRegisterEphemeral(t *testing.T, name, token string, labels []string) {
|
func (r *mockRunner) doRegisterEphemeral(t *testing.T, name, token string, labels []string) {
|
||||||
|
|
@ -92,7 +102,13 @@ func (r *mockRunner) doRegisterEphemeral(t *testing.T, name, token string, label
|
||||||
Ephemeral: true,
|
Ephemeral: true,
|
||||||
}))
|
}))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token)
|
r.uuid = resp.Msg.Runner.Uuid
|
||||||
|
r.token = resp.Msg.Runner.Token
|
||||||
|
r.client = newMockRunnerClient(r.uuid, r.token)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *mockRunner) setRequestKey(requestKey string) {
|
||||||
|
r.client = newMockRunnerClientWithRequestKey(r.uuid, r.token, requestKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) {
|
func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue