diff --git a/.deadcode-out b/.deadcode-out index 6d2c35e374..45ad117ccd 100644 --- a/.deadcode-out +++ b/.deadcode-out @@ -19,7 +19,6 @@ forgejo.org/models/auth forgejo.org/models/db TruncateBeans TruncateBeansCascade - InTransaction DumpTables GetTableNames extendBeansForCascade diff --git a/go.mod b/go.mod index 68381a87a7..c7a29c014c 100644 --- a/go.mod +++ b/go.mod @@ -274,4 +274,4 @@ replace github.com/gliderlabs/ssh => code.forgejo.org/forgejo/ssh v0.0.0-2024121 replace git.sr.ht/~mariusor/go-xsd-duration => code.forgejo.org/forgejo/go-xsd-duration v0.0.0-20220703122237-02e73435a078 -replace xorm.io/xorm v1.3.9 => code.forgejo.org/xorm/xorm v1.3.9-forgejo.8 +replace xorm.io/xorm v1.3.9 => code.forgejo.org/xorm/xorm v1.3.9-forgejo.9 diff --git a/go.sum b/go.sum index 410025ae97..594dbb4315 100644 --- a/go.sum +++ b/go.sum @@ -42,8 +42,8 @@ code.forgejo.org/go-chi/captcha v1.0.2 h1:vyHDPXkpjDv8bLO9NqtWzZayzstD/WpJ5xwEkA code.forgejo.org/go-chi/captcha v1.0.2/go.mod h1:lxiPLcJ76UCZHoH31/Wbum4GUi2NgjfFZLrJkKv1lLE= code.forgejo.org/go-chi/session v1.0.3 h1:ByJ9c/UC0AU57hxiGl53TXh+NdBOBwK/bhZ9jyadEwE= code.forgejo.org/go-chi/session v1.0.3/go.mod h1:xzGtFrV/agCJoZCUhFDlqAr1he6BrAdqlaprKOB1W90= -code.forgejo.org/xorm/xorm v1.3.9-forgejo.8 h1:dsSKm2nus0NhHsqYxeuB3Gldk6TtlusD1CBGV6V1SS0= -code.forgejo.org/xorm/xorm v1.3.9-forgejo.8/go.mod h1:A7sFd3BFmRp20h6drSsCXgQRQdF8Vz8HuCSrzFS3m90= +code.forgejo.org/xorm/xorm v1.3.9-forgejo.9 h1:hzEXDa53opdp5nrGG4F6y8HzFzrGXd5GIvFyUHcvGmI= +code.forgejo.org/xorm/xorm v1.3.9-forgejo.9/go.mod h1:A7sFd3BFmRp20h6drSsCXgQRQdF8Vz8HuCSrzFS3m90= code.gitea.io/sdk/gitea v0.21.0 h1:69n6oz6kEVHRo1+APQQyizkhrZrLsTLXey9142pfkD4= code.gitea.io/sdk/gitea v0.21.0/go.mod h1:tnBjVhuKJCn8ibdyyhvUyxrR1Ca2KHEoTWoukNhXQPA= code.superseriousbusiness.org/exif-terminator v0.11.1 h1:qnujLH4/Yk/CFtFMmtjozbdV6Ry5G3Q/E/mLlWm/gQI= diff --git a/models/db/context.go b/models/db/context.go index 9be158ccca..f098b40a32 100644 --- a/models/db/context.go +++ b/models/db/context.go @@ -6,6 +6,8 @@ package db import ( "context" "database/sql" + "errors" + "fmt" "xorm.io/builder" "xorm.io/xorm" @@ -416,3 +418,42 @@ func inTransaction(ctx context.Context) (*xorm.Session, bool) { return nil, false } } + +type RetryConfig struct { + ErrorIs []error + AttemptCount int +} + +// Execute the given function in a transaction. RetryConfig will retry the function on an error, if it matches the +// ErrorIs parameter, up to the total of AttemptCount number of tries. RetryTx cannot be invoked when already within a +// transaction and will return an error immediately. +func RetryTx(ctx context.Context, config RetryConfig, f func(ctx context.Context) error) error { + if InTransaction(ctx) { + return errors.New("unsupported operation: attempted to use RetryTx while already within a transaction") + } else if config.AttemptCount == 0 { + return errors.New("unsupported operation: attempted to use RetryTx with 0 attempts") + } + + var lastError error + for range config.AttemptCount { + err := WithTx(ctx, f) + if err == nil { + return nil + } + + foundMatch := false + for _, possibleError := range config.ErrorIs { + if errors.Is(err, possibleError) { + foundMatch = true + break + } + } + if !foundMatch { + return err + } + + lastError = err + } + + return fmt.Errorf("retry tx failed after %d attempts; last error: %w", config.AttemptCount, lastError) +} diff --git a/models/db/context_test.go b/models/db/context_test.go index 525ab54d99..60ef8462cc 100644 --- a/models/db/context_test.go +++ b/models/db/context_test.go @@ -220,3 +220,59 @@ func TestAfterTx(t *testing.T) { }) } } + +func TestRetryTx(t *testing.T) { + t.Run("success", func(t *testing.T) { + err := db.RetryTx(t.Context(), db.RetryConfig{AttemptCount: 1}, func(ctx context.Context) error { + assert.True(t, db.InTransaction(ctx)) + return nil + }) + require.NoError(t, err) + }) + + t.Run("fail constantly", func(t *testing.T) { + attemptCount := 0 + testError := errors.New("hello") + err := db.RetryTx(t.Context(), db.RetryConfig{ + AttemptCount: 2, + ErrorIs: []error{testError}, + }, func(ctx context.Context) error { + attemptCount++ + return testError + }) + require.ErrorIs(t, err, testError) + require.ErrorContains(t, err, "2 attempts") + assert.Equal(t, 2, attemptCount) + }) + + t.Run("fail w/ non retriable error", func(t *testing.T) { + attemptCount := 0 + testError := errors.New("hello") + err := db.RetryTx(t.Context(), db.RetryConfig{ + AttemptCount: 2, + ErrorIs: []error{}, + }, func(ctx context.Context) error { + attemptCount++ + return testError + }) + require.ErrorIs(t, err, testError) + assert.Equal(t, 1, attemptCount) + }) + + t.Run("succeed on retry", func(t *testing.T) { + attemptCount := 0 + testError := errors.New("hello") + err := db.RetryTx(t.Context(), db.RetryConfig{ + AttemptCount: 2, + ErrorIs: []error{testError}, + }, func(ctx context.Context) error { + attemptCount++ + if attemptCount == 1 { + return testError + } + return nil + }) + require.NoError(t, err) + assert.Equal(t, 2, attemptCount) + }) +} diff --git a/models/packages/package_version.go b/models/packages/package_version.go index 873f7bf9b6..545ad63eb4 100644 --- a/models/packages/package_version.go +++ b/models/packages/package_version.go @@ -5,11 +5,13 @@ package packages import ( "context" + "errors" "strconv" "strings" "forgejo.org/models/db" "forgejo.org/modules/optional" + "forgejo.org/modules/setting" "forgejo.org/modules/timeutil" "forgejo.org/modules/util" @@ -155,6 +157,25 @@ func HasVersionFileReferences(ctx context.Context, versionID int64) (bool, error }) } +func (pv *PackageVersion) LockForUpdate(ctx context.Context) error { + if !db.InTransaction(ctx) { + return errors.New("invalid state for PackageVersion.LockForUpdate: database is not in a transaction") + } else if setting.Database.Type.IsSQLite3() { + // SQLite both doesn't support "SELECT ... FOR UPDATE", and it's irrelevant for SQLite as the entire database is + // locked for write when a write transaction is open. + return nil + } + + pvfu := PackageVersion{} + has, err := db.GetEngine(ctx).ID(pv.ID).ForUpdate().Get(&pvfu) + if err != nil { + return err + } else if !has { + return ErrPackageNotExist + } + return nil +} + // SearchValue describes a value to search // If ExactMatch is true, the field must match the value otherwise a LIKE search is performed. type SearchValue struct { diff --git a/services/packages/packages.go b/services/packages/packages.go index 418ceab798..a1772cc1d9 100644 --- a/services/packages/packages.go +++ b/services/packages/packages.go @@ -23,6 +23,8 @@ import ( "forgejo.org/modules/setting" "forgejo.org/modules/storage" notify_service "forgejo.org/services/notify" + + "xorm.io/xorm" ) var ( @@ -76,38 +78,54 @@ func CreatePackageOrAddFileToExisting(ctx context.Context, pvci *PackageCreation } func createPackageAndAddFile(ctx context.Context, pvci *PackageCreationInfo, pfci *PackageFileCreationInfo, allowDuplicate bool) (*packages_model.PackageVersion, *packages_model.PackageFile, error) { - dbCtx, committer, err := db.TxContext(ctx) - if err != nil { - return nil, nil, err - } - defer committer.Close() + var pv *packages_model.PackageVersion + var pf *packages_model.PackageFile + var blobHash256Created optional.Option[string] + var createdPackage bool - pv, created, err := createPackageAndVersion(dbCtx, pvci, allowDuplicate) - if err != nil { - return nil, nil, err - } + // ErrUniqueConstraintViolation can occur when two concurrent updates occur to a package registry. Typically this + // occurs when a registry with an index of organization-level packages is modified (Debian, Alpine, Alt, Arch, RPM) + // and that index needs to be rebuilt -- even if two different packages are being updated, they can write the + // registry concurrently and that can cause ErrUniqueConstraintViolation errors from the database operations that + // "check if record exists, if not, create it". + // + // The simple approach of detecting the ErrUniqueConstraintViolation error inside the transaction and picking up the + // other write isn't possible for two reasons: (a) PostgreSQL can't continue a transaction with an error in it, a + // SAVEPOINT and ROLLBACK TO SAVEPOINT are required, and (b) xorm keeps internal state during a transaction that + // causes such a recovery from error to panic. So, we retry the entire modification transaction if + // ErrUniqueConstraintViolation is encountered. + err := db.RetryTx(ctx, db.RetryConfig{ + // A single retry is sufficient as any package index that was concurrently modified should now be present: + AttemptCount: 2, + ErrorIs: []error{xorm.ErrUniqueConstraintViolation}, + }, func(ctx context.Context) error { + var err error + var pb *packages_model.PackageBlob + var blobCreated bool - pf, pb, blobCreated, err := addFileToPackageVersion(dbCtx, pv, &pvci.PackageInfo, pfci) - removeBlob := false - defer func() { - if blobCreated && removeBlob { - contentStore := packages_module.NewContentStore() - if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil { + pv, createdPackage, err = createPackageAndVersion(ctx, pvci, allowDuplicate) + if err != nil { + return err + } + + pf, pb, blobCreated, err = addFileToPackageVersion(ctx, pv, &pvci.PackageInfo, pfci) + if blobCreated { + blobHash256Created = optional.Some(pb.HashSHA256) + } + return err + }) + if err != nil { + // If we have an error later in the process after writing a blob to the content store, make our best effort to + // remove the content -- it won't be referenced in the DB because the transaction would be rolled back. + if has, hash := blobHash256Created.Get(); has { + if err := packages_module.NewContentStore().Delete(packages_module.BlobHash256Key(hash)); err != nil { log.Error("Error deleting package blob from content store: %v", err) } } - }() - if err != nil { - removeBlob = true return nil, nil, err } - if err := committer.Commit(); err != nil { - removeBlob = true - return nil, nil, err - } - - if created { + if createdPackage { pd, err := packages_model.GetPackageDescriptor(ctx, pv) if err != nil { return nil, nil, err @@ -213,29 +231,33 @@ func AddFileToPackageVersionInternal(ctx context.Context, pv *packages_model.Pac } func addFileToPackageWrapper(ctx context.Context, fn func(ctx context.Context) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error)) (*packages_model.PackageFile, error) { - ctx, committer, err := db.TxContext(ctx) - if err != nil { - return nil, err - } - defer committer.Close() + var pf *packages_model.PackageFile + var pb *packages_model.PackageBlob + var blobHash256Created optional.Option[string] - pf, pb, blobCreated, err := fn(ctx) - removeBlob := false - defer func() { - if removeBlob { - contentStore := packages_module.NewContentStore() - if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil { + // See comment in createPackageAndAddFile which explains why RetryTx is used with ErrUniqueConstraintViolation. + err := db.RetryTx(ctx, db.RetryConfig{ + // A single retry is sufficient as any package index that was concurrently modified should now be present: + AttemptCount: 2, + ErrorIs: []error{xorm.ErrUniqueConstraintViolation}, + }, func(ctx context.Context) error { + var err error + var blobCreated bool + + pf, pb, blobCreated, err = fn(ctx) + if blobCreated { + blobHash256Created = optional.Some(pb.HashSHA256) + } + return err + }) + if err != nil { + // If we have an error later in the process after writing a blob to the content store, make our best effort to + // remove the content -- it won't be referenced in the DB because the transaction would be rolled back. + if has, hash := blobHash256Created.Get(); has { + if err := packages_module.NewContentStore().Delete(packages_module.BlobHash256Key(hash)); err != nil { log.Error("Error deleting package blob from content store: %v", err) } } - }() - if err != nil { - removeBlob = blobCreated - return nil, err - } - - if err := committer.Commit(); err != nil { - removeBlob = blobCreated return nil, err } @@ -267,6 +289,20 @@ func addFileToPackageVersion(ctx context.Context, pv *packages_model.PackageVers func addFileToPackageVersionUnchecked(ctx context.Context, pv *packages_model.PackageVersion, pfci *PackageFileCreationInfo, packageType packages_model.Type) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error) { log.Trace("Adding package file: %v, %s", pv.ID, pfci.Filename) + // The `OverwriteExisting` capability in this method has a race condition in it -- it will check if the file already + // exists in the package, and delete the file's properties and the file, and then it will attempt to insert the new + // file. This can cause the `ErrDuplicatePackageFile` error to be returned even when `OverwriteExisting` in + // concurrent modifications, as both modifications will attempt to delete the existing file, one will succeed, one + // will delete zero records and think it succeeded, and then both will attempt to add the file and one will hit + // `ErrDuplicatePackageFile`. + // + // To address this, lock the package version being modified by performing a `SELECT ... FOR UPDATE` on it, + // guaranteeing only one `addFileToPackageVersionUnchecked` is running on a specific package version. + err := pv.LockForUpdate(ctx) + if err != nil { + return nil, nil, false, err + } + pb, exists, err := packages_model.GetOrInsertBlob(ctx, NewPackageBlob(pfci.Data)) if err != nil { log.Error("Error inserting package blob: %v", err) @@ -430,7 +466,12 @@ func CheckSizeQuotaExceeded(ctx context.Context, doer, owner *user_model.User, p func GetOrCreateInternalPackageVersion(ctx context.Context, ownerID int64, packageType packages_model.Type, name, version string) (*packages_model.PackageVersion, error) { var pv *packages_model.PackageVersion - return pv, db.WithTx(ctx, func(ctx context.Context) error { + // See comment in createPackageAndAddFile which explains why RetryTx is used with ErrUniqueConstraintViolation. + return pv, db.RetryTx(ctx, db.RetryConfig{ + // A single retry is sufficient as any package index that was concurrently modified should now be present: + AttemptCount: 2, + ErrorIs: []error{xorm.ErrUniqueConstraintViolation}, + }, func(ctx context.Context) error { p := &packages_model.Package{ OwnerID: ownerID, Type: packageType, diff --git a/tests/integration/api_packages_debian_test.go b/tests/integration/api_packages_debian_test.go index e22165c44b..4dc5288bc5 100644 --- a/tests/integration/api_packages_debian_test.go +++ b/tests/integration/api_packages_debian_test.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "strings" + "sync" "testing" "forgejo.org/models/db" @@ -19,6 +20,7 @@ import ( user_model "forgejo.org/models/user" "forgejo.org/modules/base" debian_module "forgejo.org/modules/packages/debian" + "forgejo.org/modules/setting" "forgejo.org/tests" "github.com/blakesmith/ar" @@ -26,6 +28,32 @@ import ( "github.com/stretchr/testify/require" ) +func createDebianArchive(name, version, architecture, packageDescription string) io.Reader { + var cbuf bytes.Buffer + zw := gzip.NewWriter(&cbuf) + tw := tar.NewWriter(zw) + tw.WriteHeader(&tar.Header{ + Name: "control", + Mode: 0o600, + Size: 50, + }) + fmt.Fprintf(tw, "Package: %s\nVersion: %s\nArchitecture: %s\nDescription: %s\n", name, version, architecture, packageDescription) + tw.Close() + zw.Close() + + var buf bytes.Buffer + aw := ar.NewWriter(&buf) + aw.WriteGlobalHeader() + hdr := &ar.Header{ + Name: "control.tar.gz", + Mode: 0o600, + Size: int64(cbuf.Len()), + } + aw.WriteHeader(hdr) + aw.Write(cbuf.Bytes()) + return &buf +} + func TestPackageDebian(t *testing.T) { defer tests.PrepareTestEnv(t)() user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) @@ -35,32 +63,6 @@ func TestPackageDebian(t *testing.T) { packageVersion2 := "1.0.4" packageDescription := "Package Description" - createArchive := func(name, version, architecture string) io.Reader { - var cbuf bytes.Buffer - zw := gzip.NewWriter(&cbuf) - tw := tar.NewWriter(zw) - tw.WriteHeader(&tar.Header{ - Name: "control", - Mode: 0o600, - Size: 50, - }) - fmt.Fprintf(tw, "Package: %s\nVersion: %s\nArchitecture: %s\nDescription: %s\n", name, version, architecture, packageDescription) - tw.Close() - zw.Close() - - var buf bytes.Buffer - aw := ar.NewWriter(&buf) - aw.WriteGlobalHeader() - hdr := &ar.Header{ - Name: "control.tar.gz", - Mode: 0o600, - Size: int64(cbuf.Len()), - } - aw.WriteHeader(hdr) - aw.Write(cbuf.Bytes()) - return &buf - } - distributions := []string{"test", "gitea"} components := []string{"main", "stable"} architectures := []string{"all", "amd64"} @@ -97,16 +99,16 @@ func TestPackageDebian(t *testing.T) { AddBasicAuth(user.Name) MakeRequest(t, req, http.StatusBadRequest) - req = NewRequestWithBody(t, "PUT", uploadURL, createArchive("", "", "")). + req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive("", "", "", packageDescription)). AddBasicAuth(user.Name) MakeRequest(t, req, http.StatusBadRequest) - req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)). + req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)). AddBasicAuth(user.Name). SetHeader("content-type", "multipart/form-data") MakeRequest(t, req, http.StatusBadRequest) - req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)). + req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)). AddBasicAuth(user.Name) MakeRequest(t, req, http.StatusCreated) @@ -154,7 +156,7 @@ func TestPackageDebian(t *testing.T) { return seen }) - req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)). + req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)). AddBasicAuth(user.Name) MakeRequest(t, req, http.StatusConflict) }) @@ -171,7 +173,7 @@ func TestPackageDebian(t *testing.T) { t.Run("Packages", func(t *testing.T) { defer tests.PrintCurrentTest(t)() - req := NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion2, architecture)). + req := NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion2, architecture, packageDescription)). AddBasicAuth(user.Name) MakeRequest(t, req, http.StatusCreated) @@ -308,3 +310,50 @@ func TestPackageDebian(t *testing.T) { require.Contains(t, body, fmt.Sprintf("Version: %s", packageVersion2)) }) } + +func TestPackageDebianConcurrent(t *testing.T) { + if setting.Database.Type.IsSQLite3() { + // Concurrency test fails on SQLite w/ "database is locked" + t.Skip() + } + + defer tests.PrepareTestEnv(t)() + + user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + + distribution := "test" + component := "main" + architecture := "amd64" + packageName := "gitea" + packageDescription := "Package Description" + + rootURL := fmt.Sprintf("/api/packages/%s/debian", user.Name) + uploadURL := fmt.Sprintf("%s/pool/%s/%s/upload", rootURL, distribution, component) + + t.Run("Concurrent Upload", func(t *testing.T) { + defer tests.PrintCurrentTest(t)() + + var wg sync.WaitGroup + packageCount := 10 + for i := range packageCount { + wg.Go(func() { + req := NewRequestWithBody(t, "PUT", uploadURL, + createDebianArchive(packageName, fmt.Sprintf("1.0.%d", i), architecture, packageDescription)). + AddBasicAuth(user.Name) + MakeRequest(t, req, http.StatusCreated) + }) + } + wg.Wait() + + url := fmt.Sprintf("%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture) + + req := NewRequest(t, "GET", url) + resp := MakeRequest(t, req, http.StatusOK) + body := resp.Body.String() + + assert.Contains(t, body, fmt.Sprintf("Package: %s\n", packageName)) + for i := range packageCount { + assert.Contains(t, body, fmt.Sprintf("Version: 1.0.%d\n", i)) + } + }) +}