fix(s3api): re-encrypt UploadPartCopy bytes for the destination's SSE config (#8908) (#9280)

* fix(s3api): re-encrypt UploadPartCopy bytes for the destination's SSE config (#8908)

The remaining failure mode in #8908 was that Docker Registry's blob
finalization (server-side Move via UploadPartCopy) silently corrupts
SSE-S3 multipart objects. Reproduces with `aws s3api upload-part-copy`
under bucket-default SSE-S3: the GET on the completed object returns
deterministic wrong bytes (correct length, same wrong SHA-256 across
runs). The metadata is mathematically self-consistent — every chunk's
stored IV equals `calculateIVWithOffset(baseIV_dst, partLocalOffset)` —
but the bytes on disk were encrypted with the SOURCE upload's key+baseIV.

Root cause:

  - `copyChunksForRange` (and `createDestinationChunk`) constructs new
    chunks for UploadPartCopy without copying `SseType` / `SseMetadata`,
    so destination chunks are written with `SseType=NONE`.

  - At completion, `completedMultipartChunk` (PR #9224's NONE→SSE_S3
    backfill, intended to recover from a different missing-metadata
    bug) sees those NONE chunks under an SSE-S3 multipart upload and
    backfills SSE-S3 metadata derived from the destination upload's
    baseIV. The chunk metadata is now internally consistent and the
    GET path applies decryption — but the bytes on disk are encrypted
    with the source upload's key, not the destination's. Decryption
    produces deterministic garbage. Docker Registry pulls then fail
    with "Digest did not match".

Fix: when either the source object or the destination multipart upload
has any SSE configured, take a slow-path UploadPartCopy that
(1) opens a plaintext reader of the source range — decrypting the
source's per-chunk SSE-S3 metadata if needed via a reused
`buildMultipartSSES3Reader`, and
(2) feeds that plaintext through `putToFiler`'s existing encryption
pipeline by staging the destination upload entry's SSE-S3/SSE-KMS
headers on a cloned request. Encryption then matches PutObjectPart's
contract: every part starts a fresh CTR stream from counter 0 with
`baseIV_dst`, and each internal chunk's metadata records
`calculateIVWithOffset(baseIV_dst, chunk.partLocalOffset)`.

The `non-SSE → non-SSE` case still takes the existing fast raw-byte
copy path — bytes on disk are plaintext on both sides, so chunk-level
metadata is irrelevant.

Cross-encryption from SSE-KMS / SSE-C sources is left as TODO — the
new path returns an explicit error rather than the previous silent
corruption. SSE-S3 (the user-reported case) round-trips correctly.

Tests:

  - test/s3/sse/s3_sse_uploadpartcopy_integration_test.go pins three
    UploadPartCopy shapes against bucket-default SSE-S3:
    * Docker-Registry-shape 32MB+tail (the user's exact 5-chunk /
      2-part metadata layout)
    * single full-object UploadPartCopy
    * many small range copies
    Each round-trips SHA-256.

  - test/s3/sse/s3_sse_concurrent_repro_test.go covers the parallel
    multipart-upload shape from the user report (5 blobs in parallel,
    full GET and chunked range GET both hash-checked) — pre-existing
    coverage; added here as a regression sentinel.

* test(s3-sse): rename UploadPartCopy regression test so CI matches it

The CI workflow .github/workflows/s3-sse-tests.yml dispatches on the
TEST_PATTERN ".*Multipart.*Integration" — i.e. the test name must contain
both "Multipart" and "Integration" for CI to run it. The previous name
TestSSES3UploadPartCopyIntegration had only "Integration"; "UploadPart"
isn't "Multipart". Rename to TestSSES3MultipartUploadPartCopyIntegration
so the regression test actually runs in CI rather than only locally.

