diff --git a/test/s3/sse/s3_sse_integration_test.go b/test/s3/sse/s3_sse_integration_test.go index 1a4576691..b458abcde 100644 --- a/test/s3/sse/s3_sse_integration_test.go +++ b/test/s3/sse/s3_sse_integration_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/md5" "crypto/rand" + "crypto/sha256" "encoding/base64" "fmt" "io" @@ -969,6 +970,87 @@ func uploadAndVerifyMultipartSSEObject(t *testing.T, ctx context.Context, client } } +// TestSSEMultipartManyChunksIntegration pins the end-to-end fix for issue +// #8908. A Docker Registry blob upload typically produces a multipart upload +// with many small parts (5MB each) that totals 100MB+. After the per-chunk +// metadata fix in #9211 and the completion backfill in #9224, the remaining +// failure mode reported in #8908 was that GET would return truncated bytes — +// Docker registry then computed a SHA over the truncated bytes and reported +// "Digest did not match." The root cause was that buildMultipartSSES3Reader +// (and its SSE-KMS / SSE-C peers) opened a volume-server HTTP connection for +// EVERY chunk upfront, then walked them with io.MultiReader; later chunks' +// connections sat idle while earlier chunks were being consumed and could be +// closed by the volume server's keep-alive logic under load, producing +// unexpected EOFs at the S3 client. +// +// This test mirrors that shape: 25 parts of 5MB each (125MB total, 25 +// internal chunks since each part is below the 8MB internal chunk size) with +// bucket-default SSE-S3. The full GET must return exactly the bytes we +// uploaded, with the SHA-256 matching. The lazy chunk reader keeps at most +// one volume-server HTTP connection open at a time, which both eliminates the +// idle-connection failure mode and makes resource usage proportional to one +// chunk regardless of object size. +// +// The function name ends in "Integration" so it is matched by the existing +// `.*Multipart.*Integration` pattern in .github/workflows/s3-sse-tests.yml +// (and the `TestSSE.*Integration` pattern in test/s3/sse/Makefile's `test` +// target), so this regression coverage is run automatically in CI. +func TestSSEMultipartManyChunksIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "Failed to create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"sse-s3-many-chunks-") + require.NoError(t, err, "Failed to create test bucket") + defer cleanupTestBucket(ctx, client, bucketName) + + _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{ + Bucket: aws.String(bucketName), + ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{ + Rules: []types.ServerSideEncryptionRule{ + { + ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{ + SSEAlgorithm: types.ServerSideEncryptionAes256, + }, + }, + }, + }, + }) + require.NoError(t, err, "Failed to set bucket default SSE-S3 encryption") + + const numParts = 25 + const partSize = 5 * 1024 * 1024 // S3 minimum part size + parts := make([][]byte, numParts) + for i := range parts { + parts[i] = generateTestData(partSize) + } + expected := bytes.Join(parts, nil) + expectedHash := sha256.Sum256(expected) + + uploadAndVerifyMultipartSSEObject(t, ctx, client, bucketName, "many-chunks-blob", parts, multipartSSEOptions{ + verifyGet: func(resp *s3.GetObjectOutput) { + assert.Equal(t, types.ServerSideEncryptionAes256, resp.ServerSideEncryption) + }, + }) + + // Re-fetch and verify SHA-256 of the entire stream matches what we uploaded. + // uploadAndVerifyMultipartSSEObject already does a byte-equal check, but + // hashing is what Docker Registry actually does on pull, so pinning that + // path here is the most faithful reproduction of #8908's symptom. + getResp, err := client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucketName), + Key: aws.String("many-chunks-blob"), + }) + require.NoError(t, err, "Failed to GET many-chunks-blob for SHA verification") + defer getResp.Body.Close() + h := sha256.New() + n, err := io.Copy(h, getResp.Body) + require.NoError(t, err, "Streaming GET body to SHA hasher must not error (this is the #8908 truncation symptom)") + assert.Equal(t, int64(len(expected)), n, "GET stream returned %d bytes, expected %d (truncation reproduces #8908)", n, len(expected)) + assert.Equal(t, expectedHash, sha256.Sum256(expected), "sanity") // tautology for clarity + assert.Equal(t, expectedHash, [32]byte(h.Sum(nil)), "SHA-256 of GET stream must match SHA-256 of uploaded bytes (this is exactly the digest check Docker Registry does)") +} + // TestDebugSSEMultipart helps debug the multipart SSE-KMS data mismatch func TestDebugSSEMultipart(t *testing.T) { ctx := context.Background() diff --git a/test/s3/sse/s3_sse_range_coverage_test.go b/test/s3/sse/s3_sse_range_coverage_test.go new file mode 100644 index 000000000..1e839d4c9 --- /dev/null +++ b/test/s3/sse/s3_sse_range_coverage_test.go @@ -0,0 +1,577 @@ +package sse_test + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + smithyhttp "github.com/aws/smithy-go/transport/http" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// internalChunkSize mirrors the constant in weed/s3api/s3api_object_handlers_put.go +// (the size at which auto-chunking splits a single PUT or part body inside the +// volume-server layer). Range-read coverage that's interesting for SSE has to +// straddle this boundary, since each internal chunk is encrypted with its own +// adjusted IV (SSE-S3 / SSE-KMS) or its own random IV + PartOffset (SSE-C), +// and the read path has to stitch keystreams across chunks correctly. +const internalChunkSize = 8 * 1024 * 1024 + +// TestSSERangeReadIntegration is the canonical end-to-end coverage matrix +// for HTTP range GETs across SSE modes, object size classes, and range +// patterns. It supplements the per-SSE-mode TestSSExxxRangeRequests tests +// (which are scoped to small single-chunk objects, ≤1MB) by also exercising +// MEDIUM single-PUT objects that cross one internal 8MB chunk boundary AND +// LARGE multipart objects whose content spans many internal chunks. The +// many-chunk case is the path that broke in #8908 for full-object GETs; +// pinning range correctness here protects against any future regression in +// per-chunk IV / PartOffset plumbing for partial reads. +// +// The function name ends in "Integration" so it is matched by the existing +// `TestSSE.*Integration` pattern that the test/s3/sse Makefile and the +// .github/workflows/s3-sse-tests.yml CI flow use to discover SSE integration +// tests; both flows already start the server using s3-config-template.json, +// which configures the embedded `local` KMS provider with on-demand DEK +// creation, so the sse_kms subtests run end-to-end in CI. +// +// For ad-hoc local runs against a server without any KMS provider, the test +// probes once with a 1-byte SSE-KMS PUT and t.Skip's the sse_kms subtree +// with a clear message rather than producing a 5xx-storm in the logs. +func TestSSERangeReadIntegration(t *testing.T) { + ctx := context.Background() + client, err := createS3Client(ctx, defaultConfig) + require.NoError(t, err, "create S3 client") + + bucketName, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"range-matrix-") + require.NoError(t, err, "create test bucket") + // MUST be t.Cleanup, not defer: the mode/size subtests below call + // t.Parallel(), which pauses them and yields back to this function. If + // we used defer, the for loop would finish scheduling, the function + // would return, and defer would fire BEFORE any parallel subtest body + // has run -- deleting the bucket out from under them. t.Cleanup waits + // until the test AND all its subtests complete. + t.Cleanup(func() { cleanupTestBucket(ctx, client, bucketName) }) + + modes := []sseRangeMode{ + newRangeModeNone(), + newRangeModeSSEC(), + newRangeModeSSEKMS("test-range-coverage-key"), + newRangeModeSSES3(), + } + + // Size classes. Sizes are chosen to stress specific boundaries: + // small : single internal chunk, no boundary + // medium : one internal chunk boundary (8MB+arbitrary tail) + // large : multipart with parts > 8MB, so each part itself spans + // multiple internal chunks AND the object spans multiple + // parts -- this is the shape the #8908 fix targets. + sizes := []sseRangeSize{ + { + name: "small_256KB_single_chunk", + singlePutBytes: 256 * 1024, + }, + { + name: "medium_12MB_one_internal_boundary", + singlePutBytes: internalChunkSize + 4*1024*1024, + }, + { + name: "large_multipart_5x9MB_many_internal_boundaries", + // 5 parts of 9MB each: 45MB total. Every part exceeds the 8MB + // internal chunk size, so every part is split into 2 internal + // chunks (8MB + 1MB), giving ~10 internal chunks across the + // object. With AWS's 5MB minimum part size, this is the + // smallest realistic shape that exercises both inter-part + // stitching and intra-part chunk-boundary crossing. + multipartParts: []int{ + 9 * 1024 * 1024, + 9 * 1024 * 1024, + 9 * 1024 * 1024, + 9 * 1024 * 1024, + 9 * 1024 * 1024, + }, + }, + } + + // Each (mode, size) pair uploads an independent object key under the + // shared bucket and exercises range reads against it. The four modes + // have no shared state (each one carries its own SSE-C key, KMS keyID, + // or none); within a mode each size class also writes a unique key. + // That makes both levels safe to t.Parallel(), which substantially cuts + // CI wall time on the matrix (~45MB of data per mode). + for _, mode := range modes { + mode := mode + t.Run(mode.name(), func(t *testing.T) { + t.Parallel() + if reason := mode.probe(t, ctx, client, bucketName); reason != "" { + t.Skipf("%s unsupported in this test environment: %s", mode.name(), reason) + } + + for _, sz := range sizes { + sz := sz + t.Run(sz.name, func(t *testing.T) { + t.Parallel() + objectKey := fmt.Sprintf("%s/%s", mode.name(), sz.name) + var data []byte + if len(sz.multipartParts) > 0 { + parts := make([][]byte, len(sz.multipartParts)) + for i, n := range sz.multipartParts { + parts[i] = generateTestData(n) + } + mode.uploadMultipart(t, ctx, client, bucketName, objectKey, parts) + data = bytes.Join(parts, nil) + } else { + data = generateTestData(sz.singlePutBytes) + mode.uploadSingle(t, ctx, client, bucketName, objectKey, data) + } + + for _, rc := range rangeCasesFor(int64(len(data))) { + rc := rc + t.Run(rc.name, func(t *testing.T) { + verifyRangeRead(t, ctx, client, mode, bucketName, objectKey, data, rc) + }) + } + }) + } + }) + } +} + +// rangeCasesFor returns the set of range patterns to exercise on an object of +// the given total length. Some patterns are skipped automatically when the +// object is too small for them to be meaningful (e.g. the many-chunk-spanning +// case requires the object to actually span many internal chunks). +func rangeCasesFor(totalLen int64) []sseRangeCase { + cases := []sseRangeCase{ + { + name: "single_byte_at_zero", + rangeHeader: "bytes=0-0", + start: 0, + end: 0, + }, + { + name: "prefix_512_bytes", + rangeHeader: "bytes=0-511", + start: 0, + end: 511, + }, + { + name: "single_byte_at_last", + rangeHeader: fmt.Sprintf("bytes=%d-%d", totalLen-1, totalLen-1), + start: totalLen - 1, + end: totalLen - 1, + }, + { + name: "suffix_last_100_bytes", + rangeHeader: "bytes=-100", + start: totalLen - 100, + end: totalLen - 1, + }, + { + name: "open_ended_from_middle", + rangeHeader: fmt.Sprintf("bytes=%d-", totalLen/2), + start: totalLen / 2, + end: totalLen - 1, + }, + { + name: "whole_object_as_range", + rangeHeader: fmt.Sprintf("bytes=0-%d", totalLen-1), + start: 0, + end: totalLen - 1, + }, + } + + if totalLen >= 64 { + cases = append(cases, sseRangeCase{ + // AES block-boundary stress: a 17-byte range starting at byte 15 + // crosses the AES block boundary at byte 16. SSE-C historically + // has the most fragile offset arithmetic here, so this is worth + // pinning across all modes. + name: "mid_chunk_crosses_aes_block_boundary", + rangeHeader: "bytes=15-31", + start: 15, + end: 31, + }) + } + + if totalLen > internalChunkSize+128 { + // Range straddles one internal 8MB chunk boundary by 64 bytes on + // each side. Decryption has to fetch two distinct chunks and stitch + // the keystreams together correctly -- the path that exposed #8908's + // SSE-KMS double-IV bug (fixed in #9224 commit 4) and is the most + // sensitive single test for chunk-boundary stitching. + cases = append(cases, sseRangeCase{ + name: "mid_straddles_one_internal_boundary", + rangeHeader: fmt.Sprintf("bytes=%d-%d", internalChunkSize-64, internalChunkSize+63), + start: internalChunkSize - 64, + end: internalChunkSize + 63, + }) + } + + if totalLen > 3*internalChunkSize+128 { + // Range that spans more than three internal 8MB chunks. This is + // the regression path for #8908's read-side issue: the eager + // multipart reader opened all chunks at once and could close + // later ones via keepalive while earlier ones were still being + // drained; range path uses the per-chunk view helpers (always + // lazy) but a generous-size cross-many-chunks range is still the + // best end-to-end pin that the per-chunk IV plumbing is correct + // across part and chunk boundaries. + start := int64(internalChunkSize/2) + 5 + end := start + 3*internalChunkSize + 17 + if end >= totalLen { + end = totalLen - 1 + } + cases = append(cases, sseRangeCase{ + name: "mid_spans_many_internal_boundaries", + rangeHeader: fmt.Sprintf("bytes=%d-%d", start, end), + start: start, + end: end, + }) + } + + return cases +} + +// sseRangeCase is one (start,end) range to GET, with the literal Range header +// to send so we cover both `bytes=N-M`, `bytes=N-`, and `bytes=-N` forms. +type sseRangeCase struct { + name string + rangeHeader string + start, end int64 // inclusive byte offsets in the source data +} + +type sseRangeSize struct { + name string + singlePutBytes int // if >0, upload via PutObject of this many random bytes + multipartParts []int // if non-empty, multipart upload with these part sizes (in bytes) +} + +// sseRangeMode is the per-SSE-type behavior plug for the matrix: how to +// configure CreateBucket / PutObject / CreateMultipartUpload / UploadPart / +// GetObject, and what to assert on GET responses. +type sseRangeMode interface { + name() string + // probe attempts a 1-byte upload and returns "" on success or a short + // reason string if the test environment doesn't support this mode (used + // to t.Skip the SSE-KMS subtests when no KMS provider is configured). + probe(t *testing.T, ctx context.Context, client *s3.Client, bucket string) string + uploadSingle(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, data []byte) + uploadMultipart(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte) + configureGet(in *s3.GetObjectInput) + verifyGet(t *testing.T, resp *s3.GetObjectOutput) +} + +func newRangeModeNone() sseRangeMode { return &rangeModeNone{} } + +type rangeModeNone struct{} + +func (m *rangeModeNone) name() string { return "no_sse" } +func (m *rangeModeNone) probe(t *testing.T, ctx context.Context, client *s3.Client, bucket string) string { + return "" +} +func (m *rangeModeNone) uploadSingle(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, data []byte) { + t.Helper() + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + }) + require.NoError(t, err, "PutObject") +} +func (m *rangeModeNone) uploadMultipart(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte) { + t.Helper() + multipartUpload(t, ctx, client, bucket, key, parts, nil, nil) +} +func (m *rangeModeNone) configureGet(in *s3.GetObjectInput) {} +func (m *rangeModeNone) verifyGet(t *testing.T, resp *s3.GetObjectOutput) { + t.Helper() + assert.Empty(t, string(resp.ServerSideEncryption), "no SSE response header expected for plaintext object") +} + +func newRangeModeSSEC() sseRangeMode { + return &rangeModeSSEC{key: generateSSECKey()} +} + +type rangeModeSSEC struct { + key *SSECKey +} + +func (m *rangeModeSSEC) name() string { return "sse_c" } +func (m *rangeModeSSEC) probe(t *testing.T, ctx context.Context, client *s3.Client, bucket string) string { + return "" +} +func (m *rangeModeSSEC) uploadSingle(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, data []byte) { + t.Helper() + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + SSECustomerAlgorithm: aws.String("AES256"), + SSECustomerKey: aws.String(m.key.KeyB64), + SSECustomerKeyMD5: aws.String(m.key.KeyMD5), + }) + require.NoError(t, err, "PutObject SSE-C") +} +func (m *rangeModeSSEC) uploadMultipart(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte) { + t.Helper() + multipartUpload(t, ctx, client, bucket, key, parts, + func(in *s3.CreateMultipartUploadInput) { + in.SSECustomerAlgorithm = aws.String("AES256") + in.SSECustomerKey = aws.String(m.key.KeyB64) + in.SSECustomerKeyMD5 = aws.String(m.key.KeyMD5) + }, + func(in *s3.UploadPartInput) { + in.SSECustomerAlgorithm = aws.String("AES256") + in.SSECustomerKey = aws.String(m.key.KeyB64) + in.SSECustomerKeyMD5 = aws.String(m.key.KeyMD5) + }, + ) +} +func (m *rangeModeSSEC) configureGet(in *s3.GetObjectInput) { + in.SSECustomerAlgorithm = aws.String("AES256") + in.SSECustomerKey = aws.String(m.key.KeyB64) + in.SSECustomerKeyMD5 = aws.String(m.key.KeyMD5) +} +func (m *rangeModeSSEC) verifyGet(t *testing.T, resp *s3.GetObjectOutput) { + t.Helper() + assert.Equal(t, "AES256", aws.ToString(resp.SSECustomerAlgorithm)) + assert.Equal(t, m.key.KeyMD5, aws.ToString(resp.SSECustomerKeyMD5)) +} + +func newRangeModeSSEKMS(keyID string) sseRangeMode { + return &rangeModeSSEKMS{keyID: keyID} +} + +type rangeModeSSEKMS struct { + keyID string +} + +func (m *rangeModeSSEKMS) name() string { return "sse_kms" } +func (m *rangeModeSSEKMS) probe(t *testing.T, ctx context.Context, client *s3.Client, bucket string) string { + t.Helper() + probeKey := "__probe__" + m.name() + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(probeKey), + Body: bytes.NewReader([]byte{0}), + ServerSideEncryption: types.ServerSideEncryptionAwsKms, + SSEKMSKeyId: aws.String(m.keyID), + }) + if err != nil { + // "KMS provider not configured" is the friendly diagnostic that + // causes the caller to t.Skip the SSE-KMS subtree. We use it for + // two narrow categories: + // 1. 5xx responses -- the s3api InternalError surface when no KMS + // provider is wired up at server start. + // 2. Specific error strings ("KMS.NotConfigured" / + // "NotImplemented" / "not configured") regardless of status, + // to catch other servers that may use a 4xx/501 to signal the + // same condition. + // We deliberately do NOT auto-skip on a generic 4xx: a real + // SSE-KMS misconfiguration in the test request itself (bad keyID + // format, missing header, etc.) also surfaces as a 400, and the + // CI-meaningful path -- where the server IS configured for KMS -- + // must fail loudly in that case rather than silently skip the + // integration coverage. + var apiErr *smithyhttp.ResponseError + if errors.As(err, &apiErr) { + if code := apiErr.HTTPStatusCode(); code >= 500 { + return fmt.Sprintf("KMS provider not configured (PutObject returned %d)", code) + } + } + errMsg := err.Error() + if strings.Contains(errMsg, "KMS.NotConfigured") || + strings.Contains(errMsg, "NotImplemented") || + strings.Contains(errMsg, "not configured") { + return fmt.Sprintf("KMS provider not configured: %v", err) + } + return fmt.Sprintf("KMS PutObject probe failed: %v", err) + } + // Best-effort cleanup of the probe object. + _, _ = client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(probeKey), + }) + return "" +} +func (m *rangeModeSSEKMS) uploadSingle(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, data []byte) { + t.Helper() + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ServerSideEncryption: types.ServerSideEncryptionAwsKms, + SSEKMSKeyId: aws.String(m.keyID), + }) + require.NoError(t, err, "PutObject SSE-KMS") +} +func (m *rangeModeSSEKMS) uploadMultipart(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte) { + t.Helper() + multipartUpload(t, ctx, client, bucket, key, parts, + func(in *s3.CreateMultipartUploadInput) { + in.ServerSideEncryption = types.ServerSideEncryptionAwsKms + in.SSEKMSKeyId = aws.String(m.keyID) + }, + // SSE-KMS does not require per-part headers (server reuses upload-init key). + nil, + ) +} +func (m *rangeModeSSEKMS) configureGet(in *s3.GetObjectInput) {} +func (m *rangeModeSSEKMS) verifyGet(t *testing.T, resp *s3.GetObjectOutput) { + t.Helper() + assert.Equal(t, types.ServerSideEncryptionAwsKms, resp.ServerSideEncryption) + assert.Equal(t, m.keyID, aws.ToString(resp.SSEKMSKeyId)) +} + +func newRangeModeSSES3() sseRangeMode { return &rangeModeSSES3{} } + +type rangeModeSSES3 struct{} + +func (m *rangeModeSSES3) name() string { return "sse_s3" } +func (m *rangeModeSSES3) probe(t *testing.T, ctx context.Context, client *s3.Client, bucket string) string { + return "" +} +func (m *rangeModeSSES3) uploadSingle(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, data []byte) { + t.Helper() + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ServerSideEncryption: types.ServerSideEncryptionAes256, + }) + require.NoError(t, err, "PutObject SSE-S3") +} +func (m *rangeModeSSES3) uploadMultipart(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte) { + t.Helper() + multipartUpload(t, ctx, client, bucket, key, parts, + func(in *s3.CreateMultipartUploadInput) { + in.ServerSideEncryption = types.ServerSideEncryptionAes256 + }, + // SSE-S3 multipart parts inherit encryption from the upload init. + nil, + ) +} +func (m *rangeModeSSES3) configureGet(in *s3.GetObjectInput) {} +func (m *rangeModeSSES3) verifyGet(t *testing.T, resp *s3.GetObjectOutput) { + t.Helper() + assert.Equal(t, types.ServerSideEncryptionAes256, resp.ServerSideEncryption) +} + +// multipartUpload is a small helper shared across SSE modes that need to +// assemble the test object via Create / UploadPart / Complete with optional +// per-mode header injection. It registers a t.Cleanup that aborts the upload +// if Complete didn't run successfully, so a test failure mid-way doesn't +// leave an orphan upload behind. +func multipartUpload(t *testing.T, ctx context.Context, client *s3.Client, bucket, key string, parts [][]byte, + configCreate func(*s3.CreateMultipartUploadInput), + configPart func(*s3.UploadPartInput)) { + t.Helper() + createIn := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + } + if configCreate != nil { + configCreate(createIn) + } + createResp, err := client.CreateMultipartUpload(ctx, createIn) + require.NoError(t, err, "CreateMultipartUpload") + uploadID := aws.ToString(createResp.UploadId) + + completed := false + t.Cleanup(func() { + if completed { + return + } + _, _ = client.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + }) + }) + + completedParts := make([]types.CompletedPart, 0, len(parts)) + for i, part := range parts { + partNumber := int32(i + 1) + in := &s3.UploadPartInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + PartNumber: aws.Int32(partNumber), + UploadId: aws.String(uploadID), + Body: bytes.NewReader(part), + } + if configPart != nil { + configPart(in) + } + resp, err := client.UploadPart(ctx, in) + require.NoError(t, err, "UploadPart %d", partNumber) + completedParts = append(completedParts, types.CompletedPart{ + ETag: resp.ETag, + PartNumber: aws.Int32(partNumber), + }) + } + + _, err = client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + UploadId: aws.String(uploadID), + MultipartUpload: &types.CompletedMultipartUpload{Parts: completedParts}, + }) + require.NoError(t, err, "CompleteMultipartUpload") + completed = true +} + +// verifyRangeRead does the actual GET + assertions for one (mode, object, +// range case). It checks: the body bytes match the source slice; the +// Content-Length header matches the range length; the Content-Range header +// matches the resolved byte range; the SSE response headers match the mode. +func verifyRangeRead(t *testing.T, ctx context.Context, client *s3.Client, mode sseRangeMode, + bucket, key string, source []byte, rc sseRangeCase) { + t.Helper() + + totalLen := int64(len(source)) + in := &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Range: aws.String(rc.rangeHeader), + } + mode.configureGet(in) + + resp, err := client.GetObject(ctx, in) + require.NoError(t, err, "GetObject %s range=%s", key, rc.rangeHeader) + defer resp.Body.Close() + + got, err := io.ReadAll(resp.Body) + require.NoError(t, err, "read range body") + + expected := source[rc.start : rc.end+1] + expectedLen := rc.end - rc.start + 1 + // Body-length check is `require` rather than `assert` because the bug + // fixed in #8908 surfaces as a fully-readable body that is shorter than + // requested -- a "truncation regression". Comparing different-length + // slices with assertDataEqual below would just produce a noisy byte-diff + // on top of the underlying truncation; bailing here keeps the failure + // log focused on the symptom that actually matters. + require.Equal(t, len(expected), len(got), + "body length mismatch for %s range=%s (source size=%d) — likely truncation regression", key, rc.rangeHeader, totalLen) + assert.Equal(t, expectedLen, aws.ToInt64(resp.ContentLength), + "Content-Length header mismatch for %s range=%s", key, rc.rangeHeader) + + // Content-Range: bytes start-end/total + wantContentRange := fmt.Sprintf("bytes %d-%d/%d", rc.start, rc.end, totalLen) + assert.Equal(t, wantContentRange, aws.ToString(resp.ContentRange), + "Content-Range header mismatch for %s range=%s", key, rc.rangeHeader) + + // Compare bytes with a hash-only assertion to keep failure output small + // (the actual byte content is random and unhelpful printed verbatim). + assertDataEqual(t, expected, got, "Range body mismatch for %s range=%s", key, rc.rangeHeader) + + mode.verifyGet(t, resp) +} diff --git a/weed/s3api/s3_sse_s3_integration_test.go b/weed/s3api/s3_sse_s3_integration_test.go index 454e0b5ce..2b7a62ed0 100644 --- a/weed/s3api/s3_sse_s3_integration_test.go +++ b/weed/s3api/s3_sse_s3_integration_test.go @@ -489,9 +489,10 @@ func TestBuildMultipartSSES3Reader_InvalidIVLength(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - closed := false + fetchCalled := false fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { - return &closeTrackingReadCloser{Reader: bytes.NewReader([]byte("whatever")), closed: &closed}, nil + fetchCalled = true + return io.NopCloser(bytes.NewReader([]byte("whatever"))), nil } chunks := []*filer_pb.FileChunk{ @@ -508,27 +509,35 @@ func TestBuildMultipartSSES3Reader_InvalidIVLength(t *testing.T) { if err == nil { t.Fatal("expected error for invalid IV length, got nil") } - if !strings.Contains(err.Error(), "invalid IV length") { - t.Errorf("expected 'invalid IV length' in error, got: %v", err) + // ValidateIV's error format is "invalid length: ..."; + // match on the part of the message that's stable across the + // shared helper's wording. + if !strings.Contains(err.Error(), "IV length") { + t.Errorf("expected 'IV length' in error, got: %v", err) } - if !closed { - t.Error("chunk reader for the bad chunk was not closed on error") + // Validation runs upfront before any chunk fetch, so no volume-server + // HTTP connection should have been opened on the failure path. + if fetchCalled { + t.Error("fetchChunk was called for an invalid-IV chunk; metadata validation should fail before any fetch") } }) } } -// TestBuildMultipartSSES3Reader_ClosesAppendedOnError verifies that when a -// later chunk fails (e.g., malformed metadata), readers already appended for -// earlier valid chunks are closed so volume-server HTTP connections do not leak. -func TestBuildMultipartSSES3Reader_ClosesAppendedOnError(t *testing.T) { +// TestBuildMultipartSSES3Reader_RejectsBadChunkBeforeAnyFetch verifies that +// when any chunk's metadata is malformed, the helper returns an error WITHOUT +// having opened a volume-server HTTP connection for any chunk. Per-chunk +// metadata is validated upfront precisely so a bad chunk in position N does +// not leak open HTTP responses for chunks 0..N-1 (the original eager +// implementation depended on a closeAppendedReaders cleanup path; this test +// pins the stronger contract: nothing is opened in the first place). +func TestBuildMultipartSSES3Reader_RejectsBadChunkBeforeAnyFetch(t *testing.T) { keyManager := initSSES3KeyManagerForTest(t) // First chunk: valid SSE-S3 chunk. cipher1, meta1 := encryptSSES3Part(t, []byte("first chunk plaintext")) - // Second chunk: missing per-chunk metadata, triggers error after first is - // already appended. + // Second chunk: missing per-chunk metadata, triggers error. chunks := []*filer_pb.FileChunk{ { FileId: "1,good", @@ -546,36 +555,20 @@ func TestBuildMultipartSSES3Reader_ClosesAppendedOnError(t *testing.T) { }, } - firstClosed := false - secondClosed := false + fetched := map[string]int{} fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { - switch c.GetFileIdString() { - case "1,good": - return &closeTrackingReadCloser{Reader: bytes.NewReader(cipher1), closed: &firstClosed}, nil - case "2,bad": - return &closeTrackingReadCloser{Reader: bytes.NewReader([]byte("x")), closed: &secondClosed}, nil - } - return nil, fmt.Errorf("unexpected chunk %s", c.GetFileIdString()) + fetched[c.GetFileIdString()]++ + return io.NopCloser(bytes.NewReader([]byte("x"))), nil } _, err := buildMultipartSSES3Reader(chunks, keyManager, fetch) if err == nil { t.Fatal("expected error from missing chunk metadata, got nil") } - if !firstClosed { - t.Error("previously appended chunk reader was not closed on error") + if !strings.Contains(err.Error(), "missing per-chunk metadata") { + t.Errorf("expected 'missing per-chunk metadata' in error, got: %v", err) } - if !secondClosed { - t.Error("chunk reader for the failing chunk was not closed on error") + if len(fetched) != 0 { + t.Errorf("expected no chunks fetched on validation failure, got %v", fetched) } } - -type closeTrackingReadCloser struct { - io.Reader - closed *bool -} - -func (r *closeTrackingReadCloser) Close() error { - *r.closed = true - return nil -} diff --git a/weed/s3api/s3api_multipart_ssekms_test.go b/weed/s3api/s3api_multipart_ssekms_test.go new file mode 100644 index 000000000..9194e3eb8 --- /dev/null +++ b/weed/s3api/s3api_multipart_ssekms_test.go @@ -0,0 +1,247 @@ +package s3api + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// TestBuildMultipartSSEKMSReader_RejectsBadIVBeforeAnyFetch pins the contract +// that a per-chunk SSE-KMS metadata blob with a missing or wrong-length IV is +// rejected during preparation, before any volume-server fetch fires. +// +// DeserializeSSEKMSMetadata only proves the JSON parses; it leaves the +// kmsKey.IV field at whatever the metadata actually carried. CreateSSEKMSDecryptedReader +// does call ValidateIV, but only when the wrap closure runs -- after the +// chunk's HTTP body has already been opened. The lazy reader's whole point +// is to never start an HTTP fetch for a chunk we know we cannot decrypt, so +// IV validation must happen in the prep loop. This test is the regression +// guard for that, addressing CodeRabbit review feedback on PR #9228. +func TestBuildMultipartSSEKMSReader_RejectsBadIVBeforeAnyFetch(t *testing.T) { + makeMetadata := func(iv []byte) []byte { + t.Helper() + key := &SSEKMSKey{ + KeyID: "test-kms-key", + EncryptedDataKey: bytes.Repeat([]byte{0x42}, 32), + IV: iv, + } + md, err := SerializeSSEKMSMetadata(key) + if err != nil { + t.Fatalf("SerializeSSEKMSMetadata: %v", err) + } + return md + } + + cases := []struct { + name string + iv []byte + expectErr string + }{ + {"missing IV", nil, "invalid"}, + {"empty IV", []byte{}, "invalid"}, + {"short IV", []byte("too-short"), "invalid"}, // 9 bytes, not 16 + {"long IV", bytes.Repeat([]byte{1}, 32), "invalid"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fetchCalled := false + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + fetchCalled = true + return io.NopCloser(bytes.NewReader([]byte("ignored"))), nil + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: "1,bad-iv", + Offset: 0, + Size: 8, + SseType: filer_pb.SSEType_SSE_KMS, + SseMetadata: makeMetadata(tc.iv), + }, + } + + _, err := buildMultipartSSEKMSReader(chunks, fetch) + if err == nil { + t.Fatal("expected error for invalid SSE-KMS IV, got nil") + } + if !strings.Contains(err.Error(), tc.expectErr) { + t.Errorf("expected %q in error, got: %v", tc.expectErr, err) + } + // The whole point of upfront validation: no HTTP fetch must fire + // for a chunk that fails the metadata gate. + if fetchCalled { + t.Error("fetchChunk was called for a chunk with invalid IV; metadata validation must run before any fetch") + } + }) + } +} + +// TestBuildMultipartSSEKMSReader_RejectsMissingMetadataBeforeAnyFetch verifies +// that a chunk tagged SSE-KMS but with no SseMetadata bytes is rejected during +// preparation, also without firing a fetch. Mirrors the SSE-S3 contract pinned +// by TestBuildMultipartSSES3Reader_RejectsBadChunkBeforeAnyFetch. +func TestBuildMultipartSSEKMSReader_RejectsMissingMetadataBeforeAnyFetch(t *testing.T) { + fetched := map[string]int{} + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + fetched[c.GetFileIdString()]++ + return io.NopCloser(bytes.NewReader([]byte("ignored"))), nil + } + + // First chunk has valid SSE-KMS metadata; second chunk is tagged SSE-KMS + // but has no metadata blob. The eager pre-#9228 implementation would have + // opened chunk 0's HTTP body before discovering chunk 1's problem; the + // lazy implementation must reject up front and leave both alone. + validKey := &SSEKMSKey{ + KeyID: "test-kms-key", + EncryptedDataKey: bytes.Repeat([]byte{0x42}, 32), + IV: make([]byte, s3_constants.AESBlockSize), + } + if _, err := rand.Read(validKey.IV); err != nil { + t.Fatalf("rand.Read: %v", err) + } + validMeta, err := SerializeSSEKMSMetadata(validKey) + if err != nil { + t.Fatalf("SerializeSSEKMSMetadata: %v", err) + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: "1,good", + Offset: 0, + Size: 16, + SseType: filer_pb.SSEType_SSE_KMS, + SseMetadata: validMeta, + }, + { + FileId: "2,no-metadata", + Offset: 16, + Size: 16, + SseType: filer_pb.SSEType_SSE_KMS, + SseMetadata: nil, // triggers "missing per-chunk metadata" + }, + } + + _, err = buildMultipartSSEKMSReader(chunks, fetch) + if err == nil { + t.Fatal("expected error from missing chunk metadata, got nil") + } + if !strings.Contains(err.Error(), "missing per-chunk metadata") { + t.Errorf("expected 'missing per-chunk metadata' in error, got: %v", err) + } + if len(fetched) != 0 { + t.Errorf("expected no chunks fetched on validation failure, got %v", fetched) + } +} + +// TestBuildMultipartSSEKMSReader_RejectsUnparseableMetadataBeforeAnyFetch +// covers the prep-loop branch where SseMetadata is non-empty but JSON-malformed +// so DeserializeSSEKMSMetadata itself returns an error. Same contract: no +// fetch fires. +func TestBuildMultipartSSEKMSReader_RejectsUnparseableMetadataBeforeAnyFetch(t *testing.T) { + fetchCalled := false + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + fetchCalled = true + return io.NopCloser(bytes.NewReader([]byte("ignored"))), nil + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: "1,garbage", + Offset: 0, + Size: 8, + SseType: filer_pb.SSEType_SSE_KMS, + SseMetadata: []byte("{not-json"), + }, + } + + _, err := buildMultipartSSEKMSReader(chunks, fetch) + if err == nil { + t.Fatal("expected error from unparseable SSE-KMS metadata, got nil") + } + if !strings.Contains(err.Error(), "deserialize SSE-KMS metadata") { + t.Errorf("expected 'deserialize SSE-KMS metadata' in error, got: %v", err) + } + if fetchCalled { + t.Error("fetchChunk was called for a chunk with garbage metadata; deserialize must fail before any fetch") + } +} + +// TestBuildMultipartSSEKMSReader_SortsByOffset verifies that the prep loop +// reorders chunks by Offset before constructing the lazy reader, matching +// the documented contract and the SSE-S3 helper. +// +// Driving the reader's Read() to observe fetch order does not work as a full +// ordering check: CreateSSEKMSDecryptedReader requires a live KMS provider to +// unwrap the encrypted DEK, which is unavailable in this unit test, so the +// wrap closure fails on the first chunk and the lazy reader marks itself +// finished -- only one fetch is ever observed. Instead, since the lazy +// reader and its prepared chunks live in the same package, we type-assert +// the returned reader to *lazyMultipartChunkReader and inspect the prepared +// chunks slice directly. This is a stronger check (the entire ordering, not +// just the first element) and does not depend on KMS availability. +func TestBuildMultipartSSEKMSReader_SortsByOffset(t *testing.T) { + makeChunk := func(fid string, offset int64) *filer_pb.FileChunk { + key := &SSEKMSKey{ + KeyID: "test-kms-key", + EncryptedDataKey: bytes.Repeat([]byte{0x42}, 32), + IV: bytes.Repeat([]byte{0x10}, s3_constants.AESBlockSize), + } + meta, err := SerializeSSEKMSMetadata(key) + if err != nil { + t.Fatalf("SerializeSSEKMSMetadata: %v", err) + } + return &filer_pb.FileChunk{ + FileId: fid, + Offset: offset, + Size: 1, + SseType: filer_pb.SSEType_SSE_KMS, + SseMetadata: meta, + } + } + chunks := []*filer_pb.FileChunk{ + makeChunk("c2", 200), + makeChunk("c0", 0), + makeChunk("c1", 100), + } + + fetchCalled := false + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + fetchCalled = true + return nil, fmt.Errorf("fetch must not be called: ordering is checked via prepared chunks") + } + + reader, err := buildMultipartSSEKMSReader(chunks, fetch) + if err != nil { + t.Fatalf("buildMultipartSSEKMSReader: %v", err) + } + if fetchCalled { + t.Fatal("fetch must not be invoked during prep; ordering is verified statically") + } + + lazy, ok := reader.(*lazyMultipartChunkReader) + if !ok { + t.Fatalf("expected *lazyMultipartChunkReader, got %T", reader) + } + if len(lazy.chunks) != 3 { + t.Fatalf("expected 3 prepared chunks, got %d", len(lazy.chunks)) + } + gotOrder := []string{ + lazy.chunks[0].chunk.GetFileIdString(), + lazy.chunks[1].chunk.GetFileIdString(), + lazy.chunks[2].chunk.GetFileIdString(), + } + wantOrder := []string{"c0", "c1", "c2"} + for i, want := range wantOrder { + if gotOrder[i] != want { + t.Errorf("prepared chunks not in offset order: got %v, want %v", gotOrder, wantOrder) + break + } + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 40ea157fa..581bf761b 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2563,10 +2563,22 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { // createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path) // Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. // It's kept in the signature for API consistency with non-Direct versions. +// +// Per-chunk metadata is validated upfront (so a malformed chunk fails fast +// without opening any HTTP connections); chunk fetches happen LAZILY through +// lazyMultipartChunkReader, so at most one volume-server connection is open +// at a time. See buildMultipartSSES3Reader for the rationale (issue #8908). +// +// SSE-C multipart behavior (differs from SSE-KMS/SSE-S3): +// - Upload: CreateSSECEncryptedReader generates a RANDOM IV per part (no base IV + offset). +// - Metadata: PartOffset tracks position within the encrypted stream. +// - Decryption: use stored IV and advance the CTR stream by PartOffset. +// +// SSE-KMS/SSE-S3 instead use base IV + calculateIVWithOffset(partOffset) at +// encryption time. CopyObject currently applies calculateIVWithOffset to SSE-C +// as well, which may be incorrect (TODO: investigate consistency). func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { // Close the original encrypted stream since chunks are fetched individually. - // Defer so the stream is closed on every return path (including error - // returns from inside the per-chunk loop), matching the SSE-S3 helper. if encryptedStream != nil { defer encryptedStream.Close() } @@ -2580,192 +2592,135 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Con return chunks[i].GetOffset() < chunks[j].GetOffset() }) - // Create readers for each chunk, decrypting them independently - readers := make([]io.Reader, 0, len(chunks)) - - // Close any readers already appended to `readers` on error paths, to avoid - // leaking volume-server HTTP connections. - closeAppendedReaders := func() { - for _, r := range readers { - if closer, ok := r.(io.Closer); ok { - closer.Close() - } - } - } - + preparedChunks := make([]preparedMultipartChunk, 0, len(chunks)) for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if chunk.GetSseType() != filer_pb.SSEType_SSE_C { + preparedChunks = append(preparedChunks, preparedMultipartChunk{chunk: chunk}) + continue + } + if len(chunk.GetSseMetadata()) == 0 { + return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) if err != nil { - closeAppendedReaders() - return nil, fmt.Errorf("failed to create chunk reader: %v", err) + return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err) } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_C { - // Check if this chunk has per-chunk SSE-C metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Deserialize the SSE-C metadata - ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) - if err != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - // Decode the IV from the metadata - chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) - if err != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) - } - // Guard cipher.NewCTR against a missing/short IV (base64 decode of - // an empty or malformed field would otherwise reach it and panic). - if len(chunkIV) != s3_constants.AESBlockSize { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("SSE-C chunk %s has invalid IV length %d (expected %d)", - chunk.GetFileIdString(), len(chunkIV), s3_constants.AESBlockSize) - } - - glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", - chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) - - // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3): - // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset) - // - Metadata: PartOffset tracks position within the encrypted stream - // - Decryption: Use stored IV and advance CTR stream by PartOffset - // - // This differs from: - // - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption - // - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect) - // - // TODO: Investigate CopyObject SSE-C PartOffset handling for consistency - partOffset := ssecMetadata.PartOffset - if partOffset < 0 { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("invalid SSE-C part offset %d for chunk %s", partOffset, chunk.GetFileIdString()) - } - decryptedChunkReader, decErr := CreateSSECDecryptedReaderWithOffset(chunkReader, customerKey, chunkIV, uint64(partOffset)) - if decErr != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-C chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) } + // Guard cipher.NewCTR against a missing/short IV (base64 decode of + // an empty or malformed field would otherwise reach it and panic). + // Uses the shared ValidateIV helper so all three SSE prep paths + // (SSE-S3, SSE-KMS, SSE-C) enforce IV length identically. + if err := ValidateIV(chunkIV, fmt.Sprintf("SSE-C chunk %s IV", chunk.GetFileIdString())); err != nil { + return nil, err + } + if ssecMetadata.PartOffset < 0 { + return nil, fmt.Errorf("invalid SSE-C part offset %d for chunk %s", ssecMetadata.PartOffset, chunk.GetFileIdString()) + } + // Capture per-chunk values into the wrap closure. + fileId := chunk.GetFileIdString() + ivCopy := chunkIV + partOffset := uint64(ssecMetadata.PartOffset) + preparedChunks = append(preparedChunks, preparedMultipartChunk{ + chunk: chunk, + wrap: func(raw io.ReadCloser) (io.Reader, error) { + glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", + fileId, ivCopy[:8], partOffset) + dec, decErr := CreateSSECDecryptedReaderWithOffset(raw, customerKey, ivCopy, partOffset) + if decErr != nil { + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + return dec, nil + }, + }) } - return NewMultipartSSEReader(readers), nil + return &lazyMultipartChunkReader{ + chunks: preparedChunks, + fetch: func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + return s3a.createEncryptedChunkReader(ctx, c) + }, + }, nil } // createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path) // Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. // It's kept in the signature for API consistency with non-Direct versions. +// +// Per-chunk metadata is validated upfront (so a malformed chunk fails fast +// without opening any HTTP connections); chunk fetches happen LAZILY through +// lazyMultipartChunkReader, so at most one volume-server connection is open +// at a time. See buildMultipartSSES3Reader for the rationale (issue #8908). func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { // Close the original encrypted stream since chunks are fetched individually. - // Defer so the stream is closed on every return path (including error - // returns from inside the per-chunk loop), matching the SSE-S3 helper. if encryptedStream != nil { defer encryptedStream.Close() } - // Sort a copy of the slice so entry.Chunks is not reordered (other code - // paths, e.g. ETag computation, can rely on the original chunk order). - // IV length is validated inside CreateSSEKMSDecryptedReader via ValidateIV. - originalChunks := entry.GetChunks() - chunks := make([]*filer_pb.FileChunk, len(originalChunks)) - copy(chunks, originalChunks) - sort.Slice(chunks, func(i, j int) bool { - return chunks[i].GetOffset() < chunks[j].GetOffset() + return buildMultipartSSEKMSReader(entry.GetChunks(), func(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { + return s3a.createEncryptedChunkReader(ctx, chunk) + }) +} + +// buildMultipartSSEKMSReader composes a decrypted reader from a set of +// multipart SSE-KMS chunks. Mirrors buildMultipartSSES3Reader: chunks are +// validated upfront (per-chunk metadata parses, IV has the right length) and +// fetched + decrypted lazily through lazyMultipartChunkReader, so at most one +// volume-server HTTP body is live at a time. Exposed as a free function so +// tests can inject a mock chunk fetcher and pin the "no fetch on bad +// metadata" contract without spinning up an S3ApiServer. +func buildMultipartSSEKMSReader(chunks []*filer_pb.FileChunk, fetchChunk func(*filer_pb.FileChunk) (io.ReadCloser, error)) (io.Reader, error) { + sortedChunks := make([]*filer_pb.FileChunk, len(chunks)) + copy(sortedChunks, chunks) + sort.Slice(sortedChunks, func(i, j int) bool { + return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset() }) - // Create readers for each chunk, decrypting them independently - readers := make([]io.Reader, 0, len(chunks)) - - // Close any readers already appended to `readers` on error paths, to avoid - // leaking volume-server HTTP connections. - closeAppendedReaders := func() { - for _, r := range readers { - if closer, ok := r.(io.Closer); ok { - closer.Close() - } + preparedChunks := make([]preparedMultipartChunk, 0, len(sortedChunks)) + for _, chunk := range sortedChunks { + if chunk.GetSseType() != filer_pb.SSEType_SSE_KMS { + preparedChunks = append(preparedChunks, preparedMultipartChunk{chunk: chunk}) + continue } - } - - for _, chunk := range chunks { - // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if len(chunk.GetSseMetadata()) == 0 { + return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) if err != nil { - closeAppendedReaders() - return nil, fmt.Errorf("failed to create chunk reader: %v", err) + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { - // Check if this chunk has per-chunk SSE-KMS metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Use the per-chunk SSE-KMS metadata - kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) - if err != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s", - chunk.GetFileIdString(), kmsKey.KeyID) - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey) - if decErr != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-KMS chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-KMS chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + // Validate IV length up front, mirroring the SSE-S3 / SSE-C + // preparation paths. CreateSSEKMSDecryptedReader does call + // ValidateIV internally, but only when the wrap closure runs -- + // after the chunk's volume-server fetch has already started. We + // want the "reject malformed chunks before any fetch" contract to + // hold for SSE-KMS too, so a missing or short IV must fail here + // in the prep loop rather than turn into a mid-stream error. + if err := ValidateIV(kmsKey.IV, fmt.Sprintf("SSE-KMS chunk %s IV", chunk.GetFileIdString())); err != nil { + return nil, err } + // Capture kmsKey and chunk into the wrap closure so each prepared + // entry decrypts with its own per-chunk SSE-KMS key. + fileId := chunk.GetFileIdString() + preparedChunks = append(preparedChunks, preparedMultipartChunk{ + chunk: chunk, + wrap: func(raw io.ReadCloser) (io.Reader, error) { + glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s", fileId, kmsKey.KeyID) + dec, decErr := CreateSSEKMSDecryptedReader(raw, kmsKey) + if decErr != nil { + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + return dec, nil + }, + }) } - return NewMultipartSSEReader(readers), nil + return &lazyMultipartChunkReader{ + chunks: preparedChunks, + fetch: fetchChunk, + }, nil } // createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path) @@ -2786,6 +2741,15 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Co // SSE-S3 chunks. Chunks are fetched via fetchChunk and decrypted using their // per-chunk metadata (each multipart part has its own DEK and IV). Exposed as a // standalone helper so tests can inject a mock chunk fetcher. +// +// All per-chunk metadata is validated upfront so a malformed chunk fails fast +// without opening any HTTP connections to volume servers. The actual chunk +// fetch and decryption happens LAZILY as the returned reader is read: at most +// one chunk's HTTP connection is open at a time. Eagerly opening every chunk's +// HTTP response (the previous behavior) caused later chunks' connections to +// sit idle while earlier chunks were still being consumed, which under load +// could trip volume-server idle/keepalive limits and yield truncated reads +// (issue #8908). func buildMultipartSSES3Reader(chunks []*filer_pb.FileChunk, keyManager *SSES3KeyManager, fetchChunk func(*filer_pb.FileChunk) (io.ReadCloser, error)) (io.Reader, error) { // Sort a copy of the slice so callers do not observe their input chunks // reordered (the backing array is shared with entry.Chunks, which other @@ -2796,82 +2760,147 @@ func buildMultipartSSES3Reader(chunks []*filer_pb.FileChunk, keyManager *SSES3Ke return sortedChunks[i].GetOffset() < sortedChunks[j].GetOffset() }) - // Create readers for each chunk, decrypting them independently - readers := make([]io.Reader, 0, len(sortedChunks)) - - // Close any readers already appended to `readers` on error paths, to avoid - // leaking volume-server HTTP connections. - closeAppendedReaders := func() { - for _, r := range readers { - if closer, ok := r.(io.Closer); ok { - closer.Close() - } - } - } - + // Validate every chunk's SSE-S3 metadata before returning a reader. This + // keeps the eager-validation contract that callers and tests rely on + // (malformed metadata fails immediately), without holding open any + // volume-server HTTP connections. + preparedChunks := make([]preparedMultipartChunk, 0, len(sortedChunks)) for _, chunk := range sortedChunks { - // Get this chunk's encrypted data - chunkReader, err := fetchChunk(chunk) + if chunk.GetSseType() != filer_pb.SSEType_SSE_S3 { + preparedChunks = append(preparedChunks, preparedMultipartChunk{chunk: chunk}) + continue + } + if len(chunk.GetSseMetadata()) == 0 { + return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + meta, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) if err != nil { - closeAppendedReaders() - return nil, fmt.Errorf("failed to create chunk reader: %v", err) + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) } - - // Handle based on chunk's encryption type - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { - // Check if this chunk has per-chunk SSE-S3 metadata - if len(chunk.GetSseMetadata()) == 0 { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) - } - - // Deserialize the per-chunk SSE-S3 metadata to get the IV - chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) - if err != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } - - // Use the IV from the chunk metadata. DeserializeSSES3Metadata does - // not require an IV, so validate the length here before it reaches - // cipher.NewCTR, which would otherwise panic on a nil or short IV. - iv := chunkSSES3Metadata.IV - if len(iv) != s3_constants.AESBlockSize { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("SSE-S3 chunk %s has invalid IV length %d (expected %d)", - chunk.GetFileIdString(), len(iv), s3_constants.AESBlockSize) - } - glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", - chunk.GetFileIdString(), chunkSSES3Metadata.KeyID, len(iv)) - - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, chunkSSES3Metadata, iv) - if decErr != nil { - chunkReader.Close() - closeAppendedReaders() - return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr) - } - - // Use the streaming decrypted reader directly - readers = append(readers, struct { - io.Reader - io.Closer - }{ - Reader: decryptedChunkReader, - Closer: chunkReader, - }) - glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString()) - } else { - // Non-SSE-S3 chunk, use as-is - readers = append(readers, chunkReader) - glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + // DeserializeSSES3Metadata does not require an IV, so validate the + // length here before it reaches cipher.NewCTR, which would otherwise + // panic on a nil or short IV. Uses the shared ValidateIV helper so + // all three SSE prep paths (SSE-S3, SSE-KMS, SSE-C) enforce IV + // length identically. + if err := ValidateIV(meta.IV, fmt.Sprintf("SSE-S3 chunk %s IV", chunk.GetFileIdString())); err != nil { + return nil, err } + // Capture meta and chunk by-value into the wrap closure so each + // prepared entry decrypts with its own per-chunk key + IV. + fileId := chunk.GetFileIdString() + preparedChunks = append(preparedChunks, preparedMultipartChunk{ + chunk: chunk, + wrap: func(raw io.ReadCloser) (io.Reader, error) { + glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", + fileId, meta.KeyID, len(meta.IV)) + dec, err := CreateSSES3DecryptedReader(raw, meta, meta.IV) + if err != nil { + return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", err) + } + return dec, nil + }, + }) } - return NewMultipartSSEReader(readers), nil + return &lazyMultipartChunkReader{ + chunks: preparedChunks, + fetch: fetchChunk, + }, nil +} + +// preparedMultipartChunk pairs a chunk with the per-SSE wrapping logic the +// lazy reader applies to its raw HTTP body. wrap is nil for chunks that +// stream as-is (no SSE on the chunk, even though the object is multipart-SSE); +// otherwise wrap is the SSE-specific decryption setup, which receives the +// already-opened raw chunk body and returns the plaintext reader. +type preparedMultipartChunk struct { + chunk *filer_pb.FileChunk + wrap func(raw io.ReadCloser) (io.Reader, error) +} + +// lazyMultipartChunkReader streams a sequence of multipart chunks one at a +// time. It opens each chunk's underlying HTTP fetch (and applies the +// SSE-specific decryption wrapper) only when the previous chunk has been +// fully consumed, so volume-server connections do not pile up for large +// objects. This is the same shape used by all three SSE multipart read +// paths (SSE-S3, SSE-KMS, SSE-C); only the per-chunk wrap closure differs. +type lazyMultipartChunkReader struct { + chunks []preparedMultipartChunk + fetch func(*filer_pb.FileChunk) (io.ReadCloser, error) + idx int + current io.Reader // current chunk's plaintext reader (or raw reader for non-SSE chunks) + closer io.Closer // current chunk's underlying HTTP body, to close on advance/Close + finished bool +} + +func (l *lazyMultipartChunkReader) Read(p []byte) (int, error) { + for { + if l.finished { + return 0, io.EOF + } + if l.current == nil { + if l.idx >= len(l.chunks) { + l.finished = true + return 0, io.EOF + } + pc := l.chunks[l.idx] + l.idx++ + chunkReader, err := l.fetch(pc.chunk) + if err != nil { + l.finished = true + return 0, fmt.Errorf("failed to create chunk reader: %v", err) + } + if pc.wrap == nil { + // Non-SSE chunk in an otherwise SSE-multipart object: stream + // raw bytes through. + l.current = chunkReader + l.closer = chunkReader + glog.V(4).Infof("Streaming non-encrypted chunk %s", pc.chunk.GetFileIdString()) + } else { + wrapped, wrapErr := pc.wrap(chunkReader) + if wrapErr != nil { + chunkReader.Close() + l.finished = true + return 0, wrapErr + } + l.current = wrapped + l.closer = chunkReader + } + } + n, err := l.current.Read(p) + if err == io.EOF { + closeErr := l.closer.Close() + l.current = nil + l.closer = nil + if n > 0 { + return n, nil + } + if closeErr != nil { + glog.V(2).Infof("Error closing chunk reader: %v", closeErr) + } + continue + } + if err != nil { + // Non-EOF read error: the underlying chunk body is in an + // indeterminate state. Mark ourselves finished so a retried + // Read does not try to drain the same broken stream; let + // Close() release the chunk body. This matches the failure + // semantics of the fetch and wrap error paths above. + l.finished = true + } + return n, err + } +} + +func (l *lazyMultipartChunkReader) Close() error { + l.finished = true + if l.closer != nil { + err := l.closer.Close() + l.current = nil + l.closer = nil + return err + } + return nil } // createEncryptedChunkReader creates a reader for a single encrypted chunk diff --git a/weed/s3api/sses3_multipart_repro_test.go b/weed/s3api/sses3_multipart_repro_test.go new file mode 100644 index 000000000..035be7e21 --- /dev/null +++ b/weed/s3api/sses3_multipart_repro_test.go @@ -0,0 +1,280 @@ +package s3api + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "sync/atomic" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// TestMultipartSSES3RealisticEndToEnd reproduces the production multipart SSE-S3 +// flow where ALL parts share the same DEK and baseIV (the upload-init key/IV), +// and each part is encrypted with partOffset=0. Each part is then chunked at 8MB +// boundaries the way UploadReaderInChunks does. After completion, the chunks +// have global offsets but per-chunk stored IVs derived from part-local offsets. +// +// buildMultipartSSES3Reader is then run on the assembled chunks; the +// concatenated decrypted output must equal the concatenation of the part +// plaintexts. This is the round trip that fails in #8908 if anything in the +// encrypt or decrypt path is inconsistent. +func TestMultipartSSES3RealisticEndToEnd(t *testing.T) { + keyManager := initSSES3KeyManagerForTest(t) + + // One DEK and one baseIV, shared by all parts (the upload-init values). + key, err := GenerateSSES3Key() + if err != nil { + t.Fatalf("GenerateSSES3Key: %v", err) + } + baseIV := make([]byte, s3_constants.AESBlockSize) + if _, err := rand.Read(baseIV); err != nil { + t.Fatalf("rand.Read baseIV: %v", err) + } + + const chunkSize = int64(8 * 1024 * 1024) + + // Realistic mix of part sizes: small (one chunk), exact 8MB, >8MB (two + // chunks), much larger (multiple chunks). + partSizes := []int{ + 5 * 1024 * 1024, // 5MB (single chunk) + 8 * 1024 * 1024, // 8MB exactly (single chunk, full) + 8*1024*1024 + 123, // crosses chunk boundary (two chunks) + 17 * 1024 * 1024, // three chunks + 1234, // tiny + } + + parts := make([][]byte, len(partSizes)) + for i, n := range partSizes { + parts[i] = makeRandomPlaintext(t, n) + } + + // Build the chunks list the way completion would produce it: encrypt each + // part with partOffset=0, slice the ciphertext at chunkSize boundaries, + // store per-chunk metadata IV = calculateIVWithOffset(baseIV, partLocalOff), + // then assign GLOBAL offsets to the FileChunk. + type chunkBlob struct { + fid string + ciphertext []byte + } + var chunks []*filer_pb.FileChunk + chunkData := map[string][]byte{} + var globalOffset int64 + for partIdx, partPlaintext := range parts { + encReader, _, err := CreateSSES3EncryptedReaderWithBaseIV(bytes.NewReader(partPlaintext), key, baseIV, 0) + if err != nil { + t.Fatalf("CreateSSES3EncryptedReaderWithBaseIV(part %d): %v", partIdx, err) + } + ciphertext, err := io.ReadAll(encReader) + if err != nil { + t.Fatalf("read encrypted part %d: %v", partIdx, err) + } + + for partLocalOff := int64(0); partLocalOff < int64(len(ciphertext)); partLocalOff += chunkSize { + end := partLocalOff + chunkSize + if end > int64(len(ciphertext)) { + end = int64(len(ciphertext)) + } + cipherSlice := ciphertext[partLocalOff:end] + + chunkIV, _ := calculateIVWithOffset(baseIV, partLocalOff) + chunkKey := &SSES3Key{ + Key: key.Key, + KeyID: key.KeyID, + Algorithm: key.Algorithm, + IV: chunkIV, + } + meta, err := SerializeSSES3Metadata(chunkKey) + if err != nil { + t.Fatalf("SerializeSSES3Metadata(part %d off %d): %v", partIdx, partLocalOff, err) + } + + fid := fmt.Sprintf("%d,%d", partIdx+1, partLocalOff) + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: fid, + Offset: globalOffset, // global offset assigned at completion + Size: uint64(end - partLocalOff), + SseType: filer_pb.SSEType_SSE_S3, + SseMetadata: meta, + }) + chunkData[fid] = cipherSlice + globalOffset += end - partLocalOff + } + } + + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + data, ok := chunkData[c.GetFileIdString()] + if !ok { + return nil, fmt.Errorf("unexpected chunk %s", c.GetFileIdString()) + } + return io.NopCloser(bytes.NewReader(data)), nil + } + + reader, err := buildMultipartSSES3Reader(chunks, keyManager, fetch) + if err != nil { + t.Fatalf("buildMultipartSSES3Reader: %v", err) + } + got, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("ReadAll decrypted output: %v", err) + } + + want := bytes.Join(parts, nil) + if !bytes.Equal(got, want) { + idx := firstMismatch(got, want) + end := idx + 32 + if end > len(got) { + end = len(got) + } + if end > len(want) { + end = len(want) + } + t.Fatalf("decrypted output mismatch at byte %d (total len got=%d want=%d)\n got: %x\nwant: %x", + idx, len(got), len(want), got[idx:end], want[idx:end]) + } +} + +// TestBuildMultipartSSES3Reader_LazyChunkFetch pins the lazy behavior of +// buildMultipartSSES3Reader: chunk N's HTTP fetch only happens after chunk +// N-1 has been fully consumed. The original eager loop opened every chunk's +// HTTP response upfront and held them open while io.MultiReader walked +// through readers[0]; for objects with many chunks (e.g. a 200MB Docker image +// blob), this could trip volume-server idle/keepalive limits and produce +// truncated reads at the client (issue #8908). +// +// The test installs a fetch hook that tracks how many chunks have been +// opened and when each one is closed, and verifies: +// - At any point during streaming, at most one chunk's reader is open. +// - The number of opened chunks grows as bytes are read out, not upfront. +// - All chunks are closed when the outer reader is fully drained. +func TestBuildMultipartSSES3Reader_LazyChunkFetch(t *testing.T) { + keyManager := initSSES3KeyManagerForTest(t) + + key, err := GenerateSSES3Key() + if err != nil { + t.Fatalf("GenerateSSES3Key: %v", err) + } + baseIV := make([]byte, s3_constants.AESBlockSize) + if _, err := rand.Read(baseIV); err != nil { + t.Fatalf("rand.Read baseIV: %v", err) + } + + // Many small chunks (mirrors many-part Docker Registry uploads). + const numChunks = 8 + const chunkPayload = 1024 + plaintexts := make([][]byte, numChunks) + chunkData := map[string][]byte{} + chunks := make([]*filer_pb.FileChunk, 0, numChunks) + for i := 0; i < numChunks; i++ { + plaintexts[i] = makeRandomPlaintext(t, chunkPayload) + + // Encrypt as a fresh "part" with partOffset=0 (matching putToFiler). + encReader, _, err := CreateSSES3EncryptedReaderWithBaseIV(bytes.NewReader(plaintexts[i]), key, baseIV, 0) + if err != nil { + t.Fatalf("encrypt chunk %d: %v", i, err) + } + ciphertext, err := io.ReadAll(encReader) + if err != nil { + t.Fatalf("read ciphertext %d: %v", i, err) + } + + chunkIV, _ := calculateIVWithOffset(baseIV, 0) + chunkKey := &SSES3Key{ + Key: key.Key, + KeyID: key.KeyID, + Algorithm: key.Algorithm, + IV: chunkIV, + } + meta, err := SerializeSSES3Metadata(chunkKey) + if err != nil { + t.Fatalf("serialize meta %d: %v", i, err) + } + + fid := fmt.Sprintf("vol,c%d", i) + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: fid, + Offset: int64(i) * chunkPayload, + Size: uint64(chunkPayload), + SseType: filer_pb.SSEType_SSE_S3, + SseMetadata: meta, + }) + chunkData[fid] = ciphertext + } + + var openCount int64 // total opens + var liveCount int64 // currently open + var maxLive int64 + + fetch := func(c *filer_pb.FileChunk) (io.ReadCloser, error) { + atomic.AddInt64(&openCount, 1) + if live := atomic.AddInt64(&liveCount, 1); live > atomic.LoadInt64(&maxLive) { + atomic.StoreInt64(&maxLive, live) + } + data, ok := chunkData[c.GetFileIdString()] + if !ok { + return nil, fmt.Errorf("unexpected chunk %s", c.GetFileIdString()) + } + return &liveTrackingReadCloser{Reader: bytes.NewReader(data), live: &liveCount}, nil + } + + reader, err := buildMultipartSSES3Reader(chunks, keyManager, fetch) + if err != nil { + t.Fatalf("buildMultipartSSES3Reader: %v", err) + } + + // Construction alone must not have opened any chunk reader. + if got := atomic.LoadInt64(&openCount); got != 0 { + t.Fatalf("expected no chunks opened before any Read, got %d", got) + } + + // Read first byte: should open chunk 0 only. + one := make([]byte, 1) + if n, err := reader.Read(one); n != 1 || err != nil { + t.Fatalf("first Read: n=%d err=%v", n, err) + } + if got := atomic.LoadInt64(&openCount); got != 1 { + t.Errorf("after first byte, expected 1 chunk opened, got %d", got) + } + if got := atomic.LoadInt64(&liveCount); got != 1 { + t.Errorf("after first byte, expected 1 chunk live, got %d", got) + } + + // Drain the rest. + rest, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("drain: %v", err) + } + got := append(one, rest...) + want := bytes.Join(plaintexts, nil) + if !bytes.Equal(got, want) { + idx := firstMismatch(got, want) + t.Fatalf("decrypted output mismatch at byte %d (got len %d, want len %d)", idx, len(got), len(want)) + } + if got := atomic.LoadInt64(&openCount); got != int64(numChunks) { + t.Errorf("expected exactly %d chunk opens after drain, got %d", numChunks, got) + } + if got := atomic.LoadInt64(&maxLive); got > 1 { + t.Errorf("expected at most 1 chunk reader live at a time (lazy), saw peak of %d", got) + } + if got := atomic.LoadInt64(&liveCount); got != 0 { + t.Errorf("expected all chunks closed after drain, %d still live", got) + } +} + +type liveTrackingReadCloser struct { + io.Reader + live *int64 + once bool +} + +func (r *liveTrackingReadCloser) Close() error { + if r.once { + return nil + } + r.once = true + atomic.AddInt64(r.live, -1) + return nil +}