mirror of
https://github.com/versity/versitygw.git
synced 2026-05-13 07:21:28 +00:00
fix: store object multipart upload metadata compressed
Store multipart upload metadata through shared backend helpers so POSIX and Azure use the same encode/decode path. POSIX stores raw gzipped JSON in metadata stores, while Azure stores base64-encoded gzip for string metadata. Retrieval falls back to the legacy raw JSON format for existing objects. Storing the mp metadata compressed in posix will guarantee that for any allowed number of parts, the metadata won't exceed the xattr threshold(64KB).
This commit is contained in:
@@ -505,8 +505,8 @@ func (az *Azure) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.G
|
||||
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
|
||||
// full object with no Content-Range; any other partNumber is out of range.
|
||||
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
|
||||
mpMeta, err := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
@@ -627,8 +627,8 @@ func (az *Azure) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3
|
||||
// For non-multipart objects (no mp-metadata), partNumber=1 returns the
|
||||
// full object with no Content-Range; any other partNumber is out of range.
|
||||
if mpMetaStr, ok := resp.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if err := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); err != nil {
|
||||
mpMeta, err := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
@@ -1779,8 +1779,8 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
|
||||
finalProps, propErr := finalClient.GetProperties(ctx, nil)
|
||||
if propErr == nil {
|
||||
if mpMetaStr, ok := finalProps.Metadata[string(keyMpMetadata)]; ok && mpMetaStr != nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if jsonErr := json.Unmarshal([]byte(*mpMetaStr), &mpMeta); jsonErr == nil && mpMeta.UploadID == *input.UploadId {
|
||||
mpMeta, metaErr := backend.UnmarshalMpUploadMetadata([]byte(*mpMetaStr), true)
|
||||
if metaErr == nil && mpMeta.UploadID == *input.UploadId {
|
||||
return s3response.CompleteMultipartUploadResult{
|
||||
Bucket: input.Bucket,
|
||||
Key: input.Key,
|
||||
@@ -1896,11 +1896,11 @@ func (az *Azure) CompleteMultipartUpload(ctx context.Context, input *s3.Complete
|
||||
|
||||
// Serialize multipart metadata so GetObject/HeadObject can serve by part-number.
|
||||
mpMeta := backend.MpUploadMetadata{UploadID: *input.UploadId, Parts: partSizes}
|
||||
mpMetaJSON, err := json.Marshal(mpMeta)
|
||||
mpMetaBytes, err := backend.MarshalMpUploadMetadata(mpMeta, true)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("marshal mp metadata: %w", err)
|
||||
}
|
||||
mpMetaStr := string(mpMetaJSON)
|
||||
mpMetaStr := string(mpMetaBytes)
|
||||
if props.Metadata == nil {
|
||||
props.Metadata = map[string]*string{}
|
||||
}
|
||||
|
||||
@@ -15,8 +15,12 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"crypto/md5"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
@@ -416,6 +420,84 @@ type MpUploadMetadata struct {
|
||||
Parts []int64 `json:"parts"`
|
||||
}
|
||||
|
||||
// MarshalMpUploadMetadata returns a compressed representation of multipart
|
||||
// metadata. When base64Encode is true, the compressed bytes are base64-encoded
|
||||
// so they can be sent in azure string-only metadata headers
|
||||
func MarshalMpUploadMetadata(mpMeta MpUploadMetadata, base64Encode bool) ([]byte, error) {
|
||||
mpMetaJSON, err := json.Marshal(mpMeta)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal mp metadata: %w", err)
|
||||
}
|
||||
|
||||
compressed, err := compressMpUploadMetadata(mpMetaJSON)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("compress mp metadata: %w", err)
|
||||
}
|
||||
|
||||
if !base64Encode {
|
||||
return compressed, nil
|
||||
}
|
||||
|
||||
encoded := make([]byte, base64.StdEncoding.EncodedLen(len(compressed)))
|
||||
base64.StdEncoding.Encode(encoded, compressed)
|
||||
return encoded, nil
|
||||
}
|
||||
|
||||
// UnmarshalMpUploadMetadata decodes metadata produced by MarshalMpUploadMetadata.
|
||||
// It also accepts the legacy raw JSON form so existing multipart objects remain readable
|
||||
func UnmarshalMpUploadMetadata(data []byte, base64Decode bool) (MpUploadMetadata, error) {
|
||||
if base64Decode {
|
||||
compressed, err := base64.StdEncoding.DecodeString(string(data))
|
||||
if err == nil {
|
||||
if mpMeta, err := unmarshalCompressedMpUploadMetadata(compressed); err == nil {
|
||||
return mpMeta, nil
|
||||
}
|
||||
}
|
||||
} else if mpMeta, err := unmarshalCompressedMpUploadMetadata(data); err == nil {
|
||||
return mpMeta, nil
|
||||
}
|
||||
|
||||
var mpMeta MpUploadMetadata
|
||||
if err := json.Unmarshal(data, &mpMeta); err != nil {
|
||||
return mpMeta, fmt.Errorf("unmarshal mp metadata: %w", err)
|
||||
}
|
||||
return mpMeta, nil
|
||||
}
|
||||
|
||||
func compressMpUploadMetadata(data []byte) ([]byte, error) {
|
||||
var compressed bytes.Buffer
|
||||
gz := gzip.NewWriter(&compressed)
|
||||
if _, err := gz.Write(data); err != nil {
|
||||
_ = gz.Close()
|
||||
return nil, err
|
||||
}
|
||||
if err := gz.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return compressed.Bytes(), nil
|
||||
}
|
||||
|
||||
func unmarshalCompressedMpUploadMetadata(compressed []byte) (MpUploadMetadata, error) {
|
||||
var mpMeta MpUploadMetadata
|
||||
gz, err := gzip.NewReader(bytes.NewReader(compressed))
|
||||
if err != nil {
|
||||
return mpMeta, fmt.Errorf("decompress mp metadata: %w", err)
|
||||
}
|
||||
decompressed, err := io.ReadAll(gz)
|
||||
closeErr := gz.Close()
|
||||
if err != nil {
|
||||
return mpMeta, fmt.Errorf("decompress mp metadata: %w", err)
|
||||
}
|
||||
if closeErr != nil {
|
||||
return mpMeta, fmt.Errorf("decompress mp metadata: %w", closeErr)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(decompressed, &mpMeta); err != nil {
|
||||
return mpMeta, fmt.Errorf("unmarshal mp metadata: %w", err)
|
||||
}
|
||||
return mpMeta, nil
|
||||
}
|
||||
|
||||
type FileSectionReadCloser struct {
|
||||
R io.Reader
|
||||
F *os.File
|
||||
|
||||
@@ -15,12 +15,98 @@
|
||||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/versity/versitygw/s3err"
|
||||
)
|
||||
|
||||
func TestMpUploadMetadataRawGzipRoundTrip(t *testing.T) {
|
||||
want := MpUploadMetadata{
|
||||
UploadID: "upload-id",
|
||||
Parts: []int64{5, 12, 12},
|
||||
}
|
||||
|
||||
stored, err := MarshalMpUploadMetadata(want, false)
|
||||
if err != nil {
|
||||
t.Fatalf("MarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if len(stored) < 2 || stored[0] != 0x1f || stored[1] != 0x8b {
|
||||
t.Fatalf("stored metadata should contain raw gzip payload: %q", stored)
|
||||
}
|
||||
if bytes.HasPrefix(stored, []byte("{")) {
|
||||
t.Fatalf("stored metadata should not be raw JSON: %q", stored)
|
||||
}
|
||||
|
||||
got, err := UnmarshalMpUploadMetadata(stored, false)
|
||||
if err != nil {
|
||||
t.Fatalf("UnmarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("metadata mismatch: got %+v want %+v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMpUploadMetadataBase64RoundTrip(t *testing.T) {
|
||||
want := MpUploadMetadata{
|
||||
UploadID: "azure-upload-id",
|
||||
Parts: []int64{10, 20, 35},
|
||||
}
|
||||
|
||||
stored, err := MarshalMpUploadMetadata(want, true)
|
||||
if err != nil {
|
||||
t.Fatalf("MarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if len(stored) >= 2 && stored[0] == 0x1f && stored[1] == 0x8b {
|
||||
t.Fatalf("stored metadata should not contain raw gzip bytes: %q", stored)
|
||||
}
|
||||
|
||||
got, err := UnmarshalMpUploadMetadata(stored, true)
|
||||
if err != nil {
|
||||
t.Fatalf("UnmarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("metadata mismatch: got %+v want %+v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalMpUploadMetadataLegacyJSON(t *testing.T) {
|
||||
want := MpUploadMetadata{
|
||||
UploadID: "legacy-upload-id",
|
||||
Parts: []int64{1, 3, 6},
|
||||
}
|
||||
|
||||
stored, err := json.Marshal(want)
|
||||
if err != nil {
|
||||
t.Fatalf("json.Marshal: %v", err)
|
||||
}
|
||||
|
||||
got, err := UnmarshalMpUploadMetadata(stored, false)
|
||||
if err != nil {
|
||||
t.Fatalf("UnmarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("metadata mismatch: got %+v want %+v", got, want)
|
||||
}
|
||||
got, err = UnmarshalMpUploadMetadata(stored, true)
|
||||
if err != nil {
|
||||
t.Fatalf("UnmarshalMpUploadMetadata: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("metadata mismatch: got %+v want %+v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnmarshalMpUploadMetadataInvalid(t *testing.T) {
|
||||
_, err := UnmarshalMpUploadMetadata([]byte("not-gzip-or-json"), false)
|
||||
if err == nil {
|
||||
t.Fatal("expected invalid metadata error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCopySource(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -1828,8 +1828,8 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
// multipart upload may have been finalized and the final object has been created
|
||||
// before or by the racing request
|
||||
if mpMetaBytes, statErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey); statErr == nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
|
||||
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("parse object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
@@ -2245,12 +2245,12 @@ func (p *Posix) CompleteMultipartUploadWithCopy(ctx context.Context, input *s3.C
|
||||
// Store multipart upload metadata on the final object so that GetObject /
|
||||
// HeadObject can serve individual parts by part-number.
|
||||
mpMeta := backend.MpUploadMetadata{UploadID: uploadID, Parts: partSizes}
|
||||
mpMetaJSON, err := json.Marshal(mpMeta)
|
||||
mpMetaBytes, err := backend.MarshalMpUploadMetadata(mpMeta, false)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("marshal object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaJSON)
|
||||
err = p.meta.StoreAttribute(f.File(), bucket, object, mpMetaKey, mpMetaBytes)
|
||||
if err != nil {
|
||||
return res, "", fmt.Errorf("set object multipart metadata: %w", err)
|
||||
}
|
||||
@@ -4641,8 +4641,8 @@ func (p *Posix) GetObject(ctx context.Context, input *s3.GetObjectInput) (*s3.Ge
|
||||
if input.PartNumber != nil {
|
||||
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
|
||||
if metaErr == nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
|
||||
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
@@ -4885,8 +4885,8 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3.
|
||||
if input.PartNumber != nil {
|
||||
mpMetaBytes, metaErr := p.meta.RetrieveAttribute(nil, bucket, object, mpMetaKey)
|
||||
if metaErr == nil {
|
||||
var mpMeta backend.MpUploadMetadata
|
||||
if err := json.Unmarshal(mpMetaBytes, &mpMeta); err != nil {
|
||||
mpMeta, err := backend.UnmarshalMpUploadMetadata(mpMetaBytes, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse object multipart metadata: %w", err)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user