* fix(s3api): map unsupported UploadPartCopy SSE source to 501, not 500 (review feedback on #9280)

openSourcePlaintextReader explicitly rejects SSE-KMS and SSE-C sources
(SSE-S3 is the only one wired up in this slow path so far). Earlier
the caller blanket-mapped that to ErrInternalError, which collapses
"this shape isn't implemented yet" into the same 500 response a
real server failure would produce. Clients can no longer tell whether
they hit a feature gap or a bug.

Introduce a sentinel errCopySourceSSEUnsupported and have
copyObjectPartViaReencryption errors.Is-check it; on match, return
ErrNotImplemented (501) instead of ErrInternalError (500). Other
failures still map to 500.

Found by coderabbitai review on PR #9280.

* fix(s3api): UploadPartCopy must fail with NoSuchUpload when upload entry is missing (review feedback on #9280)

CopyObjectPartHandler's earlier checkUploadId call only verifies that
the uploadID's hash prefix matches dstObject; it does not prove the
upload directory exists in the filer. The previous logic silently
swallowed filer_pb.ErrNotFound from getEntry(uploadDir) and fell
through with uploadEntry=nil, which then skipped the destination
SSE check and could route a plain-source copy through the raw-byte
fast path even though the destination's encryption state is unknown.

Treat ErrNotFound as ErrNoSuchUpload so the client sees the right
status, matching the AWS S3 contract for UploadPartCopy on a
non-existent upload.

Found by coderabbitai review on PR #9280.

* feat(s3api): set SSE response headers on UploadPartCopy slow path (review feedback on #9280)

PutObjectPartHandler writes x-amz-server-side-encryption (and the KMS
key-id header for SSE-KMS) on every successful part response so clients
can confirm the destination's encryption state. The new UploadPartCopy
slow path was missing this — it returned only the ETag in the response
body and no SSE response headers.

Plumb putToFiler's SSEResponseMetadata back through
copyObjectPartViaReencryption to the handler, then call
setSSEResponseHeaders before writing the XML response, matching the
PutObjectPart contract.

Found by gemini-code-assist review on PR #9280.

* fix(s3api): map transient filer errors on UploadPartCopy upload-entry fetch to 503 (review feedback on #9280)

Earlier non-ErrNotFound errors from getEntry(uploadDir, uploadID) all
returned 500 InternalError, which most SDKs treat as fatal — even
though a transient filer outage (gRPC Unavailable, leader election in
flight, deadline exceeded) is exactly the kind of failure SDK retry
logic is supposed to recover from.

Add an isTransientFilerError helper that recognises:
  - context.DeadlineExceeded / context.Canceled
  - gRPC codes.Unavailable, DeadlineExceeded, ResourceExhausted, Aborted

When the upload-entry fetch fails for one of those reasons, return
503 ServiceUnavailable so the client retries; everything else still
maps to 500. Log line now also carries dstObject (in addition to
dstBucket and uploadID) to make incident triage easier.

Found by gemini-code-assist review on PR #9280.
This commit is contained in:
Chris Lu
2026-04-29 09:46:44 -07:00
committed by GitHub
parent e82789ea4b
commit 82cf60a44f
4 changed files with 980 additions and 0 deletions

View File

@@ -0,0 +1,310 @@
package sse_test
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"sync"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/require"
)
// TestSSES3ConcurrentMultipartDigestIntegration mirrors the production
// scenario reported in issue #8908: Docker Registry pushes several large
// container images in parallel, each push being itself a multipart upload of
// many ~5MB parts under bucket-default SSE-S3. Registry pulls fail with
// "Digest did not match" because the SHA-256 of the streamed GET body does
// not match the SHA-256 of what was uploaded.
//
// Coverage that the existing TestSSEMultipartManyChunksIntegration does NOT
// have:
// - bucket-default SSE-S3 (not explicit per-request headers)
// - multiple objects uploaded concurrently
// - parts within each object uploaded concurrently
// - both full-body GET (Docker Registry) and chunked range GETs (kubelet/CRI)
//
// If the SHA over the streamed body ever differs from the SHA over the
// uploaded bytes, the test fails with the exact bucket/key, expected SHA,
// and actual SHA — the same shape Docker Registry surfaces on pull.
//
// The function name ends in "Integration" so it is matched by the existing
// `TestSSE.*Integration` pattern in test/s3/sse/Makefile and the
// `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml,
// so the regression coverage runs automatically in CI.
func TestSSES3ConcurrentMultipartDigestIntegration(t *testing.T) {
ctx := context.Background()
client, err := createS3Client(ctx, defaultConfig)
require.NoError(t, err, "Failed to create S3 client")
bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-concurrent-")
require.NoError(t, err, "Failed to create test bucket")
defer cleanupTestBucket(ctx, client, bucketName)
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
Bucket: aws.String(bucketName),
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
Rules: []types.ServerSideEncryptionRule{
{
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
SSEAlgorithm: types.ServerSideEncryptionAes256,
},
},
},
},
})
require.NoError(t, err, "Failed to set bucket default SSE-S3 encryption")
const (
numObjects = 5 // mirrors "5 images at a time"
partsPerObject = 12 // 12 parts × 5MB = 60MB per blob; total 5×60=300MB
partSize = 5 * 1024 * 1024 // S3 minimum part size
uploadParallel = 8 // mirror typical S3 SDK transfer manager concurrency
iterations = 2 // run twice to flush out flakiness without bloating CI
)
type blob struct {
key string
parts [][]byte
full []byte
fullSHA [32]byte
}
makeBlobs := func(iter int) []blob {
blobs := make([]blob, numObjects)
for i := range blobs {
parts := make([][]byte, partsPerObject)
for j := range parts {
parts[j] = generateTestData(partSize)
}
full := bytes.Join(parts, nil)
blobs[i] = blob{
key: fmt.Sprintf("iter%02d-blob-%02d", iter, i),
parts: parts,
full: full,
fullSHA: sha256.Sum256(full),
}
}
return blobs
}
uploadOne := func(b blob) error {
createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(b.key),
})
if err != nil {
return fmt.Errorf("create multipart for %s: %w", b.key, err)
}
uploadID := aws.ToString(createResp.UploadId)
completedParts := make([]types.CompletedPart, partsPerObject)
var (
wg sync.WaitGroup
partErr error
partMu sync.Mutex
)
sem := make(chan struct{}, uploadParallel)
for i := 0; i < partsPerObject; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
resp, err := client.UploadPart(ctx, &s3.UploadPartInput{
Bucket: aws.String(bucketName),
Key: aws.String(b.key),
PartNumber: aws.Int32(int32(i + 1)),
UploadId: aws.String(uploadID),
Body: bytes.NewReader(b.parts[i]),
})
if err != nil {
partMu.Lock()
if partErr == nil {
partErr = fmt.Errorf("upload part %d of %s: %w", i+1, b.key, err)
}
partMu.Unlock()
return
}
completedParts[i] = types.CompletedPart{
ETag: resp.ETag,
PartNumber: aws.Int32(int32(i + 1)),
}
}(i)
}
wg.Wait()
if partErr != nil {
_, _ = client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucketName), Key: aws.String(b.key), UploadId: aws.String(uploadID),
})
return partErr
}
_, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(b.key),
UploadId: aws.String(uploadID),
MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
})
if err != nil {
return fmt.Errorf("complete multipart for %s: %w", b.key, err)
}
return nil
}
type result struct {
key string
expected string
actual string
gotSize int64
wantSize int64
readErr error
}
verifyFullGET := func(b blob) result {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(b.key),
})
if err != nil {
return result{key: "FULL " + b.key, readErr: fmt.Errorf("GetObject: %w", err)}
}
defer resp.Body.Close()
h := sha256.New()
n, copyErr := io.Copy(h, resp.Body)
var actual [32]byte
copy(actual[:], h.Sum(nil))
return result{
key: "FULL " + b.key,
expected: hex.EncodeToString(b.fullSHA[:]),
actual: hex.EncodeToString(actual[:]),
gotSize: n,
wantSize: int64(len(b.full)),
readErr: copyErr,
}
}
verifyRangeGET := func(b blob) result {
// 1MB windows. Includes a window that crosses the 8MB internal-chunk
// boundary (chunkSize in putToFiler) — the historical danger zone for
// SSE-S3 multipart range reads.
const window = 1024 * 1024
h := sha256.New()
var totalN int64
objectSize := int64(len(b.full))
for offset := int64(0); offset < objectSize; offset += window {
end := offset + window - 1
if end >= objectSize {
end = objectSize - 1
}
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(b.key),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, end)),
})
if err != nil {
return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range GetObject [%d-%d]: %w", offset, end, err)}
}
n, copyErr := io.Copy(h, resp.Body)
resp.Body.Close()
totalN += n
if copyErr != nil {
return result{key: "RANGE " + b.key, readErr: fmt.Errorf("Range copy [%d-%d]: %w", offset, end, copyErr)}
}
}
var actual [32]byte
copy(actual[:], h.Sum(nil))
return result{
key: "RANGE " + b.key,
expected: hex.EncodeToString(b.fullSHA[:]),
actual: hex.EncodeToString(actual[:]),
gotSize: totalN,
wantSize: objectSize,
}
}
var allFailures []string
for iter := 0; iter < iterations; iter++ {
blobs := makeBlobs(iter)
// Upload all N blobs in parallel.
{
var (
wg sync.WaitGroup
errMu sync.Mutex
errors []error
)
for i := range blobs {
wg.Add(1)
go func(i int) {
defer wg.Done()
if err := uploadOne(blobs[i]); err != nil {
errMu.Lock()
errors = append(errors, err)
errMu.Unlock()
}
}(i)
}
wg.Wait()
require.Empty(t, errors, "iter %d: concurrent multipart uploads must succeed", iter)
}
// Two passes (full body + range), all blobs verified concurrently.
results := make([]result, 0, 2*numObjects)
var resultMu sync.Mutex
var wg sync.WaitGroup
for i := range blobs {
wg.Add(2)
go func(b blob) {
defer wg.Done()
r := verifyFullGET(b)
resultMu.Lock()
results = append(results, r)
resultMu.Unlock()
}(blobs[i])
go func(b blob) {
defer wg.Done()
r := verifyRangeGET(b)
resultMu.Lock()
results = append(results, r)
resultMu.Unlock()
}(blobs[i])
}
wg.Wait()
for _, r := range results {
if r.readErr != nil {
allFailures = append(allFailures, fmt.Sprintf("[iter %d %s] read error: %v (got %d / want %d bytes)",
iter, r.key, r.readErr, r.gotSize, r.wantSize))
continue
}
if r.expected != r.actual {
allFailures = append(allFailures, fmt.Sprintf(
"[iter %d %s] DIGEST MISMATCH: expected sha256:%s, got sha256:%s (got %d / want %d bytes)",
iter, r.key, r.expected, r.actual, r.gotSize, r.wantSize))
}
}
}
if len(allFailures) > 0 {
t.Fatalf("%d concurrent SSE-S3 multipart blob digest mismatches across %d iterations:\n %s",
len(allFailures), iterations, joinFailures(allFailures))
}
}
func joinFailures(ss []string) string {
out := ""
for i, s := range ss {
if i > 0 {
out += "\n "
}
out += s
}
return out
}

