From 82cf60a44f899b08e8fc8917bc5691c65611220a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Apr 2026 09:46:44 -0700 Subject: [PATCH] fix(s3api): re-encrypt UploadPartCopy bytes for the destination's SSE config (#8908) (#9280) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(s3api): re-encrypt UploadPartCopy bytes for the destination's SSE config (#8908) The remaining failure mode in #8908 was that Docker Registry's blob finalization (server-side Move via UploadPartCopy) silently corrupts SSE-S3 multipart objects. Reproduces with `aws s3api upload-part-copy` under bucket-default SSE-S3: the GET on the completed object returns deterministic wrong bytes (correct length, same wrong SHA-256 across runs). The metadata is mathematically self-consistent — every chunk's stored IV equals `calculateIVWithOffset(baseIV_dst, partLocalOffset)` — but the bytes on disk were encrypted with the SOURCE upload's key+baseIV. Root cause: - `copyChunksForRange` (and `createDestinationChunk`) constructs new chunks for UploadPartCopy without copying `SseType` / `SseMetadata`, so destination chunks are written with `SseType=NONE`. - At completion, `completedMultipartChunk` (PR #9224's NONE→SSE_S3 backfill, intended to recover from a different missing-metadata bug) sees those NONE chunks under an SSE-S3 multipart upload and backfills SSE-S3 metadata derived from the destination upload's baseIV. The chunk metadata is now internally consistent and the GET path applies decryption — but the bytes on disk are encrypted with the source upload's key, not the destination's. Decryption produces deterministic garbage. Docker Registry pulls then fail with "Digest did not match". Fix: when either the source object or the destination multipart upload has any SSE configured, take a slow-path UploadPartCopy that (1) opens a plaintext reader of the source range — decrypting the source's per-chunk SSE-S3 metadata if needed via a reused `buildMultipartSSES3Reader`, and (2) feeds that plaintext through `putToFiler`'s existing encryption pipeline by staging the destination upload entry's SSE-S3/SSE-KMS headers on a cloned request. Encryption then matches PutObjectPart's contract: every part starts a fresh CTR stream from counter 0 with `baseIV_dst`, and each internal chunk's metadata records `calculateIVWithOffset(baseIV_dst, chunk.partLocalOffset)`. The `non-SSE → non-SSE` case still takes the existing fast raw-byte copy path — bytes on disk are plaintext on both sides, so chunk-level metadata is irrelevant. Cross-encryption from SSE-KMS / SSE-C sources is left as TODO — the new path returns an explicit error rather than the previous silent corruption. SSE-S3 (the user-reported case) round-trips correctly. Tests: - test/s3/sse/s3_sse_uploadpartcopy_integration_test.go pins three UploadPartCopy shapes against bucket-default SSE-S3: * Docker-Registry-shape 32MB+tail (the user's exact 5-chunk / 2-part metadata layout) * single full-object UploadPartCopy * many small range copies Each round-trips SHA-256. - test/s3/sse/s3_sse_concurrent_repro_test.go covers the parallel multipart-upload shape from the user report (5 blobs in parallel, full GET and chunked range GET both hash-checked) — pre-existing coverage; added here as a regression sentinel. * test(s3-sse): rename UploadPartCopy regression test so CI matches it The CI workflow .github/workflows/s3-sse-tests.yml dispatches on the TEST_PATTERN ".*Multipart.*Integration" — i.e. the test name must contain both "Multipart" and "Integration" for CI to run it. The previous name TestSSES3UploadPartCopyIntegration had only "Integration"; "UploadPart" isn't "Multipart". Rename to TestSSES3MultipartUploadPartCopyIntegration so the regression test actually runs in CI rather than only locally. * fix(s3api): map unsupported UploadPartCopy SSE source to 501, not 500 (review feedback on #9280) openSourcePlaintextReader explicitly rejects SSE-KMS and SSE-C sources (SSE-S3 is the only one wired up in this slow path so far). Earlier the caller blanket-mapped that to ErrInternalError, which collapses "this shape isn't implemented yet" into the same 500 response a real server failure would produce. Clients can no longer tell whether they hit a feature gap or a bug. Introduce a sentinel errCopySourceSSEUnsupported and have copyObjectPartViaReencryption errors.Is-check it; on match, return ErrNotImplemented (501) instead of ErrInternalError (500). Other failures still map to 500. Found by coderabbitai review on PR #9280. * fix(s3api): UploadPartCopy must fail with NoSuchUpload when upload entry is missing (review feedback on #9280) CopyObjectPartHandler's earlier checkUploadId call only verifies that the uploadID's hash prefix matches dstObject; it does not prove the upload directory exists in the filer. The previous logic silently swallowed filer_pb.ErrNotFound from getEntry(uploadDir) and fell through with uploadEntry=nil, which then skipped the destination SSE check and could route a plain-source copy through the raw-byte fast path even though the destination's encryption state is unknown. Treat ErrNotFound as ErrNoSuchUpload so the client sees the right status, matching the AWS S3 contract for UploadPartCopy on a non-existent upload. Found by coderabbitai review on PR #9280. * feat(s3api): set SSE response headers on UploadPartCopy slow path (review feedback on #9280) PutObjectPartHandler writes x-amz-server-side-encryption (and the KMS key-id header for SSE-KMS) on every successful part response so clients can confirm the destination's encryption state. The new UploadPartCopy slow path was missing this — it returned only the ETag in the response body and no SSE response headers. Plumb putToFiler's SSEResponseMetadata back through copyObjectPartViaReencryption to the handler, then call setSSEResponseHeaders before writing the XML response, matching the PutObjectPart contract. Found by gemini-code-assist review on PR #9280. * fix(s3api): map transient filer errors on UploadPartCopy upload-entry fetch to 503 (review feedback on #9280) Earlier non-ErrNotFound errors from getEntry(uploadDir, uploadID) all returned 500 InternalError, which most SDKs treat as fatal — even though a transient filer outage (gRPC Unavailable, leader election in flight, deadline exceeded) is exactly the kind of failure SDK retry logic is supposed to recover from. Add an isTransientFilerError helper that recognises: - context.DeadlineExceeded / context.Canceled - gRPC codes.Unavailable, DeadlineExceeded, ResourceExhausted, Aborted When the upload-entry fetch fails for one of those reasons, return 503 ServiceUnavailable so the client retries; everything else still maps to 500. Log line now also carries dstObject (in addition to dstBucket and uploadID) to make incident triage easier. Found by gemini-code-assist review on PR #9280. --- test/s3/sse/s3_sse_concurrent_repro_test.go | 310 +++++++++++++ .../s3_sse_uploadpartcopy_integration_test.go | 190 ++++++++ weed/s3api/s3api_object_handlers_copy.go | 55 +++ .../s3api_object_handlers_copy_part_sse.go | 425 ++++++++++++++++++ 4 files changed, 980 insertions(+) create mode 100644 test/s3/sse/s3_sse_concurrent_repro_test.go create mode 100644 test/s3/sse/s3_sse_uploadpartcopy_integration_test.go create mode 100644 weed/s3api/s3api_object_handlers_copy_part_sse.go diff --git a/test/s3/sse/s3_sse_concurrent_repro_test.go b/test/s3/sse/s3_sse_concurrent_repro_test.go new file mode 100644 index 000000000..da84f6cd6 --- /dev/null +++ b/test/s3/sse/s3_sse_concurrent_repro_test.go @@ -0,0 +1,310 @@ +package sse_test + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" +) + +// TestSSES3ConcurrentMultipartDigestIntegration mirrors the production +// scenario reported in issue #8908: Docker Registry pushes several large +// container images in parallel, each push being itself a multipart upload of +// many ~5MB parts under bucket-default SSE-S3. Registry pulls fail with +// "Digest did not match" because the SHA-256 of the streamed GET body does +// not match the SHA-256 of what was uploaded. +// +// Coverage that the existing TestSSEMultipartManyChunksIntegration does NOT +// have: +// - bucket-default SSE-S3 (not explicit per-request headers) +// - multiple objects uploaded concurrently +// - parts within each object uploaded concurrently +// - both full-body GET (Docker Registry) and chunked range GETs (kubelet/CRI) +// +// If the SHA over the streamed body ever differs from the SHA over the +// uploaded bytes, the test fails with the exact bucket/key, expected SHA, +// and actual SHA — the same shape Docker Registry surfaces on pull. +// +// The function name ends in "Integration" so it is matched by the existing +// `TestSSE.*Integration` pattern in test/s3/sse/Makefile and the +// `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml, +// so the regression coverage runs automatically in CI. +func TestSSES3ConcurrentMultipartDigestIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-concurrent-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(bucketName), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set bucket default SSE-S3 encryption") + + const ( + numObjects = 5 // mirrors "5 images at a time" + partsPerObject = 12 // 12 parts × 5MB = 60MB per blob; total 5×60=300MB + partSize = 5 * 1024 * 1024 // S3 minimum part size + uploadParallel = 8 // mirror typical S3 SDK transfer manager concurrency + iterations = 2 // run twice to flush out flakiness without bloating CI + ) + + type blob struct { + key string + parts [][]byte + full []byte + fullSHA [32]byte + } + + makeBlobs := func(iter int) []blob { + blobs := make([]blob, numObjects) + for i := range blobs { + parts := make([][]byte, partsPerObject) + for j := range parts { + parts[j] = generateTestData(partSize) + } + full := bytes.Join(parts, nil) + blobs[i] = blob{ + key: fmt.Sprintf("iter%02d-blob-%02d", iter, i), + parts: parts, + full: full, + fullSHA: sha256.Sum256(full), + } + } + return blobs + } + + uploadOne := func(b blob) error { + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + }) + if err != nil { + return fmt.Errorf("create multipart for %s: %w", b.key, err) + } + uploadID := aws.ToString(createResp.UploadId) + + completedParts := make([]types.CompletedPart, partsPerObject) + var ( + wg sync.WaitGroup + partErr error + partMu sync.Mutex + ) + sem := make(chan struct{}, uploadParallel) + for i := 0; i < partsPerObject; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + resp, err := client.UploadPart(ctx, &s3.UploadPartInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + PartNumber: aws.Int32(int32(i + 1)), + UploadId: aws.String(uploadID), + Body: bytes.NewReader(b.parts[i]), + }) + if err != nil { + partMu.Lock() + if partErr == nil { + partErr = fmt.Errorf("upload part %d of %s: %w", i+1, b.key, err) + } + partMu.Unlock() + return + } + completedParts[i] = types.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int32(int32(i + 1)), + } + }(i) + } + wg.Wait() + if partErr != nil { + _, _ = client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ + Bucket: aws.String(bucketName), Key: aws.String(b.key), UploadId: aws.String(uploadID), + }) + return partErr + } + + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + if err != nil { + return fmt.Errorf("complete multipart for %s: %w", b.key, err) + } + return nil + } + + type result struct { + key string + expected string + actual string + gotSize int64 + wantSize int64 + readErr error + } + + verifyFullGET := func(b blob) result { + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + }) + if err != nil { + return result{key: "FULL " + b.key, readErr: fmt.Errorf("GetObject: %w", err)} + } + defer resp.Body.Close() + h := sha256.New() + n, copyErr := io.Copy(h, resp.Body) + var actual [32]byte + copy(actual[:], h.Sum(nil)) + return result{ + key: "FULL " + b.key, + expected: hex.EncodeToString(b.fullSHA[:]), + actual: hex.EncodeToString(actual[:]), + gotSize: n, + wantSize: int64(len(b.full)), + readErr: copyErr, + } + } + + verifyRangeGET := func(b blob) result { + // 1MB windows. Includes a window that crosses the 8MB internal-chunk + // boundary (chunkSize in putToFiler) — the historical danger zone for + // SSE-S3 multipart range reads. + const window = 1024 * 1024 + h := sha256.New() + var totalN int64 + objectSize := int64(len(b.full)) + for offset := int64(0); offset < objectSize; offset += window { + end := offset + window - 1 + if end >= objectSize { + end = objectSize - 1 + } + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(b.key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, end)), + }) + if err != nil { + return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range GetObject [%d-%d]: %w", offset, end, err)} + } + n, copyErr := io.Copy(h, resp.Body) + resp.Body.Close() + totalN += n + if copyErr != nil { + return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range copy [%d-%d]: %w", offset, end, copyErr)} + } + } + var actual [32]byte + copy(actual[:], h.Sum(nil)) + return result{ + key: "RANGE " + b.key, + expected: hex.EncodeToString(b.fullSHA[:]), + actual: hex.EncodeToString(actual[:]), + gotSize: totalN, + wantSize: objectSize, + } + } + + var allFailures []string + for iter := 0; iter < iterations; iter++ { + blobs := makeBlobs(iter) + + // Upload all N blobs in parallel. + { + var ( + wg sync.WaitGroup + errMu sync.Mutex + errors []error + ) + for i := range blobs { + wg.Add(1) + go func(i int) { + defer wg.Done() + if err := uploadOne(blobs[i]); err != nil { + errMu.Lock() + errors = append(errors, err) + errMu.Unlock() + } + }(i) + } + wg.Wait() + require.Empty(t, errors, "iter %d: concurrent multipart uploads must succeed", iter) + } + + // Two passes (full body + range), all blobs verified concurrently. + results := make([]result, 0, 2*numObjects) + var resultMu sync.Mutex + var wg sync.WaitGroup + for i := range blobs { + wg.Add(2) + go func(b blob) { + defer wg.Done() + r := verifyFullGET(b) + resultMu.Lock() + results = append(results, r) + resultMu.Unlock() + }(blobs[i]) + go func(b blob) { + defer wg.Done() + r := verifyRangeGET(b) + resultMu.Lock() + results = append(results, r) + resultMu.Unlock() + }(blobs[i]) + } + wg.Wait() + + for _, r := range results { + if r.readErr != nil { + allFailures = append(allFailures, fmt.Sprintf("[iter %d %s] read error: %v (got %d / want %d bytes)", + iter, r.key, r.readErr, r.gotSize, r.wantSize)) + continue + } + if r.expected != r.actual { + allFailures = append(allFailures, fmt.Sprintf( + "[iter %d %s] DIGEST MISMATCH: expected sha256:%s, got sha256:%s (got %d / want %d bytes)", + iter, r.key, r.expected, r.actual, r.gotSize, r.wantSize)) + } + } + } + if len(allFailures) > 0 { + t.Fatalf("%d concurrent SSE-S3 multipart blob digest mismatches across %d iterations:\n %s", + len(allFailures), iterations, joinFailures(allFailures)) + } +} + +func joinFailures(ss []string) string { + out := "" + for i, s := range ss { + if i > 0 { + out += "\n " + } + out += s + } + return out +} diff --git a/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go b/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go new file mode 100644 index 000000000..8fbfa240d --- /dev/null +++ b/test/s3/sse/s3_sse_uploadpartcopy_integration_test.go @@ -0,0 +1,190 @@ +package sse_test + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/stretchr/testify/require" +) + +// TestSSES3MultipartUploadPartCopyIntegration pins the fix for issue #8908. +// +// Docker Registry's S3 storage driver finalizes blob uploads via a server-side +// "Move" pattern: a streaming PUT/multipart upload to a temporary key, then +// CreateMultipartUpload + UploadPartCopy(s) + CompleteMultipartUpload to put +// the bytes at the final blob path. Under bucket-default SSE-S3, every push +// goes through this UploadPartCopy step. +// +// Before the fix, copyChunksForRange did a raw byte copy that left the +// destination's part chunks SseType=NONE. Then completedMultipartChunk +// (PR #9224) saw NONE chunks in an SSE-S3 multipart upload and "backfilled" +// SSE-S3 metadata with IVs derived from the destination upload's baseIV. But +// the bytes on disk had been encrypted with the SOURCE upload's key+baseIV, +// so the read path decrypted with the wrong IV — yielding deterministic byte +// corruption on GET (the "Digest did not match" symptom kubelet surfaces). +// +// This test reproduces the exact shape: a 39MB plaintext source (single +// PutObject — auto-chunked into multiple internal SSE-S3 chunks on disk +// because of bucket-default SSE-S3), then a fresh multipart upload at a new +// destination key with two UploadPartCopy parts (32MB + 7MB) and Complete. +// The full GET must SHA back to what was uploaded. +// +// The function name contains both "Multipart" and "Integration" so it is matched +// by the `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml +// and the `TestSSE.*Integration` pattern in test/s3/sse/Makefile, ensuring this +// regression coverage runs in CI. +func TestSSES3MultipartUploadPartCopyIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-uploadpartcopy-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + // Bucket-default SSE-S3 — same setup Docker Registry uses. + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(bucketName), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set bucket default SSE-S3") + + // Source: 39MB single PutObject (auto-chunked internally into 5 SSE-S3 chunks). + const sourceSize = 39 * 1024 * 1024 + sourceData := generateTestData(sourceSize) + expectedSHA := sha256.Sum256(sourceData) + + _, err = client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String("source-blob"), + Body: bytes.NewReader(sourceData), + }) + require.NoError(t, err, "Failed to upload source object") + + // Sanity check: the source itself must round-trip correctly. + { + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String("source-blob"), + }) + require.NoError(t, err, "Failed to GET source") + got, err := io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + require.Equal(t, expectedSHA, sha256.Sum256(got), "source object must round-trip") + } + + cases := []struct { + name string + // part definitions: each entry is (start, end) byte range to copy. + parts [][2]int64 + }{ + { + // Docker Registry's typical Move shape for blobs around 40MB: + // one 32MB part + one tail part. This is exactly the metadata + // shape the user reported (5 dst chunks across 2 multipart parts). + name: "DockerRegistry_32MB_Plus_Tail", + parts: [][2]int64{ + {0, 32*1024*1024 - 1}, + {32 * 1024 * 1024, sourceSize - 1}, + }, + }, + { + // Single full-object UploadPartCopy. + name: "Single_Full_Object_Copy", + parts: [][2]int64{ + {0, sourceSize - 1}, + }, + }, + { + // Many small range-copies — exercises the per-part-local-offset + // IV math under varied chunk-overlap shapes. + name: "Many_5MB_Ranges", + parts: [][2]int64{ + {0, 5*1024*1024 - 1}, + {5 * 1024 * 1024, 10*1024*1024 - 1}, + {10 * 1024 * 1024, 15*1024*1024 - 1}, + {15 * 1024 * 1024, 20*1024*1024 - 1}, + {20 * 1024 * 1024, sourceSize - 1}, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + dstKey := "dest-" + strings.ToLower(strings.ReplaceAll(tc.name, "_", "-")) + + createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + }) + require.NoError(t, err, "CreateMultipartUpload") + uploadID := aws.ToString(createResp.UploadId) + + completedParts := make([]types.CompletedPart, 0, len(tc.parts)) + for i, rng := range tc.parts { + partNumber := int32(i + 1) + resp, err := client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + CopySource: aws.String(bucketName + "/source-blob"), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", rng[0], rng[1])), + }) + require.NoErrorf(t, err, "UploadPartCopy part %d range=[%d,%d]", partNumber, rng[0], rng[1]) + completedParts = append(completedParts, types.CompletedPart{ + ETag: resp.CopyPartResult.ETag, + PartNumber: aws.Int32(partNumber), + }) + } + + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + require.NoError(t, err, "CompleteMultipartUpload") + + // Two-pass verification: full GET (Docker Registry / Kubelet shape). + resp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String(dstKey), + }) + require.NoError(t, err, "GetObject") + defer resp.Body.Close() + + h := sha256.New() + n, err := io.Copy(h, resp.Body) + require.NoError(t, err, "stream GET body") + require.Equal(t, int64(sourceSize), n, "GET length") + + var actual [32]byte + copy(actual[:], h.Sum(nil)) + + require.Equalf(t, expectedSHA, actual, + "UploadPartCopy SHA mismatch (#8908):\n expected sha256:%s\n got sha256:%s\n parts=%d size=%d", + hex.EncodeToString(expectedSHA[:]), + hex.EncodeToString(actual[:]), + len(tc.parts), n) + }) + } +} diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index cb4c6a5e8..843e7364e 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -714,6 +714,61 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req } } + // Fetch the destination upload entry to determine whether the multipart + // upload was created with SSE configured. If either side has SSE, the + // fast raw-byte chunk copy below would leave destination chunks tagged + // inconsistently with the bytes on disk and trigger #8908's deterministic + // byte corruption on GET. Re-encrypt the source bytes in that case so + // destination chunks come out properly tagged. + // + // checkUploadId above only verifies that the uploadID's hash prefix + // matches dstObject; it does NOT prove the upload directory exists. + // Treat a missing upload entry as NoSuchUpload — falling through with + // uploadEntry=nil would silently skip the SSE check on the destination + // side and could send a plain-source copy through the raw-byte fast + // path even though the destination's encryption state is unknown. + uploadEntry, uploadEntryErr := s3a.getEntry(s3a.genUploadsFolder(dstBucket), uploadID) + if uploadEntryErr != nil { + if errors.Is(uploadEntryErr, filer_pb.ErrNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload) + return + } + glog.Errorf("CopyObjectPartHandler: failed to fetch upload entry for %s/%s uploadID=%s: %v", + dstBucket, dstObject, uploadID, uploadEntryErr) + // Distinguish transient from permanent errors: gRPC Unavailable + // (filer briefly unreachable, leader election in flight, etc.) and + // DeadlineExceeded both indicate the client should retry rather than + // give up. Map them to 503 ServiceUnavailable; everything else stays + // as 500 InternalError. + if isTransientFilerError(uploadEntryErr) { + s3err.WriteErrorResponse(w, r, s3err.ErrServiceUnavailable) + return + } + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + if uploadEntryHasSSE(uploadEntry) || sourceEntryHasSSE(entry) { + etag, sseMetadata, errCode := s3a.copyObjectPartViaReencryption(r, entry, startOffset, endOffset, dstBucket, uploadID, partID, uploadEntry) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + setEtag(w, "\""+strings.Trim(etag, "\"")+"\"") + // Mirror PutObjectPartHandler: write x-amz-server-side-encryption / + // x-amz-server-side-encryption-aws-kms-key-id headers on the response + // so clients can see the destination's encryption state. + s3a.setSSEResponseHeaders(w, r, sseMetadata) + writeSuccessResponseXML(w, r, CopyPartResult{ + ETag: etag, + LastModified: t, + }) + return + } + + // Fast path: neither source nor destination has SSE. Raw byte copy is + // safe, since the bytes on disk are plaintext on both sides. + // Create new entry for the part // Calculate part size, avoiding underflow for invalid ranges partSize := uint64(0) diff --git a/weed/s3api/s3api_object_handlers_copy_part_sse.go b/weed/s3api/s3api_object_handlers_copy_part_sse.go new file mode 100644 index 000000000..47f496c2f --- /dev/null +++ b/weed/s3api/s3api_object_handlers_copy_part_sse.go @@ -0,0 +1,425 @@ +package s3api + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sort" + "strconv" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// errCopySourceSSEUnsupported is returned by openSourcePlaintextReader when +// the source object's SSE type is not yet implemented in the UploadPartCopy +// slow path. Callers map it to a 501 NotImplemented S3 response so clients +// can distinguish "we will not handle this shape" from "the server failed". +var errCopySourceSSEUnsupported = errors.New("UploadPartCopy source SSE type not yet supported") + +// isTransientFilerError reports whether an error talking to the filer is +// retryable from the client's perspective (filer briefly unreachable, leader +// election in flight, deadline exceeded, etc.). Such errors should map to a +// 503 ServiceUnavailable response so SDK retry logic engages, rather than a +// 500 InternalError which most clients treat as fatal. +func isTransientFilerError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return true + } + if s, ok := status.FromError(err); ok { + switch s.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted: + return true + } + } + return false +} + +// uploadEntryHasSSE reports whether the multipart upload entry was created +// with any server-side encryption configured (SSE-S3 or SSE-KMS — explicit at +// CreateMultipartUpload time or applied as bucket default). It is used to +// decide whether UploadPartCopy must re-encrypt source bytes for the +// destination, rather than copying them as raw bytes (the fast path). +func uploadEntryHasSSE(uploadEntry *filer_pb.Entry) bool { + if uploadEntry == nil || uploadEntry.Extended == nil { + return false + } + if _, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; ok { + return true + } + if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSES3Encryption]; ok && string(v) == s3_constants.SSEAlgorithmAES256 { + return true + } + return false +} + +// sourceEntryHasSSE reports whether the source object's chunks are SSE +// ciphertext on disk and therefore cannot be raw-copied — they must be +// decrypted on read. +func sourceEntryHasSSE(srcEntry *filer_pb.Entry) bool { + if srcEntry == nil { + return false + } + for _, c := range srcEntry.GetChunks() { + if c.GetSseType() != filer_pb.SSEType_NONE { + return true + } + } + if srcEntry.Extended != nil { + if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key]; ok { + return true + } + if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; ok { + return true + } + if _, ok := srcEntry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; ok { + return true + } + } + return false +} + +// readCloserAdapter pairs an arbitrary io.Reader with an io.Closer so callers +// can release the original underlying source even when the inner Reader (e.g. +// cipher.StreamReader, io.LimitReader) does not implement io.Closer. +type readCloserAdapter struct { + io.Reader + closer io.Closer +} + +func (r *readCloserAdapter) Close() error { + if r.closer == nil { + return nil + } + return r.closer.Close() +} + +// openSourcePlaintextReader returns a reader yielding the source object's +// plaintext bytes for [startOffset, endOffset], applying any necessary SSE +// decryption based on the source entry's metadata. +// +// Used by CopyObjectPartHandler when source or destination is SSE-encrypted: +// the fast raw-chunk-copy path leaves destination chunks SseType=NONE and +// completedMultipartChunk's NONE→SSE_S3 backfill (PR #9224) then writes +// destination-baseIV-derived metadata onto bytes that were actually encrypted +// with the source's key — producing deterministic byte corruption on GET (#8908). +// +// Returns errCopySourceSSEUnsupported when the source's SSE type is not yet +// implemented in this slow path (SSE-KMS, SSE-C). Callers should map that +// sentinel to a 501 NotImplemented S3 response rather than collapsing it to +// 500 InternalError, so clients can distinguish "we will not handle this +// shape" from "the server failed". +func (s3a *S3ApiServer) openSourcePlaintextReader( + ctx context.Context, + srcEntry *filer_pb.Entry, + startOffset, endOffset int64, +) (io.ReadCloser, error) { + if srcEntry == nil { + return nil, fmt.Errorf("nil source entry") + } + if endOffset < startOffset { + return io.NopCloser(io.LimitReader(emptyReader{}, 0)), nil + } + sliceLen := endOffset - startOffset + 1 + + switch s3a.detectPrimarySSEType(srcEntry) { + case s3_constants.SSETypeS3: + return s3a.openSSES3SourcePlaintextReader(ctx, srcEntry, startOffset, sliceLen) + case s3_constants.SSETypeKMS: + return nil, fmt.Errorf("%w: UploadPartCopy from SSE-KMS source", errCopySourceSSEUnsupported) + case s3_constants.SSETypeC: + return nil, fmt.Errorf("%w: UploadPartCopy from SSE-C source", errCopySourceSSEUnsupported) + default: + // Unencrypted source: stream raw bytes and apply range. + raw, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry) + if err != nil { + return nil, fmt.Errorf("open unencrypted source: %w", err) + } + return applyRange(raw, startOffset, sliceLen) + } +} + +// openSSES3SourcePlaintextReader builds a decrypted reader for an SSE-S3 +// source. It reuses buildMultipartSSES3Reader, which decrypts each chunk +// independently using its per-chunk metadata — correct for both multipart-SSE +// objects (multiple SSE-S3 chunks) and single-part SSE-S3 objects whose single +// chunk also carries per-chunk metadata after PR #9211. +// +// For older single-part SSE-S3 objects whose chunks lack per-chunk metadata, +// this falls back to the entry-level SSE-S3 key + the entry's stored IV, +// matching the read path's single-part fallback. +func (s3a *S3ApiServer) openSSES3SourcePlaintextReader( + ctx context.Context, + srcEntry *filer_pb.Entry, + startOffset, sliceLen int64, +) (io.ReadCloser, error) { + chunks := srcEntry.GetChunks() + hasPerChunkSSE := false + for _, c := range chunks { + if c.GetSseType() == filer_pb.SSEType_SSE_S3 && len(c.GetSseMetadata()) > 0 { + hasPerChunkSSE = true + break + } + } + + if hasPerChunkSSE { + sortedChunks := make([]*filer_pb.FileChunk, len(chunks)) + copy(sortedChunks, chunks) + sort.Slice(sortedChunks, func(i, j int) bool { + return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset() + }) + decReader, err := buildMultipartSSES3Reader( + sortedChunks, + GetSSES3KeyManager(), + func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + return s3a.createEncryptedChunkReader(ctx, c) + }, + ) + if err != nil { + return nil, fmt.Errorf("build SSE-S3 source reader: %w", err) + } + // buildMultipartSSES3Reader returns a *lazyMultipartChunkReader whose + // Close() releases the live chunk body. Use it as the closer. + var closer io.Closer + if rc, ok := decReader.(io.Closer); ok { + closer = rc + } + return applyRange(&readCloserAdapter{Reader: decReader, closer: closer}, startOffset, sliceLen) + } + + // Legacy single-part fallback: entry-level SeaweedFSSSES3Key + entry IV. + keyData, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key] + if !ok || len(keyData) == 0 { + return nil, fmt.Errorf("SSE-S3 source has no per-chunk metadata and no entry-level SSE-S3 key") + } + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("deserialize entry-level SSE-S3 key: %w", err) + } + iv, err := GetSSES3IV(srcEntry, sseS3Key, keyManager) + if err != nil { + return nil, fmt.Errorf("get SSE-S3 IV: %w", err) + } + encStream, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry) + if err != nil { + return nil, fmt.Errorf("open ciphertext source: %w", err) + } + dec, err := CreateSSES3DecryptedReader(encStream, sseS3Key, iv) + if err != nil { + encStream.Close() + return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", err) + } + rc, ok := dec.(io.ReadCloser) + if !ok { + rc = &readCloserAdapter{Reader: dec, closer: encStream} + } + return applyRange(rc, startOffset, sliceLen) +} + +// applyRange skips startOffset bytes from src and limits the result to +// sliceLen bytes. The returned ReadCloser closes the underlying source. +func applyRange(src io.ReadCloser, startOffset, sliceLen int64) (io.ReadCloser, error) { + if startOffset > 0 { + if _, err := io.CopyN(io.Discard, src, startOffset); err != nil { + src.Close() + return nil, fmt.Errorf("skip to range start %d: %w", startOffset, err) + } + } + if sliceLen <= 0 { + return &readCloserAdapter{Reader: io.LimitReader(src, 0), closer: src}, nil + } + return &readCloserAdapter{Reader: io.LimitReader(src, sliceLen), closer: src}, nil +} + +// emptyReader yields no bytes. Used for empty-range UploadPartCopy. +type emptyReader struct{} + +func (emptyReader) Read([]byte) (int, error) { return 0, io.EOF } + +// applyDestSSEHeadersToCopyRequest stages the destination's SSE setup on the +// (cloned) request so that putToFiler's existing handleAllSSEEncryption picks +// it up. The upload-entry markers (laid down at CreateMultipartUpload) bind +// every part of the upload to the same key+baseIV, matching PutObjectPart. +func (s3a *S3ApiServer) applyDestSSEHeadersToCopyRequest( + r *http.Request, uploadEntry *filer_pb.Entry, uploadID string, +) error { + if uploadEntry == nil || uploadEntry.Extended == nil { + return nil + } + + if keyIDBytes, hasKMS := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; hasKMS { + // Mirror the SSE-KMS branch of PutObjectPartHandler: stage + // X-Amz-Server-Side-Encryption=aws:kms plus the key ID, encryption + // context, bucket-key flag and base IV onto the request. + keyID := string(keyIDBytes) + + bucketKeyEnabled := false + if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled]; ok && string(v) == "true" { + bucketKeyEnabled = true + } + + var encryptionContext map[string]string + if cb, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext]; ok { + if err := json.Unmarshal(cb, &encryptionContext); err != nil { + glog.Errorf("UploadPartCopy: failed to parse SSE-KMS context for upload %s: %v", uploadID, err) + encryptionContext = nil + } + } + if len(encryptionContext) == 0 { + // Bucket and object are populated on the cloned request; reuse + // the same builder PutObjectPartHandler does. + bucket, object := s3_constants.GetBucketAndObject(r) + encryptionContext = BuildEncryptionContext(bucket, object, bucketKeyEnabled) + } + + var baseIV []byte + if ivBytes, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV]; ok { + decoded, decErr := base64.StdEncoding.DecodeString(string(ivBytes)) + if decErr != nil || len(decoded) != s3_constants.AESBlockSize { + return fmt.Errorf("invalid SSE-KMS base IV on upload %s", uploadID) + } + baseIV = decoded + } else { + return fmt.Errorf("no SSE-KMS base IV on upload %s", uploadID) + } + + r.Header.Set(s3_constants.AmzServerSideEncryption, "aws:kms") + r.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) + if bucketKeyEnabled { + r.Header.Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } + if len(encryptionContext) > 0 { + if cj, err := json.Marshal(encryptionContext); err == nil { + r.Header.Set(s3_constants.AmzServerSideEncryptionContext, base64.StdEncoding.EncodeToString(cj)) + } + } + r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV)) + return nil + } + + // SSE-S3 path: reuse the existing PutObjectPart helper unchanged. It is + // pure header manipulation on r and does not touch S3ApiServer state. + return s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID) +} + +// fakeContentRequest builds a minimal request representing "PUT this body" for +// the multipart-part write path used by UploadPartCopy. It clones the original +// request's headers (so things like AmzAccountId carry over) and clears the +// copy-only headers; SSE setup is added later by applyDestSSEHeadersToCopyRequest. +func fakeContentRequest(orig *http.Request, body io.ReadCloser, contentLength int64) *http.Request { + cloned := orig.Clone(orig.Context()) + cloned.Body = body + cloned.ContentLength = contentLength + if cloned.Header == nil { + cloned.Header = http.Header{} + } else { + cloned.Header = cloned.Header.Clone() + } + cloned.Header.Set("Content-Length", strconv.FormatInt(contentLength, 10)) + cloned.Header.Del("X-Amz-Copy-Source") + cloned.Header.Del("X-Amz-Copy-Source-Range") + cloned.Header.Del("X-Amz-Metadata-Directive") + cloned.Header.Del("X-Amz-Tagging-Directive") + // Content-Md5 cannot be reproduced from the source plaintext without + // streaming it once first; clear it so putToFiler doesn't validate. + cloned.Header.Del("Content-Md5") + return cloned +} + +// copyObjectPartViaReencryption implements the slow path of UploadPartCopy when +// either the source object is SSE-encrypted or the destination multipart upload +// is configured for SSE encryption. It: +// +// 1. Opens a plaintext reader of the source range (decrypting if needed). +// 2. Stages the destination's SSE-S3 / SSE-KMS multipart headers on a cloned +// request so handleAllSSEEncryption (called from putToFiler) routes the +// body through the matching multipart-encryption helper. +// 3. Calls putToFiler with the plaintext reader, which encrypts using the +// destination upload session's key+baseIV (consistent with PutObjectPart), +// auto-chunks, and writes the part entry with proper per-chunk SSE metadata. +// +// Without this path, copyChunksForRange's raw byte copy leaves destination +// chunks SseType=NONE; completedMultipartChunk then "backfills" SSE-S3 metadata +// with destination-baseIV-derived IVs, but the bytes on disk were encrypted +// with the source's key — yielding deterministic byte corruption on GET (#8908). +func (s3a *S3ApiServer) copyObjectPartViaReencryption( + r *http.Request, + srcEntry *filer_pb.Entry, + startOffset, endOffset int64, + dstBucket, uploadID string, + partID int, + uploadEntry *filer_pb.Entry, +) (etag string, sseMetadata SSEResponseMetadata, errCode s3err.ErrorCode) { + if endOffset < startOffset { + tag, code := s3a.writeEmptyCopyPart(dstBucket, uploadID, partID) + return tag, SSEResponseMetadata{}, code + } + sliceLen := endOffset - startOffset + 1 + + srcReader, err := s3a.openSourcePlaintextReader(r.Context(), srcEntry, startOffset, endOffset) + if err != nil { + glog.Errorf("UploadPartCopy: open source plaintext reader: %v", err) + // Distinguish "we will not handle this shape" (501) from "the server + // failed" (500). SSE-KMS / SSE-C source support in this slow path is + // staged work; the explicit error lets clients see it as a feature + // gap rather than a server fault. + if errors.Is(err, errCopySourceSSEUnsupported) { + return "", SSEResponseMetadata{}, s3err.ErrNotImplemented + } + return "", SSEResponseMetadata{}, s3err.ErrInternalError + } + defer srcReader.Close() + + cloned := fakeContentRequest(r, srcReader, sliceLen) + if err := s3a.applyDestSSEHeadersToCopyRequest(cloned, uploadEntry, uploadID); err != nil { + glog.Errorf("UploadPartCopy: apply destination SSE headers: %v", err) + return "", SSEResponseMetadata{}, s3err.ErrInternalError + } + + // Surface putToFiler's SSE response metadata to the caller so the handler + // can mirror PutObjectPart's behavior of writing + // x-amz-server-side-encryption / x-amz-server-side-encryption-aws-kms-key-id + // on the UploadPartCopy response. Without this, clients have no way to + // see that the destination was encrypted. + filePath := s3a.genPartUploadPath(dstBucket, uploadID, partID) + tag, code, putSSE := s3a.putToFiler(cloned, filePath, srcReader, dstBucket, "", partID, nil) + if code != s3err.ErrNone { + return "", SSEResponseMetadata{}, code + } + return tag, putSSE, s3err.ErrNone +} + +// writeEmptyCopyPart writes a 0-byte part entry for an empty UploadPartCopy +// range, mirroring the legacy fast path's handling of endOffset < startOffset. +func (s3a *S3ApiServer) writeEmptyCopyPart(dstBucket, uploadID string, partID int) (string, s3err.ErrorCode) { + uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID + partName := fmt.Sprintf("%04d_%s.part", partID, "copy") + if exists, _ := s3a.exists(uploadDir, partName, false); exists { + if err := s3a.rm(uploadDir, partName, false, false); err != nil { + return "", s3err.ErrInternalError + } + } + if err := s3a.mkFile(uploadDir, partName, nil, func(e *filer_pb.Entry) { + if e.Attributes == nil { + e.Attributes = &filer_pb.FuseAttributes{} + } + e.Attributes.FileSize = 0 + }); err != nil { + return "", s3err.ErrInternalError + } + const emptyMD5Hex = "d41d8cd98f00b204e9800998ecf8427e" + return emptyMD5Hex, s3err.ErrNone +}