From 1b7b1e4fe5be463d20d5164bcee2410b243e7c86 Mon Sep 17 00:00:00 2001 From: forgejo-backport-action Date: Thu, 7 May 2026 19:44:33 +0200 Subject: [PATCH] [v15.0/forgejo] fix: make package cleanup work again (#12452) **Backport:** https://codeberg.org/forgejo/forgejo/pulls/12446 - Regression of forgejo/forgejo!11776 (and forgejo/forgejo!11881) - Scope of the transaction is moved to a per-package cleanup rule basis. This is also a enhancement for scaling (already deployed on Codeberg for a while). - Package cleanup is now run with `RetryTx`, because rebuilding repository files runs `RetryTx` and it could indicate to retry the whole transaction. - Previously it would error and say running `RetryTx` in a transaction was not possible, this is now possible. Nested `RetryTx` is always allowed, matching of which errors to retry is still the responsible of the inner `RetryTx`. Co-authored-by: Gusted Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/12452 Reviewed-by: Gusted --- models/db/context.go | 50 ++++++++--- models/db/context_test.go | 44 ++++++++++ services/packages/cleanup/cleanup.go | 116 +++++++++++-------------- tests/integration/api_packages_test.go | 36 ++++++++ 4 files changed, 171 insertions(+), 75 deletions(-) diff --git a/models/db/context.go b/models/db/context.go index 18237bb2a2..b51861d896 100644 --- a/models/db/context.go +++ b/models/db/context.go @@ -510,31 +510,57 @@ type RetryConfig struct { AttemptCount int } +var ErrNestedRetryTxFailure = errors.New("(nested)") + +type nestedRetryTxState int + +var nestedRetryTx nestedRetryTxState + // Execute the given function in a transaction. RetryConfig will retry the function on an error, if it matches the // ErrorIs parameter, up to the total of AttemptCount number of tries. RetryTx cannot be invoked when already within a // transaction and will return an error immediately. +// +// ErrNestedRetryTxFailure is an error type that will occur when RetryTx is nested within each other, and indicates that +// an inner RetryTx encountered an error that matched its error list. func RetryTx(ctx context.Context, config RetryConfig, f func(ctx context.Context) error) error { - if InTransaction(ctx) { + matchError := func(err error) bool { + for _, possibleError := range config.ErrorIs { + if errors.Is(err, possibleError) { + return true + } + } + return false + } + + // Accept `ErrNestedRetryTxFailure` as error to retry on, means that a nested + // RetryTx indicated to retry the whole transaction. + config.ErrorIs = append(config.ErrorIs, ErrNestedRetryTxFailure) + + withinRetryTx, present := ctx.Value(nestedRetryTx).(bool) + if present && withinRetryTx { + // If a caller already started `RetryTx`, then we assume we don't have to actually perform retries here -- we + // can attempt the requested function once, and if an error is returned that matches the configured error list, + // we'll return that error + ErrNestedRetryTxFailure wrapping. + err := f(ctx) + if err == nil { + return nil + } else if matchError(err) { + return fmt.Errorf("nested RetryTx; internal Tx failed with error that won't be retried: %w %w", err, ErrNestedRetryTxFailure) + } + return err + } else if InTransaction(ctx) { return errors.New("unsupported operation: attempted to use RetryTx while already within a transaction") } else if config.AttemptCount == 0 { return errors.New("unsupported operation: attempted to use RetryTx with 0 attempts") } + innerCtx := context.WithValue(ctx, nestedRetryTx, true) var lastError error for range config.AttemptCount { - err := WithTx(ctx, f) + err := WithTx(innerCtx, f) if err == nil { return nil - } - - foundMatch := false - for _, possibleError := range config.ErrorIs { - if errors.Is(err, possibleError) { - foundMatch = true - break - } - } - if !foundMatch { + } else if !matchError(err) { return err } diff --git a/models/db/context_test.go b/models/db/context_test.go index 60ef8462cc..59ef1e7754 100644 --- a/models/db/context_test.go +++ b/models/db/context_test.go @@ -275,4 +275,48 @@ func TestRetryTx(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, attemptCount) }) + + t.Run("nested", func(t *testing.T) { + attemptCount := 0 + testError := errors.New("hello") + err := db.RetryTx(t.Context(), db.RetryConfig{ + AttemptCount: 2, + }, func(ctx context.Context) error { + attemptCount++ + return db.RetryTx(ctx, db.RetryConfig{ + AttemptCount: 2, + ErrorIs: []error{testError}, + }, func(ctx context.Context) error { + if attemptCount == 2 { + return nil + } + return testError + }) + }) + + require.NoError(t, err) + assert.Equal(t, 2, attemptCount) + }) + + t.Run("inner RetryTx decides on error", func(t *testing.T) { + attemptCount := 0 + testError := errors.New("hello") + err := db.RetryTx(t.Context(), db.RetryConfig{ + AttemptCount: 2, + ErrorIs: []error{}, + }, func(ctx context.Context) error { + attemptCount++ + return db.RetryTx(ctx, db.RetryConfig{ + AttemptCount: 2, + }, func(ctx context.Context) error { + if attemptCount == 2 { + return nil + } + return testError + }) + }) + + require.ErrorIs(t, err, testError) + assert.Equal(t, 1, attemptCount) + }) } diff --git a/services/packages/cleanup/cleanup.go b/services/packages/cleanup/cleanup.go index 3c177abca4..d244bf921d 100644 --- a/services/packages/cleanup/cleanup.go +++ b/services/packages/cleanup/cleanup.go @@ -33,78 +33,68 @@ func CleanupTask(ctx context.Context, olderThan time.Duration) error { return CleanupExpiredData(ctx, olderThan) } -func ExecuteCleanupRules(outerCtx context.Context) error { - ctx, committer, err := db.TxContext(outerCtx) - if err != nil { - return err - } - defer committer.Close() - - err = packages_model.IterateEnabledCleanupRules(ctx, func(ctx context.Context, pcr *packages_model.PackageCleanupRule) error { - select { - case <-outerCtx.Done(): - return db.ErrCancelledf("While processing package cleanup rules") - default: - } - - versionsToRemove, err := GetCleanupTargets(ctx, pcr, true) - if err != nil { - return fmt.Errorf("CleanupRule [%d]: GetCleanupTargets failed: %w", pcr.ID, err) - } - - anyVersionDeleted := false - packageWithVersionDeleted := make(map[int64]bool) // set of Package.ID's where at least one package version was removed - for _, ct := range versionsToRemove { - if err := packages_service.DeletePackageVersionAndReferences(ctx, ct.PackageVersion); err != nil { - return fmt.Errorf("CleanupRule [%d]: DeletePackageVersionAndReferences failed: %w", pcr.ID, err) +func ExecuteCleanupRules(ctx context.Context) error { + return packages_model.IterateEnabledCleanupRules(ctx, func(ctx context.Context, pcr *packages_model.PackageCleanupRule) error { + // We have no errors to retry on, because we have no evidence we need any in + // this area. What we do retry on is when a nested `db.RetryTx` indicates to + // retry the whole transaction. + return db.RetryTx(ctx, db.RetryConfig{ + AttemptCount: 3, + }, func(ctx context.Context) error { + versionsToRemove, err := GetCleanupTargets(ctx, pcr, true) + if err != nil { + return fmt.Errorf("CleanupRule [%d]: GetCleanupTargets failed: %w", pcr.ID, err) } - packageWithVersionDeleted[ct.Package.ID] = true - anyVersionDeleted = true - } - if pcr.Type == packages_model.TypeCargo { - for packageID := range packageWithVersionDeleted { - owner, err := user_model.GetUserByID(ctx, pcr.OwnerID) - if err != nil { - return fmt.Errorf("GetUserByID failed: %w", err) + anyVersionDeleted := false + packageWithVersionDeleted := make(map[int64]bool) // set of Package.ID's where at least one package version was removed + for _, ct := range versionsToRemove { + if err := packages_service.DeletePackageVersionAndReferences(ctx, ct.PackageVersion); err != nil { + return fmt.Errorf("CleanupRule [%d]: DeletePackageVersionAndReferences failed: %w", pcr.ID, err) } - if err := cargo_service.UpdatePackageIndexIfExists(ctx, owner, owner, packageID); err != nil { - return fmt.Errorf("CleanupRule [%d]: cargo.UpdatePackageIndexIfExists failed: %w", pcr.ID, err) + packageWithVersionDeleted[ct.Package.ID] = true + anyVersionDeleted = true + } + + if pcr.Type == packages_model.TypeCargo { + for packageID := range packageWithVersionDeleted { + owner, err := user_model.GetUserByID(ctx, pcr.OwnerID) + if err != nil { + return fmt.Errorf("GetUserByID failed: %w", err) + } + if err := cargo_service.UpdatePackageIndexIfExists(ctx, owner, owner, packageID); err != nil { + return fmt.Errorf("CleanupRule [%d]: cargo.UpdatePackageIndexIfExists failed: %w", pcr.ID, err) + } } } - } - if anyVersionDeleted { - switch pcr.Type { - case packages_model.TypeDebian: - if err := debian_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { - return fmt.Errorf("CleanupRule [%d]: debian.BuildAllRepositoryFiles failed: %w", pcr.ID, err) - } - case packages_model.TypeAlpine: - if err := alpine_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { - return fmt.Errorf("CleanupRule [%d]: alpine.BuildAllRepositoryFiles failed: %w", pcr.ID, err) - } - case packages_model.TypeRpm: - if err := rpm_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { - return fmt.Errorf("CleanupRule [%d]: rpm.BuildAllRepositoryFiles failed: %w", pcr.ID, err) - } - case packages_model.TypeArch: - if err := arch_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { - return fmt.Errorf("CleanupRule [%d]: arch.BuildAllRepositoryFiles failed: %w", pcr.ID, err) - } - case packages_model.TypeAlt: - if err := alt_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { - return fmt.Errorf("CleanupRule [%d]: alt.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + if anyVersionDeleted { + switch pcr.Type { + case packages_model.TypeDebian: + if err := debian_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { + return fmt.Errorf("CleanupRule [%d]: debian.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + } + case packages_model.TypeAlpine: + if err := alpine_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { + return fmt.Errorf("CleanupRule [%d]: alpine.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + } + case packages_model.TypeRpm: + if err := rpm_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { + return fmt.Errorf("CleanupRule [%d]: rpm.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + } + case packages_model.TypeArch: + if err := arch_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { + return fmt.Errorf("CleanupRule [%d]: arch.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + } + case packages_model.TypeAlt: + if err := alt_service.BuildAllRepositoryFiles(ctx, pcr.OwnerID); err != nil { + return fmt.Errorf("CleanupRule [%d]: alt.BuildAllRepositoryFiles failed: %w", pcr.ID, err) + } } } - } - return nil + return nil + }) }) - if err != nil { - return err - } - - return committer.Commit() } type CleanupTarget struct { diff --git a/tests/integration/api_packages_test.go b/tests/integration/api_packages_test.go index e79f977c86..f4cd42784a 100644 --- a/tests/integration/api_packages_test.go +++ b/tests/integration/api_packages_test.go @@ -492,6 +492,42 @@ func TestPackageCleanup(t *testing.T) { duration, _ := time.ParseDuration("-1h") + t.Run("Debian", func(t *testing.T) { + defer tests.PrintCurrentTest(t)() + // Debian does a repository rebuild. + + distribution := "forgejo" + component := "main" + architecture := "amd64" + packageName := "runner" + packageDescription := "Forgejo Runner" + + rootURL := fmt.Sprintf("/api/packages/%s/debian", user.Name) + uploadURL := fmt.Sprintf("%s/pool/%s/%s/upload", rootURL, distribution, component) + + req := NewRequestWithBody(t, "PUT", uploadURL, + createDebianArchive(packageName, "1.0.0", architecture, packageDescription)). + AddBasicAuth(user.Name) + MakeRequest(t, req, http.StatusCreated) + + resp := MakeRequest(t, NewRequestf(t, "GET", "%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture), http.StatusOK) + assert.Contains(t, resp.Body.String(), "pool/forgejo/main/runner_1.0.0_amd64.deb") + + pcr, err := packages_model.InsertCleanupRule(t.Context(), &packages_model.PackageCleanupRule{ + Enabled: true, + RemovePattern: `.+`, + OwnerID: user.ID, + Type: packages_model.TypeDebian, + }) + require.NoError(t, err) + + require.NoError(t, packages_cleanup_service.CleanupTask(t.Context(), duration)) + + MakeRequest(t, NewRequestf(t, "GET", "%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture), http.StatusNotFound) + + require.NoError(t, packages_model.DeleteCleanupRuleByID(t.Context(), pcr.ID)) + }) + t.Run("Common", func(t *testing.T) { defer tests.PrintCurrentTest(t)()