View File

@@ -0,0 +1,190 @@
package sse_test
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"strings"
"testing"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/stretchr/testify/require"
)
// TestSSES3MultipartUploadPartCopyIntegration pins the fix for issue #8908.
//
// Docker Registry's S3 storage driver finalizes blob uploads via a server-side
// "Move" pattern: a streaming PUT/multipart upload to a temporary key, then
// CreateMultipartUpload + UploadPartCopy(s) + CompleteMultipartUpload to put
// the bytes at the final blob path. Under bucket-default SSE-S3, every push
// goes through this UploadPartCopy step.
//
// Before the fix, copyChunksForRange did a raw byte copy that left the
// destination's part chunks SseType=NONE. Then completedMultipartChunk
// (PR #9224) saw NONE chunks in an SSE-S3 multipart upload and "backfilled"
// SSE-S3 metadata with IVs derived from the destination upload's baseIV. But
// the bytes on disk had been encrypted with the SOURCE upload's key+baseIV,
// so the read path decrypted with the wrong IV — yielding deterministic byte
// corruption on GET (the "Digest did not match" symptom kubelet surfaces).
//
// This test reproduces the exact shape: a 39MB plaintext source (single
// PutObject — auto-chunked into multiple internal SSE-S3 chunks on disk
// because of bucket-default SSE-S3), then a fresh multipart upload at a new
// destination key with two UploadPartCopy parts (32MB + 7MB) and Complete.
// The full GET must SHA back to what was uploaded.
//
// The function name contains both "Multipart" and "Integration" so it is matched
// by the `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml
// and the `TestSSE.*Integration` pattern in test/s3/sse/Makefile, ensuring this
// regression coverage runs in CI.
func TestSSES3MultipartUploadPartCopyIntegration(t *testing.T) {
ctx := context.Background()
client, err := createS3Client(ctx, defaultConfig)
require.NoError(t, err, "Failed to create S3 client")
bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-uploadpartcopy-")
require.NoError(t, err, "Failed to create test bucket")
defer cleanupTestBucket(ctx, client, bucketName)
// Bucket-default SSE-S3 — same setup Docker Registry uses.
_, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
Bucket: aws.String(bucketName),
ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
Rules: []types.ServerSideEncryptionRule{
{
ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
SSEAlgorithm: types.ServerSideEncryptionAes256,
},
},
},
},
})
require.NoError(t, err, "Failed to set bucket default SSE-S3")
// Source: 39MB single PutObject (auto-chunked internally into 5 SSE-S3 chunks).
const sourceSize = 39 * 1024 * 1024
sourceData := generateTestData(sourceSize)
expectedSHA := sha256.Sum256(sourceData)
_, err = client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String("source-blob"),
Body: bytes.NewReader(sourceData),
})
require.NoError(t, err, "Failed to upload source object")
// Sanity check: the source itself must round-trip correctly.
{
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String("source-blob"),
})
require.NoError(t, err, "Failed to GET source")
got, err := io.ReadAll(resp.Body)
resp.Body.Close()
require.NoError(t, err)
require.Equal(t, expectedSHA, sha256.Sum256(got), "source object must round-trip")
}
cases := []struct {
name string
// part definitions: each entry is (start, end) byte range to copy.
parts [][2]int64
}{
{
// Docker Registry's typical Move shape for blobs around 40MB:
// one 32MB part + one tail part. This is exactly the metadata
// shape the user reported (5 dst chunks across 2 multipart parts).
name: "DockerRegistry_32MB_Plus_Tail",
parts: [][2]int64{
{0, 32*1024*1024 - 1},
{32 * 1024 * 1024, sourceSize - 1},
},
},
{
// Single full-object UploadPartCopy.
name: "Single_Full_Object_Copy",
parts: [][2]int64{
{0, sourceSize - 1},
},
},
{
// Many small range-copies — exercises the per-part-local-offset
// IV math under varied chunk-overlap shapes.
name: "Many_5MB_Ranges",
parts: [][2]int64{
{0, 5*1024*1024 - 1},
{5 * 1024 * 1024, 10*1024*1024 - 1},
{10 * 1024 * 1024, 15*1024*1024 - 1},
{15 * 1024 * 1024, 20*1024*1024 - 1},
{20 * 1024 * 1024, sourceSize - 1},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
dstKey := "dest-" + strings.ToLower(strings.ReplaceAll(tc.name, "_", "-"))
createResp, err := client.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(dstKey),
})
require.NoError(t, err, "CreateMultipartUpload")
uploadID := aws.ToString(createResp.UploadId)
completedParts := make([]types.CompletedPart, 0, len(tc.parts))
for i, rng := range tc.parts {
partNumber := int32(i + 1)
resp, err := client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(bucketName),
Key: aws.String(dstKey),
PartNumber: aws.Int32(partNumber),
UploadId: aws.String(uploadID),
CopySource: aws.String(bucketName + "/source-blob"),
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", rng[0], rng[1])),
})
require.NoErrorf(t, err, "UploadPartCopy part %d range=[%d,%d]", partNumber, rng[0], rng[1])
completedParts = append(completedParts, types.CompletedPart{
ETag: resp.CopyPartResult.ETag,
PartNumber: aws.Int32(partNumber),
})
}
_, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(dstKey),
UploadId: aws.String(uploadID),
MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts},
})
require.NoError(t, err, "CompleteMultipartUpload")
// Two-pass verification: full GET (Docker Registry / Kubelet shape).
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(dstKey),
})
require.NoError(t, err, "GetObject")
defer resp.Body.Close()
h := sha256.New()
n, err := io.Copy(h, resp.Body)
require.NoError(t, err, "stream GET body")
require.Equal(t, int64(sourceSize), n, "GET length")
var actual [32]byte
copy(actual[:], h.Sum(nil))
require.Equalf(t, expectedSHA, actual,
"UploadPartCopy SHA mismatch (#8908):\n expected sha256:%s\n got sha256:%s\n parts=%d size=%d",
hex.EncodeToString(expectedSHA[:]),
hex.EncodeToString(actual[:]),
len(tc.parts), n)
})
}
}

