jojo/routers/api/actions/runner/runner.go
Andreas Ahlenstorf 5e1c13f50e feat: allow runners to request a particular job (#11676)
Forgejo Runner can optionally ask for a particular job. Example: `forgejo-runner one-job --handle 9d52c7d8-aebe-426b-b015-dd453aacaada`. This change adds the necessary job filtering to Forgejo.

See https://code.forgejo.org/forgejo/forgejo-actions-feature-requests/issues/76 for the motivation and design considerations.

PR for the extension of the runner protocol: https://code.forgejo.org/forgejo/actions-proto/pulls/18

Related change in Forgejo Runner with usage example: https://code.forgejo.org/forgejo/runner/pulls/1443

## Checklist

The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org).

### Tests for Go changes

(can be removed for JavaScript changes)

- I added test coverage for Go changes...
  - [x] in their respective `*_test.go` for unit tests.
  - [x] in the `tests/integration` directory if it involves interactions with a live Forgejo server.
- I ran...
  - [x] `make pr-go` before pushing

### Tests for JavaScript changes

(can be removed for Go changes)

- I added test coverage for JavaScript changes...
  - [ ] in `web_src/js/*.test.js` if it can be unit tested.
  - [ ] in `tests/e2e/*.test.e2e.js` if it requires interactions with a live Forgejo server (see also the [developer guide for JavaScript testing](https://codeberg.org/forgejo/forgejo/src/branch/forgejo/tests/e2e/README.md#end-to-end-tests)).

### Documentation

- [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change.
- [x] I did not document these changes and I do not expect someone else to do it.

### Release notes

- [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change.
- [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change.

*The decision if the pull request will be shown in the release notes is up to the mergers / release team.*

The content of the `release-notes/<pull request number>.md` file will serve as the basis for the release notes. If the file does not exist, the title of the pull request will be used instead.

<!--start release-notes-assistant-->

## Release notes
<!--URL:https://codeberg.org/forgejo/forgejo-->
- Features
  - [PR](https://codeberg.org/forgejo/forgejo/pulls/11676): <!--number 11676 --><!--line 0 --><!--description YWxsb3cgcnVubmVycyB0byByZXF1ZXN0IGEgcGFydGljdWxhciBqb2I=-->allow runners to request a particular job<!--description-->
<!--end release-notes-assistant-->

Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11676
Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org>
Co-authored-by: Andreas Ahlenstorf <andreas@ahlenstorf.ch>
Co-committed-by: Andreas Ahlenstorf <andreas@ahlenstorf.ch>
2026-03-25 17:27:05 +01:00

424 lines
14 KiB
Go

// Copyright 2022 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package runner
import (
"context"
"errors"
"fmt"
"net/http"
actions_model "forgejo.org/models/actions"
repo_model "forgejo.org/models/repo"
user_model "forgejo.org/models/user"
"forgejo.org/modules/actions"
"forgejo.org/modules/log"
"forgejo.org/modules/setting"
"forgejo.org/modules/util"
actions_service "forgejo.org/services/actions"
runnerv1 "code.forgejo.org/forgejo/actions-proto/runner/v1"
"code.forgejo.org/forgejo/actions-proto/runner/v1/runnerv1connect"
"connectrpc.com/connect"
gouuid "github.com/google/uuid"
)
func NewRunnerServiceHandler() (string, http.Handler) {
return runnerv1connect.NewRunnerServiceHandler(
&Service{},
connect.WithCompressMinBytes(1024),
withRunner,
)
}
var _ runnerv1connect.RunnerServiceClient = (*Service)(nil)
type Service struct {
runnerv1connect.UnimplementedRunnerServiceHandler
}
// Register for new runner.
func (s *Service) Register(
ctx context.Context,
req *connect.Request[runnerv1.RegisterRequest],
) (*connect.Response[runnerv1.RegisterResponse], error) {
if req.Msg.Token == "" || req.Msg.Name == "" {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("missing runner token, name"))
}
runnerToken, err := actions_model.GetRunnerToken(ctx, req.Msg.Token)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("runner registration token not found"))
}
if !runnerToken.IsActive {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("runner registration token has been invalidated, please use the latest one"))
}
if has, ownerID := runnerToken.OwnerID.Get(); has {
if _, err := user_model.GetUserByID(ctx, ownerID); err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.New("owner of the token not found"))
}
}
if has, repoID := runnerToken.RepoID.Get(); has {
if _, err := repo_model.GetRepositoryByID(ctx, repoID); err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.New("repository of the token not found"))
}
}
labels := req.Msg.Labels
// create new runner
name, _ := util.SplitStringAtByteN(req.Msg.Name, 255)
runner := &actions_model.ActionRunner{
UUID: gouuid.New().String(),
Name: name,
OwnerID: runnerToken.OwnerID.ValueOrDefault(0),
RepoID: runnerToken.RepoID.ValueOrDefault(0),
Version: req.Msg.Version,
AgentLabels: labels,
Ephemeral: req.Msg.Ephemeral,
}
runner.GenerateToken()
// create new runner
if err := actions_model.CreateRunner(ctx, runner); err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.New("can't create new runner"))
}
// update token status
runnerToken.IsActive = true
if err := actions_model.UpdateRunnerToken(ctx, runnerToken, "is_active"); err != nil {
return nil, connect.NewError(connect.CodeInternal, errors.New("can't update runner token status"))
}
res := connect.NewResponse(&runnerv1.RegisterResponse{
Runner: &runnerv1.Runner{
Id: runner.ID,
Uuid: runner.UUID,
Token: runner.Token,
Name: runner.Name,
Version: runner.Version,
Labels: runner.AgentLabels,
Ephemeral: runner.Ephemeral,
},
})
return res, nil
}
func (s *Service) Declare(
ctx context.Context,
req *connect.Request[runnerv1.DeclareRequest],
) (*connect.Response[runnerv1.DeclareResponse], error) {
runner := GetRunner(ctx)
runner.AgentLabels = req.Msg.Labels
runner.Version = req.Msg.Version
if err := actions_model.UpdateRunner(ctx, runner, "agent_labels", "version"); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update runner: %w", err))
}
return connect.NewResponse(&runnerv1.DeclareResponse{
Runner: &runnerv1.Runner{
Id: runner.ID,
Uuid: runner.UUID,
Token: runner.Token,
Name: runner.Name,
Version: runner.Version,
Labels: runner.AgentLabels,
Ephemeral: runner.Ephemeral,
},
}), nil
}
// FetchTask assigns a task to the runner
func (s *Service) FetchTask(
ctx context.Context,
req *connect.Request[runnerv1.FetchTaskRequest],
) (*connect.Response[runnerv1.FetchTaskResponse], error) {
runner := GetRunner(ctx)
requestKey := getRequestKey(ctx)
if requestKey != nil {
recoveredTasks, err := recoverTasks(ctx, runner, *requestKey)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
} else if len(recoveredTasks) > 0 {
resp := &runnerv1.FetchTaskResponse{
Task: recoveredTasks[0],
TasksVersion: 0,
AdditionalTasks: recoveredTasks[1:],
}
return connect.NewResponse(resp), nil
}
}
latestVersion, err := getLatestTasksVersion(ctx, runner.OwnerID, runner.RepoID)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
var task *runnerv1.Task
var additionalTasks []*runnerv1.Task
if req.Msg.TasksVersion != latestVersion {
// if the task version in request is not equal to the version in db,
// it means there may still be some tasks not be assigned.
// try to pick a task for the runner that send the request.
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil {
log.Error("pick task failed: %v", err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
} else if ok {
task = t
taskCapacity := req.Msg.GetTaskCapacity()
taskCapacity-- // remove 1 for the task already fetched as `task`
for taskCapacity > 0 {
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, nil); err != nil {
// Don't return an error to the client/runner -- we've already assigned one-or-more tasks to the runner
// and if we don't return them, they can't be picked up by another runner and will become zombie tasks.
// Log the error and return the tasks we've assigned so far.
log.Error("pick task failed: %v", err)
break
} else if ok {
additionalTasks = append(additionalTasks, t)
taskCapacity--
} else {
break
}
}
}
}
res := connect.NewResponse(&runnerv1.FetchTaskResponse{
Task: task,
TasksVersion: latestVersion,
AdditionalTasks: additionalTasks,
})
return res, nil
}
func (s *Service) FetchSingleTask(
ctx context.Context,
req *connect.Request[runnerv1.FetchSingleTaskRequest],
) (*connect.Response[runnerv1.FetchSingleTaskResponse], error) {
runner := GetRunner(ctx)
requestKey := getRequestKey(ctx)
if requestKey != nil {
recoveredTasks, err := recoverTasks(ctx, runner, *requestKey)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
} else if len(recoveredTasks) == 1 {
resp := &runnerv1.FetchSingleTaskResponse{
TasksVersion: 0,
Task: recoveredTasks[0],
}
return connect.NewResponse(resp), nil
} else if len(recoveredTasks) > 1 {
return nil, connect.NewError(connect.CodeInternal,
fmt.Errorf("cannot recover %d tasks because runner requested only one", len(recoveredTasks)))
}
}
latestVersion, err := getLatestTasksVersion(ctx, runner.OwnerID, runner.RepoID)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
var task *runnerv1.Task
if req.Msg.TasksVersion != latestVersion {
var handle *string
if req.Msg.Handle != nil && *req.Msg.Handle != "" {
handle = req.Msg.Handle
}
if t, ok, err := actions_service.PickTask(ctx, runner, requestKey, handle); err != nil {
log.Error("pick task failed: %v", err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("pick task: %w", err))
} else if ok {
task = t
}
}
res := connect.NewResponse(&runnerv1.FetchSingleTaskResponse{
TasksVersion: latestVersion,
Task: task,
})
return res, nil
}
// UpdateTask updates the task status.
func (s *Service) UpdateTask(
ctx context.Context,
req *connect.Request[runnerv1.UpdateTaskRequest],
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
runner := GetRunner(ctx)
task, err := actions_service.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
}
for k, v := range req.Msg.Outputs {
if len(k) > 255 {
log.Warn("Ignore the output of task %d because the key is too long: %q", task.ID, k)
continue
}
// The value can be a maximum of 1 MB
if l := len(v); l > 1024*1024 {
log.Warn("Ignore the output %q of task %d because the value is too long: %v", k, task.ID, l)
continue
}
// There's another limitation on GitHub that the total of all outputs in a workflow run can be a maximum of 50 MB.
// We don't check the total size here because it's not easy to do, and it doesn't really worth it.
// See https://docs.github.com/en/actions/using-jobs/defining-outputs-for-jobs
if err := actions_model.InsertTaskOutputIfNotExist(ctx, task.ID, k, v); err != nil {
log.Warn("Failed to insert the output %q of task %d: %v", k, task.ID, err)
// It's ok not to return errors, the runner will resend the outputs.
}
}
sentOutputs, err := actions_model.FindTaskOutputKeyByTaskID(ctx, task.ID)
if err != nil {
log.Warn("Failed to find the sent outputs of task %d: %v", task.ID, err)
// It's not to return errors, it can be handled when the runner resends sent outputs.
}
if err := task.LoadJob(ctx); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("load job: %w", err))
}
if err := task.Job.LoadRun(ctx); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("load run: %w", err))
}
// don't create commit status for cron job
if task.Job.Run.ScheduleID == 0 {
actions_service.CreateCommitStatus(ctx, task.Job)
}
if req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
if err := actions_service.EmitJobsIfReady(task.Job.RunID); err != nil {
log.Error("Emit ready jobs of run %d: %v", task.Job.RunID, err)
}
// Reaching a finalized result for a task can cause other tasks in the same concurrency group to become
// unblocked. Increasing task version here allows all applicable runners to requery to the DB for that state.
// Because it is only useful for that condition, and it has system performance risks, only enable it when
// concurrency group queuing is enabled.
if setting.Actions.ConcurrencyGroupQueueEnabled {
if err := actions_model.IncreaseTaskVersion(ctx, runner.OwnerID, runner.RepoID); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("fail to increase task version: %w", err))
}
}
if runner.Ephemeral {
err := actions_model.DeleteRunner(ctx, runner)
if err != nil {
log.Error("failed to delete ephemeral runner %v, %w", task.RunnerID, err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to delete ephemeral runner %v, %w", task.RunnerID, err))
}
}
}
return connect.NewResponse(&runnerv1.UpdateTaskResponse{
State: &runnerv1.TaskState{
Id: req.Msg.State.Id,
Result: task.Status.AsResult(),
},
SentOutputs: sentOutputs,
}), nil
}
// UpdateLog uploads log of the task.
func (s *Service) UpdateLog(
ctx context.Context,
req *connect.Request[runnerv1.UpdateLogRequest],
) (*connect.Response[runnerv1.UpdateLogResponse], error) {
runner := GetRunner(ctx)
res := connect.NewResponse(&runnerv1.UpdateLogResponse{})
task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("get task: %w", err))
} else if runner.ID != task.RunnerID {
return nil, connect.NewError(connect.CodeInternal, errors.New("invalid runner for task"))
}
ack := task.LogLength
if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack {
res.Msg.AckIndex = ack
return res, nil
}
if task.LogInStorage {
return nil, connect.NewError(connect.CodeAlreadyExists, errors.New("log file has been archived"))
}
rows := req.Msg.Rows[ack-req.Msg.Index:]
ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("write logs: %w", err))
}
task.LogLength += int64(len(rows))
for _, n := range ns {
task.LogIndexes = append(task.LogIndexes, task.LogSize)
task.LogSize += int64(n)
}
res.Msg.AckIndex = task.LogLength
var remove func()
if req.Msg.NoMore {
task.LogInStorage = true
remove, err = actions.TransferLogs(ctx, task.LogFilename)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("transfer logs: %w", err))
}
}
if err := actions_model.UpdateTask(ctx, task, "log_indexes", "log_length", "log_size", "log_in_storage"); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
}
if remove != nil {
remove()
}
return res, nil
}
func recoverTasks(ctx context.Context, runner *actions_model.ActionRunner, requestKey string) ([]*runnerv1.Task, error) {
// Search for previous tasks is based upon both the runner and the request key in order to reduce the security
// risk. If a request key is leaked (eg. it appears in a log file, log file gets published in a bug report) it
// could be used indefinitely to retrieve the associated task(s), so requiring the correctly authenticated
// runner reduces that risk.
recoveredTasks, err := actions_model.GetTasksByRunnerRequestKey(ctx, runner, requestKey)
if err != nil {
return nil, fmt.Errorf("query by request key failed: %w", err)
} else if len(recoveredTasks) > 0 {
// Recovered tasks from a repeat request key
tasks, err := actions_service.RecoverTasks(ctx, recoveredTasks)
if err != nil {
return nil, fmt.Errorf("recover tasks failed: %w", err)
}
return tasks, nil
}
return []*runnerv1.Task{}, nil
}
func getLatestTasksVersion(ctx context.Context, ownerID, repoID int64) (int64, error) {
latestVersion, err := actions_model.GetTasksVersionByScope(ctx, ownerID, repoID)
if err != nil {
return 0, fmt.Errorf("query tasks version failed: %w", err)
} else if latestVersion == 0 {
if err := actions_model.IncreaseTaskVersion(ctx, ownerID, repoID); err != nil {
return 0, fmt.Errorf("fail to increase task version: %w", err)
}
// if we don't increase the value of `latestVersion` here,
// the response of FetchTask will return tasksVersion as zero.
// and the runner will treat it as an old version of Gitea.
latestVersion++
}
return latestVersion, nil
}