mirror of
https://github.com/versity/versitygw.git
synced 2026-04-11 00:26:58 +00:00
fix: enforce 5gb copy source object size threshold.
Fixes #1896 Enforces the S3 `5 GiB` copy source size limit across the posix and azure backends for `CopyObject` and `UploadPartCopy`, returning `InvalidRequest` when the source object exceeds the threshold. The limit is now configurable via `--copy-object-threshold` (`VGW_COPY_OBJECT_THRESHOLD`, default 5 GiB). A new `--mp-max-parts flag` (`VGW_MP_MAX_PARTS`, default `10000`) has been added to make multipart upload parts number limit configurable. No integration test has been added, as GitHub Actions cannot reliably handle large objects.
This commit is contained in:
@@ -95,16 +95,17 @@ func (key) Table() map[string]struct{} {
|
||||
type Azure struct {
|
||||
backend.BackendUnsupported
|
||||
|
||||
client *azblob.Client
|
||||
sharedkeyCreds *azblob.SharedKeyCredential
|
||||
defaultCreds *azidentity.DefaultAzureCredential
|
||||
serviceURL string
|
||||
sasToken string
|
||||
client *azblob.Client
|
||||
sharedkeyCreds *azblob.SharedKeyCredential
|
||||
defaultCreds *azidentity.DefaultAzureCredential
|
||||
serviceURL string
|
||||
sasToken string
|
||||
copyObjectThreshold int64
|
||||
}
|
||||
|
||||
var _ backend.Backend = &Azure{}
|
||||
|
||||
func New(accountName, accountKey, serviceURL, sasToken string) (*Azure, error) {
|
||||
func New(accountName, accountKey, serviceURL, sasToken string, copyObjectThreshold int64) (*Azure, error) {
|
||||
url := serviceURL
|
||||
if serviceURL == "" && accountName != "" {
|
||||
// if not otherwise specified, use the typical form:
|
||||
@@ -117,7 +118,12 @@ func New(accountName, accountKey, serviceURL, sasToken string) (*Azure, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init client: %w", err)
|
||||
}
|
||||
return &Azure{client: client, serviceURL: serviceURL, sasToken: sasToken}, nil
|
||||
return &Azure{
|
||||
client: client,
|
||||
serviceURL: serviceURL,
|
||||
sasToken: sasToken,
|
||||
copyObjectThreshold: copyObjectThreshold,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if accountName == "" {
|
||||
@@ -134,7 +140,12 @@ func New(accountName, accountKey, serviceURL, sasToken string) (*Azure, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("init client: %w", err)
|
||||
}
|
||||
return &Azure{client: client, serviceURL: url, defaultCreds: cred}, nil
|
||||
return &Azure{
|
||||
client: client,
|
||||
serviceURL: url,
|
||||
defaultCreds: cred,
|
||||
copyObjectThreshold: copyObjectThreshold,
|
||||
}, nil
|
||||
}
|
||||
|
||||
cred, err := azblob.NewSharedKeyCredential(accountName, accountKey)
|
||||
@@ -147,7 +158,12 @@ func New(accountName, accountKey, serviceURL, sasToken string) (*Azure, error) {
|
||||
return nil, fmt.Errorf("init client: %w", err)
|
||||
}
|
||||
|
||||
return &Azure{client: client, serviceURL: url, sharedkeyCreds: cred}, nil
|
||||
return &Azure{
|
||||
client: client,
|
||||
serviceURL: url,
|
||||
sharedkeyCreds: cred,
|
||||
copyObjectThreshold: copyObjectThreshold,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (az *Azure) Shutdown() {}
|
||||
@@ -959,6 +975,15 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu
|
||||
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrInvalidCopyDest)
|
||||
}
|
||||
|
||||
resp, err := dstClient.GetProperties(ctx, nil)
|
||||
if err != nil {
|
||||
return s3response.CopyObjectOutput{}, azureErrToS3Err(err)
|
||||
}
|
||||
|
||||
if resp.ContentLength != nil && *resp.ContentLength > az.copyObjectThreshold {
|
||||
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(az.copyObjectThreshold)
|
||||
}
|
||||
|
||||
// Set object meta http headers
|
||||
res, err := dstClient.SetHTTPHeaders(ctx, blob.HTTPHeaders{
|
||||
BlobCacheControl: input.CacheControl,
|
||||
@@ -1045,6 +1070,10 @@ func (az *Azure) CopyObject(ctx context.Context, input s3response.CopyObjectInpu
|
||||
}
|
||||
defer downloadResp.Body.Close()
|
||||
|
||||
if downloadResp.ContentLength != nil && *downloadResp.ContentLength > az.copyObjectThreshold {
|
||||
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(az.copyObjectThreshold)
|
||||
}
|
||||
|
||||
pInput := s3response.PutObjectInput{
|
||||
Body: downloadResp.Body,
|
||||
Bucket: input.Bucket,
|
||||
|
||||
@@ -98,6 +98,12 @@ type Posix struct {
|
||||
// execute blocking syscalls (stat, readdir, xattr, open, etc.), this limiter
|
||||
// constrains parallelism to prevent excessive thread creation under load.
|
||||
actionLimiter *semaphore.Weighted
|
||||
|
||||
// copyObjectThreshold is the maximum allowed size (in bytes) for a copy
|
||||
// source object. Requests to copy objects larger than this value are
|
||||
// rejected with an 'InvalidRequest' to comply with the S3 limit
|
||||
// of 5 GiB.
|
||||
copyObjectThreshold int64
|
||||
}
|
||||
|
||||
var _ backend.Backend = &Posix{}
|
||||
@@ -172,6 +178,11 @@ type PosixOpts struct {
|
||||
// queue depth grows under sustained load, request latency increases and
|
||||
// upstream timeouts may occur.
|
||||
Concurrency int
|
||||
// CopyObjectThreshold sets the maximum allowed source object size (in bytes)
|
||||
// for CopyObject and UploadPartCopy operations. Requests exceeding this
|
||||
// threshold are rejected with an 'InvalidRequest' error. Defaults to the
|
||||
// S3 specification limit of 5 GiB.
|
||||
CopyObjectThreshold int64
|
||||
}
|
||||
|
||||
func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, error) {
|
||||
@@ -235,6 +246,7 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro
|
||||
forceNoCopyFileRange: opts.ForceNoCopyFileRange,
|
||||
validateBucketName: opts.ValidateBucketNames,
|
||||
actionLimiter: semaphore.NewWeighted(int64(concurrencyOrDefault(opts.Concurrency))),
|
||||
copyObjectThreshold: opts.CopyObjectThreshold,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -3084,6 +3096,9 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
|
||||
if err != nil {
|
||||
return s3response.CopyPartResult{}, err
|
||||
}
|
||||
if length > p.copyObjectThreshold {
|
||||
return s3response.CopyPartResult{}, s3err.GetCopySourceObjectTooLargeErr(p.copyObjectThreshold)
|
||||
}
|
||||
|
||||
srcf, err := os.Open(objPath)
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
@@ -4706,6 +4721,9 @@ func (p *Posix) CopyObject(ctx context.Context, input s3response.CopyObjectInput
|
||||
if !strings.HasSuffix(srcObject, "/") && fi.IsDir() {
|
||||
return s3response.CopyObjectOutput{}, s3err.GetAPIError(s3err.ErrNoSuchKey)
|
||||
}
|
||||
if fi.Size() > p.copyObjectThreshold {
|
||||
return s3response.CopyObjectOutput{}, s3err.GetCopySourceObjectTooLargeErr(p.copyObjectThreshold)
|
||||
}
|
||||
|
||||
b, err := p.meta.RetrieveAttribute(f, srcBucket, srcObject, etagkey)
|
||||
srcEtag := string(b)
|
||||
|
||||
@@ -45,6 +45,11 @@ type ScoutfsOpts struct {
|
||||
// Concurrency sets the maximum number of concurrently running POSIX actions.
|
||||
// Defaults to 5000 when unset or non-positive.
|
||||
Concurrency int
|
||||
// CopyObjectThreshold sets the maximum allowed source object size (in bytes)
|
||||
// for CopyObject and UploadPartCopy operations. Requests exceeding this
|
||||
// threshold are rejected with an 'InvalidRequest' error. Defaults to the
|
||||
// S3 specification limit of 5 GiB.
|
||||
CopyObjectThreshold int64
|
||||
}
|
||||
|
||||
var _ backend.Backend = &ScoutFS{}
|
||||
|
||||
@@ -82,6 +82,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) {
|
||||
VersioningDir: opts.VersioningDir,
|
||||
ValidateBucketNames: opts.ValidateBucketNames,
|
||||
Concurrency: opts.Concurrency,
|
||||
CopyObjectThreshold: opts.CopyObjectThreshold,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -65,7 +65,7 @@ func azureCommand() *cli.Command {
|
||||
}
|
||||
|
||||
func runAzure(ctx *cli.Context) error {
|
||||
be, err := azure.New(azAccount, azKey, azServiceURL, azSASToken)
|
||||
be, err := azure.New(azAccount, azKey, azServiceURL, azSASToken, copyObjectThreshold)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init azure: %w", err)
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ func initEnv(dir string) {
|
||||
maxConnections = 250000
|
||||
maxRequests = 100000
|
||||
ports = []string{"127.0.0.1:7070"}
|
||||
mpMaxParts = 10000
|
||||
copyObjectThreshold = 5 * 1024 * 1024 * 1024
|
||||
|
||||
// client
|
||||
awsID = "user"
|
||||
|
||||
@@ -103,6 +103,8 @@ var (
|
||||
webuiPathPrefix string
|
||||
webuiS3Prefix string
|
||||
disableACLs bool
|
||||
mpMaxParts int
|
||||
copyObjectThreshold int64
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -755,6 +757,20 @@ func initFlags() []cli.Flag {
|
||||
EnvVars: []string{"VGW_IPA_INSECURE"},
|
||||
Destination: &ipaInsecure,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "mp-max-parts",
|
||||
Usage: "maximum number of parts allowed in a multipart upload",
|
||||
EnvVars: []string{"VGW_MP_MAX_PARTS"},
|
||||
Value: 10000,
|
||||
Destination: &mpMaxParts,
|
||||
},
|
||||
&cli.Int64Flag{
|
||||
Name: "copy-object-threshold",
|
||||
Usage: "maximum allowed source object size in bytes for CopyObject; objects larger than this are rejected",
|
||||
EnvVars: []string{"VGW_COPY_OBJECT_THRESHOLD"},
|
||||
Value: 5 * 1024 * 1024 * 1024,
|
||||
Destination: ©ObjectThreshold,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -778,6 +794,12 @@ func runGateway(ctx context.Context, be backend.Backend) error {
|
||||
log.Printf("WARNING: max-requests (%d) exceeds max-connections (%d) which could allow for gateway to panic before throttling requests",
|
||||
maxRequests, maxConnections)
|
||||
}
|
||||
if mpMaxParts < 1 {
|
||||
return fmt.Errorf("mp-max-parts must be positive")
|
||||
}
|
||||
if copyObjectThreshold < 1 {
|
||||
return fmt.Errorf("copy-object-threshold must be positive")
|
||||
}
|
||||
|
||||
// Ensure we have at least one port specified
|
||||
if len(ports) == 0 {
|
||||
@@ -842,6 +864,7 @@ func runGateway(ctx context.Context, be backend.Backend) error {
|
||||
|
||||
opts := []s3api.Option{
|
||||
s3api.WithConcurrencyLimiter(maxConnections, maxRequests),
|
||||
s3api.WithMpMaxParts(mpMaxParts),
|
||||
}
|
||||
if corsAllowOrigin != "" {
|
||||
opts = append(opts, s3api.WithCORSAllowOrigin(corsAllowOrigin))
|
||||
|
||||
@@ -148,6 +148,7 @@ func runPosix(ctx *cli.Context) error {
|
||||
ForceNoCopyFileRange: forceNoCopyFileRange,
|
||||
ValidateBucketNames: disableStrictBucketNames,
|
||||
Concurrency: actionsConcurrency,
|
||||
CopyObjectThreshold: copyObjectThreshold,
|
||||
}
|
||||
|
||||
var ms meta.MetadataStorer
|
||||
|
||||
@@ -134,6 +134,7 @@ func runScoutfs(ctx *cli.Context) error {
|
||||
opts.ValidateBucketNames = disableStrictBucketNames
|
||||
opts.SetProjectID = setProjectID
|
||||
opts.Concurrency = actionsConcurrency
|
||||
opts.CopyObjectThreshold = copyObjectThreshold
|
||||
|
||||
be, err := scoutfs.New(ctx.Args().Get(0), opts)
|
||||
if err != nil {
|
||||
|
||||
@@ -147,6 +147,18 @@ ROOT_SECRET_ACCESS_KEY=
|
||||
# strict validation checks.
|
||||
#VGW_DISABLE_STRICT_BUCKET_NAMES=false
|
||||
|
||||
# The VGW_MP_MAX_PARTS option sets the maximum number of parts allowed in a
|
||||
# single multipart upload. The S3 specification allows up to 10,000 parts per
|
||||
# multipart upload. The default value of 10000 matches the AWS S3 maximum.
|
||||
# Clients that attempt to upload more parts than this limit will receive an error.
|
||||
#VGW_MP_MAX_PARTS=10000
|
||||
|
||||
# The VGW_COPY_OBJECT_THRESHOLD option sets the maximum allowed source object
|
||||
# size in bytes for CopyObject and UploadPartCopy. Requests whose source
|
||||
# object exceeds this threshold will be rejected with an error. The default
|
||||
# value of 5368709120 (5 GiB) matches the AWS S3 'Copy' size limit.
|
||||
#VGW_COPY_OBJECT_THRESHOLD=5368709120
|
||||
|
||||
###############
|
||||
# Access Logs #
|
||||
###############
|
||||
|
||||
@@ -681,7 +681,7 @@ func TestAdminController_CreateBucket(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
s3api := New(be, iam, nil, nil, nil, false, false, "")
|
||||
s3api := New(be, iam, nil, nil, nil, false, false, "", 10000)
|
||||
|
||||
ctrl := AdminController{
|
||||
iam: iam,
|
||||
|
||||
@@ -38,6 +38,7 @@ type S3ApiController struct {
|
||||
logger s3log.AuditLogger
|
||||
evSender s3event.S3EventSender
|
||||
mm metrics.Manager
|
||||
mpMaxParts int
|
||||
readonly bool
|
||||
disableACL bool
|
||||
virtualDomain string
|
||||
@@ -60,7 +61,7 @@ var (
|
||||
xmlhdr = []byte(`<?xml version="1.0" encoding="UTF-8"?>` + "\n")
|
||||
)
|
||||
|
||||
func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender, mm metrics.Manager, readonly, disableACL bool, virtualDomain string) S3ApiController {
|
||||
func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender, mm metrics.Manager, readonly, disableACL bool, virtualDomain string, mpMaxParts int) S3ApiController {
|
||||
return S3ApiController{
|
||||
be: be,
|
||||
iam: iam,
|
||||
@@ -70,6 +71,7 @@ func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs
|
||||
mm: mm,
|
||||
disableACL: disableACL,
|
||||
virtualDomain: virtualDomain,
|
||||
mpMaxParts: mpMaxParts,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -282,7 +282,7 @@ func (c S3ApiController) UploadPart(ctx *fiber.Ctx) (*Response, error) {
|
||||
}, err
|
||||
}
|
||||
|
||||
if partNumber < minPartNumber || partNumber > maxPartNumber {
|
||||
if partNumber < minPartNumber || partNumber > int32(c.mpMaxParts) {
|
||||
debuglogger.Logf("invalid part number: %d", partNumber)
|
||||
return &Response{
|
||||
MetaOpts: &MetaOptions{
|
||||
|
||||
@@ -607,7 +607,8 @@ func TestS3ApiController_UploadPart(t *testing.T) {
|
||||
}
|
||||
|
||||
ctrl := S3ApiController{
|
||||
be: be,
|
||||
be: be,
|
||||
mpMaxParts: 10000,
|
||||
}
|
||||
|
||||
testController(
|
||||
|
||||
@@ -42,10 +42,11 @@ type S3ApiRouter struct {
|
||||
region string
|
||||
virtualDomain string
|
||||
corsAllowOrigin string
|
||||
mpMaxParts int
|
||||
}
|
||||
|
||||
func (sa *S3ApiRouter) Init() {
|
||||
ctrl := controllers.New(sa.be, sa.iam, sa.logger, sa.evs, sa.mm, sa.readonly, sa.disableACL, sa.virtualDomain)
|
||||
ctrl := controllers.New(sa.be, sa.iam, sa.logger, sa.evs, sa.mm, sa.readonly, sa.disableACL, sa.virtualDomain, sa.mpMaxParts)
|
||||
sa.Ctrl = ctrl
|
||||
adminServices := &controllers.Services{
|
||||
Logger: sa.aLogger,
|
||||
|
||||
@@ -175,6 +175,11 @@ func WithReadOnly() Option {
|
||||
return func(s *S3ApiServer) { s.Router.readonly = true }
|
||||
}
|
||||
|
||||
// WithMpMaxParts sets the maximum number of parts allowed in a multipart upload.
|
||||
func WithMpMaxParts(n int) Option {
|
||||
return func(s *S3ApiServer) { s.Router.mpMaxParts = n }
|
||||
}
|
||||
|
||||
// WithHostStyle enabled host-style bucket addressing on the server
|
||||
func WithHostStyle(virtualDomain string) Option {
|
||||
return func(s *S3ApiServer) {
|
||||
|
||||
@@ -1088,3 +1088,11 @@ func GetNegativeMaxLimiterErr(limiter string) APIError {
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
|
||||
func GetCopySourceObjectTooLargeErr(limit int64) APIError {
|
||||
return APIError{
|
||||
Code: "InvalidRequest",
|
||||
Description: fmt.Sprintf("The specified copy source is larger than the maximum allowable size for a copy source: %d", limit),
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user