View File

@@ -714,6 +714,61 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
}
}
// Fetch the destination upload entry to determine whether the multipart
// upload was created with SSE configured. If either side has SSE, the
// fast raw-byte chunk copy below would leave destination chunks tagged
// inconsistently with the bytes on disk and trigger #8908's deterministic
// byte corruption on GET. Re-encrypt the source bytes in that case so
// destination chunks come out properly tagged.
//
// checkUploadId above only verifies that the uploadID's hash prefix
// matches dstObject; it does NOT prove the upload directory exists.
// Treat a missing upload entry as NoSuchUpload — falling through with
// uploadEntry=nil would silently skip the SSE check on the destination
// side and could send a plain-source copy through the raw-byte fast
// path even though the destination's encryption state is unknown.
uploadEntry, uploadEntryErr := s3a.getEntry(s3a.genUploadsFolder(dstBucket), uploadID)
if uploadEntryErr != nil {
if errors.Is(uploadEntryErr, filer_pb.ErrNotFound) {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
glog.Errorf("CopyObjectPartHandler: failed to fetch upload entry for %s/%s uploadID=%s: %v",
dstBucket, dstObject, uploadID, uploadEntryErr)
// Distinguish transient from permanent errors: gRPC Unavailable
// (filer briefly unreachable, leader election in flight, etc.) and
// DeadlineExceeded both indicate the client should retry rather than
// give up. Map them to 503 ServiceUnavailable; everything else stays
// as 500 InternalError.
if isTransientFilerError(uploadEntryErr) {
s3err.WriteErrorResponse(w, r, s3err.ErrServiceUnavailable)
return
}
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if uploadEntryHasSSE(uploadEntry) || sourceEntryHasSSE(entry) {
etag, sseMetadata, errCode := s3a.copyObjectPartViaReencryption(r, entry, startOffset, endOffset, dstBucket, uploadID, partID, uploadEntry)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
setEtag(w, "\""+strings.Trim(etag, "\"")+"\"")
// Mirror PutObjectPartHandler: write x-amz-server-side-encryption /
// x-amz-server-side-encryption-aws-kms-key-id headers on the response
// so clients can see the destination's encryption state.
s3a.setSSEResponseHeaders(w, r, sseMetadata)
writeSuccessResponseXML(w, r, CopyPartResult{
ETag: etag,
LastModified: t,
})
return
}
// Fast path: neither source nor destination has SSE. Raw byte copy is
// safe, since the bytes on disk are plaintext on both sides.
// Create new entry for the part
// Calculate part size, avoiding underflow for invalid ranges
partSize := uint64(0)

View File

@@ -0,0 +1,425 @@
package s3api
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)
// errCopySourceSSEUnsupported is returned by openSourcePlaintextReader when
// the source object's SSE type is not yet implemented in the UploadPartCopy
// slow path. Callers map it to a 501 NotImplemented S3 response so clients
// can distinguish "we will not handle this shape" from "the server failed".
var errCopySourceSSEUnsupported = errors.New("UploadPartCopy source SSE type not yet supported")
// isTransientFilerError reports whether an error talking to the filer is
// retryable from the client's perspective (filer briefly unreachable, leader
// election in flight, deadline exceeded, etc.). Such errors should map to a
// 503 ServiceUnavailable response so SDK retry logic engages, rather than a
// 500 InternalError which most clients treat as fatal.
func isTransientFilerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return true
}
if s, ok := status.FromError(err); ok {
switch s.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted:
return true
}
}
return false
}
// uploadEntryHasSSE reports whether the multipart upload entry was created
// with any server-side encryption configured (SSE-S3 or SSE-KMS — explicit at
// CreateMultipartUpload time or applied as bucket default). It is used to
// decide whether UploadPartCopy must re-encrypt source bytes for the
// destination, rather than copying them as raw bytes (the fast path).
func uploadEntryHasSSE(uploadEntry *filer_pb.Entry) bool {
if uploadEntry == nil || uploadEntry.Extended == nil {
return false
}
if _, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; ok {
return true
}
if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSES3Encryption]; ok && string(v) == s3_constants.SSEAlgorithmAES256 {
return true
}
return false
}
// sourceEntryHasSSE reports whether the source object's chunks are SSE
// ciphertext on disk and therefore cannot be raw-copied — they must be
// decrypted on read.
func sourceEntryHasSSE(srcEntry *filer_pb.Entry) bool {
if srcEntry == nil {
return false
}
for _, c := range srcEntry.GetChunks() {
if c.GetSseType() != filer_pb.SSEType_NONE {
return true
}
}
if srcEntry.Extended != nil {
if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key]; ok {
return true
}
if _, ok := srcEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; ok {
return true
}
if _, ok := srcEntry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; ok {
return true
}
}
return false
}
// readCloserAdapter pairs an arbitrary io.Reader with an io.Closer so callers
// can release the original underlying source even when the inner Reader (e.g.
// cipher.StreamReader, io.LimitReader) does not implement io.Closer.
type readCloserAdapter struct {
io.Reader
closer io.Closer
}
func (r *readCloserAdapter) Close() error {
if r.closer == nil {
return nil
}
return r.closer.Close()
}
// openSourcePlaintextReader returns a reader yielding the source object's
// plaintext bytes for [startOffset, endOffset], applying any necessary SSE
// decryption based on the source entry's metadata.
//
// Used by CopyObjectPartHandler when source or destination is SSE-encrypted:
// the fast raw-chunk-copy path leaves destination chunks SseType=NONE and
// completedMultipartChunk's NONE→SSE_S3 backfill (PR #9224) then writes
// destination-baseIV-derived metadata onto bytes that were actually encrypted
// with the source's key — producing deterministic byte corruption on GET (#8908).
//
// Returns errCopySourceSSEUnsupported when the source's SSE type is not yet
// implemented in this slow path (SSE-KMS, SSE-C). Callers should map that
// sentinel to a 501 NotImplemented S3 response rather than collapsing it to
// 500 InternalError, so clients can distinguish "we will not handle this
// shape" from "the server failed".
func (s3a *S3ApiServer) openSourcePlaintextReader(
ctx context.Context,
srcEntry *filer_pb.Entry,
startOffset, endOffset int64,
) (io.ReadCloser, error) {
if srcEntry == nil {
return nil, fmt.Errorf("nil source entry")
}
if endOffset < startOffset {
return io.NopCloser(io.LimitReader(emptyReader{}, 0)), nil
}
sliceLen := endOffset - startOffset + 1
switch s3a.detectPrimarySSEType(srcEntry) {
case s3_constants.SSETypeS3:
return s3a.openSSES3SourcePlaintextReader(ctx, srcEntry, startOffset, sliceLen)
case s3_constants.SSETypeKMS:
return nil, fmt.Errorf("%w: UploadPartCopy from SSE-KMS source", errCopySourceSSEUnsupported)
case s3_constants.SSETypeC:
return nil, fmt.Errorf("%w: UploadPartCopy from SSE-C source", errCopySourceSSEUnsupported)
default:
// Unencrypted source: stream raw bytes and apply range.
raw, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry)
if err != nil {
return nil, fmt.Errorf("open unencrypted source: %w", err)
}
return applyRange(raw, startOffset, sliceLen)
}
}
// openSSES3SourcePlaintextReader builds a decrypted reader for an SSE-S3
// source. It reuses buildMultipartSSES3Reader, which decrypts each chunk
// independently using its per-chunk metadata — correct for both multipart-SSE
// objects (multiple SSE-S3 chunks) and single-part SSE-S3 objects whose single
// chunk also carries per-chunk metadata after PR #9211.
//
// For older single-part SSE-S3 objects whose chunks lack per-chunk metadata,
// this falls back to the entry-level SSE-S3 key + the entry's stored IV,
// matching the read path's single-part fallback.
func (s3a *S3ApiServer) openSSES3SourcePlaintextReader(
ctx context.Context,
srcEntry *filer_pb.Entry,
startOffset, sliceLen int64,
) (io.ReadCloser, error) {
chunks := srcEntry.GetChunks()
hasPerChunkSSE := false
for _, c := range chunks {
if c.GetSseType() == filer_pb.SSEType_SSE_S3 && len(c.GetSseMetadata()) > 0 {
hasPerChunkSSE = true
break
}
}
if hasPerChunkSSE {
sortedChunks := make([]*filer_pb.FileChunk, len(chunks))
copy(sortedChunks, chunks)
sort.Slice(sortedChunks, func(i, j int) bool {
return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset()
})
decReader, err := buildMultipartSSES3Reader(
sortedChunks,
GetSSES3KeyManager(),
func(c *filer_pb.FileChunk) (io.ReadCloser, error) {
return s3a.createEncryptedChunkReader(ctx, c)
},
)
if err != nil {
return nil, fmt.Errorf("build SSE-S3 source reader: %w", err)
}
// buildMultipartSSES3Reader returns a *lazyMultipartChunkReader whose
// Close() releases the live chunk body. Use it as the closer.
var closer io.Closer
if rc, ok := decReader.(io.Closer); ok {
closer = rc
}
return applyRange(&readCloserAdapter{Reader: decReader, closer: closer}, startOffset, sliceLen)
}
// Legacy single-part fallback: entry-level SeaweedFSSSES3Key + entry IV.
keyData, ok := srcEntry.Extended[s3_constants.SeaweedFSSSES3Key]
if !ok || len(keyData) == 0 {
return nil, fmt.Errorf("SSE-S3 source has no per-chunk metadata and no entry-level SSE-S3 key")
}
keyManager := GetSSES3KeyManager()
sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
if err != nil {
return nil, fmt.Errorf("deserialize entry-level SSE-S3 key: %w", err)
}
iv, err := GetSSES3IV(srcEntry, sseS3Key, keyManager)
if err != nil {
return nil, fmt.Errorf("get SSE-S3 IV: %w", err)
}
encStream, err := s3a.getEncryptedStreamFromVolumes(ctx, srcEntry)
if err != nil {
return nil, fmt.Errorf("open ciphertext source: %w", err)
}
dec, err := CreateSSES3DecryptedReader(encStream, sseS3Key, iv)
if err != nil {
encStream.Close()
return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", err)
}
rc, ok := dec.(io.ReadCloser)
if !ok {
rc = &readCloserAdapter{Reader: dec, closer: encStream}
}
return applyRange(rc, startOffset, sliceLen)
}
// applyRange skips startOffset bytes from src and limits the result to
// sliceLen bytes. The returned ReadCloser closes the underlying source.
func applyRange(src io.ReadCloser, startOffset, sliceLen int64) (io.ReadCloser, error) {
if startOffset > 0 {
if _, err := io.CopyN(io.Discard, src, startOffset); err != nil {
src.Close()
return nil, fmt.Errorf("skip to range start %d: %w", startOffset, err)
}
}
if sliceLen <= 0 {
return &readCloserAdapter{Reader: io.LimitReader(src, 0), closer: src}, nil
}
return &readCloserAdapter{Reader: io.LimitReader(src, sliceLen), closer: src}, nil
}
// emptyReader yields no bytes. Used for empty-range UploadPartCopy.
type emptyReader struct{}
func (emptyReader) Read([]byte) (int, error) { return 0, io.EOF }
// applyDestSSEHeadersToCopyRequest stages the destination's SSE setup on the
// (cloned) request so that putToFiler's existing handleAllSSEEncryption picks
// it up. The upload-entry markers (laid down at CreateMultipartUpload) bind
// every part of the upload to the same key+baseIV, matching PutObjectPart.
func (s3a *S3ApiServer) applyDestSSEHeadersToCopyRequest(
r *http.Request, uploadEntry *filer_pb.Entry, uploadID string,
) error {
if uploadEntry == nil || uploadEntry.Extended == nil {
return nil
}
if keyIDBytes, hasKMS := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSKeyID]; hasKMS {
// Mirror the SSE-KMS branch of PutObjectPartHandler: stage
// X-Amz-Server-Side-Encryption=aws:kms plus the key ID, encryption
// context, bucket-key flag and base IV onto the request.
keyID := string(keyIDBytes)
bucketKeyEnabled := false
if v, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled]; ok && string(v) == "true" {
bucketKeyEnabled = true
}
var encryptionContext map[string]string
if cb, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext]; ok {
if err := json.Unmarshal(cb, &encryptionContext); err != nil {
glog.Errorf("UploadPartCopy: failed to parse SSE-KMS context for upload %s: %v", uploadID, err)
encryptionContext = nil
}
}
if len(encryptionContext) == 0 {
// Bucket and object are populated on the cloned request; reuse
// the same builder PutObjectPartHandler does.
bucket, object := s3_constants.GetBucketAndObject(r)
encryptionContext = BuildEncryptionContext(bucket, object, bucketKeyEnabled)
}
var baseIV []byte
if ivBytes, ok := uploadEntry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV]; ok {
decoded, decErr := base64.StdEncoding.DecodeString(string(ivBytes))
if decErr != nil || len(decoded) != s3_constants.AESBlockSize {
return fmt.Errorf("invalid SSE-KMS base IV on upload %s", uploadID)
}
baseIV = decoded
} else {
return fmt.Errorf("no SSE-KMS base IV on upload %s", uploadID)
}
r.Header.Set(s3_constants.AmzServerSideEncryption, "aws:kms")
r.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
if bucketKeyEnabled {
r.Header.Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
}
if len(encryptionContext) > 0 {
if cj, err := json.Marshal(encryptionContext); err == nil {
r.Header.Set(s3_constants.AmzServerSideEncryptionContext, base64.StdEncoding.EncodeToString(cj))
}
}
r.Header.Set(s3_constants.SeaweedFSSSEKMSBaseIVHeader, base64.StdEncoding.EncodeToString(baseIV))
return nil
}
// SSE-S3 path: reuse the existing PutObjectPart helper unchanged. It is
// pure header manipulation on r and does not touch S3ApiServer state.
return s3a.handleSSES3MultipartHeaders(r, uploadEntry, uploadID)
}
// fakeContentRequest builds a minimal request representing "PUT this body" for
// the multipart-part write path used by UploadPartCopy. It clones the original
// request's headers (so things like AmzAccountId carry over) and clears the
// copy-only headers; SSE setup is added later by applyDestSSEHeadersToCopyRequest.
func fakeContentRequest(orig *http.Request, body io.ReadCloser, contentLength int64) *http.Request {
cloned := orig.Clone(orig.Context())
cloned.Body = body
cloned.ContentLength = contentLength
if cloned.Header == nil {
cloned.Header = http.Header{}
} else {
cloned.Header = cloned.Header.Clone()
}
cloned.Header.Set("Content-Length", strconv.FormatInt(contentLength, 10))
cloned.Header.Del("X-Amz-Copy-Source")
cloned.Header.Del("X-Amz-Copy-Source-Range")
cloned.Header.Del("X-Amz-Metadata-Directive")
cloned.Header.Del("X-Amz-Tagging-Directive")
// Content-Md5 cannot be reproduced from the source plaintext without
// streaming it once first; clear it so putToFiler doesn't validate.
cloned.Header.Del("Content-Md5")
return cloned
}
// copyObjectPartViaReencryption implements the slow path of UploadPartCopy when
// either the source object is SSE-encrypted or the destination multipart upload
// is configured for SSE encryption. It:
//
// 1. Opens a plaintext reader of the source range (decrypting if needed).
// 2. Stages the destination's SSE-S3 / SSE-KMS multipart headers on a cloned
// request so handleAllSSEEncryption (called from putToFiler) routes the
// body through the matching multipart-encryption helper.
// 3. Calls putToFiler with the plaintext reader, which encrypts using the
// destination upload session's key+baseIV (consistent with PutObjectPart),
// auto-chunks, and writes the part entry with proper per-chunk SSE metadata.
//
// Without this path, copyChunksForRange's raw byte copy leaves destination
// chunks SseType=NONE; completedMultipartChunk then "backfills" SSE-S3 metadata
// with destination-baseIV-derived IVs, but the bytes on disk were encrypted
// with the source's key — yielding deterministic byte corruption on GET (#8908).
func (s3a *S3ApiServer) copyObjectPartViaReencryption(
r *http.Request,
srcEntry *filer_pb.Entry,
startOffset, endOffset int64,
dstBucket, uploadID string,
partID int,
uploadEntry *filer_pb.Entry,
) (etag string, sseMetadata SSEResponseMetadata, errCode s3err.ErrorCode) {
if endOffset < startOffset {
tag, code := s3a.writeEmptyCopyPart(dstBucket, uploadID, partID)
return tag, SSEResponseMetadata{}, code
}
sliceLen := endOffset - startOffset + 1
srcReader, err := s3a.openSourcePlaintextReader(r.Context(), srcEntry, startOffset, endOffset)
if err != nil {
glog.Errorf("UploadPartCopy: open source plaintext reader: %v", err)
// Distinguish "we will not handle this shape" (501) from "the server
// failed" (500). SSE-KMS / SSE-C source support in this slow path is
// staged work; the explicit error lets clients see it as a feature
// gap rather than a server fault.
if errors.Is(err, errCopySourceSSEUnsupported) {
return "", SSEResponseMetadata{}, s3err.ErrNotImplemented
}
return "", SSEResponseMetadata{}, s3err.ErrInternalError
}
defer srcReader.Close()
cloned := fakeContentRequest(r, srcReader, sliceLen)
if err := s3a.applyDestSSEHeadersToCopyRequest(cloned, uploadEntry, uploadID); err != nil {
glog.Errorf("UploadPartCopy: apply destination SSE headers: %v", err)
return "", SSEResponseMetadata{}, s3err.ErrInternalError
}
// Surface putToFiler's SSE response metadata to the caller so the handler
// can mirror PutObjectPart's behavior of writing
// x-amz-server-side-encryption / x-amz-server-side-encryption-aws-kms-key-id
// on the UploadPartCopy response. Without this, clients have no way to
// see that the destination was encrypted.
filePath := s3a.genPartUploadPath(dstBucket, uploadID, partID)
tag, code, putSSE := s3a.putToFiler(cloned, filePath, srcReader, dstBucket, "", partID, nil)
if code != s3err.ErrNone {
return "", SSEResponseMetadata{}, code
}
return tag, putSSE, s3err.ErrNone
}
// writeEmptyCopyPart writes a 0-byte part entry for an empty UploadPartCopy
// range, mirroring the legacy fast path's handling of endOffset < startOffset.
func (s3a *S3ApiServer) writeEmptyCopyPart(dstBucket, uploadID string, partID int) (string, s3err.ErrorCode) {
uploadDir := s3a.genUploadsFolder(dstBucket) + "/" + uploadID
partName := fmt.Sprintf("%04d_%s.part", partID, "copy")
if exists, _ := s3a.exists(uploadDir, partName, false); exists {
if err := s3a.rm(uploadDir, partName, false, false); err != nil {
return "", s3err.ErrInternalError
}
}
if err := s3a.mkFile(uploadDir, partName, nil, func(e *filer_pb.Entry) {
if e.Attributes == nil {
e.Attributes = &filer_pb.FuseAttributes{}
}
e.Attributes.FileSize = 0
}); err != nil {
return "", s3err.ErrInternalError
}
const emptyMD5Hex = "d41d8cd98f00b204e9800998ecf8427e"
return emptyMD5Hex, s3err.ErrNone
}