Merge branch 'presigned-urls'

This commit is contained in:
Evan Jarrett
2025-10-12 09:12:44 -05:00
24 changed files with 4159 additions and 1671 deletions

View File

@@ -1,22 +1,14 @@
package main
import (
"fmt"
"os"
"time"
"github.com/distribution/distribution/v3/registry"
_ "github.com/distribution/distribution/v3/registry/auth/token"
_ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem"
_ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"
// Register our custom middleware
_ "atcr.io/pkg/middleware"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
"atcr.io/pkg/middleware"
_ "atcr.io/pkg/appview/middleware"
)
func main() {
@@ -26,11 +18,3 @@ func main() {
os.Exit(1)
}
}
// Suppress unused import warnings
var _ = fmt.Sprint
var _ = os.Stdout
var _ = time.Now
var _ = oauth.NewRefresher
var _ = token.NewIssuer
var _ = middleware.SetGlobalRefresher

View File

@@ -19,16 +19,15 @@ import (
sqlite3 "github.com/mattn/go-sqlite3"
"github.com/spf13/cobra"
"atcr.io/pkg/appview/middleware"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
"atcr.io/pkg/middleware"
// UI components
"atcr.io/pkg/appview"
"atcr.io/pkg/appview/db"
uihandlers "atcr.io/pkg/appview/handlers"
"atcr.io/pkg/appview/jetstream"
appmiddleware "atcr.io/pkg/appview/middleware"
"github.com/gorilla/mux"
)
@@ -473,7 +472,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
// Public routes (with optional auth for navbar)
// SECURITY: Public pages use read-only DB
router.Handle("/", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.HomeHandler{
DB: readOnlyDB,
Templates: templates,
@@ -481,7 +480,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("GET")
router.Handle("/api/recent-pushes", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/api/recent-pushes", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.RecentPushesHandler{
DB: readOnlyDB,
Templates: templates,
@@ -490,7 +489,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
)).Methods("GET")
// SECURITY: Search uses read-only DB to prevent writes and limit access to sensitive tables
router.Handle("/search", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/search", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.SearchHandler{
DB: readOnlyDB,
Templates: templates,
@@ -498,7 +497,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("GET")
router.Handle("/api/search-results", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/api/search-results", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.SearchResultsHandler{
DB: readOnlyDB,
Templates: templates,
@@ -507,7 +506,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
)).Methods("GET")
// API route for repository stats (public, read-only)
router.Handle("/api/stats/{handle}/{repository}", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/api/stats/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.GetStatsHandler{
DB: readOnlyDB,
Directory: oauthApp.Directory(),
@@ -515,7 +514,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
)).Methods("GET")
// API routes for stars (require authentication)
router.Handle("/api/stars/{handle}/{repository}", appmiddleware.RequireAuth(sessionStore, database)(
router.Handle("/api/stars/{handle}/{repository}", middleware.RequireAuth(sessionStore, database)(
&uihandlers.StarRepositoryHandler{
DB: database, // Needs write access
Directory: oauthApp.Directory(),
@@ -523,7 +522,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("POST")
router.Handle("/api/stars/{handle}/{repository}", appmiddleware.RequireAuth(sessionStore, database)(
router.Handle("/api/stars/{handle}/{repository}", middleware.RequireAuth(sessionStore, database)(
&uihandlers.UnstarRepositoryHandler{
DB: database, // Needs write access
Directory: oauthApp.Directory(),
@@ -531,7 +530,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("DELETE")
router.Handle("/api/stars/{handle}/{repository}", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/api/stars/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.CheckStarHandler{
DB: readOnlyDB, // Read-only check
Directory: oauthApp.Directory(),
@@ -539,7 +538,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("GET")
router.Handle("/u/{handle}", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/u/{handle}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.UserPageHandler{
DB: readOnlyDB,
Templates: templates,
@@ -547,7 +546,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
},
)).Methods("GET")
router.Handle("/r/{handle}/{repository}", appmiddleware.OptionalAuth(sessionStore, database)(
router.Handle("/r/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.RepositoryPageHandler{
DB: readOnlyDB,
Templates: templates,
@@ -559,7 +558,7 @@ func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.S
// Authenticated routes
authRouter := router.NewRoute().Subrouter()
authRouter.Use(appmiddleware.RequireAuth(sessionStore, database))
authRouter.Use(middleware.RequireAuth(sessionStore, database))
authRouter.Handle("/settings", &uihandlers.SettingsHandler{
Templates: templates,

File diff suppressed because it is too large Load Diff

View File

@@ -47,6 +47,7 @@ services:
# STORAGE_DRIVER: filesystem
# STORAGE_ROOT_DIR: /var/lib/atcr/hold
TEST_MODE: true
# DISABLE_PRESIGNED_URLS: true
# Storage config comes from env_file (STORAGE_DRIVER, AWS_*, S3_*)
build:
context: .

460
docs/HOLD_MULTIPART.md Normal file
View File

@@ -0,0 +1,460 @@
# Hold Service Multipart Upload Architecture
## Overview
The hold service supports multipart uploads through two modes:
1. **S3Native** - Uses S3's native multipart API with presigned URLs (optimal)
2. **Buffered** - Buffers parts in hold service memory, assembles on completion (fallback)
This dual-mode approach enables the hold service to work with:
- S3-compatible storage with presigned URL support (S3, Storj, MinIO, etc.)
- S3-compatible storage WITHOUT presigned URL support
- Filesystem storage
- Any storage driver supported by distribution
## Current State
### What Works ✅
- **S3 Native Mode with presigned URLs**: Fully working! Direct uploads to S3 via presigned URLs
- **Buffered mode with S3**: Tested and working with `DISABLE_PRESIGNED_URLS=true`
- **Filesystem storage**: Tested and working! Buffered mode with filesystem driver
- **AppView multipart client**: Implements chunked uploads via multipart API
- **MultipartManager**: Session tracking, automatic cleanup, thread-safe operations
- **Automatic fallback**: Falls back to buffered mode when S3 unavailable or disabled
- **ETag normalization**: Handles quoted/unquoted ETags from S3
- **Route handler**: `/multipart-parts/{uploadID}/{partNumber}` endpoint added and tested
### All Implementation Complete! 🎉
All three multipart upload modes are fully implemented, tested, and working in production.
### Bugs Fixed 🔧
- **Missing S3 parts in complete**: For S3Native mode, parts uploaded directly to S3 weren't being recorded. Fixed by storing parts from request in `HandleCompleteMultipart` before calling `CompleteMultipartUploadWithManager`.
- **Malformed XML error from S3**: S3 requires ETags to be quoted in CompleteMultipartUpload XML. Added `normalizeETag()` function to ensure quotes are present.
- **Route missing**: `/multipart-parts/{uploadID}/{partNumber}` not registered in cmd/hold/main.go. Fixed by adding route handler with path parsing.
- **MultipartMgr access**: Field was private, preventing route handler access. Fixed by exporting as `MultipartMgr`.
- **DISABLE_PRESIGNED_URLS not logged**: `initS3Client()` didn't check the flag before initializing. Fixed with early return check and proper logging.
## Architecture
### Three Modes of Operation
#### Mode 1: S3 Native Multipart ✅ WORKING
```
Docker → AppView → Hold → S3 (presigned URLs)
Returns presigned URL
Docker ──────────→ S3 (direct upload)
```
**Flow:**
1. AppView: `POST /start-multipart` → Hold starts S3 multipart, returns uploadID
2. AppView: `POST /part-presigned-url` → Hold returns S3 presigned URL
3. Docker → S3: Direct upload via presigned URL
4. AppView: `POST /complete-multipart` → Hold calls S3 CompleteMultipartUpload
**Advantages:**
- No data flows through hold service
- Minimal bandwidth usage
- Fast uploads
#### Mode 2: S3 Proxy Mode (Buffered) ✅ WORKING
```
Docker → AppView → Hold → S3 (via driver)
Buffers & proxies
S3
```
**Flow:**
1. AppView: `POST /start-multipart` → Hold creates buffered session
2. AppView: `POST /part-presigned-url` → Hold returns proxy URL
3. Docker → Hold: `PUT /multipart-parts/{uploadID}/{part}` → Hold buffers
4. AppView: `POST /complete-multipart` → Hold uploads to S3 via driver
**Use Cases:**
- S3 provider doesn't support presigned URLs
- S3 API fails to generate presigned URL
- Fallback from Mode 1
#### Mode 3: Filesystem Mode ✅ WORKING
```
Docker → AppView → Hold (filesystem driver)
Buffers & writes
Local filesystem
```
**Flow:**
Same as Mode 2, but writes to filesystem driver instead of S3 driver.
**Use Cases:**
- Development/testing with local filesystem
- Small deployments without S3
- Air-gapped environments
## Implementation: pkg/hold/multipart.go
### Core Components
#### MultipartManager
```go
type MultipartManager struct {
sessions map[string]*MultipartSession
mu sync.RWMutex
}
```
**Responsibilities:**
- Track active multipart sessions
- Clean up abandoned uploads (>24h inactive)
- Thread-safe session access
#### MultipartSession
```go
type MultipartSession struct {
UploadID string // Unique ID for this upload
Digest string // Target blob digest
Mode MultipartMode // S3Native or Buffered
S3UploadID string // S3 upload ID (S3Native only)
Parts map[int]*MultipartPart // Buffered parts (Buffered only)
CreatedAt time.Time
LastActivity time.Time
}
```
**State Tracking:**
- S3Native: Tracks S3 upload ID and part ETags
- Buffered: Stores part data in memory
#### MultipartPart
```go
type MultipartPart struct {
PartNumber int // Part number (1-indexed)
Data []byte // Part data (Buffered mode only)
ETag string // S3 ETag or computed hash
Size int64
}
```
### Key Methods
#### StartMultipartUploadWithManager
```go
func (s *HoldService) StartMultipartUploadWithManager(
ctx context.Context,
digest string,
manager *MultipartManager,
) (string, MultipartMode, error)
```
**Logic:**
1. Try S3 native multipart via `s.startMultipartUpload()`
2. If successful → Create S3Native session
3. If fails or no S3 client → Create Buffered session
4. Return uploadID and mode
#### GetPartUploadURL
```go
func (s *HoldService) GetPartUploadURL(
ctx context.Context,
session *MultipartSession,
partNumber int,
did string,
) (string, error)
```
**Logic:**
- S3Native mode: Generate S3 presigned URL via `s.getPartPresignedURL()`
- Buffered mode: Return proxy endpoint `/multipart-parts/{uploadID}/{part}`
#### CompleteMultipartUploadWithManager
```go
func (s *HoldService) CompleteMultipartUploadWithManager(
ctx context.Context,
session *MultipartSession,
manager *MultipartManager,
) error
```
**Logic:**
- S3Native: Call `s.completeMultipartUpload()` with S3 API
- Buffered: Assemble parts in order, write via storage driver
#### HandleMultipartPartUpload (New Endpoint)
```go
func (s *HoldService) HandleMultipartPartUpload(
w http.ResponseWriter,
r *http.Request,
uploadID string,
partNumber int,
did string,
manager *MultipartManager,
)
```
**New HTTP endpoint:** `PUT /multipart-parts/{uploadID}/{partNumber}`
**Purpose:** Receive part uploads in Buffered mode
**Logic:**
1. Validate session exists and is in Buffered mode
2. Authorize write access
3. Read part data from request body
4. Store in session with computed ETag (SHA256)
5. Return ETag in response header
## Integration Plan
### Phase 1: Migrate to pkg/hold (COMPLETE)
- [x] Extract code from cmd/hold/main.go to pkg/hold/
- [x] Create isolated multipart.go implementation
- [x] Update cmd/hold/main.go to import pkg/hold
- [x] Test existing functionality works
### Phase 2: Add Buffered Mode Support (COMPLETE ✅)
- [x] Add MultipartManager to HoldService
- [x] Update handlers to use `*WithManager` methods
- [x] Add DISABLE_PRESIGNED_URLS environment variable for testing
- [x] Implement presigned URL disable checks in all methods
- [x] **Fixed: Record S3 parts from request in HandleCompleteMultipart**
- [x] **Fixed: ETag normalization (add quotes for S3 XML)**
- [x] **Test S3 native mode with presigned URLs** ✅ WORKING
- [x] **Add route in cmd/hold/main.go** ✅ COMPLETE
- [x] **Export MultipartMgr field for route handler access** ✅ COMPLETE
- [x] **Test DISABLE_PRESIGNED_URLS=true with S3 storage** ✅ WORKING
- [x] **Test filesystem storage with buffered multipart** ✅ WORKING
### Phase 3: Update AppView
- [ ] Detect hold capabilities (presigned vs proxy)
- [ ] Fallback to buffered mode when presigned fails
- [ ] Handle `/multipart-parts/` proxy URLs
### Phase 4: Capability Discovery
- [ ] Add capability endpoint: `GET /capabilities`
- [ ] Return: `{"multipart": "native|buffered|both", "storage": "s3|filesystem"}`
- [ ] AppView uses capabilities to choose upload strategy
## Testing Strategy
### Unit Tests
- [ ] MultipartManager session lifecycle
- [ ] Part buffering and assembly
- [ ] Concurrent part uploads (thread safety)
- [ ] Session cleanup (expired uploads)
### Integration Tests
**S3 Native Mode:**
- [x] Start multipart → get presigned URLs → upload parts → complete ✅ WORKING
- [x] Verify no data flows through hold service (only ~1KB API calls)
- [ ] Test abort cleanup
**Buffered Mode (S3 with DISABLE_PRESIGNED_URLS):**
- [x] Start multipart → get proxy URLs → upload parts → complete ✅ WORKING
- [x] Verify parts assembled correctly
- [ ] Test missing part detection
- [ ] Test abort cleanup
**Buffered Mode (Filesystem):**
- [x] Start multipart → get proxy URLs → upload parts → complete ✅ WORKING
- [x] Verify parts assembled correctly ✅ WORKING
- [x] Verify blobs written to filesystem ✅ WORKING
- [ ] Test missing part detection
- [ ] Test abort cleanup
### Load Tests
- [ ] Concurrent multipart uploads (multiple sessions)
- [ ] Large blobs (100MB+, many parts)
- [ ] Memory usage with many buffered parts
## Performance Considerations
### Memory Usage (Buffered Mode)
- Parts stored in memory until completion
- Docker typically uses 5MB chunks (S3 minimum)
- 100MB image = ~20 parts = ~100MB RAM during upload
- Multiple concurrent uploads multiply memory usage
**Mitigation:**
- Session cleanup (24h timeout)
- Consider disk-backed buffering for large parts (future optimization)
- Monitor memory usage and set limits
### Network Bandwidth
- S3Native: Minimal (only API calls)
- Buffered: Full blob data flows through hold service
- Filesystem: Always buffered (no presigned URL option)
## Configuration
### Environment Variables
**Current (S3 only):**
```bash
STORAGE_DRIVER=s3
S3_BUCKET=my-bucket
S3_ENDPOINT=https://s3.amazonaws.com
AWS_ACCESS_KEY_ID=...
AWS_SECRET_ACCESS_KEY=...
```
**Filesystem:**
```bash
STORAGE_DRIVER=filesystem
STORAGE_ROOT_DIR=/var/lib/atcr/hold
```
### Automatic Mode Selection
No configuration needed - hold service automatically:
1. Tries S3 native multipart if S3 client exists
2. Falls back to buffered mode if S3 unavailable or fails
3. Always uses buffered mode for filesystem driver
## Security Considerations
### Authorization
- All multipart operations require write authorization
- Buffered mode: Check auth on every part upload
- S3Native: Auth only on start/complete (presigned URLs have embedded auth)
### Resource Limits
- Max upload size: Controlled by storage backend
- Max concurrent uploads: Limited by memory
- Session timeout: 24 hours (configurable)
### Attack Vectors
- **Memory exhaustion**: Attacker uploads many large parts
- Mitigation: Session limits, cleanup, auth
- **Incomplete uploads**: Attacker starts but never completes
- Mitigation: 24h timeout, cleanup goroutine
- **Part flooding**: Upload many tiny parts
- Mitigation: S3 has 10,000 part limit, could add to buffered mode
## Future Enhancements
### Disk-Backed Buffering
Instead of memory, buffer parts to temporary disk location:
- Reduces memory pressure
- Supports larger uploads
- Requires cleanup on completion/abort
### Parallel Part Assembly
For large uploads, assemble parts in parallel:
- Stream parts to writer as they arrive
- Reduce memory footprint
- Faster completion
### Chunked Completion
For very large assembled blobs:
- Stream to storage driver in chunks
- Avoid loading entire blob in memory
- Use `io.Copy()` with buffer
### Multi-Backend Support
- Azure Blob Storage multipart
- Google Cloud Storage resumable uploads
- Backblaze B2 large file API
## Implementation Complete ✅
The buffered multipart mode is fully implemented with the following components:
**Route Handler** (`cmd/hold/main.go:47-73`):
- Endpoint: `PUT /multipart-parts/{uploadID}/{partNumber}`
- Parses URL path to extract uploadID and partNumber
- Delegates to `service.HandleMultipartPartUpload()`
**Exported Manager** (`pkg/hold/service.go:20`):
- Field `MultipartMgr` is now exported for route handler access
- All handlers updated to use `s.MultipartMgr`
**Configuration Check** (`pkg/hold/s3.go:20-25`):
- `initS3Client()` checks `DISABLE_PRESIGNED_URLS` flag before initializing
- Logs clear message when presigned URLs are disabled
- Prevents misleading "S3 presigned URLs enabled" message
## Testing Multipart Modes
### Test 1: S3 Native Mode (presigned URLs) ✅ TESTED
```bash
export STORAGE_DRIVER=s3
export S3_BUCKET=your-bucket
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
# Do NOT set DISABLE_PRESIGNED_URLS
# Start hold service
./bin/atcr-hold
# Push an image
docker push atcr.io/yourdid/test:latest
# Expected logs:
# "✅ S3 presigned URLs enabled"
# "Started S3 native multipart: uploadID=... s3UploadID=..."
# "Completed multipart upload: digest=... uploadID=... parts=..."
```
**Status**: ✅ Working - Direct uploads to S3, minimal bandwidth through hold service
### Test 2: Buffered Mode with S3 (forced proxy) ✅ TESTED
```bash
export STORAGE_DRIVER=s3
export S3_BUCKET=your-bucket
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export DISABLE_PRESIGNED_URLS=true # Force buffered mode
# Start hold service
./bin/atcr-hold
# Push an image
docker push atcr.io/yourdid/test:latest
# Expected logs:
# "⚠️ S3 presigned URLs DISABLED by config (DISABLE_PRESIGNED_URLS=true)"
# "Presigned URLs disabled (DISABLE_PRESIGNED_URLS=true), using buffered mode"
# "Stored part: uploadID=... part=1 size=..."
# "Assembled buffered parts: uploadID=... parts=... totalSize=..."
# "Completed buffered multipart: uploadID=... size=... written=..."
```
**Status**: ✅ Working - Parts buffered in hold service memory, assembled and written to S3 via driver
### Test 3: Filesystem Mode (always buffered) ✅ TESTED
```bash
export STORAGE_DRIVER=filesystem
export STORAGE_ROOT_DIR=/tmp/atcr-hold-test
# DISABLE_PRESIGNED_URLS not needed (filesystem never has presigned URLs)
# Start hold service
./bin/atcr-hold
# Push an image
docker push atcr.io/yourdid/test:latest
# Expected logs:
# "Storage driver is filesystem (not S3), presigned URLs disabled"
# "Started buffered multipart: uploadID=..."
# "Stored part: uploadID=... part=1 size=..."
# "Assembled buffered parts: uploadID=... parts=... totalSize=..."
# "Completed buffered multipart: uploadID=... size=... written=..."
# Verify blobs written to:
ls -lh /var/lib/atcr/hold/docker/registry/v2/blobs/sha256/
# Or from outside container:
docker exec atcr-hold ls -lh /var/lib/atcr/hold/docker/registry/v2/blobs/sha256/
```
**Status**: ✅ Working - Parts buffered in memory, assembled, and written to filesystem via driver
**Note**: Initial HEAD requests will show "Path not found" errors - this is normal! Docker checks if blobs exist before uploading. The errors occur for blobs that haven't been uploaded yet. After upload, subsequent HEAD checks succeed.
## References
- S3 Multipart Upload API: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html
- Distribution Storage Driver Interface: https://github.com/distribution/distribution/blob/main/registry/storage/driver/storagedriver.go
- OCI Distribution Spec (Blob Upload): https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-a-blob-in-chunks

448
docs/MULTIPART_OLD.md Normal file
View File

@@ -0,0 +1,448 @@
S3 Multipart Upload Implementation Plan
Problem Summary
Current implementation uses a single presigned URL with a pipe for chunked uploads (PATCH). This causes:
- Docker PATCH requests block waiting for pipe writes
- S3 upload happens in background via single presigned URL
- Docker times out → "client disconnected during blob PATCH"
- Root cause: Single presigned URLs don't support OCI's chunked upload protocol
Solution: S3 Multipart Upload API
Implement proper S3 multipart upload to support Docker's chunked PATCH operations:
- Each PATCH → separate S3 part upload with its own presigned URL
- On Commit → complete multipart upload
- No buffering, no pipes, no blocking
---
Architecture Changes
Current (Broken) Flow
POST /blobs/uploads/ → Create() → Single presigned URL to temp location
PATCH → Write to pipe → [blocks] → Background goroutine uploads via single URL
PATCH → [blocks on pipe] → Docker timeout → disconnect ❌
New (Multipart) Flow
POST /blobs/uploads/ → Create() → Initiate multipart upload, get upload ID
PATCH #1 → Get presigned URL for part 1 → Upload part 1 to S3 → Store ETag
PATCH #2 → Get presigned URL for part 2 → Upload part 2 to S3 → Store ETag
PUT (commit) → Complete multipart upload with ETags → Done ✅
---
Implementation Details
1. Hold Service: Add Multipart Upload Endpoints
File: cmd/hold/main.go
New Request/Response Types
// StartMultipartUploadRequest initiates a multipart upload
type StartMultipartUploadRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
}
type StartMultipartUploadResponse struct {
UploadID string `json:"upload_id"`
ExpiresAt time.Time `json:"expires_at"`
}
// GetPartURLRequest requests a presigned URL for a specific part
type GetPartURLRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
PartNumber int `json:"part_number"`
}
type GetPartURLResponse struct {
URL string `json:"url"`
ExpiresAt time.Time `json:"expires_at"`
}
// CompleteMultipartRequest completes a multipart upload
type CompleteMultipartRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
Parts []CompletedPart `json:"parts"`
}
type CompletedPart struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}
// AbortMultipartRequest aborts an in-progress upload
type AbortMultipartRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
}
New Endpoints
POST /start-multipart
func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) {
// Validate DID authorization for WRITE
// Build S3 key from digest
// Call s3.CreateMultipartUploadRequest()
// Generate presigned URL if needed, or return upload ID
// Return upload ID to client
}
POST /part-presigned-url
func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) {
// Validate DID authorization for WRITE
// Build S3 key from digest
// Call s3.UploadPartRequest() with part number and upload ID
// Generate presigned URL
// Return presigned URL for this specific part
}
POST /complete-multipart
func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) {
// Validate DID authorization for WRITE
// Build S3 key from digest
// Prepare CompletedPart array with part numbers and ETags
// Call s3.CompleteMultipartUpload()
// Return success
}
POST /abort-multipart (for cleanup)
func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) {
// Validate DID authorization for WRITE
// Call s3.AbortMultipartUpload()
// Return success
}
S3 Implementation
// startMultipartUpload initiates a multipart upload and returns upload ID
func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) {
if s.s3Client == nil {
return "", fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
})
if err != nil {
return "", err
}
return *result.UploadId, nil
}
// getPartPresignedURL generates presigned URL for a specific part
func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) {
if s.s3Client == nil {
return "", fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
UploadId: aws.String(uploadID),
PartNumber: aws.Int64(int64(partNumber)),
})
return req.Presign(15 * time.Minute)
}
// completeMultipartUpload finalizes the multipart upload
func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error {
if s.s3Client == nil {
return fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
// Convert to S3 CompletedPart format
s3Parts := make([]*s3.CompletedPart, len(parts))
for i, p := range parts {
s3Parts[i] = &s3.CompletedPart{
PartNumber: aws.Int64(int64(p.PartNumber)),
ETag: aws.String(p.ETag),
}
}
_, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
UploadId: aws.String(uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: s3Parts,
},
})
return err
}
---
2. AppView: Rewrite ProxyBlobStore for Multipart
File: pkg/storage/proxy_blob_store.go
Remove Current Implementation
- Remove pipe-based streaming
- Remove background goroutine with single presigned URL
- Remove global upload tracking map
New ProxyBlobWriter Structure
type ProxyBlobWriter struct {
store *ProxyBlobStore
options distribution.CreateOptions
uploadID string // S3 multipart upload ID
parts []CompletedPart // Track uploaded parts with ETags
partNumber int // Current part number (starts at 1)
buffer *bytes.Buffer // Buffer for current part
size int64 // Total bytes written
closed bool
id string // Distribution's upload ID (for state)
startedAt time.Time
finalDigest string // Set on Commit
}
type CompletedPart struct {
PartNumber int
ETag string
}
New Create() - Initiate Multipart Upload
func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
var opts distribution.CreateOptions
for _, option := range options {
if err := option.Apply(&opts); err != nil {
return nil, err
}
}
// Use temp digest for upload location
writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", writerID))
// Start multipart upload via hold service
uploadID, err := p.startMultipartUpload(ctx, tempDigest)
if err != nil {
return nil, fmt.Errorf("failed to start multipart upload: %w", err)
}
writer := &ProxyBlobWriter{
store: p,
options: opts,
uploadID: uploadID,
parts: make([]CompletedPart, 0),
partNumber: 1,
buffer: bytes.NewBuffer(make([]byte, 0, 5*1024*1024)), // 5MB buffer
id: writerID,
startedAt: time.Now(),
}
// Store in global map for Resume()
globalUploadsMu.Lock()
globalUploads[writer.id] = writer
globalUploadsMu.Unlock()
return writer, nil
}
New Write() - Buffer and Flush Parts
func (w *ProxyBlobWriter) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("writer closed")
}
n, err := w.buffer.Write(p)
w.size += int64(n)
// Flush if buffer reaches 5MB (S3 minimum part size)
if w.buffer.Len() >= 5*1024*1024 {
if err := w.flushPart(); err != nil {
return n, err
}
}
return n, err
}
func (w *ProxyBlobWriter) flushPart() error {
if w.buffer.Len() == 0 {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Get presigned URL for this part
tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id))
url, err := w.store.getPartPresignedURL(ctx, tempDigest, w.uploadID, w.partNumber)
if err != nil {
return fmt.Errorf("failed to get part presigned URL: %w", err)
}
// Upload part to S3
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(w.buffer.Bytes()))
if err != nil {
return err
}
resp, err := w.store.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return fmt.Errorf("part upload failed: status %d", resp.StatusCode)
}
// Store ETag for completion
etag := resp.Header.Get("ETag")
if etag == "" {
return fmt.Errorf("no ETag in response")
}
w.parts = append(w.parts, CompletedPart{
PartNumber: w.partNumber,
ETag: etag,
})
// Reset buffer and increment part number
w.buffer.Reset()
w.partNumber++
return nil
}
New Commit() - Complete Multipart and Move
func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
if w.closed {
return distribution.Descriptor{}, fmt.Errorf("writer closed")
}
w.closed = true
// Flush any remaining buffered data
if w.buffer.Len() > 0 {
if err := w.flushPart(); err != nil {
// Try to abort multipart on error
w.store.abortMultipartUpload(ctx, w.uploadID)
return distribution.Descriptor{}, err
}
}
// Complete multipart upload at temp location
tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id))
if err := w.store.completeMultipartUpload(ctx, tempDigest, w.uploadID, w.parts); err != nil {
return distribution.Descriptor{}, err
}
// Move from temp → final location (server-side S3 copy)
tempPath := fmt.Sprintf("uploads/temp-%s", w.id)
finalPath := desc.Digest.String()
moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s",
w.store.storageEndpoint, tempPath, finalPath, w.store.did)
req, err := http.NewRequestWithContext(ctx, "POST", moveURL, nil)
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := w.store.httpClient.Do(req)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(resp.Body)
return distribution.Descriptor{}, fmt.Errorf("move failed: %d, %s", resp.StatusCode, bodyBytes)
}
// Remove from global map
globalUploadsMu.Lock()
delete(globalUploads, w.id)
globalUploadsMu.Unlock()
return distribution.Descriptor{
Digest: desc.Digest,
Size: w.size,
MediaType: desc.MediaType,
}, nil
}
Add Hold Service Client Methods
func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, dgst digest.Digest) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
var result struct {
UploadID string `json:"upload_id"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.UploadID, nil
}
func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, dgst digest.Digest, uploadID string, partNumber int) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"upload_id": uploadID,
"part_number": partNumber,
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
var result struct {
URL string `json:"url"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.URL, nil
}
func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string, parts []CompletedPart) error {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"upload_id": uploadID,
"parts": parts,
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("complete multipart failed: status %d", resp.StatusCode)
}
return nil
}
---
Testing Plan
1. Unit Tests
- Test multipart upload initiation
- Test part upload with presigned URLs
- Test completion with ETags
- Test abort on errors
2. Integration Tests
- Push small images (< 5MB, single part)
- Push medium images (10MB, 2 parts)
- Push large images (100MB, 20 parts)
- Test with Upcloud S3
- Test with Storj S3
3. Validation
- Monitor logs for "client disconnected" errors (should be gone)
- Check Docker push success rate
- Verify blobs stored correctly in S3
- Check bandwidth usage on hold service (should be minimal)
---
Migration & Deployment
Backward Compatibility
- Keep /put-presigned-url endpoint for fallback
- Keep /move endpoint (still needed)
- New multipart endpoints are additive
Deployment Steps
1. Update hold service with new endpoints
2. Update AppView ProxyBlobStore
3. Deploy hold service first
4. Deploy AppView
5. Test with sample push
6. Monitor logs
Rollback Plan
- Revert AppView to previous version (uses old presigned URL method)
- Hold service keeps both old and new endpoints
---
Documentation Updates
Update docs/PRESIGNED_URLS.md
- Add section "Multipart Upload for Chunked Data"
- Explain why single presigned URLs don't work with PATCH
- Document new endpoints and flow
- Add S3 part size recommendations (5MB-64MB for Storj)
Add Troubleshooting Section
- "Client disconnected during PATCH" → resolved by multipart
- Storj-specific considerations (64MB parts recommended)
- Upcloud compatibility notes
---
Performance Impact
Before (Broken)
- Docker PATCH → blocks on pipe → timeout → retry → fail
- Unable to push large images reliably
After (Multipart)
- Each PATCH → independent part upload → immediate response
- No blocking, no timeouts
- Parallel part uploads possible (future optimization)
- Reliable pushes for any image size
Bandwidth
- Hold service: Only API calls (~1KB per part)
- Direct S3 uploads: Full blob data
- S3 copy for move: Server-side (no hold bandwidth)
Estimated savings: 99.98% hold service bandwidth reduction (same as before, but now actually works!)

1017
docs/PRESIGNED_UPLOADS.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -718,6 +718,55 @@ PRESIGNED_URLS_ENABLED=false docker-compose restart atcr-hold
The implementation has automatic fallbacks, so partial failures won't break functionality.
## Testing with DISABLE_PRESIGNED_URLS
### Environment Variable
Set `DISABLE_PRESIGNED_URLS=true` to force proxy/buffered mode even when S3 is configured.
**Use cases:**
- Testing proxy/buffered code paths with S3 storage
- Debugging multipart uploads in buffered mode
- Simulating S3 providers that don't support presigned URLs
- Verifying fallback behavior works correctly
### How It Works
When `DISABLE_PRESIGNED_URLS=true`:
**Single blob operations:**
- `getDownloadURL()` returns proxy URL instead of S3 presigned URL
- `getHeadURL()` returns proxy URL instead of S3 presigned HEAD URL
- `getUploadURL()` returns proxy URL instead of S3 presigned PUT URL
- Client uses `/blobs/{digest}` endpoints (proxy through hold service)
**Multipart uploads:**
- `StartMultipartUploadWithManager()` creates **Buffered** session instead of **S3Native**
- `GetPartUploadURL()` returns `/multipart-parts/{uploadID}/{partNumber}` instead of S3 presigned URL
- Parts are buffered in memory in the hold service
- `CompleteMultipartUploadWithManager()` assembles parts and writes via storage driver
### Testing Example
```bash
# Test S3 with forced proxy mode
export STORAGE_DRIVER=s3
export S3_BUCKET=my-bucket
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
export DISABLE_PRESIGNED_URLS=true # Force buffered/proxy mode
./bin/atcr-hold
# Push an image - should use proxy mode
docker push atcr.io/yourdid/test:latest
# Check logs for:
# "Presigned URLs disabled, using proxy URL"
# "Presigned URLs disabled (DISABLE_PRESIGNED_URLS=true), using buffered mode"
# "Stored part: uploadID=... part=1 size=..."
```
## Future Enhancements
### 1. Configurable Expiration

View File

@@ -14,10 +14,10 @@ import (
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
"atcr.io/pkg/appview/storage"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/storage"
)
// Global refresher instance (set by main.go)

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"
@@ -16,17 +15,11 @@ import (
)
const (
// minPartSize is S3's minimum part size for multipart uploads
// Parts must be at least 5MB (except the last part)
minPartSize = 5 * 1024 * 1024 // 5MB
// maxChunkSize is the maximum buffer size before flushing to hold service
// Matches S3's minimum multipart upload size
maxChunkSize = 5 * 1024 * 1024 // 5MB
)
// CompletedPart represents a completed multipart upload part
type CompletedPart struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}
// Global upload tracking (shared across all ProxyBlobStore instances)
// This is necessary because distribution creates new repository/blob store instances per request
var (
@@ -66,8 +59,13 @@ func NewProxyBlobStore(storageEndpoint, did string, database DatabaseMetrics, re
// Stat returns the descriptor for a blob
func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
// Quick HEAD request to hold service to check if blob exists
url := fmt.Sprintf("%s/blobs/%s?did=%s", p.storageEndpoint, dgst.String(), p.did)
// Get presigned HEAD URL
url, err := p.getHeadURL(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
// Make HEAD request to presigned URL
req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil)
if err != nil {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
@@ -149,26 +147,33 @@ func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []by
// Get upload URL
url, err := p.getUploadURL(ctx, dgst, int64(len(content)))
if err != nil {
fmt.Printf("[proxy_blob_store/Put] Failed to get upload URL: digest=%s, error=%v\n", dgst, err)
return distribution.Descriptor{}, err
}
// Upload the blob
req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(content))
if err != nil {
fmt.Printf("[proxy_blob_store/Put] Failed to create request: %v\n", err)
return distribution.Descriptor{}, err
}
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := p.httpClient.Do(req)
if err != nil {
fmt.Printf("[proxy_blob_store/Put] HTTP request failed: %v\n", err)
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return distribution.Descriptor{}, fmt.Errorf("upload failed with status %d", resp.StatusCode)
bodyBytes, _ := io.ReadAll(resp.Body)
fmt.Printf(" Error Body: %s\n", string(bodyBytes))
return distribution.Descriptor{}, fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
fmt.Printf("[proxy_blob_store/Put] Upload successful: digest=%s, size=%d\n", dgst, len(content))
return distribution.Descriptor{
Digest: dgst,
Size: int64(len(content)),
@@ -184,7 +189,19 @@ func (p *ProxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
// ServeBlob serves a blob via HTTP redirect
func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
// Get presigned download URL
// For HEAD requests, redirect to presigned HEAD URL
if r.Method == http.MethodHead {
url, err := p.getHeadURL(ctx, dgst)
if err != nil {
return err
}
// Redirect to presigned HEAD URL
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
return nil
}
// For GET requests, redirect to presigned URL
url, err := p.getDownloadURL(ctx, dgst)
if err != nil {
return err
@@ -195,7 +212,7 @@ func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r
return nil
}
// Create returns a blob writer for uploading
// Create returns a blob writer for uploading using multipart upload
func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
// Parse options
var opts distribution.CreateOptions
@@ -205,10 +222,11 @@ func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.Blo
}
}
// Use temp digest for upload location
// Generate unique writer ID
writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano())
tempPath := fmt.Sprintf("uploads/temp-%s", writerID)
tempDigest := digest.Digest(tempPath)
// Use temp digest for upload location (will be moved to final digest on commit)
tempDigest := fmt.Sprintf("uploads/temp-%s", writerID)
// Start multipart upload via hold service
uploadID, err := p.startMultipartUpload(ctx, tempDigest)
@@ -216,7 +234,7 @@ func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.Blo
return nil, fmt.Errorf("failed to start multipart upload: %w", err)
}
fmt.Printf("DEBUG [proxy_blob_store/Create]: Started multipart upload: id=%s, uploadID=%s\n", writerID, uploadID)
fmt.Printf(" Started multipart upload: uploadID=%s\n", uploadID)
writer := &ProxyBlobWriter{
store: p,
@@ -224,13 +242,12 @@ func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.Blo
uploadID: uploadID,
parts: make([]CompletedPart, 0),
partNumber: 1,
buffer: bytes.NewBuffer(make([]byte, 0, minPartSize)),
buffer: bytes.NewBuffer(make([]byte, 0, maxChunkSize)), // 5MB buffer
id: writerID,
startedAt: time.Now(),
tempDigest: tempDigest,
}
// Store in global map for Resume()
// Store in global uploads map for resume support
globalUploadsMu.Lock()
globalUploads[writer.id] = writer
globalUploadsMu.Unlock()
@@ -249,14 +266,21 @@ func (p *ProxyBlobStore) Resume(ctx context.Context, id string) (distribution.Bl
return nil, distribution.ErrBlobUploadUnknown
}
// Just return the writer - parts are buffered and flushed on demand
return writer, nil
}
// getDownloadURL requests a presigned download URL from the storage service
func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest) (string, error) {
// getPresignedURL requests a presigned URL from the storage service for any operation
func (p *ProxyBlobStore) getPresignedURL(ctx context.Context, operation, dgst string, size int64) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"operation": operation,
"did": p.did,
"digest": dgst,
}
// Only include size for PUT operations
if size > 0 {
reqBody["size"] = size
}
body, err := json.Marshal(reqBody)
@@ -264,7 +288,7 @@ func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest)
return "", err
}
url := fmt.Sprintf("%s/get-presigned-url", p.storageEndpoint)
url := fmt.Sprintf("%s/presigned-url", p.storageEndpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", err
@@ -278,7 +302,7 @@ func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get download URL: status %d", resp.StatusCode)
return "", fmt.Errorf("failed to get presigned URL: status %d", resp.StatusCode)
}
var result struct {
@@ -291,14 +315,31 @@ func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest)
return result.URL, nil
}
// getDownloadURL requests a presigned download URL from the storage service
func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest) (string, error) {
return p.getPresignedURL(ctx, "GET", dgst.String(), 0)
}
// getHeadURL requests a presigned HEAD URL from the storage service
func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) {
return p.getPresignedURL(ctx, "HEAD", dgst.String(), 0)
}
// getUploadURL requests a presigned upload URL from the storage service
func (p *ProxyBlobStore) getUploadURL(ctx context.Context, dgst digest.Digest, size int64) (string, error) {
fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: storageEndpoint=%s, digest=%s\n", p.storageEndpoint, dgst)
url, err := p.getPresignedURL(ctx, "PUT", dgst.String(), size)
if err == nil {
fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: Got presigned URL=%s\n", url)
}
return url, err
}
// startMultipartUpload initiates a multipart upload via hold service
func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"size": size,
"digest": digest,
}
body, err := json.Marshal(reqBody)
@@ -306,8 +347,7 @@ func (p *ProxyBlobStore) getUploadURL(ctx context.Context, dgst digest.Digest, s
return "", err
}
url := fmt.Sprintf("%s/put-presigned-url", p.storageEndpoint)
fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: Calling %s\n", url)
url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", err
@@ -321,7 +361,50 @@ func (p *ProxyBlobStore) getUploadURL(ctx context.Context, dgst digest.Digest, s
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get upload URL: status %d", resp.StatusCode)
bodyBytes, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("start multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
UploadID string `json:"upload_id"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.UploadID, nil
}
// getPartPresignedURL gets a presigned URL for uploading a specific part
func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": digest,
"upload_id": uploadID,
"part_number": partNumber,
}
body, err := json.Marshal(reqBody)
if err != nil {
return "", err
}
url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return "", err
}
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("get part URL failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
@@ -331,11 +414,85 @@ func (p *ProxyBlobStore) getUploadURL(ctx context.Context, dgst digest.Digest, s
return "", err
}
fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: Got presigned URL=%s\n", result.URL)
return result.URL, nil
}
// ProxyBlobWriter implements distribution.BlobWriter for proxy uploads
// completeMultipartUpload completes a multipart upload via hold service
func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error {
reqBody := map[string]any{
"did": p.did,
"digest": digest,
"upload_id": uploadID,
"parts": parts,
}
body, err := json.Marshal(reqBody)
if err != nil {
return err
}
url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("complete multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// abortMultipartUpload aborts a multipart upload via hold service
func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, digest, uploadID string) error {
reqBody := map[string]any{
"did": p.did,
"digest": digest,
"upload_id": uploadID,
}
body, err := json.Marshal(reqBody)
if err != nil {
return err
}
url := fmt.Sprintf("%s/abort-multipart", p.storageEndpoint)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("abort multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// CompletedPart represents an uploaded part with its ETag
type CompletedPart struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}
// ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload
type ProxyBlobWriter struct {
store *ProxyBlobStore
options distribution.CreateOptions
@@ -345,10 +502,9 @@ type ProxyBlobWriter struct {
buffer *bytes.Buffer // Buffer for current part
size int64 // Total bytes written
closed bool
id string // Distribution's upload ID (for state)
id string // Distribution's upload ID (for state)
startedAt time.Time
finalDigest string // Set on Commit
tempDigest digest.Digest // Temp location digest
finalDigest string // Set on Commit
}
// ID returns the upload ID
@@ -362,7 +518,7 @@ func (w *ProxyBlobWriter) StartedAt() time.Time {
}
// Write writes data to the upload
// Buffers data and flushes parts when buffer reaches minPartSize
// Buffers data and flushes when buffer reaches 5MB
func (w *ProxyBlobWriter) Write(p []byte) (int, error) {
if w.closed {
return 0, fmt.Errorf("writer closed")
@@ -371,17 +527,17 @@ func (w *ProxyBlobWriter) Write(p []byte) (int, error) {
n, err := w.buffer.Write(p)
w.size += int64(n)
// Flush if buffer reaches minimum part size (5MB)
if w.buffer.Len() >= minPartSize {
// Flush if buffer reaches 5MB (S3 minimum part size)
if w.buffer.Len() >= maxChunkSize {
if err := w.flushPart(); err != nil {
return n, fmt.Errorf("failed to flush part: %w", err)
return n, err
}
}
return n, err
}
// flushPart uploads the current buffer as a multipart upload part
// flushPart uploads the current buffer as a part
func (w *ProxyBlobWriter) flushPart() error {
if w.buffer.Len() == 0 {
return nil
@@ -391,7 +547,8 @@ func (w *ProxyBlobWriter) flushPart() error {
defer cancel()
// Get presigned URL for this part
url, err := w.store.getPartPresignedURL(ctx, w.tempDigest, w.uploadID, w.partNumber)
tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
url, err := w.store.getPartPresignedURL(ctx, tempDigest, w.uploadID, w.partNumber)
if err != nil {
return fmt.Errorf("failed to get part presigned URL: %w", err)
}
@@ -403,11 +560,9 @@ func (w *ProxyBlobWriter) flushPart() error {
}
req.Header.Set("Content-Type", "application/octet-stream")
fmt.Printf("DEBUG [proxy_blob_store/flushPart]: Uploading part %d, size=%d bytes\n", w.partNumber, w.buffer.Len())
resp, err := w.store.httpClient.Do(req)
if err != nil {
return fmt.Errorf("part upload failed: %w", err)
return err
}
defer resp.Body.Close()
@@ -422,15 +577,12 @@ func (w *ProxyBlobWriter) flushPart() error {
return fmt.Errorf("no ETag in response")
}
// Remove quotes from ETag if present (S3 sometimes adds them)
etag = strings.Trim(etag, "\"")
w.parts = append(w.parts, CompletedPart{
PartNumber: w.partNumber,
ETag: etag,
})
fmt.Printf("DEBUG [proxy_blob_store/flushPart]: Part %d uploaded successfully, ETag=%s\n", w.partNumber, etag)
fmt.Printf("[flushPart] Part %d uploaded successfully: ETag=%s\n", w.partNumber, etag)
// Reset buffer and increment part number
w.buffer.Reset()
@@ -474,33 +626,41 @@ func (w *ProxyBlobWriter) Size() int64 {
return w.size
}
// Commit finalizes the upload
// Commit finalizes the upload by completing multipart upload and moving to final location
func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
if w.closed {
return distribution.Descriptor{}, fmt.Errorf("writer closed")
}
w.closed = true
// Flush any remaining buffered data as the final part
// Remove from global uploads map
globalUploadsMu.Lock()
delete(globalUploads, w.id)
globalUploadsMu.Unlock()
// Flush any remaining buffered data
if w.buffer.Len() > 0 {
fmt.Printf("[Commit] Flushing final buffer: %d bytes\n", w.buffer.Len())
if err := w.flushPart(); err != nil {
// Try to abort multipart on error
w.store.abortMultipartUpload(ctx, w.tempDigest, w.uploadID)
tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID)
return distribution.Descriptor{}, fmt.Errorf("failed to flush final part: %w", err)
}
}
// Complete multipart upload at temp location
if err := w.store.completeMultipartUpload(ctx, w.tempDigest, w.uploadID, w.parts); err != nil {
tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
fmt.Printf("🔒 [Commit] Completing multipart upload: uploadID=%s, parts=%d\n", w.uploadID, len(w.parts))
if err := w.store.completeMultipartUpload(ctx, tempDigest, w.uploadID, w.parts); err != nil {
return distribution.Descriptor{}, fmt.Errorf("failed to complete multipart upload: %w", err)
}
fmt.Printf("DEBUG [proxy_blob_store/Commit]: Completed multipart upload with %d parts, total size=%d\n", len(w.parts), w.size)
// Move from temp → final location (server-side S3 copy)
tempPath := fmt.Sprintf("uploads/temp-%s", w.id)
finalPath := desc.Digest.String()
fmt.Printf("[Commit] Moving blob: %s → %s\n", tempPath, finalPath)
moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s",
w.store.storageEndpoint, tempPath, finalPath, w.store.did)
@@ -517,15 +677,10 @@ func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descript
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
bodyBytes, _ := io.ReadAll(resp.Body)
return distribution.Descriptor{}, fmt.Errorf("move failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
return distribution.Descriptor{}, fmt.Errorf("move blob failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
// Remove from global map
globalUploadsMu.Lock()
delete(globalUploads, w.id)
globalUploadsMu.Unlock()
fmt.Printf("DEBUG [proxy_blob_store/Commit]: Successfully committed: digest=%s, size=%d\n", desc.Digest, w.size)
fmt.Printf("[Commit] Upload completed successfully: digest=%s, size=%d, parts=%d\n", desc.Digest, w.size, len(w.parts))
return distribution.Descriptor{
Digest: desc.Digest,
@@ -534,153 +689,35 @@ func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descript
}, nil
}
// Cancel cancels the upload
// Cancel cancels the upload by aborting the multipart upload
func (w *ProxyBlobWriter) Cancel(ctx context.Context) error {
w.closed = true
fmt.Printf("[Cancel] Cancelling upload: id=%s\n", w.id)
// Remove from global uploads map
globalUploadsMu.Lock()
delete(globalUploads, w.id)
globalUploadsMu.Unlock()
// Abort multipart upload on S3
if err := w.store.abortMultipartUpload(ctx, w.tempDigest, w.uploadID); err != nil {
fmt.Printf("DEBUG [proxy_blob_store/Cancel]: Failed to abort multipart upload: %v\n", err)
// Continue anyway - we still want to clean up
// Abort multipart upload
tempDigest := fmt.Sprintf("uploads/temp-%s", w.id)
if err := w.store.abortMultipartUpload(ctx, tempDigest, w.uploadID); err != nil {
fmt.Printf("⚠️ [Cancel] Failed to abort multipart upload: %v\n", err)
// Continue anyway - we want to mark upload as cancelled
}
fmt.Printf("DEBUG [proxy_blob_store/Cancel]: Cancelled upload: id=%s, uploadID=%s\n", w.id, w.uploadID)
fmt.Printf("[Cancel] Upload cancelled: id=%s\n", w.id)
return nil
}
// Close closes the writer
// Does nothing - actual completion happens in Commit() or Cancel()
// Parts are flushed on demand, so this is a no-op
func (w *ProxyBlobWriter) Close() error {
// Don't set w.closed = true - allow resuming for next PATCH
return nil
}
// startMultipartUpload initiates a multipart upload via hold service
func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, dgst digest.Digest) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to start multipart upload: status %d", resp.StatusCode)
}
var result struct {
UploadID string `json:"upload_id"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.UploadID, nil
}
// getPartPresignedURL gets a presigned URL for uploading a specific part
func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, dgst digest.Digest, uploadID string, partNumber int) (string, error) {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"upload_id": uploadID,
"part_number": partNumber,
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get part presigned URL: status %d", resp.StatusCode)
}
var result struct {
URL string `json:"url"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return "", err
}
return result.URL, nil
}
// completeMultipartUpload completes a multipart upload
func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string, parts []CompletedPart) error {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"upload_id": uploadID,
"parts": parts,
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("complete multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// abortMultipartUpload aborts a multipart upload
func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string) error {
reqBody := map[string]any{
"did": p.did,
"digest": dgst.String(),
"upload_id": uploadID,
}
body, _ := json.Marshal(reqBody)
url := fmt.Sprintf("%s/abort-multipart", p.storageEndpoint)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
resp, err := p.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("abort multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// readSeekCloser wraps an io.ReadCloser to implement ReadSeekCloser
type readSeekCloser struct {
io.ReadCloser

View File

@@ -3,6 +3,7 @@ package atproto
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
@@ -343,6 +344,26 @@ func (c *Client) GetBlob(ctx context.Context, cid string) ([]byte, error) {
return nil, fmt.Errorf("failed to read blob data: %w", err)
}
// Check if PDS returned JSON-wrapped blob (Bluesky implementation)
// PDS may wrap blobs as JSON-encoded base64 strings
// Detection: Check if content starts with a quote (indicating JSON string)
if len(data) > 0 && data[0] == '"' {
// Blob is JSON-encoded - decode it
var base64Str string
if err := json.Unmarshal(data, &base64Str); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON-wrapped blob: %w", err)
}
// Base64-decode the blob content
decoded, err := base64.StdEncoding.DecodeString(base64Str)
if err != nil {
return nil, fmt.Errorf("failed to base64-decode blob: %w", err)
}
return decoded, nil
}
// Raw blob response (expected ATProto behavior)
return data, nil
}

View File

@@ -5,6 +5,13 @@ import (
"strings"
)
// AccessEntry represents access permissions for a resource
type AccessEntry struct {
Type string `json:"type"` // "repository"
Name string `json:"name,omitempty"` // e.g., "alice/myapp"
Actions []string `json:"actions,omitempty"` // e.g., ["pull", "push"]
}
// ParseScope parses Docker registry scope strings into AccessEntry structures
// Scope format: "repository:alice/myapp:pull,push"
// Multiple scopes can be provided

View File

@@ -1,8 +0,0 @@
package auth
// AccessEntry represents access permissions for a resource
type AccessEntry struct {
Type string `json:"type"` // "repository"
Name string `json:"name,omitempty"` // e.g., "alice/myapp"
Actions []string `json:"actions,omitempty"` // e.g., ["pull", "push"]
}

131
pkg/hold/authorization.go Normal file
View File

@@ -0,0 +1,131 @@
package hold
import (
"context"
"encoding/json"
"fmt"
"log"
"atcr.io/pkg/atproto"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
)
// isAuthorizedRead checks if a DID can read from this hold
// Authorization:
// - Public hold: allow anonymous (empty DID) or any authenticated user
// - Private hold: require authentication (any user with sailor.profile)
func (s *HoldService) isAuthorizedRead(did string) bool {
// Check hold public flag
isPublic, err := s.isHoldPublic()
if err != nil {
log.Printf("ERROR: Failed to check hold public flag: %v", err)
// Fail secure - deny access on error
return false
}
if isPublic {
// Public hold - allow anyone (even anonymous)
return true
}
// Private hold - require authentication
// Any authenticated user with sailor.profile can read
if did == "" {
// Anonymous user trying to access private hold
return false
}
// For MVP: assume DID presence means they have sailor.profile
// Future: could query PDS to verify sailor.profile exists
return true
}
// isAuthorizedWrite checks if a DID can write to this hold
// Authorization: must be hold owner OR crew member
func (s *HoldService) isAuthorizedWrite(did string) bool {
if did == "" {
// Anonymous writes not allowed
return false
}
// Check if DID is the hold owner
ownerDID := s.config.Registration.OwnerDID
if ownerDID == "" {
log.Printf("ERROR: Hold owner DID not configured")
return false
}
if did == ownerDID {
// Owner always has write access
return true
}
// Check if DID is a crew member
isCrew, err := s.isCrewMember(did)
if err != nil {
log.Printf("ERROR: Failed to check crew membership: %v", err)
return false
}
return isCrew
}
// isHoldPublic checks if this hold allows public (anonymous) reads
func (s *HoldService) isHoldPublic() (bool, error) {
// Use cached config value for now
// Future: could query PDS for hold record to get live value
return s.config.Server.Public, nil
}
// isCrewMember checks if a DID is a crew member of this hold
func (s *HoldService) isCrewMember(did string) (bool, error) {
ownerDID := s.config.Registration.OwnerDID
if ownerDID == "" {
return false, fmt.Errorf("hold owner DID not configured")
}
ctx := context.Background()
// Resolve owner's PDS endpoint using indigo
directory := identity.DefaultDirectory()
ownerDIDParsed, err := syntax.ParseDID(ownerDID)
if err != nil {
return false, fmt.Errorf("invalid owner DID: %w", err)
}
ident, err := directory.LookupDID(ctx, ownerDIDParsed)
if err != nil {
return false, fmt.Errorf("failed to resolve owner PDS: %w", err)
}
pdsEndpoint := ident.PDSEndpoint()
if pdsEndpoint == "" {
return false, fmt.Errorf("no PDS endpoint found for owner")
}
// Create unauthenticated client to read public records
client := atproto.NewClient(pdsEndpoint, ownerDID, "")
// List crew records for this hold
// Crew records are public, so we can read them without auth
records, err := client.ListRecords(ctx, atproto.HoldCrewCollection, 100)
if err != nil {
return false, fmt.Errorf("failed to list crew records: %w", err)
}
// Check if DID is in crew list
for _, record := range records {
var crewRecord atproto.HoldCrewRecord
if err := json.Unmarshal(record.Value, &crewRecord); err != nil {
continue
}
if crewRecord.Member == did {
// Found crew membership
return true, nil
}
}
return false, nil
}

134
pkg/hold/config.go Normal file
View File

@@ -0,0 +1,134 @@
package hold
import (
"fmt"
"os"
"time"
"github.com/distribution/distribution/v3/configuration"
)
// Config represents the hold service configuration
type Config struct {
Version string `yaml:"version"`
Storage StorageConfig `yaml:"storage"`
Server ServerConfig `yaml:"server"`
Registration RegistrationConfig `yaml:"registration"`
}
// RegistrationConfig defines auto-registration settings
type RegistrationConfig struct {
// OwnerDID is the owner's ATProto DID (from env: HOLD_OWNER)
// If set, auto-registration is enabled
OwnerDID string `yaml:"owner_did"`
}
// StorageConfig wraps distribution's storage configuration
type StorageConfig struct {
configuration.Storage `yaml:",inline"`
}
// ServerConfig defines server settings
type ServerConfig struct {
// Addr is the address to listen on (e.g., ":8080")
Addr string `yaml:"addr"`
// PublicURL is the public URL of this hold service (e.g., "https://hold.example.com")
PublicURL string `yaml:"public_url"`
// Public controls whether this hold allows public blob reads without auth (from env: HOLD_PUBLIC)
Public bool `yaml:"public"`
// TestMode uses localhost for OAuth redirects while storing real URL in hold record (from env: TEST_MODE)
TestMode bool `yaml:"test_mode"`
// DisablePresignedURLs forces proxy mode even with S3 configured (for testing) (from env: DISABLE_PRESIGNED_URLS)
DisablePresignedURLs bool `yaml:"disable_presigned_urls"`
// ReadTimeout for HTTP requests
ReadTimeout time.Duration `yaml:"read_timeout"`
// WriteTimeout for HTTP requests
WriteTimeout time.Duration `yaml:"write_timeout"`
}
// LoadConfigFromEnv loads all configuration from environment variables
func LoadConfigFromEnv() (*Config, error) {
cfg := &Config{
Version: "0.1",
}
// Server configuration
cfg.Server.Addr = getEnvOrDefault("HOLD_SERVER_ADDR", ":8080")
cfg.Server.PublicURL = os.Getenv("HOLD_PUBLIC_URL")
if cfg.Server.PublicURL == "" {
return nil, fmt.Errorf("HOLD_PUBLIC_URL is required")
}
cfg.Server.Public = os.Getenv("HOLD_PUBLIC") == "true"
cfg.Server.TestMode = os.Getenv("TEST_MODE") == "true"
cfg.Server.DisablePresignedURLs = os.Getenv("DISABLE_PRESIGNED_URLS") == "true"
cfg.Server.ReadTimeout = 5 * time.Minute // Increased for large blob uploads
cfg.Server.WriteTimeout = 5 * time.Minute // Increased for large blob uploads
// Registration configuration (optional)
cfg.Registration.OwnerDID = os.Getenv("HOLD_OWNER")
// Storage configuration - build from env vars based on storage type
storageType := getEnvOrDefault("STORAGE_DRIVER", "s3")
var err error
cfg.Storage, err = buildStorageConfig(storageType)
if err != nil {
return nil, fmt.Errorf("failed to build storage config: %w", err)
}
return cfg, nil
}
// buildStorageConfig creates storage configuration based on driver type
func buildStorageConfig(driver string) (StorageConfig, error) {
params := make(map[string]any)
switch driver {
case "s3":
// S3/Storj/Minio configuration from standard AWS env vars
accessKey := os.Getenv("AWS_ACCESS_KEY_ID")
secretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")
region := getEnvOrDefault("AWS_REGION", "us-east-1")
bucket := os.Getenv("S3_BUCKET")
endpoint := os.Getenv("S3_ENDPOINT") // For Storj/Minio
if bucket == "" {
return StorageConfig{}, fmt.Errorf("S3_BUCKET is required for S3 storage")
}
params["accesskey"] = accessKey
params["secretkey"] = secretKey
params["region"] = region
params["bucket"] = bucket
if endpoint != "" {
params["regionendpoint"] = endpoint
}
case "filesystem":
// Filesystem configuration
rootDir := getEnvOrDefault("STORAGE_ROOT_DIR", "/var/lib/atcr/hold")
params["rootdirectory"] = rootDir
default:
return StorageConfig{}, fmt.Errorf("unsupported storage driver: %s", driver)
}
// Build distribution Storage config
storageCfg := configuration.Storage{}
storageCfg[driver] = configuration.Parameters(params)
return StorageConfig{Storage: storageCfg}, nil
}
// getEnvOrDefault gets an environment variable or returns a default value
func getEnvOrDefault(key, defaultValue string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultValue
}

587
pkg/hold/handlers.go Normal file
View File

@@ -0,0 +1,587 @@
package hold
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
"atcr.io/pkg/atproto"
)
// PresignedURLOperation defines the type of presigned URL operation
type PresignedURLOperation string
const (
OperationGet PresignedURLOperation = "GET"
OperationHead PresignedURLOperation = "HEAD"
OperationPut PresignedURLOperation = "PUT"
)
// PresignedURLRequest represents a request for a presigned URL (GET, HEAD, or PUT)
type PresignedURLRequest struct {
Operation PresignedURLOperation `json:"operation"`
DID string `json:"did"`
Digest string `json:"digest"`
Size int64 `json:"size,omitempty"` // Only required for PUT operations
}
// PresignedURLResponse contains the presigned URL
type PresignedURLResponse struct {
URL string `json:"url"`
ExpiresAt time.Time `json:"expires_at"`
}
// HandlePresignedURL handles presigned URL requests (GET, HEAD, or PUT)
// Operation type is specified in the request body
func (s *HoldService) HandlePresignedURL(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req PresignedURLRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate DID authorization based on operation type
var authorized bool
switch req.Operation {
case OperationGet, OperationHead:
authorized = s.isAuthorizedRead(req.DID)
case OperationPut:
authorized = s.isAuthorizedWrite(req.DID)
default:
http.Error(w, "unsupported operation", http.StatusBadRequest)
return
}
if !authorized {
log.Printf("[HandlePresignedURL:%s] Authorization FAILED", req.Operation)
if req.DID == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: access denied", http.StatusForbidden)
}
return
}
// Generate presigned URL (15 minute expiry)
ctx := context.Background()
expiry := time.Now().Add(15 * time.Minute)
url, err := s.getPresignedURL(ctx, req.Operation, req.Digest, req.DID)
if err != nil {
log.Printf("[HandlePresignedURL:%s] getPresignedURL failed: %v", req.Operation, err)
http.Error(w, fmt.Sprintf("failed to generate URL: %v", err), http.StatusInternalServerError)
return
}
log.Printf("[HandlePresignedURL:%s] Returning URL to client", req.Operation)
resp := PresignedURLResponse{
URL: url,
ExpiresAt: expiry,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// HandleProxyGet proxies a blob download through the service
func (s *HoldService) HandleProxyGet(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodHead {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Extract digest from path (e.g., /blobs/sha256:abc123)
digest := r.URL.Path[len("/blobs/"):]
if digest == "" {
http.Error(w, "missing digest", http.StatusBadRequest)
return
}
// Get DID from query param or header
did := r.URL.Query().Get("did")
if did == "" {
did = r.Header.Get("X-ATCR-DID")
}
log.Printf(" DID: %s", did)
// Authorize READ access
if !s.isAuthorizedRead(did) {
log.Printf("[HandleProxyGet] Authorization FAILED")
if did == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: access denied", http.StatusForbidden)
}
return
}
ctx := r.Context()
path := blobPath(digest)
// For HEAD requests, just check if blob exists
if r.Method == http.MethodHead {
stat, err := s.driver.Stat(ctx, path)
if err != nil {
http.Error(w, "blob not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
w.WriteHeader(http.StatusOK)
return
}
// For GET requests, read and return the blob
content, err := s.driver.GetContent(ctx, path)
if err != nil {
http.Error(w, "blob not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(content)
}
// HandleMove moves a blob from one path to another
// POST /move?from={path}&to={digest}&did={did}
func (s *HoldService) HandleMove(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
fromPath := r.URL.Query().Get("from")
toDigest := r.URL.Query().Get("to")
did := r.URL.Query().Get("did")
if fromPath == "" || toDigest == "" {
http.Error(w, "missing from or to parameter", http.StatusBadRequest)
return
}
// Authorize WRITE access
if !s.isAuthorizedWrite(did) {
if did == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
ctx := r.Context()
sourcePath := blobPath(fromPath)
destPath := blobPath(toDigest)
// Try to move using driver's Move operation
if err := s.driver.Move(ctx, sourcePath, destPath); err != nil {
log.Printf("HandleMove: failed to move blob: %v", err)
http.Error(w, fmt.Sprintf("failed to move blob: %v", err), http.StatusInternalServerError)
return
}
log.Printf("HandleMove: successfully moved blob from=%s to=%s", fromPath, toDigest)
w.WriteHeader(http.StatusOK)
}
// HandleProxyPut proxies a blob upload through the service
func (s *HoldService) HandleProxyPut(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
digest := r.URL.Path[len("/blobs/"):]
if digest == "" {
http.Error(w, "missing digest", http.StatusBadRequest)
return
}
did := r.URL.Query().Get("did")
if did == "" {
did = r.Header.Get("X-ATCR-DID")
}
// Authorize WRITE access
if !s.isAuthorizedWrite(did) {
log.Printf("[HandleProxyPut] Authorization FAILED")
if did == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Stream blob to storage (no buffering)
ctx := r.Context()
path := blobPath(digest)
// Create writer for streaming
writer, err := s.driver.Writer(ctx, path, false)
if err != nil {
log.Printf("HandleProxyPut: failed to create writer: %v", err)
http.Error(w, "failed to create writer", http.StatusInternalServerError)
return
}
// Stream directly from request body to storage
written, err := io.Copy(writer, r.Body)
if err != nil {
writer.Cancel(ctx)
log.Printf("HandleProxyPut: failed to write blob: %v", err)
http.Error(w, "failed to write blob", http.StatusInternalServerError)
return
}
// Commit the write
if err := writer.Commit(ctx); err != nil {
log.Printf("HandleProxyPut: failed to commit blob: %v", err)
http.Error(w, "failed to commit blob", http.StatusInternalServerError)
return
}
log.Printf("HandleProxyPut: successfully stored blob path=%s, size=%d", digest, written)
w.WriteHeader(http.StatusCreated)
}
// StartMultipartUploadRequest initiates a multipart upload
type StartMultipartUploadRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
}
// StartMultipartUploadResponse contains the multipart upload ID
type StartMultipartUploadResponse struct {
UploadID string `json:"upload_id"`
ExpiresAt time.Time `json:"expires_at"`
}
// HandleStartMultipart initiates a multipart upload
func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req StartMultipartUploadRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate DID authorization for WRITE
if !s.isAuthorizedWrite(req.DID) {
if req.DID == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Start multipart upload with manager (supports both S3Native and Buffered modes)
ctx := r.Context()
uploadID, mode, err := s.StartMultipartUploadWithManager(ctx, req.Digest, s.MultipartMgr)
if err != nil {
http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Started multipart upload: uploadID=%s, mode=%v, digest=%s", uploadID, mode, req.Digest)
expiry := time.Now().Add(24 * time.Hour) // Multipart uploads can take longer
resp := StartMultipartUploadResponse{
UploadID: uploadID,
ExpiresAt: expiry,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// GetPartURLRequest requests a presigned URL for a specific part
type GetPartURLRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
PartNumber int `json:"part_number"`
}
// GetPartURLResponse contains the presigned URL for a part
type GetPartURLResponse struct {
URL string `json:"url"`
ExpiresAt time.Time `json:"expires_at"`
}
// HandleGetPartURL generates a presigned URL for uploading a specific part
func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req GetPartURLRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate DID authorization for WRITE
if !s.isAuthorizedWrite(req.DID) {
if req.DID == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Get multipart session
session, err := s.MultipartMgr.GetSession(req.UploadID)
if err != nil {
http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound)
return
}
// Get part upload URL (presigned for S3Native, proxy for Buffered)
ctx := r.Context()
url, err := s.GetPartUploadURL(ctx, session, req.PartNumber, req.DID)
if err != nil {
http.Error(w, fmt.Sprintf("failed to generate part URL: %v", err), http.StatusInternalServerError)
return
}
expiry := time.Now().Add(15 * time.Minute)
resp := GetPartURLResponse{
URL: url,
ExpiresAt: expiry,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
// CompleteMultipartRequest completes a multipart upload
type CompleteMultipartRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
Parts []CompletedPart `json:"parts"`
}
// CompletedPart represents an uploaded part with its ETag
type CompletedPart struct {
PartNumber int `json:"part_number"`
ETag string `json:"etag"`
}
// HandleCompleteMultipart completes a multipart upload
func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req CompleteMultipartRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate DID authorization for WRITE
if !s.isAuthorizedWrite(req.DID) {
if req.DID == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Get multipart session
session, err := s.MultipartMgr.GetSession(req.UploadID)
if err != nil {
http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound)
return
}
// For S3Native mode, use parts from request (uploaded directly to S3)
// For Buffered mode, parts are in the session
if session.Mode == S3Native {
// Record parts from AppView's request (they have ETags from S3)
for _, p := range req.Parts {
session.RecordS3Part(p.PartNumber, p.ETag, 0)
}
log.Printf("Recorded %d S3 parts from request for uploadID=%s", len(req.Parts), req.UploadID)
}
// Complete multipart upload (handles both S3Native and Buffered modes)
ctx := r.Context()
if err := s.CompleteMultipartUploadWithManager(ctx, session, s.MultipartMgr); err != nil {
http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Completed multipart upload: uploadID=%s, mode=%v", req.UploadID, session.Mode)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "completed",
})
}
// AbortMultipartRequest aborts an in-progress upload
type AbortMultipartRequest struct {
DID string `json:"did"`
Digest string `json:"digest"`
UploadID string `json:"upload_id"`
}
// HandleAbortMultipart aborts an in-progress multipart upload
func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req AbortMultipartRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate DID authorization for WRITE
if !s.isAuthorizedWrite(req.DID) {
if req.DID == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Get multipart session
session, err := s.MultipartMgr.GetSession(req.UploadID)
if err != nil {
http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound)
return
}
// Abort multipart upload (handles both S3Native and Buffered modes)
ctx := r.Context()
if err := s.AbortMultipartUploadWithManager(ctx, session, s.MultipartMgr); err != nil {
http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Aborted multipart upload: uploadID=%s, mode=%v", req.UploadID, session.Mode)
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "aborted",
})
}
// RegisterRequest represents a request to register this hold in a user's PDS
type RegisterRequest struct {
DID string `json:"did"`
AccessToken string `json:"access_token"`
PDSEndpoint string `json:"pds_endpoint"`
}
// RegisterResponse contains the registration result
type RegisterResponse struct {
HoldURI string `json:"hold_uri"`
CrewURI string `json:"crew_uri"`
Message string `json:"message"`
}
// HandleRegister registers this hold service in a user's PDS (manual endpoint)
func (s *HoldService) HandleRegister(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req RegisterRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest)
return
}
// Validate required fields
if req.DID == "" || req.AccessToken == "" || req.PDSEndpoint == "" {
http.Error(w, "missing required fields: did, access_token, pds_endpoint", http.StatusBadRequest)
return
}
// Get public URL from config
publicURL := s.config.Server.PublicURL
if publicURL == "" {
// Fallback to constructing URL from request
scheme := "http"
if r.TLS != nil {
scheme = "https"
}
publicURL = fmt.Sprintf("%s://%s", scheme, r.Host)
}
// Derive hold name from URL
holdName, err := extractHostname(publicURL)
if err != nil {
http.Error(w, fmt.Sprintf("failed to extract hostname: %v", err), http.StatusBadRequest)
return
}
ctx := r.Context()
// Create ATProto client with user's credentials
client := atproto.NewClient(req.PDSEndpoint, req.DID, req.AccessToken)
// Create HoldRecord
holdRecord := atproto.NewHoldRecord(publicURL, req.DID, s.config.Server.Public)
holdResult, err := client.PutRecord(ctx, atproto.HoldCollection, holdName, holdRecord)
if err != nil {
http.Error(w, fmt.Sprintf("failed to create hold record: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Created hold record: %s", holdResult.URI)
// Create HoldCrewRecord for the owner
crewRecord := atproto.NewHoldCrewRecord(holdResult.URI, req.DID, "owner")
crewRKey := fmt.Sprintf("%s-%s", holdName, req.DID)
crewResult, err := client.PutRecord(ctx, atproto.HoldCrewCollection, crewRKey, crewRecord)
if err != nil {
http.Error(w, fmt.Sprintf("failed to create crew record: %v", err), http.StatusInternalServerError)
return
}
log.Printf("Created crew record: %s", crewResult.URI)
resp := RegisterResponse{
HoldURI: holdResult.URI,
CrewURI: crewResult.URI,
Message: fmt.Sprintf("Successfully registered hold service. Storage endpoint: %s", publicURL),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}

381
pkg/hold/multipart.go Normal file
View File

@@ -0,0 +1,381 @@
package hold
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"github.com/google/uuid"
)
// MultipartMode indicates how multipart uploads are handled
type MultipartMode int
const (
// S3Native uses S3's native multipart API with presigned URLs
S3Native MultipartMode = iota
// Buffered buffers parts in memory and assembles them in the hold service
Buffered
)
// MultipartSession tracks an in-progress multipart upload
type MultipartSession struct {
UploadID string // Unique upload ID
Digest string // Target digest path
Mode MultipartMode // Upload mode (S3Native or Buffered)
S3UploadID string // S3 upload ID (for S3Native mode)
Parts map[int]*MultipartPart // Buffered parts (for Buffered mode)
CreatedAt time.Time // When upload started
LastActivity time.Time // Last part upload
mu sync.RWMutex // Protects Parts map
}
// MultipartPart represents a single part in a multipart upload
type MultipartPart struct {
PartNumber int // Part number (1-indexed)
Data []byte // Part data (for Buffered mode)
ETag string // ETag from S3 or computed hash
Size int64 // Part size in bytes
UploadedAt time.Time // When part was uploaded
}
// MultipartManager manages multipart upload sessions
type MultipartManager struct {
sessions map[string]*MultipartSession // uploadID -> session
mu sync.RWMutex // Protects sessions map
}
// NewMultipartManager creates a new multipart manager
func NewMultipartManager() *MultipartManager {
mgr := &MultipartManager{
sessions: make(map[string]*MultipartSession),
}
// Start cleanup goroutine for abandoned uploads
go mgr.cleanupLoop()
return mgr
}
// cleanupLoop periodically cleans up expired sessions
func (m *MultipartManager) cleanupLoop() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for range ticker.C {
m.cleanupExpiredSessions()
}
}
// cleanupExpiredSessions removes sessions inactive for >24 hours
func (m *MultipartManager) cleanupExpiredSessions() {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
for uploadID, session := range m.sessions {
if now.Sub(session.LastActivity) > 24*time.Hour {
log.Printf("Cleaning up expired multipart session: uploadID=%s, age=%v", uploadID, now.Sub(session.CreatedAt))
delete(m.sessions, uploadID)
}
}
}
// CreateSession creates a new multipart upload session
func (m *MultipartManager) CreateSession(digest string, mode MultipartMode, s3UploadID string) *MultipartSession {
uploadID := uuid.New().String()
session := &MultipartSession{
UploadID: uploadID,
Digest: digest,
Mode: mode,
S3UploadID: s3UploadID,
Parts: make(map[int]*MultipartPart),
CreatedAt: time.Now(),
LastActivity: time.Now(),
}
m.mu.Lock()
m.sessions[uploadID] = session
m.mu.Unlock()
log.Printf("Created multipart session: uploadID=%s, digest=%s, mode=%v", uploadID, digest, mode)
return session
}
// GetSession retrieves a multipart session by upload ID
func (m *MultipartManager) GetSession(uploadID string) (*MultipartSession, error) {
m.mu.RLock()
defer m.mu.RUnlock()
session, ok := m.sessions[uploadID]
if !ok {
return nil, fmt.Errorf("multipart session not found: %s", uploadID)
}
return session, nil
}
// DeleteSession removes a multipart session
func (m *MultipartManager) DeleteSession(uploadID string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.sessions, uploadID)
log.Printf("Deleted multipart session: uploadID=%s", uploadID)
}
// StorePart stores a part in the session (for Buffered mode)
func (s *MultipartSession) StorePart(partNumber int, data []byte) string {
s.mu.Lock()
defer s.mu.Unlock()
// Compute ETag as SHA256 hash of part data
hash := sha256.Sum256(data)
etag := hex.EncodeToString(hash[:])
part := &MultipartPart{
PartNumber: partNumber,
Data: data,
ETag: etag,
Size: int64(len(data)),
UploadedAt: time.Now(),
}
s.Parts[partNumber] = part
s.LastActivity = time.Now()
log.Printf("Stored part: uploadID=%s, part=%d, size=%d bytes, etag=%s", s.UploadID, partNumber, len(data), etag)
return etag
}
// RecordS3Part records a part uploaded to S3 (for S3Native mode)
func (s *MultipartSession) RecordS3Part(partNumber int, etag string, size int64) {
s.mu.Lock()
defer s.mu.Unlock()
part := &MultipartPart{
PartNumber: partNumber,
ETag: etag,
Size: size,
UploadedAt: time.Now(),
}
s.Parts[partNumber] = part
s.LastActivity = time.Now()
log.Printf("Recorded S3 part: uploadID=%s, part=%d, size=%d bytes, etag=%s", s.UploadID, partNumber, size, etag)
}
// AssembleBufferedParts assembles all buffered parts into a single blob
// Returns the complete data and total size
func (s *MultipartSession) AssembleBufferedParts() ([]byte, int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.Mode != Buffered {
return nil, 0, fmt.Errorf("session is not in buffered mode")
}
// Calculate total size
var totalSize int64
maxPart := 0
for partNum, part := range s.Parts {
totalSize += part.Size
if partNum > maxPart {
maxPart = partNum
}
}
// Check for missing parts
for i := 1; i <= maxPart; i++ {
if _, ok := s.Parts[i]; !ok {
return nil, 0, fmt.Errorf("missing part %d", i)
}
}
// Assemble parts in order
assembled := make([]byte, 0, totalSize)
for i := 1; i <= maxPart; i++ {
part := s.Parts[i]
assembled = append(assembled, part.Data...)
}
log.Printf("Assembled buffered parts: uploadID=%s, parts=%d, totalSize=%d bytes", s.UploadID, maxPart, totalSize)
return assembled, totalSize, nil
}
// GetCompletedParts returns the list of completed parts for S3 multipart completion
func (s *MultipartSession) GetCompletedParts() []CompletedPart {
s.mu.RLock()
defer s.mu.RUnlock()
parts := make([]CompletedPart, 0, len(s.Parts))
for _, part := range s.Parts {
parts = append(parts, CompletedPart{
PartNumber: part.PartNumber,
ETag: part.ETag,
})
}
return parts
}
// StartMultipartUploadWithManager initiates a multipart upload using the manager
// Returns uploadID and mode
func (s *HoldService) StartMultipartUploadWithManager(ctx context.Context, digest string, manager *MultipartManager) (string, MultipartMode, error) {
// Check if presigned URLs are disabled for testing
if s.config.Server.DisablePresignedURLs {
log.Printf("Presigned URLs disabled (DISABLE_PRESIGNED_URLS=true), using buffered mode")
session := manager.CreateSession(digest, Buffered, "")
log.Printf("Started buffered multipart: uploadID=%s", session.UploadID)
return session.UploadID, Buffered, nil
}
// Try S3 native multipart first
if s.s3Client != nil {
s3UploadID, err := s.startMultipartUpload(ctx, digest)
if err == nil {
// S3 native multipart succeeded
session := manager.CreateSession(digest, S3Native, s3UploadID)
log.Printf("Started S3 native multipart: uploadID=%s, s3UploadID=%s", session.UploadID, s3UploadID)
return session.UploadID, S3Native, nil
}
log.Printf("S3 native multipart failed, falling back to buffered mode: %v", err)
}
// Fallback to buffered mode
session := manager.CreateSession(digest, Buffered, "")
log.Printf("Started buffered multipart: uploadID=%s", session.UploadID)
return session.UploadID, Buffered, nil
}
// GetPartUploadURL generates a URL for uploading a part
// For S3Native: returns presigned URL
// For Buffered: returns proxy endpoint
func (s *HoldService) GetPartUploadURL(ctx context.Context, session *MultipartSession, partNumber int, did string) (string, error) {
if session.Mode == S3Native {
// Generate S3 presigned URL for this part
url, err := s.getPartPresignedURL(ctx, session.Digest, session.S3UploadID, partNumber)
if err != nil {
return "", fmt.Errorf("failed to generate S3 part URL: %w", err)
}
return url, nil
}
// Buffered mode: return proxy endpoint
url := fmt.Sprintf("%s/multipart-parts/%s/%d?did=%s",
s.config.Server.PublicURL, session.UploadID, partNumber, did)
return url, nil
}
// CompleteMultipartUploadWithManager completes a multipart upload
func (s *HoldService) CompleteMultipartUploadWithManager(ctx context.Context, session *MultipartSession, manager *MultipartManager) error {
defer manager.DeleteSession(session.UploadID)
if session.Mode == S3Native {
// Complete S3 multipart upload
parts := session.GetCompletedParts()
if err := s.completeMultipartUpload(ctx, session.Digest, session.S3UploadID, parts); err != nil {
return fmt.Errorf("failed to complete S3 multipart: %w", err)
}
log.Printf("Completed S3 native multipart: uploadID=%s, parts=%d", session.UploadID, len(parts))
return nil
}
// Buffered mode: assemble parts and write via driver
data, size, err := session.AssembleBufferedParts()
if err != nil {
return fmt.Errorf("failed to assemble parts: %w", err)
}
// Write assembled blob to storage
path := blobPath(session.Digest)
writer, err := s.driver.Writer(ctx, path, false)
if err != nil {
return fmt.Errorf("failed to create writer: %w", err)
}
written, err := writer.Write(data)
if err != nil {
writer.Cancel(ctx)
return fmt.Errorf("failed to write blob: %w", err)
}
if err := writer.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit blob: %w", err)
}
log.Printf("Completed buffered multipart: uploadID=%s, size=%d bytes, written=%d", session.UploadID, size, written)
return nil
}
// AbortMultipartUploadWithManager aborts a multipart upload
func (s *HoldService) AbortMultipartUploadWithManager(ctx context.Context, session *MultipartSession, manager *MultipartManager) error {
defer manager.DeleteSession(session.UploadID)
if session.Mode == S3Native {
// Abort S3 multipart upload
if err := s.abortMultipartUpload(ctx, session.Digest, session.S3UploadID); err != nil {
return fmt.Errorf("failed to abort S3 multipart: %w", err)
}
log.Printf("Aborted S3 native multipart: uploadID=%s", session.UploadID)
return nil
}
// Buffered mode: just delete the session (parts are in memory)
log.Printf("Aborted buffered multipart: uploadID=%s", session.UploadID)
return nil
}
// HandleMultipartPartUpload handles uploading a part in buffered mode
// This is a new endpoint: PUT /multipart-parts/{uploadID}/{partNumber}
func (s *HoldService) HandleMultipartPartUpload(w http.ResponseWriter, r *http.Request, uploadID string, partNumber int, did string, manager *MultipartManager) {
if r.Method != http.MethodPut {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Get session
session, err := manager.GetSession(uploadID)
if err != nil {
http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound)
return
}
// Verify authorization
if !s.isAuthorizedWrite(did) {
if did == "" {
http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized)
} else {
http.Error(w, "forbidden: write access denied", http.StatusForbidden)
}
return
}
// Verify session is in buffered mode
if session.Mode != Buffered {
http.Error(w, "session is not in buffered mode", http.StatusBadRequest)
return
}
// Read part data
data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("failed to read part data: %v", err), http.StatusInternalServerError)
return
}
// Store part and get ETag
etag := session.StorePart(partNumber, data)
// Return ETag in response
w.Header().Set("ETag", etag)
w.WriteHeader(http.StatusOK)
}

267
pkg/hold/registration.go Normal file
View File

@@ -0,0 +1,267 @@
package hold
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"strings"
"time"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth/oauth"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
)
// HealthHandler handles health check requests
func (s *HoldService) HealthHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
}
// isHoldRegistered checks if a hold with the given public URL is already registered in the PDS
func (s *HoldService) isHoldRegistered(ctx context.Context, did, pdsEndpoint, publicURL string) (bool, error) {
// We need to query the PDS without authentication to check public records
// ATProto records are publicly readable, so we can use an unauthenticated client
client := atproto.NewClient(pdsEndpoint, did, "")
// List all hold records for this DID
records, err := client.ListRecords(ctx, atproto.HoldCollection, 100)
if err != nil {
return false, fmt.Errorf("failed to list hold records: %w", err)
}
// Check if any hold record matches our public URL
for _, record := range records {
var holdRecord atproto.HoldRecord
if err := json.Unmarshal(record.Value, &holdRecord); err != nil {
continue
}
if holdRecord.Endpoint == publicURL {
return true, nil
}
}
return false, nil
}
// AutoRegister registers this hold service in the owner's PDS
// Checks if already registered first, then does OAuth if needed
func (s *HoldService) AutoRegister(callbackHandler *http.HandlerFunc) error {
reg := &s.config.Registration
publicURL := s.config.Server.PublicURL
if publicURL == "" {
return fmt.Errorf("HOLD_PUBLIC_URL not set")
}
if reg.OwnerDID == "" {
return fmt.Errorf("HOLD_OWNER not set - required for registration")
}
ctx := context.Background()
log.Printf("Checking registration status for DID: %s", reg.OwnerDID)
// Resolve DID to PDS endpoint using indigo
directory := identity.DefaultDirectory()
didParsed, err := syntax.ParseDID(reg.OwnerDID)
if err != nil {
return fmt.Errorf("invalid owner DID: %w", err)
}
ident, err := directory.LookupDID(ctx, didParsed)
if err != nil {
return fmt.Errorf("failed to resolve PDS for DID: %w", err)
}
pdsEndpoint := ident.PDSEndpoint()
if pdsEndpoint == "" {
return fmt.Errorf("no PDS endpoint found for DID")
}
log.Printf("PDS endpoint: %s", pdsEndpoint)
// Check if hold is already registered
isRegistered, err := s.isHoldRegistered(ctx, reg.OwnerDID, pdsEndpoint, publicURL)
if err != nil {
log.Printf("Warning: failed to check registration status: %v", err)
log.Printf("Proceeding with OAuth registration...")
} else if isRegistered {
log.Printf("✓ Hold service already registered in PDS")
log.Printf("Public URL: %s", publicURL)
return nil
}
// Not registered, need to do OAuth
log.Printf("Hold not registered, starting OAuth flow...")
// Get handle from DID document (already resolved above)
handle := ident.Handle.String()
if handle == "" || handle == "handle.invalid" {
return fmt.Errorf("no valid handle found for DID")
}
log.Printf("Resolved handle: %s", handle)
log.Printf("Starting OAuth registration for hold service")
log.Printf("Public URL: %s", publicURL)
return s.registerWithOAuth(publicURL, handle, reg.OwnerDID, pdsEndpoint, callbackHandler)
}
// registerWithOAuth performs OAuth flow and registers the hold
func (s *HoldService) registerWithOAuth(publicURL, handle, did, pdsEndpoint string, callbackHandler *http.HandlerFunc) error {
// Define the scopes we need for hold registration
holdScopes := []string{
"atproto",
fmt.Sprintf("repo:%s?action=create", atproto.HoldCollection),
fmt.Sprintf("repo:%s?action=update", atproto.HoldCollection),
fmt.Sprintf("repo:%s?action=create", atproto.HoldCrewCollection),
fmt.Sprintf("repo:%s?action=update", atproto.HoldCrewCollection),
fmt.Sprintf("repo:%s?action=create", atproto.SailorProfileCollection),
fmt.Sprintf("repo:%s?action=update", atproto.SailorProfileCollection),
}
// Determine base URL based on mode
// Callback path standardized to /auth/oauth/callback across ATCR
var baseURL string
if s.config.Server.TestMode {
// Test mode: Use localhost for OAuth (browser accessible) but store real URL in hold record
// Extract port from publicURL (e.g., "http://172.28.0.3:8080" -> ":8080")
parsedURL, err := url.Parse(publicURL)
if err != nil {
return fmt.Errorf("failed to parse public URL: %w", err)
}
port := parsedURL.Port()
if port == "" {
port = "8080" // default
}
baseURL = fmt.Sprintf("http://127.0.0.1:%s", port)
} else {
baseURL = publicURL
}
// Run interactive OAuth flow with persistent server
ctx := context.Background()
result, err := oauth.InteractiveFlowWithCallback(
ctx,
baseURL,
handle,
holdScopes, // Pass hold-specific scopes
func(handler http.HandlerFunc) error {
// Populate the pre-registered callback handler
*callbackHandler = handler
return nil
},
func(authURL string) error {
// Display OAuth URL for user to visit
log.Print("\n" + strings.Repeat("=", 80))
log.Printf("OAUTH AUTHORIZATION REQUIRED")
log.Print(strings.Repeat("=", 80))
log.Printf("\nPlease visit this URL to authorize the hold service:\n")
log.Printf(" %s\n", authURL)
log.Printf("Waiting for authorization...")
log.Print(strings.Repeat("=", 80) + "\n")
return nil
},
)
if err != nil {
return err
}
log.Printf("Authorization received!")
log.Printf("OAuth session obtained successfully")
log.Printf("DID: %s", did)
log.Printf("PDS: %s", pdsEndpoint)
// Create ATProto client with indigo's API client (handles DPoP automatically)
apiClient := result.Session.APIClient()
client := atproto.NewClientWithIndigoClient(pdsEndpoint, did, apiClient)
return s.registerWithClient(publicURL, did, client)
}
// registerWithClient registers the hold using an authenticated ATProto client
func (s *HoldService) registerWithClient(publicURL, did string, client *atproto.Client) error {
// Derive hold name from URL (hostname)
holdName, err := extractHostname(publicURL)
if err != nil {
return fmt.Errorf("failed to extract hostname from URL: %w", err)
}
log.Printf("Registering hold service: url=%s, name=%s, owner=%s", publicURL, holdName, did)
ctx := context.Background()
// Create HoldRecord
holdRecord := atproto.NewHoldRecord(publicURL, did, s.config.Server.Public)
// Use hostname as record key
holdResult, err := client.PutRecord(ctx, atproto.HoldCollection, holdName, holdRecord)
if err != nil {
return fmt.Errorf("failed to create hold record: %w", err)
}
log.Printf("✓ Created hold record: %s", holdResult.URI)
// Create HoldCrewRecord for the owner
crewRecord := atproto.NewHoldCrewRecord(holdResult.URI, did, "owner")
crewRKey := fmt.Sprintf("%s-%s", holdName, did)
crewResult, err := client.PutRecord(ctx, atproto.HoldCrewCollection, crewRKey, crewRecord)
if err != nil {
return fmt.Errorf("failed to create crew record: %w", err)
}
log.Printf("✓ Created crew record: %s", crewResult.URI)
// Update sailor profile to set this as the default hold
profile, err := atproto.GetProfile(ctx, client)
if err != nil {
log.Printf("Warning: failed to get sailor profile: %v", err)
} else {
if profile == nil {
// Create new profile with this hold as default
profile = atproto.NewSailorProfileRecord(publicURL)
} else {
// Update existing profile with new defaultHold
profile.DefaultHold = publicURL
profile.UpdatedAt = time.Now()
}
err = atproto.UpdateProfile(ctx, client, profile)
if err != nil {
log.Printf("Warning: failed to update sailor profile: %v", err)
} else {
log.Printf("✓ Updated sailor profile defaultHold: %s", publicURL)
}
}
log.Print("\n" + strings.Repeat("=", 80))
log.Printf("REGISTRATION COMPLETE")
log.Print(strings.Repeat("=", 80))
log.Printf("Hold service is now registered and ready to use!")
log.Print(strings.Repeat("=", 80) + "\n")
return nil
}
// extractHostname extracts the hostname from a URL to use as the hold name
func extractHostname(urlStr string) (string, error) {
u, err := url.Parse(urlStr)
if err != nil {
return "", err
}
// Remove port if present
hostname := u.Hostname()
if hostname == "" {
return "", fmt.Errorf("no hostname in URL")
}
return hostname, nil
}

221
pkg/hold/s3.go Normal file
View File

@@ -0,0 +1,221 @@
package hold
import (
"context"
"fmt"
"log"
"sort"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
// initS3Client initializes the S3 client for presigned URL generation
// Returns nil error if S3 client is successfully initialized
// Returns error if storage is not S3 or if initialization fails (service will fall back to proxy mode)
func (s *HoldService) initS3Client() error {
// Check if presigned URLs are explicitly disabled
if s.config.Server.DisablePresignedURLs {
log.Printf("⚠️ S3 presigned URLs DISABLED by config (DISABLE_PRESIGNED_URLS=true)")
log.Printf(" All uploads will use buffered mode (parts buffered in hold service)")
return nil // Not an error - just using buffered mode
}
// Check if storage driver is S3
if s.config.Storage.Type() != "s3" {
log.Printf("Storage driver is %s (not S3), presigned URLs disabled", s.config.Storage.Type())
return nil // Not an error - just using different driver
}
// Extract S3 configuration from storage parameters
params := s.config.Storage.Parameters()
// Extract required S3 configuration
region, _ := params["region"].(string)
if region == "" {
region = "us-east-1" // Default region
}
accessKey, _ := params["accesskey"].(string)
secretKey, _ := params["secretkey"].(string)
bucket, _ := params["bucket"].(string)
if bucket == "" {
return fmt.Errorf("S3 bucket not configured")
}
// Build AWS config
awsConfig := &aws.Config{
Region: aws.String(region),
}
// Add credentials if provided (allow IAM role auth if not provided)
if accessKey != "" && secretKey != "" {
awsConfig.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "")
}
// Add custom endpoint for S3-compatible services (Storj, MinIO, R2, etc.)
if endpoint, ok := params["regionendpoint"].(string); ok && endpoint != "" {
awsConfig.Endpoint = aws.String(endpoint)
awsConfig.S3ForcePathStyle = aws.Bool(true) // Required for MinIO, Storj
}
// Create AWS session
sess, err := session.NewSession(awsConfig)
if err != nil {
return fmt.Errorf("failed to create AWS session: %w", err)
}
// Create S3 client
s.s3Client = s3.New(sess)
s.bucket = bucket
// Extract path prefix if configured (rootdirectory in S3 params)
if rootDir, ok := params["rootdirectory"].(string); ok && rootDir != "" {
s.s3PathPrefix = strings.TrimPrefix(rootDir, "/")
}
log.Printf("✅ S3 presigned URLs enabled")
return nil
}
// startMultipartUpload initiates a multipart upload and returns upload ID
func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) {
if s.s3Client == nil {
return "", fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
})
if err != nil {
return "", err
}
log.Printf("Started multipart upload: digest=%s, uploadID=%s", digest, *result.UploadId)
return *result.UploadId, nil
}
// getPartPresignedURL generates presigned URL for a specific part
func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) {
if s.s3Client == nil {
return "", fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
UploadId: aws.String(uploadID),
PartNumber: aws.Int64(int64(partNumber)),
})
url, err := req.Presign(15 * time.Minute)
if err != nil {
return "", err
}
log.Printf("Generated part presigned URL: digest=%s, uploadID=%s, part=%d", digest, uploadID, partNumber)
return url, nil
}
// normalizeETag ensures an ETag has quotes (required by S3 CompleteMultipartUpload)
// S3 returns ETags with quotes, but HTTP clients may strip them
func normalizeETag(etag string) string {
// Already has quotes
if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
return etag
}
// Add quotes
return fmt.Sprintf("\"%s\"", etag)
}
// completeMultipartUpload finalizes the multipart upload
func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error {
if s.s3Client == nil {
return fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
// Sort parts by part number (S3 requires ascending order)
sort.Slice(parts, func(i, j int) bool {
return parts[i].PartNumber < parts[j].PartNumber
})
// Convert to S3 CompletedPart format
// IMPORTANT: S3 requires ETags to be quoted in the CompleteMultipartUpload XML
s3Parts := make([]*s3.CompletedPart, len(parts))
for i, p := range parts {
etag := normalizeETag(p.ETag)
s3Parts[i] = &s3.CompletedPart{
PartNumber: aws.Int64(int64(p.PartNumber)),
ETag: aws.String(etag),
}
}
_, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
UploadId: aws.String(uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: s3Parts,
},
})
if err != nil {
log.Printf("Failed to complete multipart upload: digest=%s, uploadID=%s, err=%v", digest, uploadID, err)
return err
}
log.Printf("Completed multipart upload: digest=%s, uploadID=%s, parts=%d", digest, uploadID, len(parts))
return nil
}
// abortMultipartUpload aborts an in-progress multipart upload
func (s *HoldService) abortMultipartUpload(ctx context.Context, digest, uploadID string) error {
if s.s3Client == nil {
return fmt.Errorf("S3 not configured")
}
path := blobPath(digest)
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
_, err := s.s3Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
UploadId: aws.String(uploadID),
})
if err != nil {
log.Printf("Failed to abort multipart upload: digest=%s, uploadID=%s, err=%v", digest, uploadID, err)
return err
}
log.Printf("Aborted multipart upload: digest=%s, uploadID=%s", digest, uploadID)
return nil
}

44
pkg/hold/service.go Normal file
View File

@@ -0,0 +1,44 @@
package hold
import (
"context"
"fmt"
"log"
"github.com/aws/aws-sdk-go/service/s3"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
)
// HoldService provides presigned URLs for blob storage in a hold
type HoldService struct {
driver storagedriver.StorageDriver
config *Config
s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage)
bucket string // S3 bucket name
s3PathPrefix string // S3 path prefix (if any)
MultipartMgr *MultipartManager // Exported for access in route handlers
}
// NewHoldService creates a new hold service
func NewHoldService(cfg *Config) (*HoldService, error) {
// Create storage driver from config
ctx := context.Background()
driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters())
if err != nil {
return nil, fmt.Errorf("failed to create storage driver: %w", err)
}
service := &HoldService{
driver: driver,
config: cfg,
MultipartMgr: NewMultipartManager(),
}
// Initialize S3 client for presigned URLs (if using S3 storage)
if err := service.initS3Client(); err != nil {
log.Printf("WARNING: S3 presigned URLs disabled: %v", err)
}
return service, nil
}

115
pkg/hold/storage.go Normal file
View File

@@ -0,0 +1,115 @@
package hold
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)
// blobPath converts a digest (e.g., "sha256:abc123...") or temp path to a storage path
// Distribution stores blobs as: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data
// where xx is the first 2 characters of the hash for directory sharding
// NOTE: Path must start with / for filesystem driver
func blobPath(digest string) string {
// Handle temp paths (start with uploads/temp-)
if strings.HasPrefix(digest, "uploads/temp-") {
return fmt.Sprintf("/docker/registry/v2/%s/data", digest)
}
// Split digest into algorithm and hash
parts := strings.SplitN(digest, ":", 2)
if len(parts) != 2 {
// Fallback for malformed digest
return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest)
}
algorithm := parts[0]
hash := parts[1]
// Use first 2 characters for sharding
if len(hash) < 2 {
return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash)
}
return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash)
}
// getPresignedURL generates a presigned URL for GET, HEAD, or PUT operations
func (s *HoldService) getPresignedURL(ctx context.Context, operation PresignedURLOperation, digest string, did string) (string, error) {
path := blobPath(digest)
// Check blob exists for GET/HEAD operations (not for PUT since blob doesn't exist yet)
if operation == OperationGet || operation == OperationHead {
if _, err := s.driver.Stat(ctx, path); err != nil {
return "", fmt.Errorf("blob not found: %w", err)
}
}
// Check if presigned URLs are disabled
if s.config.Server.DisablePresignedURLs {
log.Printf("Presigned URLs disabled, using proxy URL")
return s.getProxyURL(digest, did), nil
}
// Generate presigned URL if S3 client is available
if s.s3Client != nil {
// Build S3 key from blob path
s3Key := strings.TrimPrefix(path, "/")
if s.s3PathPrefix != "" {
s3Key = s.s3PathPrefix + "/" + s3Key
}
// Create appropriate S3 request based on operation
var req interface {
Presign(time.Duration) (string, error)
}
switch operation {
case OperationGet:
// Note: Don't use ResponseContentType - not supported by all S3-compatible services
req, _ = s.s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
})
case OperationHead:
req, _ = s.s3Client.HeadObjectRequest(&s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
})
case OperationPut:
req, _ = s.s3Client.PutObjectRequest(&s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(s3Key),
ContentType: aws.String("application/octet-stream"),
})
default:
return "", fmt.Errorf("unsupported operation: %s", operation)
}
// Generate presigned URL with 15 minute expiry
url, err := req.Presign(15 * time.Minute)
if err != nil {
log.Printf("[getPresignedURL] Presign FAILED for %s: %v", operation, err)
log.Printf(" Falling back to proxy URL")
return s.getProxyURL(digest, did), nil
}
return url, nil
}
// Fallback: return proxy URL through this service
return s.getProxyURL(digest, did), nil
}
// getProxyURL returns a proxy URL for blob operations (fallback when presigned URLs unavailable)
func (s *HoldService) getProxyURL(digest, did string) string {
// All operations use the same proxy endpoint
return fmt.Sprintf("%s/blobs/%s?did=%s", s.config.Server.PublicURL, digest, did)
}

View File

@@ -1,54 +0,0 @@
package storage
import (
"context"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/storage"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/reference"
)
// S3BlobStore wraps distribution's blob store with S3 backend
type S3BlobStore struct {
distribution.BlobStore
}
// NewS3BlobStore creates a new S3-backed blob store
func NewS3BlobStore(ctx context.Context, storageDriver driver.StorageDriver, repoName string) (*S3BlobStore, error) {
// Create a registry instance with the S3 driver
reg, err := storage.NewRegistry(ctx, storageDriver)
if err != nil {
return nil, err
}
// Parse the repository name into a Named reference
named, err := reference.ParseNamed(repoName)
if err != nil {
return nil, err
}
// Get the repository
repo, err := reg.Repository(ctx, named)
if err != nil {
return nil, err
}
// Get the blob store
blobStore := repo.Blobs(ctx)
return &S3BlobStore{
BlobStore: blobStore,
}, nil
}
// Note: S3BlobStore inherits all methods from distribution.BlobStore
// including:
// - Stat(ctx, dgst) - Check if blob exists
// - Get(ctx, dgst) - Retrieve blob
// - Open(ctx, dgst) - Open blob for reading
// - Put(ctx, mediaType, payload) - Store blob
// - Create(ctx, options...) - Create blob writer
// - Resume(ctx, id) - Resume blob upload
// - ServeBlob(ctx, w, r, dgst) - Serve blob over HTTP
// - Delete(ctx, dgst) - Delete blob