feat: adds integration tests for STREAMING-AWS4-HMAC-SHA256-PAYLOAD requests

This commit is contained in:
niksis02
2025-12-23 02:31:27 +04:00
parent 9eaaeedd28
commit 807399459d
4 changed files with 445 additions and 38 deletions

View File

@@ -43,7 +43,7 @@ const (
awsV4 = "AWS4"
awsS3Service = "s3"
awsV4Request = "aws4_request"
trailerSignatureHeader = "x-amz-trailer-signature"
trailerSignatureHeader = "x-amz-trailer-signature:"
streamPayloadAlgo = "AWS4-HMAC-SHA256-PAYLOAD"
streamPayloadTrailerAlgo = "AWS4-HMAC-SHA256-TRAILER"
@@ -52,6 +52,7 @@ const (
var (
errskipHeader = errors.New("skip to next header")
delimiter = []byte{'\r', '\n'}
)
// ChunkReader reads from chunked upload request body, and returns
@@ -134,12 +135,15 @@ func (cr *ChunkReader) Read(p []byte) (int, error) {
}
}
n, err := cr.parseAndRemoveChunkInfo(p[chunkSize:n])
if err != nil && err != io.EOF {
return 0, err
}
n += int(chunkSize)
cr.dataRead += int64(n)
if cr.isEOF {
if cr.cLength != cr.dataRead {
debuglogger.Logf("number of bytes expected: (%v), number of bytes read: (%v)", cr.cLength, cr.dataRead)
return n, s3err.GetAPIError(s3err.ErrContentLengthMismatch)
return 0, s3err.GetAPIError(s3err.ErrContentLengthMismatch)
}
}
return n, err
@@ -154,7 +158,7 @@ func (cr *ChunkReader) Read(p []byte) (int, error) {
if cr.isEOF {
if cr.cLength != cr.dataRead {
debuglogger.Logf("number of bytes expected: (%v), number of bytes read: (%v)", cr.cLength, cr.dataRead)
return n, s3err.GetAPIError(s3err.ErrContentLengthMismatch)
return 0, s3err.GetAPIError(s3err.ErrContentLengthMismatch)
}
}
return n, err
@@ -378,7 +382,7 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int,
// After the first chunk each chunk header should start
// with "\n\r\n"
if !cr.isFirstHeader {
err := readAndSkip(rdr, '\r', '\n')
err := readAndSkip(rdr, delimiter...)
if err != nil {
debuglogger.Logf("failed to read chunk header first 2 bytes: (should be): \\r\\n, (got): %q", header[:min(2, len(header))])
return cr.handleRdrErr(err, header)
@@ -391,25 +395,26 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int,
}
// read the chunk signature
err = readAndSkip(rdr, 'c', 'h', 'u', 'n', 'k', '-', 's', 'i', 'g', 'n', 'a', 't', 'u', 'r', 'e', '=')
err = readAndSkip(rdr, []byte("chunk-signature=")...)
if err != nil {
debuglogger.Logf("failed to read 'chunk-signature=': %v", err)
return cr.handleRdrErr(err, header)
}
sig, err := readAndTrim(rdr, '\r')
sig, err := readBytes(rdr, 64)
if err != nil {
debuglogger.Logf("failed to read '\\r', after chunk signature: %v", err)
return cr.handleRdrErr(err, header)
}
err = readAndSkip(rdr, delimiter...)
if err != nil {
debuglogger.Logf("failed to read '\\r\\n' after chunk signature")
return cr.handleRdrErr(err, header)
}
// read and parse the final chunk trailer and checksum
if chunkSize == 0 {
if cr.requireTrailer {
err = readAndSkip(rdr, '\n')
if err != nil {
debuglogger.Logf("failed to read \\n before the trailer: %v", err)
return cr.handleRdrErr(err, header)
}
// parse and validate the trailing header
trailer, err := readAndTrim(rdr, ':')
if err != nil {
@@ -430,19 +435,19 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int,
return cr.handleRdrErr(err, header)
}
if !IsValidChecksum(checksum, algo) {
debuglogger.Logf("invalid checksum value: %v", checksum)
return 0, "", 0, s3err.GetInvalidTrailingChecksumHeaderErr(trailer)
}
err = readAndSkip(rdr, '\n')
if err != nil {
debuglogger.Logf("failed to read \\n after checksum: %v", err)
return cr.handleRdrErr(err, header)
}
if !IsValidChecksum(checksum, algo) {
debuglogger.Logf("invalid checksum value: %v", checksum)
return 0, "", 0, s3err.GetInvalidTrailingChecksumHeaderErr(trailer)
}
// parse the trailing signature
trailerSigPrefix, err := readAndTrim(rdr, ':')
trailerSigPrefix, err := readBytes(rdr, 24)
if err != nil {
debuglogger.Logf("failed to read trailing signature prefix: %v", err)
return cr.handleRdrErr(err, header)
@@ -453,37 +458,37 @@ func (cr *ChunkReader) parseChunkHeaderBytes(header []byte) (int64, string, int,
return 0, "", 0, s3err.GetAPIError(s3err.ErrIncompleteBody)
}
trailerSig, err := readAndTrim(rdr, '\r')
trailerSig, err := readBytes(rdr, 64)
if err != nil {
debuglogger.Logf("failed to read trailing signature: %v", err)
return cr.handleRdrErr(err, header)
}
err = readAndSkip(rdr, delimiter...)
if err != nil {
debuglogger.Logf("failed to read '\\r\\n' after last chunk signature")
return cr.handleRdrErr(err, header)
}
cr.trailerSig = trailerSig
cr.parsedChecksum = checksum
}
// "\r\n\r\n" is followed after the last chunk
err = readAndSkip(rdr, '\n', '\r', '\n')
err = readAndSkip(rdr, delimiter...)
if err != nil {
debuglogger.Logf("failed to read \\n\\r\\n at the end of chunk header: %v", err)
debuglogger.Logf("failed to read \\r\\n at the end of chunk header: %v", err)
return cr.handleRdrErr(err, header)
}
return 0, sig, 0, nil
}
err = readAndSkip(rdr, '\n')
if err != nil {
debuglogger.Logf("failed to read \\n at the end of chunk header: %v", err)
return cr.handleRdrErr(err, header)
}
// find the index of chunk ending: '\r\n'
// skip the first 2 bytes as it is the starting '\r\n'
// the first chunk doesn't contain the starting '\r\n', but
// anyway, trimming the first 2 bytes doesn't pollute the logic.
ind := bytes.Index(header[2:], []byte{'\r', '\n'})
ind := bytes.Index(header[2:], delimiter)
cr.isFirstHeader = false
// the offset is the found index + 4 - the stash length
@@ -570,19 +575,18 @@ func (cr *ChunkReader) Checksum() string {
}
// reads data from the "rdr" and validates the passed data bytes
func readAndSkip(rdr *bufio.Reader, data ...byte) error {
for _, d := range data {
b, err := rdr.ReadByte()
if err != nil {
return err
}
if b != d {
return s3err.GetAPIError(s3err.ErrIncompleteBody)
}
func readAndSkip(rdr *bufio.Reader, expected ...byte) error {
buf := make([]byte, len(expected))
_, err := io.ReadFull(rdr, buf)
if err != nil {
return err
}
return nil
if bytes.Equal(buf, expected) {
return nil
}
return s3err.GetAPIError(s3err.ErrIncompleteBody)
}
// reads string by "delim" and trims the delimiter at the end
@@ -594,3 +598,10 @@ func readAndTrim(r *bufio.Reader, delim byte) (string, error) {
return strings.TrimSuffix(str, string(delim)), nil
}
func readBytes(r *bufio.Reader, count int) (string, error) {
buf := make([]byte, count)
_, err := io.ReadFull(r, buf)
return string(buf), err
}

View File

@@ -1114,6 +1114,14 @@ func TestUnsignedStreaminPayloadTrailer(ts *TestState) {
}
}
func TestSignedStreaminPayload(ts *TestState) {
if !ts.conf.azureTests {
ts.Run(SignedStreamingPayload_invalid_encoding)
ts.Run(SignedStreamingPayload_invalid_chunk_size)
ts.Run(SignedStreamingPayload_decoded_content_length_mismatch)
}
}
type IntTest func(s3 *S3Conf) error
type IntTests map[string]IntTest
@@ -1767,5 +1775,8 @@ func GetIntTests() IntTests {
"UnsignedStreamingPayloadTrailer_UploadPart_trailer_and_mp_algo_mismatch": UnsignedStreamingPayloadTrailer_UploadPart_trailer_and_mp_algo_mismatch,
"UnsignedStreamingPayloadTrailer_UploadPart_success_with_trailer": UnsignedStreamingPayloadTrailer_UploadPart_success_with_trailer,
"UnsignedStreamingPayloadTrailer_not_allowed": UnsignedStreamingPayloadTrailer_not_allowed,
"SignedStreamingPayload_invalid_encoding": SignedStreamingPayload_invalid_encoding,
"SignedStreamingPayload_invalid_chunk_size": SignedStreamingPayload_invalid_chunk_size,
"SignedStreamingPayload_decoded_content_length_mismatch": SignedStreamingPayload_decoded_content_length_mismatch,
}
}

View File

@@ -0,0 +1,124 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package integration
import (
"bytes"
"fmt"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/versity/versitygw/s3err"
)
func SignedStreamingPayload_invalid_encoding(s *S3Conf) error {
testName := "SignedStreamingPayload_invalid_encoding"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
object := "object"
for i, test := range []struct {
from int
to int
buffer []byte
}{
{0, 2, []byte{'j'}}, // invalid chunk size
// missing/invalid delimiters
{83, 85, nil},
{83, 85, []byte("dd")},
{103, 105, nil},
{103, 105, []byte("something invalid")},
// invalid trailing delimiter
{187, 191, []byte("bbbb")},
// only last character changed
{190, 191, []byte("s")},
// invalid chunksize delimiter (;)
{2, 3, []byte(":")},
// missing chunk-signature
{3, 19, nil},
// short signature
{19, 24, nil},
} {
_, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, []byte("dummy data paylaod"), withModifyPayload(test.from, test.to, test.buffer))
if err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrIncompleteBody), apiErr); err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
}
return nil
})
}
func SignedStreamingPayload_invalid_chunk_size(s *S3Conf) error {
testName := "SignedStreamingPayload_invalid_chunk_size"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
object := "my-object"
for i, test := range []struct {
chunkSize int64
payload []byte
expectErr bool
}{
{10, bytes.Repeat([]byte{'b'}, 100), true},
{1000, bytes.Repeat([]byte{'a'}, 200), false},
{8192, bytes.Repeat([]byte{'c'}, 10000), false},
{1000, bytes.Repeat([]byte{'c'}, 1024*64), true},
} {
_, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, test.payload, withChunkSize(test.chunkSize))
if err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
if !test.expectErr && apiErr != nil {
return fmt.Errorf("test %v failed: expected no error, instead got: (%s) %s", i+1, apiErr.Code, apiErr.Message)
}
if test.expectErr {
if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrInvalidChunkSize), apiErr); err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
}
}
return nil
})
}
func SignedStreamingPayload_decoded_content_length_mismatch(s *S3Conf) error {
testName := "SignedStreamingPayload_decoded_content_length_mismatch"
return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error {
object := "my-object"
for i, test := range []struct {
cLength int64
payload []byte
}{
{10, bytes.Repeat([]byte{'a'}, 8)},
{10, bytes.Repeat([]byte{'a'}, 12)},
} {
_, apiErr, err := testSignedStreamingObjectPut(s, bucket, object, test.payload, withCustomHeaders(map[string]string{
"x-amz-decoded-content-length": fmt.Sprint(test.cLength),
}))
if err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
if err := compareS3ApiError(s3err.GetAPIError(s3err.ErrContentLengthMismatch), apiErr); err != nil {
return fmt.Errorf("test %v failed: %w", i+1, err)
}
}
return nil
})
}

