more s3 fixes

This commit is contained in:
Evan Jarrett
2026-02-14 22:23:07 -06:00
parent f307d6ea85
commit 0d723cb708
2 changed files with 38 additions and 0 deletions

View File

@@ -220,6 +220,27 @@ func decodeFirehoseMessage(t *testing.T, message []byte) (*events.EventHeader, *
return &header, &commit
}
// skipAccountEvent reads and discards the initial #account event that the
// EventBroadcaster sends to every new subscriber.
func skipAccountEvent(t *testing.T, conn *websocket.Conn) {
t.Helper()
msgType, message, err := conn.ReadMessage()
if err != nil {
t.Fatalf("Failed to read account event: %v", err)
}
if msgType != websocket.BinaryMessage {
t.Fatalf("Expected binary message for account event, got type %d", msgType)
}
reader := bytes.NewReader(message)
var header events.EventHeader
if err := header.UnmarshalCBOR(reader); err != nil {
t.Fatalf("Failed to decode account event header: %v", err)
}
if header.MsgType != "#account" {
t.Fatalf("Expected #account event, got %s", header.MsgType)
}
}
// assertCARResponse validates CAR file response
func assertCARResponse(t *testing.T, w *httptest.ResponseRecorder, expectedCode int) []byte {
t.Helper()
@@ -2851,6 +2872,9 @@ func TestHandleSubscribeRepos(t *testing.T) {
}
defer conn.Close()
// Skip the initial #account event
skipAccountEvent(t, conn)
// Should receive the 3 historical events
for i := 0; i < 3; i++ {
messageType, message, err := conn.ReadMessage()
@@ -2890,6 +2914,9 @@ func TestHandleSubscribeRepos(t *testing.T) {
}
defer conn.Close()
// Skip the initial #account event
skipAccountEvent(t, conn)
// Should only receive event 3 (after cursor=2)
messageType, message, err := conn.ReadMessage()
if err != nil {
@@ -2926,6 +2953,9 @@ func TestHandleSubscribeRepos(t *testing.T) {
}
defer conn.Close()
// Skip the initial #account event
skipAccountEvent(t, conn)
// Verify no historical events by broadcasting immediately and checking
// that we only receive the new event (not historical ones)
// Give subscriber time to register first
@@ -2978,6 +3008,9 @@ func TestHandleSubscribeRepos(t *testing.T) {
}
defer conn.Close()
// Skip the initial #account event
skipAccountEvent(t, conn)
// Read and discard the 4 historical events (seq 1-4)
for i := 0; i < 4; i++ {
_, _, err := conn.ReadMessage()

View File

@@ -192,6 +192,11 @@ func NewS3Service(params map[string]any) (*S3Service, error) {
o.BaseEndpoint = aws.String(endpoint)
o.UsePathStyle = true
}
// Disable automatic CRC32 checksum calculation on uploads.
// SDK v2 v1.71.0+ adds checksums by default using chunked encoding
// with trailers, which causes XAmzContentSHA256Mismatch errors on
// S3-compatible services that don't support this.
o.RequestChecksumCalculation = aws.RequestChecksumCalculationWhenRequired
})
var s3PathPrefix string