diff --git a/models/actions/TestActionTask_GetTasksByRunnerRequestKey/action_task.yml b/models/actions/TestActionTask_GetTasksByRunnerRequestKey/action_task.yml new file mode 100644 index 0000000000..2d6ce501ab --- /dev/null +++ b/models/actions/TestActionTask_GetTasksByRunnerRequestKey/action_task.yml @@ -0,0 +1,36 @@ +- + id: 100 + attempt: 3 + runner_id: 12345678 + status: 5 # StatusWaiting + repo_id: 4 + owner_id: 1 + commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0 + is_fork_pull_request: false + token_hash: a1 + token_salt: eeeeeeee + token_last_eight: eeeeeeee + log_filename: artifact-test2/2f/47.log + log_in_storage: true + log_length: 707 + log_size: 90179 + log_expired: false + runner_request_key: 0a7e017d-4201-4b34-8cf4-de0f431893a4 +- + id: 101 + attempt: 3 + runner_id: 12345678 + status: 5 # StatusWaiting + repo_id: 4 + owner_id: 1 + commit_sha: c2d72f548424103f01ee1dc02889c1e2bff816b0 + is_fork_pull_request: false + token_hash: a2 + token_salt: eeeeeeee + token_last_eight: eeeeeeee + log_filename: artifact-test2/2f/47.log + log_in_storage: true + log_length: 707 + log_size: 90179 + log_expired: false + runner_request_key: 0a7e017d-4201-4b34-8cf4-de0f431893a4 diff --git a/models/actions/task.go b/models/actions/task.go index b0e6bb0f15..f67b5e58e1 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -29,7 +29,7 @@ type ActionTask struct { Job *ActionRunJob `xorm:"-"` Steps []*ActionTaskStep `xorm:"-"` Attempt int64 - RunnerID int64 `xorm:"index"` + RunnerID int64 `xorm:"index index(request_key)"` Status Status `xorm:"index"` Started timeutil.TimeStamp `xorm:"index"` Stopped timeutil.TimeStamp `xorm:"index(stopped_log_expired)"` @@ -51,6 +51,15 @@ type ActionTask struct { LogIndexes LogIndexes `xorm:"LONGBLOB"` // line number to offset LogExpired bool `xorm:"index(stopped_log_expired)"` // files that are too old will be deleted + // When the FetchTask() API is invoked to create a task, unpreventable environmental errors may occur; for example, + // network disconnects and timeouts. If that API call has a unique identifier associated with it, it is stored in + // RunnerRequestKey. This allows the API call to be implemented idempotently using this state: if one API call + // assigns a task to a runner and a second API call is received from the same runner with the same request key, the + // existing assigned tasks can be returned. + // + // Indexed for an efficient search on runner_id=? AND runner_request_key=?. + RunnerRequestKey string `xorm:"index(request_key)"` + Created timeutil.TimeStamp `xorm:"created"` Updated timeutil.TimeStamp `xorm:"updated index"` } @@ -147,6 +156,11 @@ func (task *ActionTask) GenerateToken() { task.Token, task.TokenSalt, task.TokenHash, task.TokenLastEight = generateSaltedToken() } +// After using GenerateToken, UpdateToken can be used to update the database record affecting the same columns. +func (task *ActionTask) UpdateToken(ctx context.Context) error { + return UpdateTask(ctx, task, "token_hash", "token_salt", "token_last_eight") +} + // Retrieve all the attempts from the same job as the target `ActionTask`. Limited fields are queried to avoid loading // the LogIndexes blob when not needed. func (task *ActionTask) GetAllAttempts(ctx context.Context) ([]*ActionTask, error) { @@ -242,6 +256,15 @@ func GetRunningTaskByToken(ctx context.Context, token string) (*ActionTask, erro return nil, errNotExist } +func GetTasksByRunnerRequestKey(ctx context.Context, runner *ActionRunner, requestKey string) ([]*ActionTask, error) { + var tasks []*ActionTask + err := db.GetEngine(ctx).Where("runner_id = ? AND runner_request_key = ?", runner.ID, requestKey).Find(&tasks) + if err != nil { + return nil, err + } + return tasks, nil +} + func getConcurrencyCondition() builder.Cond { concurrencyCond := builder.NewCond() @@ -324,7 +347,7 @@ func GetAvailableJobsForRunner(e db.Engine, runner *ActionRunner) ([]*ActionRunJ return jobs, nil } -func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask, bool, error) { +func CreateTaskForRunner(ctx context.Context, runner *ActionRunner, requestKey *string) (*ActionTask, bool, error) { ctx, committer, err := db.TxContext(ctx) if err != nil { return nil, false, err @@ -370,6 +393,9 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask CommitSHA: job.CommitSHA, IsForkPullRequest: job.IsForkPullRequest, } + if requestKey != nil { + task.RunnerRequestKey = *requestKey + } task.GenerateToken() var workflowJob *jobparser.Job diff --git a/models/actions/task_test.go b/models/actions/task_test.go index 08006e6b7b..7969f52e73 100644 --- a/models/actions/task_test.go +++ b/models/actions/task_test.go @@ -76,3 +76,26 @@ func TestActionTask_CreatePlaceholderTask(t *testing.T) { } assert.Equal(t, map[string]string{"output1": "value1", "output2": "value2"}, finalOutputs) } + +func TestActionTask_GetTasksByRunnerRequestKey(t *testing.T) { + defer unittest.OverrideFixtures("models/actions/TestActionTask_GetTasksByRunnerRequestKey")() + require.NoError(t, unittest.PrepareTestDatabase()) + + runner := unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: 12345678}) + + // not matching runner_request_key + tasks, err := GetTasksByRunnerRequestKey(t.Context(), runner, "22288392-2c70-4125-bb01-c7da79fa280c") + require.NoError(t, err) + assert.Empty(t, tasks) + + // matching both runner_id and runner_request_key + tasks, err = GetTasksByRunnerRequestKey(t.Context(), runner, "0a7e017d-4201-4b34-8cf4-de0f431893a4") + require.NoError(t, err) + assert.Len(t, tasks, 2) + + // not matching runner_id + runner = unittest.AssertExistsAndLoadBean(t, &ActionRunner{ID: 10000001}) + tasks, err = GetTasksByRunnerRequestKey(t.Context(), runner, "0a7e017d-4201-4b34-8cf4-de0f431893a4") + require.NoError(t, err) + assert.Empty(t, tasks) +} diff --git a/models/forgejo_migrations/v15b_add-runner_request_key.go b/models/forgejo_migrations/v15b_add-runner_request_key.go new file mode 100644 index 0000000000..fb8150469b --- /dev/null +++ b/models/forgejo_migrations/v15b_add-runner_request_key.go @@ -0,0 +1,24 @@ +// Copyright 2025 The Forgejo Authors. All rights reserved. +// SPDX-License-Identifier: GPL-3.0-or-later + +package forgejo_migrations + +import ( + "xorm.io/xorm" +) + +func init() { + registerMigration(&Migration{ + Description: "add runner_request_key to action_task", + Upgrade: addActionTaskRunnerRequestKey, + }) +} + +func addActionTaskRunnerRequestKey(x *xorm.Engine) error { + type ActionTask struct { + RunnerID int64 `xorm:"index index(request_key)"` + RunnerRequestKey string `xorm:"index(request_key)"` + } + _, err := x.SyncWithOptions(xorm.SyncOptions{IgnoreDropIndices: true}, new(ActionTask)) + return err +} diff --git a/routers/api/actions/runner/interceptor.go b/routers/api/actions/runner/interceptor.go index be83af6997..b293ed5d66 100644 --- a/routers/api/actions/runner/interceptor.go +++ b/routers/api/actions/runner/interceptor.go @@ -19,8 +19,9 @@ import ( ) const ( - uuidHeaderKey = "x-runner-uuid" - tokenHeaderKey = "x-runner-token" + uuidHeaderKey = "x-runner-uuid" + tokenHeaderKey = "x-runner-token" + requestKeyHeaderKey = "x-runner-request-key" ) var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unaryFunc connect.UnaryFunc) connect.UnaryFunc { @@ -54,6 +55,12 @@ var withRunner = connect.WithInterceptors(connect.UnaryInterceptorFunc(func(unar } ctx = context.WithValue(ctx, runnerCtxKey{}, runner) + + requestKey := request.Header().Get(requestKeyHeaderKey) + if requestKey != "" { + ctx = context.WithValue(ctx, runnerRequestKeyCtxKey{}, requestKey) + } + return unaryFunc(ctx, request) } })) @@ -76,3 +83,14 @@ func GetRunner(ctx context.Context) *actions_model.ActionRunner { } return nil } + +type runnerRequestKeyCtxKey struct{} + +func getRequestKey(ctx context.Context) *string { + if v := ctx.Value(runnerRequestKeyCtxKey{}); v != nil { + if r, ok := v.(string); ok { + return &r + } + } + return nil +} diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index b36284e604..e079aa8de8 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -140,6 +140,30 @@ func (s *Service) FetchTask( ) (*connect.Response[runnerv1.FetchTaskResponse], error) { runner := GetRunner(ctx) + requestKey := getRequestKey(ctx) + if requestKey != nil { + // Search for previous tasks is based upon both the runner and the request key in order to reduce the security + // risk. If a request key is leaked (eg. it appears in a log file, log file gets published in a bug report) it + // could be used indefinitely to retrieve the associated task(s), so requiring the correctly authenticated + // runner reduces that risk. + recoveredTasks, err := actions_model.GetTasksByRunnerRequestKey(ctx, runner, *requestKey) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("query by request key failed: %w", err)) + } else if len(recoveredTasks) > 0 { + // Recovered tasks from a repeat request key + tasks, err := actions_service.RecoverTasks(ctx, recoveredTasks) + if err != nil { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("recover tasks failed: %w", err)) + } + resp := &runnerv1.FetchTaskResponse{ + Task: tasks[0], + TasksVersion: 0, + AdditionalTasks: tasks[1:], + } + return connect.NewResponse(resp), nil + } + } + var task *runnerv1.Task tasksVersion := req.Msg.TasksVersion // task version from runner latestVersion, err := actions_model.GetTasksVersionByScope(ctx, runner.OwnerID, runner.RepoID) @@ -160,7 +184,7 @@ func (s *Service) FetchTask( // if the task version in request is not equal to the version in db, // it means there may still be some tasks not be assigned. // try to pick a task for the runner that send the request. - if t, ok, err := actions_service.PickTask(ctx, runner); err != nil { + if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil { log.Error("pick task failed: %v", err) return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err)) } else if ok { @@ -169,7 +193,7 @@ func (s *Service) FetchTask( taskCapacity := req.Msg.GetTaskCapacity() taskCapacity-- // remove 1 for the task already fetched as `task` for taskCapacity > 0 { - if t, ok, err := actions_service.PickTask(ctx, runner); err != nil { + if t, ok, err := actions_service.PickTask(ctx, runner, requestKey); err != nil { // Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner // and if we don't return them, they can't be picked up by another runner and will become zombie tasks. // Log the error and return the tasks we've assigned so far. diff --git a/services/actions/task.go b/services/actions/task.go index ddc35d6921..141678549c 100644 --- a/services/actions/task.go +++ b/services/actions/task.go @@ -20,7 +20,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) { +func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey *string) (*runnerv1.Task, bool, error) { var ( task *runnerv1.Task job *actions_model.ActionRunJob @@ -40,7 +40,7 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv } if err := db.WithTx(ctx, func(ctx context.Context) error { - t, ok, err := actions_model.CreateTaskForRunner(ctx, runner) + t, ok, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey) if err != nil { return fmt.Errorf("CreateTaskForRunner: %w", err) } @@ -96,6 +96,61 @@ func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv return task, true, nil } +func RecoverTasks(ctx context.Context, tasks []*actions_model.ActionTask) ([]*runnerv1.Task, error) { + retval := make([]*runnerv1.Task, len(tasks)) + + err := db.WithTx(ctx, func(ctx context.Context) error { + for i, t := range tasks { + // `Token` is stored in the database w/ a one-way hash, so we can't recover it from the original. Instead + // we generate a new token to create usable runnerv1.Task objects. + t.GenerateToken() + if err := t.UpdateToken(ctx); err != nil { + return fmt.Errorf("UpdateTask failed: %w", err) + } + + if err := t.LoadAttributes(ctx); err != nil { + return fmt.Errorf("task LoadAttributes: %w", err) + } + job := t.Job + + secrets, err := getSecretsOfTask(ctx, t) + if err != nil { + return fmt.Errorf("GetSecretsOfTask: %w", err) + } + + vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run) + if err != nil { + return fmt.Errorf("GetVariablesOfRun: %w", err) + } + + needs, err := findTaskNeeds(ctx, job) + if err != nil { + return fmt.Errorf("findTaskNeeds: %w", err) + } + + taskContext, err := generateTaskContext(t) + if err != nil { + return fmt.Errorf("generateTaskContext: %w", err) + } + + retval[i] = &runnerv1.Task{ + Id: t.ID, + WorkflowPayload: t.Job.WorkflowPayload, + Context: taskContext, + Secrets: secrets, + Vars: vars, + Needs: needs, + } + } + return nil + }) + if err != nil { + return nil, err + } + + return retval, nil +} + func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) { run := t.Job.Run gitCtx, err := GenerateGiteaContext(run, t.Job) diff --git a/tests/integration/actions_fetch_task_test.go b/tests/integration/actions_fetch_task_test.go index 7416896be4..95502b7424 100644 --- a/tests/integration/actions_fetch_task_test.go +++ b/tests/integration/actions_fetch_task_test.go @@ -8,10 +8,12 @@ import ( "strings" "testing" + actions_model "forgejo.org/models/actions" unit_model "forgejo.org/models/unit" "forgejo.org/models/unittest" user_model "forgejo.org/models/user" "forgejo.org/modules/setting" + "forgejo.org/modules/util" files_service "forgejo.org/services/repository/files" "forgejo.org/tests" @@ -87,3 +89,105 @@ jobs: assert.Contains(t, string(addt[0].GetWorkflowPayload()), "name: job1 (a, a, d)") }) } + +func TestActionFetchTask_Idempotent(t *testing.T) { + if !setting.Database.Type.IsSQLite3() { + // mock repo runner only supported on SQLite testing + t.Skip() + } + + onApplicationRun(t, func(t *testing.T, u *url.URL) { + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + // create the repo + repo, _, f := tests.CreateDeclarativeRepo(t, user2, "repo-many-tasks", + []unit_model.Type{unit_model.TypeActions}, nil, + []*files_service.ChangeRepoFile{ + { + Operation: "create", + TreePath: ".forgejo/workflows/matrix.yml", + ContentReader: strings.NewReader(` +on: + push: +jobs: + job1: + strategy: + matrix: + d1: [a, b] + runs-on: ubuntu-latest + steps: + - run: sleep 2 +`), + }, + }, + ) + defer f() + + runner := newMockRunner() + runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}) + + runner.setRequestKey("4b518ff2-00c6-4c22-ba05-77d5b597c2b4") + + // First request that fetches a task: + task1 := runner.fetchTask(t) + require.NotNil(t, task1) + assert.Contains(t, string(task1.GetWorkflowPayload()), "name: job1") + { + // Base assumption, the FORGEJO_TOKEN secret can be identified... this is typical but we'll verify that it + // doesn't work after the idempotent fetch. + taskTokenTest, err := actions_model.GetRunningTaskByToken(t.Context(), task1.Secrets["FORGEJO_TOKEN"]) + require.NoError(t, err) + assert.Equal(t, task1.Id, taskTokenTest.ID) + } + + // Having retrieved a task... if we sent a fetchTask call with the same requestKey then we expect to get the + // same task again: + task1fetchedAgain := runner.fetchTask(t) + require.NotNil(t, task1fetchedAgain) + assert.Contains(t, string(task1fetchedAgain.GetWorkflowPayload()), "name: job1") + + assert.Equal(t, task1.Id, task1fetchedAgain.Id) + assert.Equal(t, task1.WorkflowPayload, task1fetchedAgain.WorkflowPayload) + m1 := task1.Context.AsMap() + m1fetchedAgain := task1fetchedAgain.Context.AsMap() + for k, v1 := range m1 { + v2 := m1fetchedAgain[k] + // "token" isn't expected to be the same as it is regenerated on recovery from idempotent fetch. But it is + // expected to be present, so we test for equal length. "gitea_runtime_token" is a signed JWT which can + // change between invocations based upon precise timestamps used, and so similarly should be validated to be + // present not necessarily identical. + if k == "token" || k == "gitea_runtime_token" { + assert.Len(t, v1.(string), len(v2.(string))) + } else { + assert.EqualValues(t, v1, v2, "context[%q]", k) + } + } + for k, v1 := range task1.Secrets { + v2 := task1fetchedAgain.Secrets[k] + if k == "FORGEJO_TOKEN" || k == "GITEA_TOKEN" || k == "GITHUB_TOKEN" { + // token isn't expected to be the same... but should be present. + assert.Len(t, v1, len(v2)) + } else { + assert.Equal(t, v1, v2, "secret[%q]", k) + } + } + assert.Equal(t, task1.Needs, task1fetchedAgain.Needs) + assert.Equal(t, task1.Vars, task1fetchedAgain.Vars) + + { + // Original FORGEJO_TOKEN should not be usable anymore. + _, err := actions_model.GetRunningTaskByToken(t.Context(), task1.Secrets["FORGEJO_TOKEN"]) + require.ErrorIs(t, err, util.ErrNotExist) + // New FORGEJO_TOKEN should be usable. + taskTokenTest, err := actions_model.GetRunningTaskByToken(t.Context(), task1fetchedAgain.Secrets["FORGEJO_TOKEN"]) + require.NoError(t, err) + assert.Equal(t, task1fetchedAgain.Id, taskTokenTest.ID) + } + + // But now if we change the request key, we don't expect to get the same task anymore: + runner.setRequestKey("6d47d5f3-eaa2-449f-9040-8b20287401b3") + task2 := runner.fetchTask(t) + require.NotNil(t, task2) + assert.NotEqual(t, task1.Id, task2.Id) + }) +} diff --git a/tests/integration/actions_runner_test.go b/tests/integration/actions_runner_test.go index 22191020dc..70784c1f64 100644 --- a/tests/integration/actions_runner_test.go +++ b/tests/integration/actions_runner_test.go @@ -25,6 +25,7 @@ import ( type mockRunner struct { client *mockRunnerClient + uuid, token string lastTasksVersion int64 } @@ -38,7 +39,7 @@ func newMockRunner() *mockRunner { return &mockRunner{client: client} } -func newMockRunnerClient(uuid, token string) *mockRunnerClient { +func newMockRunnerClientWithRequestKey(uuid, token, requestKey string) *mockRunnerClient { baseURL := fmt.Sprintf("%sapi/actions", setting.AppURL) opt := connect.WithInterceptors(connect.UnaryInterceptorFunc(func(next connect.UnaryFunc) connect.UnaryFunc { @@ -49,6 +50,9 @@ func newMockRunnerClient(uuid, token string) *mockRunnerClient { if token != "" { req.Header().Set("x-runner-token", token) } + if requestKey != "" { + req.Header().Set("x-runner-request-key", requestKey) + } return next(ctx, req) } })) @@ -61,6 +65,10 @@ func newMockRunnerClient(uuid, token string) *mockRunnerClient { return client } +func newMockRunnerClient(uuid, token string) *mockRunnerClient { + return newMockRunnerClientWithRequestKey(uuid, token, "") +} + func (r *mockRunner) doPing(t *testing.T) { resp, err := r.client.pingServiceClient.Ping(t.Context(), connect.NewRequest(&pingv1.PingRequest{ Data: "mock-runner", @@ -79,7 +87,9 @@ func (r *mockRunner) doRegister(t *testing.T, name, token string, labels []strin Ephemeral: false, })) require.NoError(t, err) - r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token) + r.uuid = resp.Msg.Runner.Uuid + r.token = resp.Msg.Runner.Token + r.client = newMockRunnerClient(r.uuid, r.token) } func (r *mockRunner) doRegisterEphemeral(t *testing.T, name, token string, labels []string) { @@ -92,7 +102,13 @@ func (r *mockRunner) doRegisterEphemeral(t *testing.T, name, token string, label Ephemeral: true, })) require.NoError(t, err) - r.client = newMockRunnerClient(resp.Msg.Runner.Uuid, resp.Msg.Runner.Token) + r.uuid = resp.Msg.Runner.Uuid + r.token = resp.Msg.Runner.Token + r.client = newMockRunnerClient(r.uuid, r.token) +} + +func (r *mockRunner) setRequestKey(requestKey string) { + r.client = newMockRunnerClientWithRequestKey(r.uuid, r.token, requestKey) } func (r *mockRunner) registerAsRepoRunner(t *testing.T, ownerName, repoName, runnerName string, labels []string) {