View File

@@ -2118,3 +2118,264 @@ func constructUnsignedPaylod(chunkSizes ...int64) (int64, []byte, error) {
return cLength, buffer.Bytes(), nil
}
type signedReqCfg struct {
headers map[string]string
chunkSize int64
modifFrom *int
modifTo *int
modifPayload []byte
}
type signedReqOpt func(*signedReqCfg)
func withCustomHeaders(h map[string]string) signedReqOpt {
return func(src *signedReqCfg) { src.headers = h }
}
func withChunkSize(s int64) signedReqOpt {
return func(src *signedReqCfg) { src.chunkSize = s }
}
func withModifyPayload(from int, to int, p []byte) signedReqOpt {
return func(src *signedReqCfg) {
src.modifPayload = p
src.modifFrom = &from
src.modifTo = &to
}
}
func testSignedStreamingObjectPut(s *S3Conf, bucket, object string, payload []byte, opts ...signedReqOpt) (map[string]string, *s3err.APIErrorResponse, error) {
cfg := &signedReqCfg{
chunkSize: 8192, // minimal valid chunk size
}
for _, opt := range opts {
opt(cfg)
}
ctx, cancel := context.WithTimeout(context.Background(), shortTimeout)
// create a request with no body
req, err := http.NewRequestWithContext(ctx, http.MethodPut, fmt.Sprintf("%s/%s/%s", s.endpoint, bucket, object), nil)
if err != nil {
return nil, nil, cancelAndError(fmt.Errorf("failed to create a request: %w", err), cancel)
}
var payloadOffset int64
// any planned modification which is going to affect the
// Content-Length header value
if cfg.modifFrom != nil && cfg.modifTo != nil {
diff := len(cfg.modifPayload) - *cfg.modifTo + *cfg.modifFrom
payloadOffset = int64(diff)
}
// precalculated the Content-Length header to correctly sign the request
req.ContentLength = calculateSignedReqContentLength(int64(len(payload)), cfg.chunkSize, payloadOffset)
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
req.Header.Set("x-amz-decoded-content-length", fmt.Sprint(len(payload)))
// set custom request headers
for key, val := range cfg.headers {
req.Header.Set(key, val)
}
signer := v4.NewSigner()
signingTime := time.Now()
// sign the request
err = signer.SignHTTP(ctx, aws.Credentials{AccessKeyID: s.awsID, SecretAccessKey: s.awsSecret}, req, "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", "s3", s.awsRegion, signingTime)
if err != nil {
return nil, nil, cancelAndError(fmt.Errorf("failed to sign the request: %w", err), cancel)
}
// extract the seed signature
seedSignature, err := extractSignature(req)
if err != nil {
return nil, nil, cancelAndError(fmt.Errorf("failed to extract seed signature: %w", err), cancel)
}
// initialize v4 stream signed
streamSigner := v4.NewStreamSigner(aws.Credentials{AccessKeyID: s.awsID, SecretAccessKey: s.awsSecret}, "s3", s.awsRegion, seedSignature)
// create the signed payload
body, err := constructSignedStreamingPayload(ctx, streamSigner, signingTime, payload, cfg.chunkSize)
if err != nil {
return nil, nil, cancelAndError(fmt.Errorf("failed to encode req body: %w", err), cancel)
}
// overwrite body bytes by configuration
if cfg.modifFrom != nil && cfg.modifTo != nil {
body, err = replaceRange(body, cfg.modifPayload, *cfg.modifFrom, *cfg.modifTo)
if err != nil {
return nil, nil, cancelAndError(fmt.Errorf("failed replace body bytes: %w", err), cancel)
}
}
// assign req.Body and req.GetBody for the http client
// to handle the request
req.Body = io.NopCloser(bytes.NewReader(body))
req.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(body)), nil
}
// send the request
resp, err := s.httpClient.Do(req)
cancel()
if err != nil {
return nil, nil, fmt.Errorf("failed to send the request: %w", err)
}
if resp.StatusCode >= 300 {
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, fmt.Errorf("failed to read the response body: %w", err)
}
var errResp s3err.APIErrorResponse
err = xml.Unmarshal(bodyBytes, &errResp)
if err != nil {
return nil, nil, fmt.Errorf("failed to unmarshal response body: %w", err)
}
return nil, &errResp, nil
}
headers := map[string]string{}
for key, val := range resp.Header {
headers[strings.ToLower(key)] = val[0]
}
return headers, nil, nil
}
func cancelAndError(err error, cancel context.CancelFunc) error {
cancel()
return err
}
const (
chunkSigHdrLength int64 = 81
)
// calculateSignedReqContentLength calculates the value of `Content-Length` header
// sizeOffset marks any planned changes on the body, which will affect the size
func calculateSignedReqContentLength(decPayloadSize int64, chunkSize int64, sizeOffset int64) int64 {
payloadSize := decPayloadSize
var chunkHeadersLength int64
// special case when chunk size is greater or equal than decoded content length
if chunkSize >= decPayloadSize {
chSizeLgth := len(fmt.Sprintf("%x", decPayloadSize))
return decPayloadSize + sizeOffset + int64(chSizeLgth) + 2*chunkSigHdrLength + 9
}
for {
if payloadSize == 0 {
chunkHeadersLength += chunkSigHdrLength + 5
break
}
if payloadSize < chunkSize {
chunkHeadersLength += 2*chunkSigHdrLength + 9 + int64(len(fmt.Sprintf("%x", payloadSize)))
break
}
chSizeLgth := len(fmt.Sprintf("%x", chunkSize))
chunkHeadersLength += int64(chSizeLgth) + chunkSigHdrLength + 4
payloadSize -= chunkSize
}
return chunkHeadersLength + decPayloadSize + sizeOffset
}
// constructSignedStreamingPayload creates chunk encoded payload with signatures.
func constructSignedStreamingPayload(ctx context.Context, signer *v4.StreamSigner, signingTime time.Time, payload []byte, chunkSize int64) ([]byte, error) {
buf := bytes.NewBuffer(nil)
payloadLen := int64(len(payload))
if chunkSize > payloadLen {
chunkSize = payloadLen
}
for i := int64(0); i < payloadLen; i += chunkSize {
if i+chunkSize > payloadLen {
offset := payloadLen - i
sig, err := signer.GetSignature(ctx, nil, payload[i:i+offset], signingTime)
if err != nil {
return nil, err
}
_, err = buf.WriteString(fmt.Sprintf("%x;chunk-signature=%x\r\n%s\r\n", offset, sig, payload[i:i+offset]))
if err != nil {
return nil, err
}
break
}
sig, err := signer.GetSignature(ctx, nil, payload[i:i+chunkSize], signingTime)
if err != nil {
return nil, err
}
_, err = buf.WriteString(fmt.Sprintf("%x;chunk-signature=%x\r\n%s\r\n", chunkSize, sig, payload[i:i+chunkSize]))
if err != nil {
return nil, err
}
}
sig, err := signer.GetSignature(ctx, nil, nil, signingTime)
if err != nil {
return nil, err
}
_, err = buf.WriteString(fmt.Sprintf("0;chunk-signature=%x\r\n\r\n", sig))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// extractSignature extracts the signature from Authorization header
func extractSignature(req *http.Request) ([]byte, error) {
const key = "Signature="
authHdr := req.Header.Get("Authorization")
i := strings.Index(authHdr, key)
if i == -1 {
return nil, errors.New("signature not found")
}
sig := authHdr[i+len(key):]
return hex.DecodeString(sig)
}
// replaceRange replaces dst[start:end] with src and returns the modified slice.
// Used for custom overwrite of request payload bytes.
func replaceRange(dst, src []byte, start, end int) ([]byte, error) {
if start < 0 || end < start || end > len(dst) {
return nil, fmt.Errorf("invalid start/end indexes")
}
newLen := len(dst) - (end - start) + len(src)
// Fast path: reuse dst capacity if possible
if cap(dst) >= newLen {
// Extend or shrink dst
dst = dst[:newLen]
// Move the tail if sizes differ
copy(dst[start+len(src):], dst[end:])
// Copy replacement
copy(dst[start:], src)
return dst, nil
}
// Fallback: allocate new slice
out := make([]byte, newLen)
copy(out, dst[:start])
copy(out[start:], src)
copy(out[start+len(src):], dst[end:])
return out, nil
}