diff --git a/test/s3/sse/s3_sse_concurrent_repro_test.go b/test/s3/sse/s3_sse_concurrent_repro_test.go new file mode 100644 index 000000000..da84f6cd6 --- /dev/null +++ b/test/s3/sse/s3_sse_concurrent_repro_test.go @@ -0,0 +1,310 @@ +package sse_test + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" +) + +// TestSSES3ConcurrentMultipartDigestIntegration mirrors the production +// scenario reported in issue #8908: Docker Registry pushes several large +// container images in parallel, each push being itself a multipart upload of +// many ~5MB parts under bucket-default SSE-S3. Registry pulls fail with +// "Digest did not match" because the SHA-256 of the streamed GET body does +// not match the SHA-256 of what was uploaded. +// +// Coverage that the existing TestSSEMultipartManyChunksIntegration does NOT +// have: +// - bucket-default SSE-S3 (not explicit per-request headers) +// - multiple objects uploaded concurrently +// - parts within each object uploaded concurrently +// - both full-body GET (Docker Registry) and chunked range GETs (kubelet/CRI) +// +// If the SHA over the streamed body ever differs from the SHA over the +// uploaded bytes, the test fails with the exact bucket/key, expected SHA, +// and actual SHA — the same shape Docker Registry surfaces on pull. +// +// The function name ends in "Integration" so it is matched by the existing +// `TestSSE.*Integration` pattern in test/s3/sse/Makefile and the +// `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml, +// so the regression coverage runs automatically in CI. +func TestSSES3ConcurrentMultipartDigestIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-concurrent-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(bucketName), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set bucket default SSE-S3 encryption") + + const ( + numObjects = 5 // mirrors "5 images at a time" + partsPerObject = 12 // 12 parts × 5MB = 60MB per blob; total 5×60=300MB + partSize = 5 * 1024 * 1024 // S3 minimum part size + uploadParallel = 8 // mirror typical S3 SDK transfer manager concurrency + iterations = 2 // run twice to flush out flakiness without bloating CI + ) + + type blob struct { + key string + parts [][]byte + full []byte + fullSHA [32]byte + } + + makeBlobs := func(iter int) []blob { + blobs := make([]blob, numObjects) + for i := range blobs { + parts := make([][]byte, partsPerObject) + for j := range parts { + parts[j] = generateTestData(partSize) + } + full := bytes.Join(parts, nil) + blobs[i] = blob{ + key: fmt.Sprintf("iter%02d-blob-%02d", iter, i), + parts: parts, + full: full, + fullSHA: sha256.Sum256(full), + } + } + return blobs + } + + uploadOne := func(b blob) error { + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + }) + if err != nil { + return fmt.Errorf("create multipart for %s: %w", b.key, err) + } + uploadID := aws.ToString(createResp.UploadId) + + completedParts := make([]types.CompletedPart, partsPerObject) + var ( + wg sync.WaitGroup + partErr error + partMu sync.Mutex + ) + sem := make(chan struct{}, uploadParallel) + for i := 0; i < partsPerObject; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + resp, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + PartNumber: aws.Int32(int32(i + 1)), + UploadId: aws.String(uploadID), + Body: bytes.NewReader(b.parts[i]), + }) + if err != nil { + partMu.Lock() + if partErr == nil { + partErr = fmt.Errorf("upload part %d of %s: %w", i+1, b.key, err) + } + partMu.Unlock() + return + } + completedParts[i] = types.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int32(int32(i + 1)), + } + }(i) + } + wg.Wait() + if partErr != nil { + _, _ = client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(bucketName), Key: aws.String(b.key), UploadId: aws.String(uploadID), + }) + return partErr + } + + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + if err != nil { + return fmt.Errorf("complete multipart for %s: %w", b.key, err) + } + return nil + } + + type result struct { + key string + expected string + actual string + gotSize int64 + wantSize int64 + readErr error + } + + verifyFullGET := func(b blob) result { + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + }) + if err != nil { + return result{key: "FULL " + b.key, readErr: fmt.Errorf("GetObject: %w", err)} + } + defer resp.Body.Close() + h := sha256.New() + n, copyErr := io.Copy(h, resp.Body) + var actual [32]byte + copy(actual[:], h.Sum(nil)) + return result{ + key: "FULL " + b.key, + expected: hex.EncodeToString(b.fullSHA[:]), + actual: hex.EncodeToString(actual[:]), + gotSize: n, + wantSize: int64(len(b.full)), + readErr: copyErr, + } + } + + verifyRangeGET := func(b blob) result { + // 1MB windows. Includes a window that crosses the 8MB internal-chunk + // boundary (chunkSize in putToFiler) — the historical danger zone for + // SSE-S3 multipart range reads. + const window = 1024 * 1024 + h := sha256.New() + var totalN int64 + objectSize := int64(len(b.full)) + for offset := int64(0); offset < objectSize; offset += window { + end := offset + window - 1 + if end >= objectSize { + end = objectSize - 1 + } + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, end)), + }) + if err != nil { + return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range GetObject [%d-%d]: %w", offset, end, err)} + } + n, copyErr := io.Copy(h, resp.Body) + resp.Body.Close() + totalN += n + if copyErr != nil { + return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range copy [%d-%d]: %w", offset, end, copyErr)} + } + } + var actual [32]byte + copy(actual[:], h.Sum(nil)) + return result{ + key: "RANGE " + b.key, + expected: hex.EncodeToString(b.fullSHA[:]), + actual: hex.EncodeToString(actual[:]), + gotSize: totalN, + wantSize: objectSize, + } + } + + var allFailures []string + for iter := 0; iter < iterations; iter++ { + blobs := makeBlobs(iter) + + // Upload all N blobs in parallel. + { + var ( + wg sync.WaitGroup + errMu sync.Mutex + errors []error + ) + for i := range blobs { + wg.Add(1) + go func(i int) { + defer wg.Done() + if err := uploadOne(blobs[i]); err != nil { + errMu.Lock() + errors = append(errors, err) + errMu.Unlock() + } + }(i) + } + wg.Wait() + require.Empty(t, errors, "iter %d: concurrent multipart uploads must succeed", iter) + } + + // Two passes (full body + range), all blobs verified concurrently. + results := make([]result, 0, 2*numObjects) + var resultMu sync.Mutex + var wg sync.WaitGroup + for i := range blobs { + wg.Add(2) + go func(b blob) { + defer wg.Done() + r := verifyFullGET(b) + resultMu.Lock() + results = append(results, r) + resultMu.Unlock() + }(blobs[i]) + go func(b blob) { + defer wg.Done() + r := verifyRangeGET(b) + resultMu.Lock() + results = append(results, r) + resultMu.Unlock() + }(blobs[i]) + } + wg.Wait() + + for _, r := range results { + if r.readErr != nil { + allFailures = append(allFailures, fmt.Sprintf("[iter %d %s] read error: %v (got %d / want %d bytes)", + iter, r.key, r.readErr, r.gotSize, r.wantSize)) + continue + } + if r.expected != r.actual { + allFailures = append(allFailures, fmt.Sprintf( + "[iter %d %s] DIGEST MISMATCH: expected sha256:%s, got sha256:%s (got %d / want %d bytes)", + iter, r.key, r.expected, r.actual, r.gotSize, r.wantSize)) + } + } + } + if len(allFailures) > 0 { + t.Fatalf("%d concurrent SSE-S3 multipart blob digest mismatches across %d iterations:\n %s", + len(allFailures), iterations, joinFailures(allFailures)) + } +} + +func joinFailures(ss []string) string { + out := "" + for i, s := range ss { + if i > 0 { + out += "\n " + } + out += s + } + return out +} diff --git a/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go b/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go new file mode 100644 index 000000000..8fbfa240d --- /dev/null +++ b/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go @@ -0,0 +1,190 @@ +package sse_test + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" +) + +// TestSSES3MultipartUploadPartCopyIntegration pins the fix for issue #8908. +// +// Docker Registry's S3 storage driver finalizes blob uploads via a server-side +// "Move" pattern: a streaming PUT/multipart upload to a temporary key, then +// CreateMultipartUpload + UploadPartCopy(s) + CompleteMultipartUpload to put +// the bytes at the final blob path. Under bucket-default SSE-S3, every push +// goes through this UploadPartCopy step. +// +// Before the fix, copyChunksForRange did a raw byte copy that left the +// destination's part chunks SseType=NONE. Then completedMultipartChunk +// (PR #9224) saw NONE chunks in an SSE-S3 multipart upload and "backfilled" +// SSE-S3 metadata with IVs derived from the destination upload's baseIV. But +// the bytes on disk had been encrypted with the SOURCE upload's key+baseIV, +// so the read path decrypted with the wrong IV — yielding deterministic byte +// corruption on GET (the "Digest did not match" symptom kubelet surfaces). +// +// This test reproduces the exact shape: a 39MB plaintext source (single +// PutObject — auto-chunked into multiple internal SSE-S3 chunks on disk +// because of bucket-default SSE-S3), then a fresh multipart upload at a new +// destination key with two UploadPartCopy parts (32MB + 7MB) and Complete. +// The full GET must SHA back to what was uploaded. +// +// The function name contains both "Multipart" and "Integration" so it is matched +// by the `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml +// and the `TestSSE.*Integration` pattern in test/s3/sse/Makefile, ensuring this +// regression coverage runs in CI. +func TestSSES3MultipartUploadPartCopyIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-uploadpartcopy-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + // Bucket-default SSE-S3 — same setup Docker Registry uses. + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(bucketName), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set bucket default SSE-S3") + + // Source: 39MB single PutObject (auto-chunked internally into 5 SSE-S3 chunks). + const sourceSize = 39 * 1024 * 1024 + sourceData := generateTestData(sourceSize) + expectedSHA := sha256.Sum256(sourceData) + + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String("source-blob"), + Body: bytes.NewReader(sourceData), + }) + require.NoError(t, err, "Failed to upload source object") + + // Sanity check: the source itself must round-trip correctly. + { + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String("source-blob"), + }) + require.NoError(t, err, "Failed to GET source") + got, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedSHA, sha256.Sum256(got), "source object must round-trip") + } + + cases := []struct { + name string + // part definitions: each entry is (start, end) byte range to copy. + parts [][2]int64 + }{ + { + // Docker Registry's typical Move shape for blobs around 40MB: + // one 32MB part + one tail part. This is exactly the metadata + // shape the user reported (5 dst chunks across 2 multipart parts). + name: "DockerRegistry_32MB_Plus_Tail", + parts: [][2]int64{ + {0, 32*1024*1024 - 1}, + {32 * 1024 * 1024, sourceSize - 1}, + }, + }, + { + // Single full-object UploadPartCopy. + name: "Single_Full_Object_Copy", + parts: [][2]int64{ + {0, sourceSize - 1}, + }, + }, + { + // Many small range-copies — exercises the per-part-local-offset + // IV math under varied chunk-overlap shapes. + name: "Many_5MB_Ranges", + parts: [][2]int64{ + {0, 5*1024*1024 - 1}, + {5 * 1024 * 1024, 10*1024*1024 - 1}, + {10 * 1024 * 1024, 15*1024*1024 - 1}, + {15 * 1024 * 1024, 20*1024*1024 - 1}, + {20 * 1024 * 1024, sourceSize - 1}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + dstKey := "dest-" + strings.ToLower(strings.ReplaceAll(tc.name, "_", "-")) + + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + }) + require.NoError(t, err, "CreateMultipartUpload") + uploadID := aws.ToString(createResp.UploadId) + + completedParts := make([]types.CompletedPart, 0, len(tc.parts)) + for i, rng := range tc.parts { + partNumber := int32(i + 1) + resp, err := client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + CopySource: aws.String(bucketName + "/source-blob"), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", rng[0], rng[1])), + }) + require.NoErrorf(t, err, "UploadPartCopy part %d range=[%d,%d]", partNumber, rng[0], rng[1]) + completedParts = append(completedParts, types.CompletedPart{ + ETag: resp.CopyPartResult.ETag, + PartNumber: aws.Int32(partNumber), + }) + } + + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + require.NoError(t, err, "CompleteMultipartUpload") + + // Two-pass verification: full GET (Docker Registry / Kubelet shape). + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + }) + require.NoError(t, err, "GetObject") + defer resp.Body.Close() + + h := sha256.New() + n, err := io.Copy(h, resp.Body) + require.NoError(t, err, "stream GET body") + require.Equal(t, int64(sourceSize), n, "GET length") + + var actual [32]byte + copy(actual[:], h.Sum(nil)) + + require.Equalf(t, expectedSHA, actual, + "UploadPartCopy SHA mismatch (#8908):\n expected sha256:%s\n got sha256:%s\n parts=%d size=%d", + hex.EncodeToString(expectedSHA[:]), + hex.EncodeToString(actual[:]), + len(tc.parts), n) + }) + } +} diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index cb4c6a5e8..843e7364e 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -714,6 +714,61 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } } + // Fetch the destination upload entry to determine whether the multipart + // upload was created with SSE configured. If either side has SSE, the + // fast raw-byte chunk copy below would leave destination chunks tagged + // inconsistently with the bytes on disk and trigger #8908's deterministic + // byte corruption on GET. Re-encrypt the source bytes in that case so + // destination chunks come out properly tagged. + // + // checkUploadId above only verifies that the uploadID's hash prefix + // matches dstObject; it does NOT prove the upload directory exists. + // Treat a missing upload entry as NoSuchUpload — falling through with + // uploadEntry=nil would silently skip the SSE check on the destination + // side and could send a plain-source copy through the raw-byte fast + // path even though the destination's encryption state is unknown. + uploadEntry, uploadEntryErr := s3a.getEntry(s3a.genUploadsFolder(dstBucket), uploadID) + if uploadEntryErr != nil { + if errors.Is(uploadEntryErr, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } + glog.Errorf("CopyObjectPartHandler: failed to fetch upload entry for %s/%s uploadID=%s: %v", + dstBucket, dstObject, uploadID, uploadEntryErr) + // Distinguish transient from permanent errors: gRPC Unavailable + // (filer briefly unreachable, leader election in flight, etc.) and + // DeadlineExceeded both indicate the client should retry rather than + // give up. Map them to 503 ServiceUnavailable; everything else stays + // as 500 InternalError. + if isTransientFilerError(uploadEntryErr) { + s3err.WriteErrorResponse(w, r, s3err.ErrServiceUnavailable) + return + } + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + if uploadEntryHasSSE(uploadEntry) || sourceEntryHasSSE(entry) { + etag, sseMetadata, errCode := s3a.copyObjectPartViaReencryption(r, entry, startOffset, endOffset, dstBucket, uploadID, partID, uploadEntry) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + setEtag(w, "\""+strings.Trim(etag, "\"")+"\"") + // Mirror PutObjectPartHandler: write x-amz-server-side-encryption / + // x-amz-server-side-encryption-aws-kms-key-id headers on the response + // so clients can see the destination's encryption state. + s3a.setSSEResponseHeaders(w, r, sseMetadata) + writeSuccessResponseXML(w, r, CopyPartResult{ + ETag: etag, + LastModified: t, + }) + return + } + + // Fast path: neither source nor destination has SSE. Raw byte copy is + // safe, since the bytes on disk are plaintext on both sides. + // Create new entry for the part // Calculate part size, avoiding underflow for invalid ranges partSize := uint64(0) diff --git a/weed/s3api/s3api_object_handlers_copy_part_sse.go b/weed/s3api/s3api_object_handlers_copy_part_sse.go new file mode 100644 index 000000000..47f496c2f --- /dev/null +++ b/weed/s3api/s3api_object_handlers_copy_part_sse.go @@ -0,0 +1,425 @@ +package s3api + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sort" + "strconv" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// errCopySourceSSEUnsupported is returned by openSourcePlaintextReader when +// the source object's SSE type is not yet implemented in the UploadPartCopy +// slow path. Callers map it to a 501 NotImplemented S3 response so clients +// can distinguish "we will not handle this shape" from "the server failed". +var errCopySourceSSEUnsupported = errors.New("UploadPartCopy source SSE type not yet supported") + +// isTransientFilerError reports whether an error talking to the filer is +// retryable from the client's perspective (filer briefly unreachable, leader +// election in flight, deadline exceeded, etc.). Such errors should map to a +// 503 ServiceUnavailable response so SDK retry logic engages, rather than a +// 500 InternalError which most clients treat as fatal. +func isTransientFilerError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + if s, ok := status.FromError(err); ok { + switch s.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted: + return true + } + } + return false +} + +// uploadEntryHasSSE reports whether the multipart upload entry was created +// with any server-side encryption configured (SSE-S3 or SSE-KMS — explicit at +// CreateMultipartUpload time or applied as bucket default). It is used to +// decide whether UploadPartCopy must re-encrypt source bytes for the +// destination, rather than copying them as raw bytes (the fast path). +func uploadEntryHasSSE(uploadEntry *filer_pb.Entry) bool { + if uploadEntry == nil || uploadEntry.Extended == nil { + return false + } + if _, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; ok { + return true + } + if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSES3Encryption]; ok && string(v) == s3_constants.SSEAlgorithmAES256 { + return true + } + return false +} + +// sourceEntryHasSSE reports whether the source object's chunks are SSE +// ciphertext on disk and therefore cannot be raw-copied — they must be +// decrypted on read. +func sourceEntryHasSSE(srcEntry *filer_pb.Entry) bool { + if srcEntry == nil { + return false + } + for _, c := range srcEntry.GetChunks() { + if c.GetSseType() != filer_pb.SSEType_NONE { + return true + } + } + if srcEntry.Extended != nil { + if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key]; ok { + return true + } + if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; ok { + return true + } + if _, ok := srcEntry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; ok { + return true + } + } + return false +} + +// readCloserAdapter pairs an arbitrary io.Reader with an io.Closer so callers +// can release the original underlying source even when the inner Reader (e.g. +// cipher.StreamReader, io.LimitReader) does not implement io.Closer. +type readCloserAdapter struct { + io.Reader + closer io.Closer +} + +func (r *readCloserAdapter) Close() error { + if r.closer == nil { + return nil + } + return r.closer.Close() +} + +// openSourcePlaintextReader returns a reader yielding the source object's +// plaintext bytes for [startOffset, endOffset], applying any necessary SSE +// decryption based on the source entry's metadata. +// +// Used by CopyObjectPartHandler when source or destination is SSE-encrypted: +// the fast raw-chunk-copy path leaves destination chunks SseType=NONE and +// completedMultipartChunk's NONE→SSE_S3 backfill (PR #9224) then writes +// destination-baseIV-derived metadata onto bytes that were actually encrypted +// with the source's key — producing deterministic byte corruption on GET (#8908). +// +// Returns errCopySourceSSEUnsupported when the source's SSE type is not yet +// implemented in this slow path (SSE-KMS, SSE-C). Callers should map that +// sentinel to a 501 NotImplemented S3 response rather than collapsing it to +// 500 InternalError, so clients can distinguish "we will not handle this +// shape" from "the server failed". +func (s3a *S3ApiServer) openSourcePlaintextReader( + ctx context.Context, + srcEntry *filer_pb.Entry, + startOffset, endOffset int64, +) (io.ReadCloser, error) { + if srcEntry == nil { + return nil, fmt.Errorf("nil source entry") + } + if endOffset < startOffset { + return io.NopCloser(io.LimitReader(emptyReader{}, 0)), nil + } + sliceLen := endOffset - startOffset + 1 + + switch s3a.detectPrimarySSEType(srcEntry) { + case s3_constants.SSETypeS3: + return s3a.openSSES3SourcePlaintextReader(ctx, srcEntry, startOffset, sliceLen) + case s3_constants.SSETypeKMS: + return nil, fmt.Errorf("%w: UploadPartCopy from SSE-KMS source", errCopySourceSSEUnsupported) + case s3_constants.SSETypeC: + return nil, fmt.Errorf("%w: UploadPartCopy from SSE-C source", errCopySourceSSEUnsupported) + default: + // Unencrypted source: stream raw bytes and apply range. + raw, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry) + if err != nil { + return nil, fmt.Errorf("open unencrypted source: %w", err) + } + return applyRange(raw, startOffset, sliceLen) + } +} + +// openSSES3SourcePlaintextReader builds a decrypted reader for an SSE-S3 +// source. It reuses buildMultipartSSES3Reader, which decrypts each chunk +// independently using its per-chunk metadata — correct for both multipart-SSE +// objects (multiple SSE-S3 chunks) and single-part SSE-S3 objects whose single +// chunk also carries per-chunk metadata after PR #9211. +// +// For older single-part SSE-S3 objects whose chunks lack per-chunk metadata, +// this falls back to the entry-level SSE-S3 key + the entry's stored IV, +// matching the read path's single-part fallback. +func (s3a *S3ApiServer) openSSES3SourcePlaintextReader( + ctx context.Context, + srcEntry *filer_pb.Entry, + startOffset, sliceLen int64, +) (io.ReadCloser, error) { + chunks := srcEntry.GetChunks() + hasPerChunkSSE := false + for _, c := range chunks { + if c.GetSseType() == filer_pb.SSEType_SSE_S3 && len(c.GetSseMetadata()) > 0 { + hasPerChunkSSE = true + break + } + } + + if hasPerChunkSSE { + sortedChunks := make([]*filer_pb.FileChunk, len(chunks)) + copy(sortedChunks, chunks) + sort.Slice(sortedChunks, func(i, j int) bool { + return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset() + }) + decReader, err := buildMultipartSSES3Reader( + sortedChunks, + GetSSES3KeyManager(), + func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + return s3a.createEncryptedChunkReader(ctx, c) + }, + ) + if err != nil { + return nil, fmt.Errorf("build SSE-S3 source reader: %w", err) + } + // buildMultipartSSES3Reader returns a *lazyMultipartChunkReader whose + // Close() releases the live chunk body. Use it as the closer. + var closer io.Closer + if rc, ok := decReader.(io.Closer); ok { + closer = rc + } + return applyRange(&readCloserAdapter{Reader: decReader, closer: closer}, startOffset, sliceLen) + } + + // Legacy single-part fallback: entry-level SeaweedFSSSES3Key + entry IV. + keyData, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key] + if !ok || len(keyData) == 0 { + return nil, fmt.Errorf("SSE-S3 source has no per-chunk metadata and no entry-level SSE-S3 key") + } + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("deserialize entry-level SSE-S3 key: %w", err) + } + iv, err := GetSSES3IV(srcEntry, sseS3Key, keyManager) + if err != nil { + return nil, fmt.Errorf("get SSE-S3 IV: %w", err) + } + encStream, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry) + if err != nil { + return nil, fmt.Errorf("open ciphertext source: %w", err) + } + dec, err := CreateSSES3DecryptedReader(encStream, sseS3Key, iv) + if err != nil { + encStream.Close() + return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", err) + } + rc, ok := dec.(io.ReadCloser) + if !ok { + rc = &readCloserAdapter{Reader: dec, closer: encStream} + } + return applyRange(rc, startOffset, sliceLen) +} + +// applyRange skips startOffset bytes from src and limits the result to +// sliceLen bytes. The returned ReadCloser closes the underlying source. +func applyRange(src io.ReadCloser, startOffset, sliceLen int64) (io.ReadCloser, error) { + if startOffset > 0 { + if _, err := io.CopyN(io.Discard, src, startOffset); err != nil { + src.Close() + return nil, fmt.Errorf("skip to range start %d: %w", startOffset, err) + } + } + if sliceLen <= 0 { + return &readCloserAdapter{Reader: io.LimitReader(src, 0), closer: src}, nil + } + return &readCloserAdapter{Reader: io.LimitReader(src, sliceLen), closer: src}, nil +} + +// emptyReader yields no bytes. Used for empty-range UploadPartCopy. +type emptyReader struct{} + +func (emptyReader) Read([]byte) (int, error) { return 0, io.EOF } + +// applyDestSSEHeadersToCopyRequest stages the destination's SSE setup on the +// (cloned) request so that putToFiler's existing handleAllSSEEncryption picks +// it up. The upload-entry markers (laid down at CreateMultipartUpload) bind +// every part of the upload to the same key+baseIV, matching PutObjectPart. +func (s3a *S3ApiServer) applyDestSSEHeadersToCopyRequest( + r *http.Request, uploadEntry *filer_pb.Entry, uploadID string, +) error { + if uploadEntry == nil || uploadEntry.Extended == nil { + return nil + } + + if keyIDBytes, hasKMS := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; hasKMS { + // Mirror the SSE-KMS branch of PutObjectPartHandler: stage + // X-Amz-Server-Side-Encryption=aws:kms plus the key ID, encryption + // context, bucket-key flag and base IV onto the request. + keyID := string(keyIDBytes) + + bucketKeyEnabled := false + if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled]; ok && string(v) == "true" { + bucketKeyEnabled = true + } + + var encryptionContext map[string]string + if cb, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext]; ok { + if err := json.Unmarshal(cb, &encryptionContext); err != nil { + glog.Errorf("UploadPartCopy: failed to parse SSE-KMS context for upload %s: %v", uploadID, err) + encryptionContext = nil + } + } + if len(encryptionContext) == 0 { + // Bucket and object are populated on the cloned request; reuse + // the same builder PutObjectPartHandler does. + bucket, object := s3_constants.GetBucketAndObject(r) + encryptionContext = BuildEncryptionContext(bucket, object, bucketKeyEnabled) + } + + var baseIV []byte + if ivBytes, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV]; ok { + decoded, decErr := base64.StdEncoding.DecodeString(string(ivBytes)) + if decErr != nil || len(decoded) != s3_constants.AESBlockSize { + return fmt.Errorf("invalid SSE-KMS base IV on upload %s", uploadID) + } + baseIV = decoded + } else { + return fmt.Errorf("no SSE-KMS base IV on upload %s", uploadID) + } + + r.Header.Set(s3_constants.AmzServerSideEncryption, "aws:kms") + r.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) + if bucketKeyEnabled { + r.Header.Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } + if len(encryptionContext) > 0 { + if cj, err := json.Marshal(encryptionContext); err == nil { + r.Header.Set(s3_constants.AmzServerSideEncryptionContext, base64.StdEncoding.EncodeToString(cj)) + } + } + r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV)) + return nil + } + + // SSE-S3 path: reuse the existing PutObjectPart helper unchanged. It is + // pure header manipulation on r and does not touch S3ApiServer state. + return s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID) +} + +// fakeContentRequest builds a minimal request representing "PUT this body" for +// the multipart-part write path used by UploadPartCopy. It clones the original +// request's headers (so things like AmzAccountId carry over) and clears the +// copy-only headers; SSE setup is added later by applyDestSSEHeadersToCopyRequest. +func fakeContentRequest(orig *http.Request, body io.ReadCloser, contentLength int64) *http.Request { + cloned := orig.Clone(orig.Context()) + cloned.Body = body + cloned.ContentLength = contentLength + if cloned.Header == nil { + cloned.Header = http.Header{} + } else { + cloned.Header = cloned.Header.Clone() + } + cloned.Header.Set("Content-Length", strconv.FormatInt(contentLength, 10)) + cloned.Header.Del("X-Amz-Copy-Source") + cloned.Header.Del("X-Amz-Copy-Source-Range") + cloned.Header.Del("X-Amz-Metadata-Directive") + cloned.Header.Del("X-Amz-Tagging-Directive") + // Content-Md5 cannot be reproduced from the source plaintext without + // streaming it once first; clear it so putToFiler doesn't validate. + cloned.Header.Del("Content-Md5") + return cloned +} + +// copyObjectPartViaReencryption implements the slow path of UploadPartCopy when +// either the source object is SSE-encrypted or the destination multipart upload +// is configured for SSE encryption. It: +// +// 1. Opens a plaintext reader of the source range (decrypting if needed). +// 2. Stages the destination's SSE-S3 / SSE-KMS multipart headers on a cloned +// request so handleAllSSEEncryption (called from putToFiler) routes the +// body through the matching multipart-encryption helper. +// 3. Calls putToFiler with the plaintext reader, which encrypts using the +// destination upload session's key+baseIV (consistent with PutObjectPart), +// auto-chunks, and writes the part entry with proper per-chunk SSE metadata. +// +// Without this path, copyChunksForRange's raw byte copy leaves destination +// chunks SseType=NONE; completedMultipartChunk then "backfills" SSE-S3 metadata +// with destination-baseIV-derived IVs, but the bytes on disk were encrypted +// with the source's key — yielding deterministic byte corruption on GET (#8908). +func (s3a *S3ApiServer) copyObjectPartViaReencryption( + r *http.Request, + srcEntry *filer_pb.Entry, + startOffset, endOffset int64, + dstBucket, uploadID string, + partID int, + uploadEntry *filer_pb.Entry, +) (etag string, sseMetadata SSEResponseMetadata, errCode s3err.ErrorCode) { + if endOffset < startOffset { + tag, code := s3a.writeEmptyCopyPart(dstBucket, uploadID, partID) + return tag, SSEResponseMetadata{}, code + } + sliceLen := endOffset - startOffset + 1 + + srcReader, err := s3a.openSourcePlaintextReader(r.Context(), srcEntry, startOffset, endOffset) + if err != nil { + glog.Errorf("UploadPartCopy: open source plaintext reader: %v", err) + // Distinguish "we will not handle this shape" (501) from "the server + // failed" (500). SSE-KMS / SSE-C source support in this slow path is + // staged work; the explicit error lets clients see it as a feature + // gap rather than a server fault. + if errors.Is(err, errCopySourceSSEUnsupported) { + return "", SSEResponseMetadata{}, s3err.ErrNotImplemented + } + return "", SSEResponseMetadata{}, s3err.ErrInternalError + } + defer srcReader.Close() + + cloned := fakeContentRequest(r, srcReader, sliceLen) + if err := s3a.applyDestSSEHeadersToCopyRequest(cloned, uploadEntry, uploadID); err != nil { + glog.Errorf("UploadPartCopy: apply destination SSE headers: %v", err) + return "", SSEResponseMetadata{}, s3err.ErrInternalError + } + + // Surface putToFiler's SSE response metadata to the caller so the handler + // can mirror PutObjectPart's behavior of writing + // x-amz-server-side-encryption / x-amz-server-side-encryption-aws-kms-key-id + // on the UploadPartCopy response. Without this, clients have no way to + // see that the destination was encrypted. + filePath := s3a.genPartUploadPath(dstBucket, uploadID, partID) + tag, code, putSSE := s3a.putToFiler(cloned, filePath, srcReader, dstBucket, "", partID, nil) + if code != s3err.ErrNone { + return "", SSEResponseMetadata{}, code + } + return tag, putSSE, s3err.ErrNone +} + +// writeEmptyCopyPart writes a 0-byte part entry for an empty UploadPartCopy +// range, mirroring the legacy fast path's handling of endOffset < startOffset. +func (s3a *S3ApiServer) writeEmptyCopyPart(dstBucket, uploadID string, partID int) (string, s3err.ErrorCode) { + uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID + partName := fmt.Sprintf("%04d_%s.part", partID, "copy") + if exists, _ := s3a.exists(uploadDir, partName, false); exists { + if err := s3a.rm(uploadDir, partName, false, false); err != nil { + return "", s3err.ErrInternalError + } + } + if err := s3a.mkFile(uploadDir, partName, nil, func(e *filer_pb.Entry) { + if e.Attributes == nil { + e.Attributes = &filer_pb.FuseAttributes{} + } + e.Attributes.FileSize = 0 + }); err != nil { + return "", s3err.ErrInternalError + } + const emptyMD5Hex = "d41d8cd98f00b204e9800998ecf8427e" + return emptyMD5Hex, s3err.ErrNone +}