mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-05-12 22:10:25 +00:00
[v15.0/forgejo] fix: duplicate key violates unique constraint in concurrent debian package uploads (#11833)
**Backport:** https://codeberg.org/forgejo/forgejo/pulls/11776 Fixes #11438. Whenever a "unique constraint violation" error is encountered by package mutation, detect if a `xorm.ErrUniqueConstraintViolation` error occurs. If it does, retry the entire transaction. ## Checklist The [contributor guide](https://forgejo.org/docs/next/contributor/) contains information that will be helpful to first time contributors. There also are a few [conditions for merging Pull Requests in Forgejo repositories](https://codeberg.org/forgejo/governance/src/branch/main/PullRequestsAgreement.md). You are also welcome to join the [Forgejo development chatroom](https://matrix.to/#/#forgejo-development:matrix.org). ### Tests for Go changes (can be removed for JavaScript changes) - I added test coverage for Go changes... - [ ] in their respective `*_test.go` for unit tests. - [ ] in the `tests/integration` directory if it involves interactions with a live Forgejo server. - I ran... - [ ] `make pr-go` before pushing ### Documentation - [ ] I created a pull request [to the documentation](https://codeberg.org/forgejo/docs) to explain to Forgejo users how to use this change. - [x] I did not document these changes and I do not expect someone else to do it. ### Release notes - [x] This change will be noticed by a Forgejo user or admin (feature, bug fix, performance, etc.). I suggest to include a release note for this change. - [ ] This change is not visible to a Forgejo user or admin (refactor, dependency upgrade, etc.). I think there is no need to add a release note for this change. Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11833 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Co-authored-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org> Co-committed-by: forgejo-backport-action <forgejo-backport-action@noreply.codeberg.org>
This commit is contained in:
parent
4230ba6ed0
commit
ebac8b38cb
8 changed files with 286 additions and 79 deletions
|
|
@ -19,7 +19,6 @@ forgejo.org/models/auth
|
||||||
forgejo.org/models/db
|
forgejo.org/models/db
|
||||||
TruncateBeans
|
TruncateBeans
|
||||||
TruncateBeansCascade
|
TruncateBeansCascade
|
||||||
InTransaction
|
|
||||||
DumpTables
|
DumpTables
|
||||||
GetTableNames
|
GetTableNames
|
||||||
extendBeansForCascade
|
extendBeansForCascade
|
||||||
|
|
|
||||||
2
go.mod
2
go.mod
|
|
@ -274,4 +274,4 @@ replace github.com/gliderlabs/ssh => code.forgejo.org/forgejo/ssh v0.0.0-2024121
|
||||||
|
|
||||||
replace git.sr.ht/~mariusor/go-xsd-duration => code.forgejo.org/forgejo/go-xsd-duration v0.0.0-20220703122237-02e73435a078
|
replace git.sr.ht/~mariusor/go-xsd-duration => code.forgejo.org/forgejo/go-xsd-duration v0.0.0-20220703122237-02e73435a078
|
||||||
|
|
||||||
replace xorm.io/xorm v1.3.9 => code.forgejo.org/xorm/xorm v1.3.9-forgejo.8
|
replace xorm.io/xorm v1.3.9 => code.forgejo.org/xorm/xorm v1.3.9-forgejo.9
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -42,8 +42,8 @@ code.forgejo.org/go-chi/captcha v1.0.2 h1:vyHDPXkpjDv8bLO9NqtWzZayzstD/WpJ5xwEkA
|
||||||
code.forgejo.org/go-chi/captcha v1.0.2/go.mod h1:lxiPLcJ76UCZHoH31/Wbum4GUi2NgjfFZLrJkKv1lLE=
|
code.forgejo.org/go-chi/captcha v1.0.2/go.mod h1:lxiPLcJ76UCZHoH31/Wbum4GUi2NgjfFZLrJkKv1lLE=
|
||||||
code.forgejo.org/go-chi/session v1.0.3 h1:ByJ9c/UC0AU57hxiGl53TXh+NdBOBwK/bhZ9jyadEwE=
|
code.forgejo.org/go-chi/session v1.0.3 h1:ByJ9c/UC0AU57hxiGl53TXh+NdBOBwK/bhZ9jyadEwE=
|
||||||
code.forgejo.org/go-chi/session v1.0.3/go.mod h1:xzGtFrV/agCJoZCUhFDlqAr1he6BrAdqlaprKOB1W90=
|
code.forgejo.org/go-chi/session v1.0.3/go.mod h1:xzGtFrV/agCJoZCUhFDlqAr1he6BrAdqlaprKOB1W90=
|
||||||
code.forgejo.org/xorm/xorm v1.3.9-forgejo.8 h1:dsSKm2nus0NhHsqYxeuB3Gldk6TtlusD1CBGV6V1SS0=
|
code.forgejo.org/xorm/xorm v1.3.9-forgejo.9 h1:hzEXDa53opdp5nrGG4F6y8HzFzrGXd5GIvFyUHcvGmI=
|
||||||
code.forgejo.org/xorm/xorm v1.3.9-forgejo.8/go.mod h1:A7sFd3BFmRp20h6drSsCXgQRQdF8Vz8HuCSrzFS3m90=
|
code.forgejo.org/xorm/xorm v1.3.9-forgejo.9/go.mod h1:A7sFd3BFmRp20h6drSsCXgQRQdF8Vz8HuCSrzFS3m90=
|
||||||
code.gitea.io/sdk/gitea v0.21.0 h1:69n6oz6kEVHRo1+APQQyizkhrZrLsTLXey9142pfkD4=
|
code.gitea.io/sdk/gitea v0.21.0 h1:69n6oz6kEVHRo1+APQQyizkhrZrLsTLXey9142pfkD4=
|
||||||
code.gitea.io/sdk/gitea v0.21.0/go.mod h1:tnBjVhuKJCn8ibdyyhvUyxrR1Ca2KHEoTWoukNhXQPA=
|
code.gitea.io/sdk/gitea v0.21.0/go.mod h1:tnBjVhuKJCn8ibdyyhvUyxrR1Ca2KHEoTWoukNhXQPA=
|
||||||
code.superseriousbusiness.org/exif-terminator v0.11.1 h1:qnujLH4/Yk/CFtFMmtjozbdV6Ry5G3Q/E/mLlWm/gQI=
|
code.superseriousbusiness.org/exif-terminator v0.11.1 h1:qnujLH4/Yk/CFtFMmtjozbdV6Ry5G3Q/E/mLlWm/gQI=
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,8 @@ package db
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"xorm.io/builder"
|
"xorm.io/builder"
|
||||||
"xorm.io/xorm"
|
"xorm.io/xorm"
|
||||||
|
|
@ -416,3 +418,42 @@ func inTransaction(ctx context.Context) (*xorm.Session, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RetryConfig struct {
|
||||||
|
ErrorIs []error
|
||||||
|
AttemptCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute the given function in a transaction. RetryConfig will retry the function on an error, if it matches the
|
||||||
|
// ErrorIs parameter, up to the total of AttemptCount number of tries. RetryTx cannot be invoked when already within a
|
||||||
|
// transaction and will return an error immediately.
|
||||||
|
func RetryTx(ctx context.Context, config RetryConfig, f func(ctx context.Context) error) error {
|
||||||
|
if InTransaction(ctx) {
|
||||||
|
return errors.New("unsupported operation: attempted to use RetryTx while already within a transaction")
|
||||||
|
} else if config.AttemptCount == 0 {
|
||||||
|
return errors.New("unsupported operation: attempted to use RetryTx with 0 attempts")
|
||||||
|
}
|
||||||
|
|
||||||
|
var lastError error
|
||||||
|
for range config.AttemptCount {
|
||||||
|
err := WithTx(ctx, f)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
foundMatch := false
|
||||||
|
for _, possibleError := range config.ErrorIs {
|
||||||
|
if errors.Is(err, possibleError) {
|
||||||
|
foundMatch = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !foundMatch {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
lastError = err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("retry tx failed after %d attempts; last error: %w", config.AttemptCount, lastError)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -220,3 +220,59 @@ func TestAfterTx(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryTx(t *testing.T) {
|
||||||
|
t.Run("success", func(t *testing.T) {
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{AttemptCount: 1}, func(ctx context.Context) error {
|
||||||
|
assert.True(t, db.InTransaction(ctx))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("fail constantly", func(t *testing.T) {
|
||||||
|
attemptCount := 0
|
||||||
|
testError := errors.New("hello")
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{testError},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
attemptCount++
|
||||||
|
return testError
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, testError)
|
||||||
|
require.ErrorContains(t, err, "2 attempts")
|
||||||
|
assert.Equal(t, 2, attemptCount)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("fail w/ non retriable error", func(t *testing.T) {
|
||||||
|
attemptCount := 0
|
||||||
|
testError := errors.New("hello")
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
attemptCount++
|
||||||
|
return testError
|
||||||
|
})
|
||||||
|
require.ErrorIs(t, err, testError)
|
||||||
|
assert.Equal(t, 1, attemptCount)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("succeed on retry", func(t *testing.T) {
|
||||||
|
attemptCount := 0
|
||||||
|
testError := errors.New("hello")
|
||||||
|
err := db.RetryTx(t.Context(), db.RetryConfig{
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{testError},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
attemptCount++
|
||||||
|
if attemptCount == 1 {
|
||||||
|
return testError
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, attemptCount)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,11 +5,13 @@ package packages
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"forgejo.org/models/db"
|
"forgejo.org/models/db"
|
||||||
"forgejo.org/modules/optional"
|
"forgejo.org/modules/optional"
|
||||||
|
"forgejo.org/modules/setting"
|
||||||
"forgejo.org/modules/timeutil"
|
"forgejo.org/modules/timeutil"
|
||||||
"forgejo.org/modules/util"
|
"forgejo.org/modules/util"
|
||||||
|
|
||||||
|
|
@ -155,6 +157,25 @@ func HasVersionFileReferences(ctx context.Context, versionID int64) (bool, error
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pv *PackageVersion) LockForUpdate(ctx context.Context) error {
|
||||||
|
if !db.InTransaction(ctx) {
|
||||||
|
return errors.New("invalid state for PackageVersion.LockForUpdate: database is not in a transaction")
|
||||||
|
} else if setting.Database.Type.IsSQLite3() {
|
||||||
|
// SQLite both doesn't support "SELECT ... FOR UPDATE", and it's irrelevant for SQLite as the entire database is
|
||||||
|
// locked for write when a write transaction is open.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pvfu := PackageVersion{}
|
||||||
|
has, err := db.GetEngine(ctx).ID(pv.ID).ForUpdate().Get(&pvfu)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !has {
|
||||||
|
return ErrPackageNotExist
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SearchValue describes a value to search
|
// SearchValue describes a value to search
|
||||||
// If ExactMatch is true, the field must match the value otherwise a LIKE search is performed.
|
// If ExactMatch is true, the field must match the value otherwise a LIKE search is performed.
|
||||||
type SearchValue struct {
|
type SearchValue struct {
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@ import (
|
||||||
"forgejo.org/modules/setting"
|
"forgejo.org/modules/setting"
|
||||||
"forgejo.org/modules/storage"
|
"forgejo.org/modules/storage"
|
||||||
notify_service "forgejo.org/services/notify"
|
notify_service "forgejo.org/services/notify"
|
||||||
|
|
||||||
|
"xorm.io/xorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -76,38 +78,54 @@ func CreatePackageOrAddFileToExisting(ctx context.Context, pvci *PackageCreation
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPackageAndAddFile(ctx context.Context, pvci *PackageCreationInfo, pfci *PackageFileCreationInfo, allowDuplicate bool) (*packages_model.PackageVersion, *packages_model.PackageFile, error) {
|
func createPackageAndAddFile(ctx context.Context, pvci *PackageCreationInfo, pfci *PackageFileCreationInfo, allowDuplicate bool) (*packages_model.PackageVersion, *packages_model.PackageFile, error) {
|
||||||
dbCtx, committer, err := db.TxContext(ctx)
|
var pv *packages_model.PackageVersion
|
||||||
if err != nil {
|
var pf *packages_model.PackageFile
|
||||||
return nil, nil, err
|
var blobHash256Created optional.Option[string]
|
||||||
}
|
var createdPackage bool
|
||||||
defer committer.Close()
|
|
||||||
|
|
||||||
pv, created, err := createPackageAndVersion(dbCtx, pvci, allowDuplicate)
|
// ErrUniqueConstraintViolation can occur when two concurrent updates occur to a package registry. Typically this
|
||||||
if err != nil {
|
// occurs when a registry with an index of organization-level packages is modified (Debian, Alpine, Alt, Arch, RPM)
|
||||||
return nil, nil, err
|
// and that index needs to be rebuilt -- even if two different packages are being updated, they can write the
|
||||||
}
|
// registry concurrently and that can cause ErrUniqueConstraintViolation errors from the database operations that
|
||||||
|
// "check if record exists, if not, create it".
|
||||||
|
//
|
||||||
|
// The simple approach of detecting the ErrUniqueConstraintViolation error inside the transaction and picking up the
|
||||||
|
// other write isn't possible for two reasons: (a) PostgreSQL can't continue a transaction with an error in it, a
|
||||||
|
// SAVEPOINT and ROLLBACK TO SAVEPOINT are required, and (b) xorm keeps internal state during a transaction that
|
||||||
|
// causes such a recovery from error to panic. So, we retry the entire modification transaction if
|
||||||
|
// ErrUniqueConstraintViolation is encountered.
|
||||||
|
err := db.RetryTx(ctx, db.RetryConfig{
|
||||||
|
// A single retry is sufficient as any package index that was concurrently modified should now be present:
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{xorm.ErrUniqueConstraintViolation},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
|
var err error
|
||||||
|
var pb *packages_model.PackageBlob
|
||||||
|
var blobCreated bool
|
||||||
|
|
||||||
pf, pb, blobCreated, err := addFileToPackageVersion(dbCtx, pv, &pvci.PackageInfo, pfci)
|
pv, createdPackage, err = createPackageAndVersion(ctx, pvci, allowDuplicate)
|
||||||
removeBlob := false
|
if err != nil {
|
||||||
defer func() {
|
return err
|
||||||
if blobCreated && removeBlob {
|
}
|
||||||
contentStore := packages_module.NewContentStore()
|
|
||||||
if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil {
|
pf, pb, blobCreated, err = addFileToPackageVersion(ctx, pv, &pvci.PackageInfo, pfci)
|
||||||
|
if blobCreated {
|
||||||
|
blobHash256Created = optional.Some(pb.HashSHA256)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If we have an error later in the process after writing a blob to the content store, make our best effort to
|
||||||
|
// remove the content -- it won't be referenced in the DB because the transaction would be rolled back.
|
||||||
|
if has, hash := blobHash256Created.Get(); has {
|
||||||
|
if err := packages_module.NewContentStore().Delete(packages_module.BlobHash256Key(hash)); err != nil {
|
||||||
log.Error("Error deleting package blob from content store: %v", err)
|
log.Error("Error deleting package blob from content store: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
removeBlob = true
|
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := committer.Commit(); err != nil {
|
if createdPackage {
|
||||||
removeBlob = true
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if created {
|
|
||||||
pd, err := packages_model.GetPackageDescriptor(ctx, pv)
|
pd, err := packages_model.GetPackageDescriptor(ctx, pv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|
@ -213,29 +231,33 @@ func AddFileToPackageVersionInternal(ctx context.Context, pv *packages_model.Pac
|
||||||
}
|
}
|
||||||
|
|
||||||
func addFileToPackageWrapper(ctx context.Context, fn func(ctx context.Context) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error)) (*packages_model.PackageFile, error) {
|
func addFileToPackageWrapper(ctx context.Context, fn func(ctx context.Context) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error)) (*packages_model.PackageFile, error) {
|
||||||
ctx, committer, err := db.TxContext(ctx)
|
var pf *packages_model.PackageFile
|
||||||
if err != nil {
|
var pb *packages_model.PackageBlob
|
||||||
return nil, err
|
var blobHash256Created optional.Option[string]
|
||||||
}
|
|
||||||
defer committer.Close()
|
|
||||||
|
|
||||||
pf, pb, blobCreated, err := fn(ctx)
|
// See comment in createPackageAndAddFile which explains why RetryTx is used with ErrUniqueConstraintViolation.
|
||||||
removeBlob := false
|
err := db.RetryTx(ctx, db.RetryConfig{
|
||||||
defer func() {
|
// A single retry is sufficient as any package index that was concurrently modified should now be present:
|
||||||
if removeBlob {
|
AttemptCount: 2,
|
||||||
contentStore := packages_module.NewContentStore()
|
ErrorIs: []error{xorm.ErrUniqueConstraintViolation},
|
||||||
if err := contentStore.Delete(packages_module.BlobHash256Key(pb.HashSHA256)); err != nil {
|
}, func(ctx context.Context) error {
|
||||||
|
var err error
|
||||||
|
var blobCreated bool
|
||||||
|
|
||||||
|
pf, pb, blobCreated, err = fn(ctx)
|
||||||
|
if blobCreated {
|
||||||
|
blobHash256Created = optional.Some(pb.HashSHA256)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// If we have an error later in the process after writing a blob to the content store, make our best effort to
|
||||||
|
// remove the content -- it won't be referenced in the DB because the transaction would be rolled back.
|
||||||
|
if has, hash := blobHash256Created.Get(); has {
|
||||||
|
if err := packages_module.NewContentStore().Delete(packages_module.BlobHash256Key(hash)); err != nil {
|
||||||
log.Error("Error deleting package blob from content store: %v", err)
|
log.Error("Error deleting package blob from content store: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
if err != nil {
|
|
||||||
removeBlob = blobCreated
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := committer.Commit(); err != nil {
|
|
||||||
removeBlob = blobCreated
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -267,6 +289,20 @@ func addFileToPackageVersion(ctx context.Context, pv *packages_model.PackageVers
|
||||||
func addFileToPackageVersionUnchecked(ctx context.Context, pv *packages_model.PackageVersion, pfci *PackageFileCreationInfo, packageType packages_model.Type) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error) {
|
func addFileToPackageVersionUnchecked(ctx context.Context, pv *packages_model.PackageVersion, pfci *PackageFileCreationInfo, packageType packages_model.Type) (*packages_model.PackageFile, *packages_model.PackageBlob, bool, error) {
|
||||||
log.Trace("Adding package file: %v, %s", pv.ID, pfci.Filename)
|
log.Trace("Adding package file: %v, %s", pv.ID, pfci.Filename)
|
||||||
|
|
||||||
|
// The `OverwriteExisting` capability in this method has a race condition in it -- it will check if the file already
|
||||||
|
// exists in the package, and delete the file's properties and the file, and then it will attempt to insert the new
|
||||||
|
// file. This can cause the `ErrDuplicatePackageFile` error to be returned even when `OverwriteExisting` in
|
||||||
|
// concurrent modifications, as both modifications will attempt to delete the existing file, one will succeed, one
|
||||||
|
// will delete zero records and think it succeeded, and then both will attempt to add the file and one will hit
|
||||||
|
// `ErrDuplicatePackageFile`.
|
||||||
|
//
|
||||||
|
// To address this, lock the package version being modified by performing a `SELECT ... FOR UPDATE` on it,
|
||||||
|
// guaranteeing only one `addFileToPackageVersionUnchecked` is running on a specific package version.
|
||||||
|
err := pv.LockForUpdate(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
pb, exists, err := packages_model.GetOrInsertBlob(ctx, NewPackageBlob(pfci.Data))
|
pb, exists, err := packages_model.GetOrInsertBlob(ctx, NewPackageBlob(pfci.Data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error inserting package blob: %v", err)
|
log.Error("Error inserting package blob: %v", err)
|
||||||
|
|
@ -430,7 +466,12 @@ func CheckSizeQuotaExceeded(ctx context.Context, doer, owner *user_model.User, p
|
||||||
func GetOrCreateInternalPackageVersion(ctx context.Context, ownerID int64, packageType packages_model.Type, name, version string) (*packages_model.PackageVersion, error) {
|
func GetOrCreateInternalPackageVersion(ctx context.Context, ownerID int64, packageType packages_model.Type, name, version string) (*packages_model.PackageVersion, error) {
|
||||||
var pv *packages_model.PackageVersion
|
var pv *packages_model.PackageVersion
|
||||||
|
|
||||||
return pv, db.WithTx(ctx, func(ctx context.Context) error {
|
// See comment in createPackageAndAddFile which explains why RetryTx is used with ErrUniqueConstraintViolation.
|
||||||
|
return pv, db.RetryTx(ctx, db.RetryConfig{
|
||||||
|
// A single retry is sufficient as any package index that was concurrently modified should now be present:
|
||||||
|
AttemptCount: 2,
|
||||||
|
ErrorIs: []error{xorm.ErrUniqueConstraintViolation},
|
||||||
|
}, func(ctx context.Context) error {
|
||||||
p := &packages_model.Package{
|
p := &packages_model.Package{
|
||||||
OwnerID: ownerID,
|
OwnerID: ownerID,
|
||||||
Type: packageType,
|
Type: packageType,
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"forgejo.org/models/db"
|
"forgejo.org/models/db"
|
||||||
|
|
@ -19,6 +20,7 @@ import (
|
||||||
user_model "forgejo.org/models/user"
|
user_model "forgejo.org/models/user"
|
||||||
"forgejo.org/modules/base"
|
"forgejo.org/modules/base"
|
||||||
debian_module "forgejo.org/modules/packages/debian"
|
debian_module "forgejo.org/modules/packages/debian"
|
||||||
|
"forgejo.org/modules/setting"
|
||||||
"forgejo.org/tests"
|
"forgejo.org/tests"
|
||||||
|
|
||||||
"github.com/blakesmith/ar"
|
"github.com/blakesmith/ar"
|
||||||
|
|
@ -26,6 +28,32 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func createDebianArchive(name, version, architecture, packageDescription string) io.Reader {
|
||||||
|
var cbuf bytes.Buffer
|
||||||
|
zw := gzip.NewWriter(&cbuf)
|
||||||
|
tw := tar.NewWriter(zw)
|
||||||
|
tw.WriteHeader(&tar.Header{
|
||||||
|
Name: "control",
|
||||||
|
Mode: 0o600,
|
||||||
|
Size: 50,
|
||||||
|
})
|
||||||
|
fmt.Fprintf(tw, "Package: %s\nVersion: %s\nArchitecture: %s\nDescription: %s\n", name, version, architecture, packageDescription)
|
||||||
|
tw.Close()
|
||||||
|
zw.Close()
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
aw := ar.NewWriter(&buf)
|
||||||
|
aw.WriteGlobalHeader()
|
||||||
|
hdr := &ar.Header{
|
||||||
|
Name: "control.tar.gz",
|
||||||
|
Mode: 0o600,
|
||||||
|
Size: int64(cbuf.Len()),
|
||||||
|
}
|
||||||
|
aw.WriteHeader(hdr)
|
||||||
|
aw.Write(cbuf.Bytes())
|
||||||
|
return &buf
|
||||||
|
}
|
||||||
|
|
||||||
func TestPackageDebian(t *testing.T) {
|
func TestPackageDebian(t *testing.T) {
|
||||||
defer tests.PrepareTestEnv(t)()
|
defer tests.PrepareTestEnv(t)()
|
||||||
user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||||
|
|
@ -35,32 +63,6 @@ func TestPackageDebian(t *testing.T) {
|
||||||
packageVersion2 := "1.0.4"
|
packageVersion2 := "1.0.4"
|
||||||
packageDescription := "Package Description"
|
packageDescription := "Package Description"
|
||||||
|
|
||||||
createArchive := func(name, version, architecture string) io.Reader {
|
|
||||||
var cbuf bytes.Buffer
|
|
||||||
zw := gzip.NewWriter(&cbuf)
|
|
||||||
tw := tar.NewWriter(zw)
|
|
||||||
tw.WriteHeader(&tar.Header{
|
|
||||||
Name: "control",
|
|
||||||
Mode: 0o600,
|
|
||||||
Size: 50,
|
|
||||||
})
|
|
||||||
fmt.Fprintf(tw, "Package: %s\nVersion: %s\nArchitecture: %s\nDescription: %s\n", name, version, architecture, packageDescription)
|
|
||||||
tw.Close()
|
|
||||||
zw.Close()
|
|
||||||
|
|
||||||
var buf bytes.Buffer
|
|
||||||
aw := ar.NewWriter(&buf)
|
|
||||||
aw.WriteGlobalHeader()
|
|
||||||
hdr := &ar.Header{
|
|
||||||
Name: "control.tar.gz",
|
|
||||||
Mode: 0o600,
|
|
||||||
Size: int64(cbuf.Len()),
|
|
||||||
}
|
|
||||||
aw.WriteHeader(hdr)
|
|
||||||
aw.Write(cbuf.Bytes())
|
|
||||||
return &buf
|
|
||||||
}
|
|
||||||
|
|
||||||
distributions := []string{"test", "gitea"}
|
distributions := []string{"test", "gitea"}
|
||||||
components := []string{"main", "stable"}
|
components := []string{"main", "stable"}
|
||||||
architectures := []string{"all", "amd64"}
|
architectures := []string{"all", "amd64"}
|
||||||
|
|
@ -97,16 +99,16 @@ func TestPackageDebian(t *testing.T) {
|
||||||
AddBasicAuth(user.Name)
|
AddBasicAuth(user.Name)
|
||||||
MakeRequest(t, req, http.StatusBadRequest)
|
MakeRequest(t, req, http.StatusBadRequest)
|
||||||
|
|
||||||
req = NewRequestWithBody(t, "PUT", uploadURL, createArchive("", "", "")).
|
req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive("", "", "", packageDescription)).
|
||||||
AddBasicAuth(user.Name)
|
AddBasicAuth(user.Name)
|
||||||
MakeRequest(t, req, http.StatusBadRequest)
|
MakeRequest(t, req, http.StatusBadRequest)
|
||||||
|
|
||||||
req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)).
|
req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)).
|
||||||
AddBasicAuth(user.Name).
|
AddBasicAuth(user.Name).
|
||||||
SetHeader("content-type", "multipart/form-data")
|
SetHeader("content-type", "multipart/form-data")
|
||||||
MakeRequest(t, req, http.StatusBadRequest)
|
MakeRequest(t, req, http.StatusBadRequest)
|
||||||
|
|
||||||
req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)).
|
req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)).
|
||||||
AddBasicAuth(user.Name)
|
AddBasicAuth(user.Name)
|
||||||
MakeRequest(t, req, http.StatusCreated)
|
MakeRequest(t, req, http.StatusCreated)
|
||||||
|
|
||||||
|
|
@ -154,7 +156,7 @@ func TestPackageDebian(t *testing.T) {
|
||||||
return seen
|
return seen
|
||||||
})
|
})
|
||||||
|
|
||||||
req = NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion, architecture)).
|
req = NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion, architecture, packageDescription)).
|
||||||
AddBasicAuth(user.Name)
|
AddBasicAuth(user.Name)
|
||||||
MakeRequest(t, req, http.StatusConflict)
|
MakeRequest(t, req, http.StatusConflict)
|
||||||
})
|
})
|
||||||
|
|
@ -171,7 +173,7 @@ func TestPackageDebian(t *testing.T) {
|
||||||
t.Run("Packages", func(t *testing.T) {
|
t.Run("Packages", func(t *testing.T) {
|
||||||
defer tests.PrintCurrentTest(t)()
|
defer tests.PrintCurrentTest(t)()
|
||||||
|
|
||||||
req := NewRequestWithBody(t, "PUT", uploadURL, createArchive(packageName, packageVersion2, architecture)).
|
req := NewRequestWithBody(t, "PUT", uploadURL, createDebianArchive(packageName, packageVersion2, architecture, packageDescription)).
|
||||||
AddBasicAuth(user.Name)
|
AddBasicAuth(user.Name)
|
||||||
MakeRequest(t, req, http.StatusCreated)
|
MakeRequest(t, req, http.StatusCreated)
|
||||||
|
|
||||||
|
|
@ -308,3 +310,50 @@ func TestPackageDebian(t *testing.T) {
|
||||||
require.Contains(t, body, fmt.Sprintf("Version: %s", packageVersion2))
|
require.Contains(t, body, fmt.Sprintf("Version: %s", packageVersion2))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPackageDebianConcurrent(t *testing.T) {
|
||||||
|
if setting.Database.Type.IsSQLite3() {
|
||||||
|
// Concurrency test fails on SQLite w/ "database is locked"
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
defer tests.PrepareTestEnv(t)()
|
||||||
|
|
||||||
|
user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
|
||||||
|
|
||||||
|
distribution := "test"
|
||||||
|
component := "main"
|
||||||
|
architecture := "amd64"
|
||||||
|
packageName := "gitea"
|
||||||
|
packageDescription := "Package Description"
|
||||||
|
|
||||||
|
rootURL := fmt.Sprintf("/api/packages/%s/debian", user.Name)
|
||||||
|
uploadURL := fmt.Sprintf("%s/pool/%s/%s/upload", rootURL, distribution, component)
|
||||||
|
|
||||||
|
t.Run("Concurrent Upload", func(t *testing.T) {
|
||||||
|
defer tests.PrintCurrentTest(t)()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
packageCount := 10
|
||||||
|
for i := range packageCount {
|
||||||
|
wg.Go(func() {
|
||||||
|
req := NewRequestWithBody(t, "PUT", uploadURL,
|
||||||
|
createDebianArchive(packageName, fmt.Sprintf("1.0.%d", i), architecture, packageDescription)).
|
||||||
|
AddBasicAuth(user.Name)
|
||||||
|
MakeRequest(t, req, http.StatusCreated)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
url := fmt.Sprintf("%s/dists/%s/%s/binary-%s/Packages", rootURL, distribution, component, architecture)
|
||||||
|
|
||||||
|
req := NewRequest(t, "GET", url)
|
||||||
|
resp := MakeRequest(t, req, http.StatusOK)
|
||||||
|
body := resp.Body.String()
|
||||||
|
|
||||||
|
assert.Contains(t, body, fmt.Sprintf("Package: %s\n", packageName))
|
||||||
|
for i := range packageCount {
|
||||||
|
assert.Contains(t, body, fmt.Sprintf("Version: 1.0.%d\n", i))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue