Files
at-container-registry/docs/MULTIPART.md
2025-10-10 20:50:21 -05:00

18 KiB

S3 Multipart Upload Implementation Plan

 Problem Summary

 Current implementation uses a single presigned URL with a pipe for chunked uploads (PATCH). This causes:
 - Docker PATCH requests block waiting for pipe writes
 - S3 upload happens in background via single presigned URL
 - Docker times out → "client disconnected during blob PATCH"
 - Root cause: Single presigned URLs don't support OCI's chunked upload protocol

 Solution: S3 Multipart Upload API

 Implement proper S3 multipart upload to support Docker's chunked PATCH operations:
 - Each PATCH → separate S3 part upload with its own presigned URL
 - On Commit → complete multipart upload
 - No buffering, no pipes, no blocking

 ---
 Architecture Changes

 Current (Broken) Flow

 POST /blobs/uploads/ → Create() → Single presigned URL to temp location
 PATCH → Write to pipe → [blocks] → Background goroutine uploads via single URL
 PATCH → [blocks on pipe] → Docker timeout → disconnect ❌

 New (Multipart) Flow

 POST /blobs/uploads/ → Create() → Initiate multipart upload, get upload ID
 PATCH #1 → Get presigned URL for part 1 → Upload part 1 to S3 → Store ETag
 PATCH #2 → Get presigned URL for part 2 → Upload part 2 to S3 → Store ETag
 PUT (commit) → Complete multipart upload with ETags → Done ✅

 ---
 Implementation Details

 1. Hold Service: Add Multipart Upload Endpoints

 File: cmd/hold/main.go

 New Request/Response Types

 // StartMultipartUploadRequest initiates a multipart upload
 type StartMultipartUploadRequest struct {
     DID    string `json:"did"`
     Digest string `json:"digest"`
 }

 type StartMultipartUploadResponse struct {
     UploadID  string    `json:"upload_id"`
     ExpiresAt time.Time `json:"expires_at"`
 }

 // GetPartURLRequest requests a presigned URL for a specific part
 type GetPartURLRequest struct {
     DID        string `json:"did"`
     Digest     string `json:"digest"`
     UploadID   string `json:"upload_id"`
     PartNumber int    `json:"part_number"`
 }

 type GetPartURLResponse struct {
     URL       string    `json:"url"`
     ExpiresAt time.Time `json:"expires_at"`
 }

 // CompleteMultipartRequest completes a multipart upload
 type CompleteMultipartRequest struct {
     DID      string `json:"did"`
     Digest   string `json:"digest"`
     UploadID string `json:"upload_id"`
     Parts    []CompletedPart `json:"parts"`
 }

 type CompletedPart struct {
     PartNumber int    `json:"part_number"`
     ETag       string `json:"etag"`
 }

 // AbortMultipartRequest aborts an in-progress upload
 type AbortMultipartRequest struct {
     DID      string `json:"did"`
     Digest   string `json:"digest"`
     UploadID string `json:"upload_id"`
 }

 New Endpoints

 POST /start-multipart
 func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) {
     // Validate DID authorization for WRITE
     // Build S3 key from digest
     // Call s3.CreateMultipartUploadRequest()
     // Generate presigned URL if needed, or return upload ID
     // Return upload ID to client
 }

 POST /part-presigned-url
 func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) {
     // Validate DID authorization for WRITE
     // Build S3 key from digest
     // Call s3.UploadPartRequest() with part number and upload ID
     // Generate presigned URL
     // Return presigned URL for this specific part
 }

 POST /complete-multipart
 func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) {
     // Validate DID authorization for WRITE
     // Build S3 key from digest
     // Prepare CompletedPart array with part numbers and ETags
     // Call s3.CompleteMultipartUpload()
     // Return success
 }

 POST /abort-multipart (for cleanup)
 func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) {
     // Validate DID authorization for WRITE
     // Call s3.AbortMultipartUpload()
     // Return success
 }

 S3 Implementation

 // startMultipartUpload initiates a multipart upload and returns upload ID
 func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) {
     if s.s3Client == nil {
         return "", fmt.Errorf("S3 not configured")
     }

     path := blobPath(digest)
     s3Key := strings.TrimPrefix(path, "/")
     if s.s3PathPrefix != "" {
         s3Key = s.s3PathPrefix + "/" + s3Key
     }

     result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
         Bucket: aws.String(s.bucket),
         Key:    aws.String(s3Key),
     })
     if err != nil {
         return "", err
     }

     return *result.UploadId, nil
 }

 // getPartPresignedURL generates presigned URL for a specific part
 func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) {
     if s.s3Client == nil {
         return "", fmt.Errorf("S3 not configured")
     }

     path := blobPath(digest)
     s3Key := strings.TrimPrefix(path, "/")
     if s.s3PathPrefix != "" {
         s3Key = s.s3PathPrefix + "/" + s3Key
     }

     req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{
         Bucket:     aws.String(s.bucket),
         Key:        aws.String(s3Key),
         UploadId:   aws.String(uploadID),
         PartNumber: aws.Int64(int64(partNumber)),
     })

     return req.Presign(15 * time.Minute)
 }

 // completeMultipartUpload finalizes the multipart upload
 func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error {
     if s.s3Client == nil {
         return fmt.Errorf("S3 not configured")
     }

     path := blobPath(digest)
     s3Key := strings.TrimPrefix(path, "/")
     if s.s3PathPrefix != "" {
         s3Key = s.s3PathPrefix + "/" + s3Key
     }

     // Convert to S3 CompletedPart format
     s3Parts := make([]*s3.CompletedPart, len(parts))
     for i, p := range parts {
         s3Parts[i] = &s3.CompletedPart{
             PartNumber: aws.Int64(int64(p.PartNumber)),
             ETag:       aws.String(p.ETag),
         }
     }

     _, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
         Bucket:   aws.String(s.bucket),
         Key:      aws.String(s3Key),
         UploadId: aws.String(uploadID),
         MultipartUpload: &s3.CompletedMultipartUpload{
             Parts: s3Parts,
         },
     })

     return err
 }

 ---
 2. AppView: Rewrite ProxyBlobStore for Multipart

 File: pkg/storage/proxy_blob_store.go

 Remove Current Implementation

 - Remove pipe-based streaming
 - Remove background goroutine with single presigned URL
 - Remove global upload tracking map

 New ProxyBlobWriter Structure

 type ProxyBlobWriter struct {
     store         *ProxyBlobStore
     options       distribution.CreateOptions
     uploadID      string          // S3 multipart upload ID
     parts         []CompletedPart // Track uploaded parts with ETags
     partNumber    int             // Current part number (starts at 1)
     buffer        *bytes.Buffer   // Buffer for current part
     size          int64           // Total bytes written
     closed        bool
     id            string          // Distribution's upload ID (for state)
     startedAt     time.Time
     finalDigest   string          // Set on Commit
 }

 type CompletedPart struct {
     PartNumber int
     ETag       string
 }

 New Create() - Initiate Multipart Upload

 func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
     var opts distribution.CreateOptions
     for _, option := range options {
         if err := option.Apply(&opts); err != nil {
             return nil, err
         }
     }

     // Use temp digest for upload location
     writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
     tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", writerID))

     // Start multipart upload via hold service
     uploadID, err := p.startMultipartUpload(ctx, tempDigest)
     if err != nil {
         return nil, fmt.Errorf("failed to start multipart upload: %w", err)
     }

     writer := &ProxyBlobWriter{
         store:      p,
         options:    opts,
         uploadID:   uploadID,
         parts:      make([]CompletedPart, 0),
         partNumber: 1,
         buffer:     bytes.NewBuffer(make([]byte, 0, 5*1024*1024)), // 5MB buffer
         id:         writerID,
         startedAt:  time.Now(),
     }

     // Store in global map for Resume()
     globalUploadsMu.Lock()
     globalUploads[writer.id] = writer
     globalUploadsMu.Unlock()

     return writer, nil
 }

 New Write() - Buffer and Flush Parts

 func (w *ProxyBlobWriter) Write(p []byte) (int, error) {
     if w.closed {
         return 0, fmt.Errorf("writer closed")
     }

     n, err := w.buffer.Write(p)
     w.size += int64(n)

     // Flush if buffer reaches 5MB (S3 minimum part size)
     if w.buffer.Len() >= 5*1024*1024 {
         if err := w.flushPart(); err != nil {
             return n, err
         }
     }

     return n, err
 }

 func (w *ProxyBlobWriter) flushPart() error {
     if w.buffer.Len() == 0 {
         return nil
     }

     ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
     defer cancel()

     // Get presigned URL for this part
     tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id))
     url, err := w.store.getPartPresignedURL(ctx, tempDigest, w.uploadID, w.partNumber)
     if err != nil {
         return fmt.Errorf("failed to get part presigned URL: %w", err)
     }

     // Upload part to S3
     req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(w.buffer.Bytes()))
     if err != nil {
         return err
     }

     resp, err := w.store.httpClient.Do(req)
     if err != nil {
         return err
     }
     defer resp.Body.Close()

     if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
         return fmt.Errorf("part upload failed: status %d", resp.StatusCode)
     }

     // Store ETag for completion
     etag := resp.Header.Get("ETag")
     if etag == "" {
         return fmt.Errorf("no ETag in response")
     }

     w.parts = append(w.parts, CompletedPart{
         PartNumber: w.partNumber,
         ETag:       etag,
     })

     // Reset buffer and increment part number
     w.buffer.Reset()
     w.partNumber++

     return nil
 }

 New Commit() - Complete Multipart and Move

 func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
     if w.closed {
         return distribution.Descriptor{}, fmt.Errorf("writer closed")
     }
     w.closed = true

     // Flush any remaining buffered data
     if w.buffer.Len() > 0 {
         if err := w.flushPart(); err != nil {
             // Try to abort multipart on error
             w.store.abortMultipartUpload(ctx, w.uploadID)
             return distribution.Descriptor{}, err
         }
     }

     // Complete multipart upload at temp location
     tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id))
     if err := w.store.completeMultipartUpload(ctx, tempDigest, w.uploadID, w.parts); err != nil {
         return distribution.Descriptor{}, err
     }

     // Move from temp → final location (server-side S3 copy)
     tempPath := fmt.Sprintf("uploads/temp-%s", w.id)
     finalPath := desc.Digest.String()

     moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s",
         w.store.storageEndpoint, tempPath, finalPath, w.store.did)

     req, err := http.NewRequestWithContext(ctx, "POST", moveURL, nil)
     if err != nil {
         return distribution.Descriptor{}, err
     }

     resp, err := w.store.httpClient.Do(req)
     if err != nil {
         return distribution.Descriptor{}, err
     }
     defer resp.Body.Close()

     if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
         bodyBytes, _ := io.ReadAll(resp.Body)
         return distribution.Descriptor{}, fmt.Errorf("move failed: %d, %s", resp.StatusCode, bodyBytes)
     }

     // Remove from global map
     globalUploadsMu.Lock()
     delete(globalUploads, w.id)
     globalUploadsMu.Unlock()

     return distribution.Descriptor{
         Digest:    desc.Digest,
         Size:      w.size,
         MediaType: desc.MediaType,
     }, nil
 }

 Add Hold Service Client Methods

 func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, dgst digest.Digest) (string, error) {
     reqBody := map[string]any{
         "did":    p.did,
         "digest": dgst.String(),
     }
     body, _ := json.Marshal(reqBody)

     url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint)
     req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
     req.Header.Set("Content-Type", "application/json")

     resp, err := p.httpClient.Do(req)
     if err != nil {
         return "", err
     }
     defer resp.Body.Close()

     var result struct {
         UploadID string `json:"upload_id"`
     }
     if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
         return "", err
     }

     return result.UploadID, nil
 }

 func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, dgst digest.Digest, uploadID string, partNumber int) (string, error) {
     reqBody := map[string]any{
         "did":         p.did,
         "digest":      dgst.String(),
         "upload_id":   uploadID,
         "part_number": partNumber,
     }
     body, _ := json.Marshal(reqBody)

     url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint)
     req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
     req.Header.Set("Content-Type", "application/json")

     resp, err := p.httpClient.Do(req)
     if err != nil {
         return "", err
     }
     defer resp.Body.Close()

     var result struct {
         URL string `json:"url"`
     }
     if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
         return "", err
     }

     return result.URL, nil
 }

 func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string, parts []CompletedPart) error {
     reqBody := map[string]any{
         "did":       p.did,
         "digest":    dgst.String(),
         "upload_id": uploadID,
         "parts":     parts,
     }
     body, _ := json.Marshal(reqBody)

     url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint)
     req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
     req.Header.Set("Content-Type", "application/json")

     resp, err := p.httpClient.Do(req)
     if err != nil {
         return err
     }
     defer resp.Body.Close()

     if resp.StatusCode != http.StatusOK {
         return fmt.Errorf("complete multipart failed: status %d", resp.StatusCode)
     }

     return nil
 }

 ---
 Testing Plan

 1. Unit Tests

 - Test multipart upload initiation
 - Test part upload with presigned URLs
 - Test completion with ETags
 - Test abort on errors

 2. Integration Tests

 - Push small images (< 5MB, single part)
 - Push medium images (10MB, 2 parts)
 - Push large images (100MB, 20 parts)
 - Test with Upcloud S3
 - Test with Storj S3

 3. Validation

 - Monitor logs for "client disconnected" errors (should be gone)
 - Check Docker push success rate
 - Verify blobs stored correctly in S3
 - Check bandwidth usage on hold service (should be minimal)

 ---
 Migration & Deployment

 Backward Compatibility

 - Keep /put-presigned-url endpoint for fallback
 - Keep /move endpoint (still needed)
 - New multipart endpoints are additive

 Deployment Steps

 1. Update hold service with new endpoints
 2. Update AppView ProxyBlobStore
 3. Deploy hold service first
 4. Deploy AppView
 5. Test with sample push
 6. Monitor logs

 Rollback Plan

 - Revert AppView to previous version (uses old presigned URL method)
 - Hold service keeps both old and new endpoints

 ---
 Documentation Updates

 Update docs/PRESIGNED_URLS.md

 - Add section "Multipart Upload for Chunked Data"
 - Explain why single presigned URLs don't work with PATCH
 - Document new endpoints and flow
 - Add S3 part size recommendations (5MB-64MB for Storj)

 Add Troubleshooting Section

 - "Client disconnected during PATCH" → resolved by multipart
 - Storj-specific considerations (64MB parts recommended)
 - Upcloud compatibility notes

 ---
 Performance Impact

 Before (Broken)

 - Docker PATCH → blocks on pipe → timeout → retry → fail
 - Unable to push large images reliably

 After (Multipart)

 - Each PATCH → independent part upload → immediate response
 - No blocking, no timeouts
 - Parallel part uploads possible (future optimization)
 - Reliable pushes for any image size

 Bandwidth

 - Hold service: Only API calls (~1KB per part)
 - Direct S3 uploads: Full blob data
 - S3 copy for move: Server-side (no hold bandwidth)

 Estimated savings: 99.98% hold service bandwidth reduction (same as before, but now actually works!)