diff --git a/pkg/hold/pds/xrpc_test.go b/pkg/hold/pds/xrpc_test.go index 6da962b..d4e798c 100644 --- a/pkg/hold/pds/xrpc_test.go +++ b/pkg/hold/pds/xrpc_test.go @@ -220,6 +220,27 @@ func decodeFirehoseMessage(t *testing.T, message []byte) (*events.EventHeader, * return &header, &commit } +// skipAccountEvent reads and discards the initial #account event that the +// EventBroadcaster sends to every new subscriber. +func skipAccountEvent(t *testing.T, conn *websocket.Conn) { + t.Helper() + msgType, message, err := conn.ReadMessage() + if err != nil { + t.Fatalf("Failed to read account event: %v", err) + } + if msgType != websocket.BinaryMessage { + t.Fatalf("Expected binary message for account event, got type %d", msgType) + } + reader := bytes.NewReader(message) + var header events.EventHeader + if err := header.UnmarshalCBOR(reader); err != nil { + t.Fatalf("Failed to decode account event header: %v", err) + } + if header.MsgType != "#account" { + t.Fatalf("Expected #account event, got %s", header.MsgType) + } +} + // assertCARResponse validates CAR file response func assertCARResponse(t *testing.T, w *httptest.ResponseRecorder, expectedCode int) []byte { t.Helper() @@ -2851,6 +2872,9 @@ func TestHandleSubscribeRepos(t *testing.T) { } defer conn.Close() + // Skip the initial #account event + skipAccountEvent(t, conn) + // Should receive the 3 historical events for i := 0; i < 3; i++ { messageType, message, err := conn.ReadMessage() @@ -2890,6 +2914,9 @@ func TestHandleSubscribeRepos(t *testing.T) { } defer conn.Close() + // Skip the initial #account event + skipAccountEvent(t, conn) + // Should only receive event 3 (after cursor=2) messageType, message, err := conn.ReadMessage() if err != nil { @@ -2926,6 +2953,9 @@ func TestHandleSubscribeRepos(t *testing.T) { } defer conn.Close() + // Skip the initial #account event + skipAccountEvent(t, conn) + // Verify no historical events by broadcasting immediately and checking // that we only receive the new event (not historical ones) // Give subscriber time to register first @@ -2978,6 +3008,9 @@ func TestHandleSubscribeRepos(t *testing.T) { } defer conn.Close() + // Skip the initial #account event + skipAccountEvent(t, conn) + // Read and discard the 4 historical events (seq 1-4) for i := 0; i < 4; i++ { _, _, err := conn.ReadMessage() diff --git a/pkg/s3/types.go b/pkg/s3/types.go index 20f0c5b..19da9cd 100644 --- a/pkg/s3/types.go +++ b/pkg/s3/types.go @@ -192,6 +192,11 @@ func NewS3Service(params map[string]any) (*S3Service, error) { o.BaseEndpoint = aws.String(endpoint) o.UsePathStyle = true } + // Disable automatic CRC32 checksum calculation on uploads. + // SDK v2 v1.71.0+ adds checksums by default using chunked encoding + // with trailers, which causes XAmzContentSHA256Mismatch errors on + // S3-compatible services that don't support this. + o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired }) var s3PathPrefix string