diff --git a/s3api/utils/signed-chunk-reader.go b/s3api/utils/signed-chunk-reader.go index dc60b2e0..ed43881d 100644 --- a/s3api/utils/signed-chunk-reader.go +++ b/s3api/utils/signed-chunk-reader.go @@ -43,7 +43,7 @@ const ( awsV4 = "AWS4" awsS3Service = "s3" awsV4Request = "aws4_request" - trailerSignatureHeader = "x-amz-trailer-signature" + trailerSignatureHeader = "x-amz-trailer-signature:" streamPayloadAlgo = "AWS4-HMAC-SHA256-PAYLOAD" streamPayloadTrailerAlgo = "AWS4-HMAC-SHA256-TRAILER" @@ -52,6 +52,7 @@ const ( var ( errskipHeader = errors.New("skip to next header") + delimiter = []byte{'\r', '\n'} ) // ChunkReader reads from chunked upload request body, and returns @@ -134,12 +135,15 @@ func (cr *ChunkReader) Read(p []byte) (int, error) { } } n, err := cr.parseAndRemoveChunkInfo(p[chunkSize:n]) + if err != nil && err != io.EOF { + return 0, err + } n += int(chunkSize) cr.dataRead += int64(n) if cr.isEOF { if cr.cLength != cr.dataRead { debuglogger.Logf("number of bytes expected: (%v), number of bytes read: (%v)", cr.cLength, cr.dataRead) - return n, s3err.GetAPIError(s3err.ErrContentLengthMismatch) + return 0, s3err.GetAPIError(s3err.ErrContentLengthMismatch) } } return n, err @@ -154,7 +158,7 @@ func (cr *ChunkReader) Read(p []byte) (int, error) { if cr.isEOF { if cr.cLength != cr.dataRead { debuglogger.Logf("number of bytes expected: (%v), number of bytes read: (%v)", cr.cLength, cr.dataRead) - return n, s3err.GetAPIError(s3err.ErrContentLengthMismatch) + return 0, s3err.GetAPIError(s3err.ErrContentLengthMismatch) } } return n, err @@ -378,7 +382,7 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int, // After the first chunk each chunk header should start // with "\n\r\n" if !cr.isFirstHeader { - err := readAndSkip(rdr, '\r', '\n') + err := readAndSkip(rdr, delimiter...) if err != nil { debuglogger.Logf("failed to read chunk header first 2 bytes: (should be): \\r\\n, (got): %q", header[:min(2, len(header))]) return cr.handleRdrErr(err, header) @@ -391,25 +395,26 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int, } // read the chunk signature - err = readAndSkip(rdr, 'c', 'h', 'u', 'n', 'k', '-', 's', 'i', 'g', 'n', 'a', 't', 'u', 'r', 'e', '=') + err = readAndSkip(rdr, []byte("chunk-signature=")...) if err != nil { debuglogger.Logf("failed to read 'chunk-signature=': %v", err) return cr.handleRdrErr(err, header) } - sig, err := readAndTrim(rdr, '\r') + sig, err := readBytes(rdr, 64) if err != nil { debuglogger.Logf("failed to read '\\r', after chunk signature: %v", err) return cr.handleRdrErr(err, header) } + err = readAndSkip(rdr, delimiter...) + if err != nil { + debuglogger.Logf("failed to read '\\r\\n' after chunk signature") + return cr.handleRdrErr(err, header) + } + // read and parse the final chunk trailer and checksum if chunkSize == 0 { if cr.requireTrailer { - err = readAndSkip(rdr, '\n') - if err != nil { - debuglogger.Logf("failed to read \\n before the trailer: %v", err) - return cr.handleRdrErr(err, header) - } // parse and validate the trailing header trailer, err := readAndTrim(rdr, ':') if err != nil { @@ -430,19 +435,19 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int, return cr.handleRdrErr(err, header) } - if !IsValidChecksum(checksum, algo) { - debuglogger.Logf("invalid checksum value: %v", checksum) - return 0, "", 0, s3err.GetInvalidTrailingChecksumHeaderErr(trailer) - } - err = readAndSkip(rdr, '\n') if err != nil { debuglogger.Logf("failed to read \\n after checksum: %v", err) return cr.handleRdrErr(err, header) } + if !IsValidChecksum(checksum, algo) { + debuglogger.Logf("invalid checksum value: %v", checksum) + return 0, "", 0, s3err.GetInvalidTrailingChecksumHeaderErr(trailer) + } + // parse the trailing signature - trailerSigPrefix, err := readAndTrim(rdr, ':') + trailerSigPrefix, err := readBytes(rdr, 24) if err != nil { debuglogger.Logf("failed to read trailing signature prefix: %v", err) return cr.handleRdrErr(err, header) @@ -453,37 +458,37 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int, return 0, "", 0, s3err.GetAPIError(s3err.ErrIncompleteBody) } - trailerSig, err := readAndTrim(rdr, '\r') + trailerSig, err := readBytes(rdr, 64) if err != nil { debuglogger.Logf("failed to read trailing signature: %v", err) return cr.handleRdrErr(err, header) } + err = readAndSkip(rdr, delimiter...) + if err != nil { + debuglogger.Logf("failed to read '\\r\\n' after last chunk signature") + return cr.handleRdrErr(err, header) + } + cr.trailerSig = trailerSig cr.parsedChecksum = checksum } // "\r\n\r\n" is followed after the last chunk - err = readAndSkip(rdr, '\n', '\r', '\n') + err = readAndSkip(rdr, delimiter...) if err != nil { - debuglogger.Logf("failed to read \\n\\r\\n at the end of chunk header: %v", err) + debuglogger.Logf("failed to read \\r\\n at the end of chunk header: %v", err) return cr.handleRdrErr(err, header) } return 0, sig, 0, nil } - err = readAndSkip(rdr, '\n') - if err != nil { - debuglogger.Logf("failed to read \\n at the end of chunk header: %v", err) - return cr.handleRdrErr(err, header) - } - // find the index of chunk ending: '\r\n' // skip the first 2 bytes as it is the starting '\r\n' // the first chunk doesn't contain the starting '\r\n', but // anyway, trimming the first 2 bytes doesn't pollute the logic. - ind := bytes.Index(header[2:], []byte{'\r', '\n'}) + ind := bytes.Index(header[2:], delimiter) cr.isFirstHeader = false // the offset is the found index + 4 - the stash length @@ -570,19 +575,18 @@ func (cr *ChunkReader) Checksum() string { } // reads data from the "rdr" and validates the passed data bytes -func readAndSkip(rdr *bufio.Reader, data ...byte) error { - for _, d := range data { - b, err := rdr.ReadByte() - if err != nil { - return err - } - - if b != d { - return s3err.GetAPIError(s3err.ErrIncompleteBody) - } +func readAndSkip(rdr *bufio.Reader, expected ...byte) error { + buf := make([]byte, len(expected)) + _, err := io.ReadFull(rdr, buf) + if err != nil { + return err } - return nil + if bytes.Equal(buf, expected) { + return nil + } + + return s3err.GetAPIError(s3err.ErrIncompleteBody) } // reads string by "delim" and trims the delimiter at the end @@ -594,3 +598,10 @@ func readAndTrim(r *bufio.Reader, delim byte) (string, error) { return strings.TrimSuffix(str, string(delim)), nil } + +func readBytes(r *bufio.Reader, count int) (string, error) { + buf := make([]byte, count) + _, err := io.ReadFull(r, buf) + + return string(buf), err +} diff --git a/tests/integration/group-tests.go b/tests/integration/group-tests.go index 91af97db..863a9bbd 100644 --- a/tests/integration/group-tests.go +++ b/tests/integration/group-tests.go @@ -1114,6 +1114,14 @@ func TestUnsignedStreaminPayloadTrailer(ts *TestState) { } } +func TestSignedStreaminPayload(ts *TestState) { + if !ts.conf.azureTests { + ts.Run(SignedStreamingPayload_invalid_encoding) + ts.Run(SignedStreamingPayload_invalid_chunk_size) + ts.Run(SignedStreamingPayload_decoded_content_length_mismatch) + } +} + type IntTest func(s3 *S3Conf) error type IntTests map[string]IntTest @@ -1767,5 +1775,8 @@ func GetIntTests() IntTests { "UnsignedStreamingPayloadTrailer_UploadPart_trailer_and_mp_algo_mismatch": UnsignedStreamingPayloadTrailer_UploadPart_trailer_and_mp_algo_mismatch, "UnsignedStreamingPayloadTrailer_UploadPart_success_with_trailer": UnsignedStreamingPayloadTrailer_UploadPart_success_with_trailer, "UnsignedStreamingPayloadTrailer_not_allowed": UnsignedStreamingPayloadTrailer_not_allowed, + "SignedStreamingPayload_invalid_encoding": SignedStreamingPayload_invalid_encoding, + "SignedStreamingPayload_invalid_chunk_size": SignedStreamingPayload_invalid_chunk_size, + "SignedStreamingPayload_decoded_content_length_mismatch": SignedStreamingPayload_decoded_content_length_mismatch, } } diff --git a/tests/integration/signed-streaming-payload.go b/tests/integration/signed-streaming-payload.go new file mode 100644 index 00000000..4e461efc --- /dev/null +++ b/tests/integration/signed-streaming-payload.go @@ -0,0 +1,124 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "bytes" + "fmt" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/versity/versitygw/s3err" +) + +func SignedStreamingPayload_invalid_encoding(s *S3Conf) error { + testName := "SignedStreamingPayload_invalid_encoding" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + object := "object" + for i, test := range []struct { + from int + to int + buffer []byte + }{ + {0, 2, []byte{'j'}}, // invalid chunk size + // missing/invalid delimiters + {83, 85, nil}, + {83, 85, []byte("dd")}, + {103, 105, nil}, + {103, 105, []byte("something invalid")}, + // invalid trailing delimiter + {187, 191, []byte("bbbb")}, + // only last character changed + {190, 191, []byte("s")}, + // invalid chunksize delimiter (;) + {2, 3, []byte(":")}, + // missing chunk-signature + {3, 19, nil}, + // short signature + {19, 24, nil}, + } { + _, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, []byte("dummy data paylaod"), withModifyPayload(test.from, test.to, test.buffer)) + if err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + + if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrIncompleteBody), apiErr); err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + } + + return nil + }) +} + +func SignedStreamingPayload_invalid_chunk_size(s *S3Conf) error { + testName := "SignedStreamingPayload_invalid_chunk_size" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + object := "my-object" + for i, test := range []struct { + chunkSize int64 + payload []byte + expectErr bool + }{ + {10, bytes.Repeat([]byte{'b'}, 100), true}, + {1000, bytes.Repeat([]byte{'a'}, 200), false}, + {8192, bytes.Repeat([]byte{'c'}, 10000), false}, + {1000, bytes.Repeat([]byte{'c'}, 1024*64), true}, + } { + _, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, test.payload, withChunkSize(test.chunkSize)) + if err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + + if !test.expectErr && apiErr != nil { + return fmt.Errorf("test %v failed: expected no error, instead got: (%s) %s", i+1, apiErr.Code, apiErr.Message) + } + + if test.expectErr { + if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrInvalidChunkSize), apiErr); err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + } + } + + return nil + }) +} + +func SignedStreamingPayload_decoded_content_length_mismatch(s *S3Conf) error { + testName := "SignedStreamingPayload_decoded_content_length_mismatch" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + object := "my-object" + for i, test := range []struct { + cLength int64 + payload []byte + }{ + {10, bytes.Repeat([]byte{'a'}, 8)}, + {10, bytes.Repeat([]byte{'a'}, 12)}, + } { + _, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, test.payload, withCustomHeaders(map[string]string{ + "x-amz-decoded-content-length": fmt.Sprint(test.cLength), + })) + if err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + + if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrContentLengthMismatch), apiErr); err != nil { + return fmt.Errorf("test %v failed: %w", i+1, err) + } + } + + return nil + }) +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index c6a710e3..10342d42 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -2118,3 +2118,264 @@ func constructUnsignedPaylod(chunkSizes ...int64) (int64, []byte, error) { return cLength, buffer.Bytes(), nil } + +type signedReqCfg struct { + headers map[string]string + chunkSize int64 + modifFrom *int + modifTo *int + modifPayload []byte +} + +type signedReqOpt func(*signedReqCfg) + +func withCustomHeaders(h map[string]string) signedReqOpt { + return func(src *signedReqCfg) { src.headers = h } +} + +func withChunkSize(s int64) signedReqOpt { + return func(src *signedReqCfg) { src.chunkSize = s } +} + +func withModifyPayload(from int, to int, p []byte) signedReqOpt { + return func(src *signedReqCfg) { + src.modifPayload = p + src.modifFrom = &from + src.modifTo = &to + } +} + +func testSignedStreamingObjectPut(s *S3Conf, bucket, object string, payload []byte, opts ...signedReqOpt) (map[string]string, *s3err.APIErrorResponse, error) { + cfg := &signedReqCfg{ + chunkSize: 8192, // minimal valid chunk size + } + + for _, opt := range opts { + opt(cfg) + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + // create a request with no body + req, err := http.NewRequestWithContext(ctx, http.MethodPut, fmt.Sprintf("%s/%s/%s", s.endpoint, bucket, object), nil) + if err != nil { + return nil, nil, cancelAndError(fmt.Errorf("failed to create a request: %w", err), cancel) + } + + var payloadOffset int64 + + // any planned modification which is going to affect the + // Content-Length header value + if cfg.modifFrom != nil && cfg.modifTo != nil { + diff := len(cfg.modifPayload) - *cfg.modifTo + *cfg.modifFrom + payloadOffset = int64(diff) + } + // precalculated the Content-Length header to correctly sign the request + req.ContentLength = calculateSignedReqContentLength(int64(len(payload)), cfg.chunkSize, payloadOffset) + req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") + req.Header.Set("x-amz-decoded-content-length", fmt.Sprint(len(payload))) + + // set custom request headers + for key, val := range cfg.headers { + req.Header.Set(key, val) + } + + signer := v4.NewSigner() + signingTime := time.Now() + + // sign the request + err = signer.SignHTTP(ctx, aws.Credentials{AccessKeyID: s.awsID, SecretAccessKey: s.awsSecret}, req, "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", "s3", s.awsRegion, signingTime) + if err != nil { + return nil, nil, cancelAndError(fmt.Errorf("failed to sign the request: %w", err), cancel) + } + + // extract the seed signature + seedSignature, err := extractSignature(req) + if err != nil { + return nil, nil, cancelAndError(fmt.Errorf("failed to extract seed signature: %w", err), cancel) + } + + // initialize v4 stream signed + streamSigner := v4.NewStreamSigner(aws.Credentials{AccessKeyID: s.awsID, SecretAccessKey: s.awsSecret}, "s3", s.awsRegion, seedSignature) + // create the signed payload + body, err := constructSignedStreamingPayload(ctx, streamSigner, signingTime, payload, cfg.chunkSize) + if err != nil { + return nil, nil, cancelAndError(fmt.Errorf("failed to encode req body: %w", err), cancel) + } + + // overwrite body bytes by configuration + if cfg.modifFrom != nil && cfg.modifTo != nil { + body, err = replaceRange(body, cfg.modifPayload, *cfg.modifFrom, *cfg.modifTo) + if err != nil { + return nil, nil, cancelAndError(fmt.Errorf("failed replace body bytes: %w", err), cancel) + } + } + + // assign req.Body and req.GetBody for the http client + // to handle the request + req.Body = io.NopCloser(bytes.NewReader(body)) + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(body)), nil + } + + // send the request + resp, err := s.httpClient.Do(req) + cancel() + if err != nil { + return nil, nil, fmt.Errorf("failed to send the request: %w", err) + } + + if resp.StatusCode >= 300 { + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, nil, fmt.Errorf("failed to read the response body: %w", err) + } + + var errResp s3err.APIErrorResponse + err = xml.Unmarshal(bodyBytes, &errResp) + if err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + return nil, &errResp, nil + } + + headers := map[string]string{} + for key, val := range resp.Header { + headers[strings.ToLower(key)] = val[0] + } + + return headers, nil, nil +} + +func cancelAndError(err error, cancel context.CancelFunc) error { + cancel() + return err +} + +const ( + chunkSigHdrLength int64 = 81 +) + +// calculateSignedReqContentLength calculates the value of `Content-Length` header +// sizeOffset marks any planned changes on the body, which will affect the size +func calculateSignedReqContentLength(decPayloadSize int64, chunkSize int64, sizeOffset int64) int64 { + payloadSize := decPayloadSize + var chunkHeadersLength int64 + + // special case when chunk size is greater or equal than decoded content length + if chunkSize >= decPayloadSize { + chSizeLgth := len(fmt.Sprintf("%x", decPayloadSize)) + return decPayloadSize + sizeOffset + int64(chSizeLgth) + 2*chunkSigHdrLength + 9 + } + + for { + if payloadSize == 0 { + chunkHeadersLength += chunkSigHdrLength + 5 + break + } + if payloadSize < chunkSize { + chunkHeadersLength += 2*chunkSigHdrLength + 9 + int64(len(fmt.Sprintf("%x", payloadSize))) + break + } + chSizeLgth := len(fmt.Sprintf("%x", chunkSize)) + chunkHeadersLength += int64(chSizeLgth) + chunkSigHdrLength + 4 + + payloadSize -= chunkSize + } + + return chunkHeadersLength + decPayloadSize + sizeOffset +} + +// constructSignedStreamingPayload creates chunk encoded payload with signatures. +func constructSignedStreamingPayload(ctx context.Context, signer *v4.StreamSigner, signingTime time.Time, payload []byte, chunkSize int64) ([]byte, error) { + buf := bytes.NewBuffer(nil) + payloadLen := int64(len(payload)) + + if chunkSize > payloadLen { + chunkSize = payloadLen + } + + for i := int64(0); i < payloadLen; i += chunkSize { + if i+chunkSize > payloadLen { + offset := payloadLen - i + sig, err := signer.GetSignature(ctx, nil, payload[i:i+offset], signingTime) + if err != nil { + return nil, err + } + + _, err = buf.WriteString(fmt.Sprintf("%x;chunk-signature=%x\r\n%s\r\n", offset, sig, payload[i:i+offset])) + if err != nil { + return nil, err + } + break + } + + sig, err := signer.GetSignature(ctx, nil, payload[i:i+chunkSize], signingTime) + if err != nil { + return nil, err + } + + _, err = buf.WriteString(fmt.Sprintf("%x;chunk-signature=%x\r\n%s\r\n", chunkSize, sig, payload[i:i+chunkSize])) + if err != nil { + return nil, err + } + } + + sig, err := signer.GetSignature(ctx, nil, nil, signingTime) + if err != nil { + return nil, err + } + + _, err = buf.WriteString(fmt.Sprintf("0;chunk-signature=%x\r\n\r\n", sig)) + if err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// extractSignature extracts the signature from Authorization header +func extractSignature(req *http.Request) ([]byte, error) { + const key = "Signature=" + + authHdr := req.Header.Get("Authorization") + + i := strings.Index(authHdr, key) + if i == -1 { + return nil, errors.New("signature not found") + } + + sig := authHdr[i+len(key):] + + return hex.DecodeString(sig) +} + +// replaceRange replaces dst[start:end] with src and returns the modified slice. +// Used for custom overwrite of request payload bytes. +func replaceRange(dst, src []byte, start, end int) ([]byte, error) { + if start < 0 || end < start || end > len(dst) { + return nil, fmt.Errorf("invalid start/end indexes") + } + + newLen := len(dst) - (end - start) + len(src) + + // Fast path: reuse dst capacity if possible + if cap(dst) >= newLen { + // Extend or shrink dst + dst = dst[:newLen] + + // Move the tail if sizes differ + copy(dst[start+len(src):], dst[end:]) + + // Copy replacement + copy(dst[start:], src) + return dst, nil + } + + // Fallback: allocate new slice + out := make([]byte, newLen) + copy(out, dst[:start]) + copy(out[start:], src) + copy(out[start+len(src):], dst[end:]) + return out, nil +}