diff --git a/pkg/appview/public/icons.svg b/pkg/appview/public/icons.svg index 3e04df0..b87161e 100644 --- a/pkg/appview/public/icons.svg +++ b/pkg/appview/public/icons.svg @@ -49,6 +49,5 @@ - \ No newline at end of file diff --git a/pkg/hold/admin/admin.go b/pkg/hold/admin/admin.go index a1dd88a..170d0c1 100644 --- a/pkg/hold/admin/admin.go +++ b/pkg/hold/admin/admin.go @@ -414,6 +414,9 @@ func (ui *AdminUI) RegisterRoutes(r chi.Router) { // GC POSTs r.Post("/admin/api/gc/preview", ui.handleGCPreview) r.Post("/admin/api/gc/run", ui.handleGCRun) + r.Post("/admin/api/gc/reconcile", ui.handleGCReconcile) + r.Post("/admin/api/gc/delete-records", ui.handleGCDeleteRecords) + r.Post("/admin/api/gc/delete-blobs", ui.handleGCDeleteBlobs) // API endpoints (for HTMX) r.Get("/admin/api/stats", ui.handleStatsAPI) diff --git a/pkg/hold/admin/handlers_gc.go b/pkg/hold/admin/handlers_gc.go index 0b2725d..00dc231 100644 --- a/pkg/hold/admin/handlers_gc.go +++ b/pkg/hold/admin/handlers_gc.go @@ -99,6 +99,83 @@ func (ui *AdminUI) handleGCRun(w http.ResponseWriter, r *http.Request) { }{Result: result}) } +// handleGCReconcile creates missing layer records without deleting anything +func (ui *AdminUI) handleGCReconcile(w http.ResponseWriter, r *http.Request) { + if ui.gc == nil { + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{"GC not available"}) + return + } + + result, err := ui.gc.Reconcile(r.Context()) + if err != nil { + slog.Error("GC reconcile failed", "error", err) + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{err.Error()}) + return + } + + session := getSessionFromContext(r.Context()) + slog.Info("GC reconcile completed via admin panel", + "recordsReconciled", result.RecordsReconciled, + "duration", result.Duration, + "by", session.DID) + + ui.renderTemplate(w, "partials/gc_result.html", struct { + Result *gc.GCResult + }{Result: result}) +} + +// handleGCDeleteRecords deletes orphaned layer records (no blob deletion) +func (ui *AdminUI) handleGCDeleteRecords(w http.ResponseWriter, r *http.Request) { + if ui.gc == nil { + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{"GC not available"}) + return + } + + result, err := ui.gc.DeleteOrphanedRecords(r.Context()) + if err != nil { + slog.Error("GC delete records failed", "error", err) + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{err.Error()}) + return + } + + session := getSessionFromContext(r.Context()) + slog.Info("GC delete orphaned records completed via admin panel", + "recordsDeleted", result.RecordsDeleted, + "orphanedRecords", result.OrphanedRecords, + "duration", result.Duration, + "by", session.DID) + + ui.renderTemplate(w, "partials/gc_result.html", struct { + Result *gc.GCResult + }{Result: result}) +} + +// handleGCDeleteBlobs walks S3 and deletes unreferenced blobs +func (ui *AdminUI) handleGCDeleteBlobs(w http.ResponseWriter, r *http.Request) { + if ui.gc == nil { + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{"GC not available"}) + return + } + + result, err := ui.gc.DeleteOrphanedBlobs(r.Context()) + if err != nil { + slog.Error("GC delete blobs failed", "error", err) + ui.renderTemplate(w, "partials/gc_error.html", struct{ Error string }{err.Error()}) + return + } + + session := getSessionFromContext(r.Context()) + slog.Info("GC delete orphaned blobs completed via admin panel", + "blobsDeleted", result.BlobsDeleted, + "bytesReclaimed", result.BytesReclaimed, + "duration", result.Duration, + "by", session.DID) + + ui.renderTemplate(w, "partials/gc_result.html", struct { + Result *gc.GCResult + }{Result: result}) +} + // timeAgo returns a human-readable relative time string func timeAgo(t time.Time) string { if t.IsZero() { diff --git a/pkg/hold/admin/public/icons.svg b/pkg/hold/admin/public/icons.svg index 3e04df0..b87161e 100644 --- a/pkg/hold/admin/public/icons.svg +++ b/pkg/hold/admin/public/icons.svg @@ -49,6 +49,5 @@ - \ No newline at end of file diff --git a/pkg/hold/admin/templates/partials/gc_preview.html b/pkg/hold/admin/templates/partials/gc_preview.html index 3fa63f1..aa61303 100644 --- a/pkg/hold/admin/templates/partials/gc_preview.html +++ b/pkg/hold/admin/templates/partials/gc_preview.html @@ -135,19 +135,43 @@ {{end}} - + {{if or .Preview.OrphanedRecords .Preview.OrphanedBlobs .Preview.MissingRecords}} -
- + {{end}} + {{if .Preview.OrphanedRecords}} + + {{end}} + {{if .Preview.OrphanedBlobs}} + + {{end}}
+

Run Scan again after each operation to see updated counts.

{{end}} diff --git a/pkg/hold/config.go b/pkg/hold/config.go index 13f06c5..31aca26 100644 --- a/pkg/hold/config.go +++ b/pkg/hold/config.go @@ -141,7 +141,7 @@ type ScannerConfig struct { Secret string `yaml:"secret" comment:"Shared secret for scanner WebSocket auth. Empty disables scanning."` // Minimum interval between re-scans of the same manifest. 0 disables proactive scanning. - RescanInterval time.Duration `yaml:"rescan_interval" comment:"Minimum interval between re-scans of the same manifest. When set, the hold proactively scans manifests when the scanner is idle. Default: 24h. Set to 0 to disable."` + RescanInterval time.Duration `yaml:"rescan_interval" comment:"Minimum interval between re-scans of the same manifest. When set, the hold proactively scans manifests when the scanner is idle. Default: 168h (7 days). Set to 0 to disable."` } // DatabaseConfig defines embedded PDS database settings @@ -223,7 +223,7 @@ func setHoldDefaults(v *viper.Viper) { v.SetDefault("gc.enabled", false) // Scanner defaults v.SetDefault("scanner.secret", "") - v.SetDefault("scanner.rescan_interval", "24h") + v.SetDefault("scanner.rescan_interval", "168h") // 7 days // Log shipper defaults v.SetDefault("log_shipper.batch_size", 100) diff --git a/pkg/hold/gc/gc.go b/pkg/hold/gc/gc.go index ba7682b..4420c51 100644 --- a/pkg/hold/gc/gc.go +++ b/pkg/hold/gc/gc.go @@ -296,6 +296,118 @@ func (gc *GarbageCollector) Preview(ctx context.Context) (*GCPreview, error) { return preview, nil } +// Reconcile creates missing layer records without deleting anything. +// Requires a prior Preview() to identify missing records. +func (gc *GarbageCollector) Reconcile(ctx context.Context) (*GCResult, error) { + if !gc.tryStart() { + return nil, fmt.Errorf("GC operation already in progress") + } + defer gc.finish() + + gc.mu.Lock() + preview := gc.lastPreview + gc.mu.Unlock() + + if preview == nil { + return nil, fmt.Errorf("no preview available — run Scan first") + } + if len(preview.MissingRecords) == 0 { + return &GCResult{}, nil + } + + start := time.Now() + result := &GCResult{} + + gc.logger.Info("Starting reconciliation", "missingRecords", len(preview.MissingRecords)) + gc.reconcileMissingRecords(ctx, preview.MissingRecords, result) + result.Duration = time.Since(start) + + gc.mu.Lock() + gc.lastResult = result + gc.lastResultAt = time.Now() + gc.mu.Unlock() + + return result, nil +} + +// DeleteOrphanedRecords deletes layer records whose manifests no longer exist. +// Requires a prior Preview() to identify orphaned records. +func (gc *GarbageCollector) DeleteOrphanedRecords(ctx context.Context) (*GCResult, error) { + if !gc.tryStart() { + return nil, fmt.Errorf("GC operation already in progress") + } + defer gc.finish() + + gc.mu.Lock() + preview := gc.lastPreview + gc.mu.Unlock() + + if preview == nil { + return nil, fmt.Errorf("no preview available — run Scan first") + } + if len(preview.OrphanedRecords) == 0 { + return &GCResult{}, nil + } + + start := time.Now() + result := &GCResult{ + OrphanedRecords: int64(len(preview.OrphanedRecords)), + } + + rkeys := make([]string, len(preview.OrphanedRecords)) + for i, r := range preview.OrphanedRecords { + rkeys[i] = r.Rkey + } + + gc.logger.Info("Deleting orphaned records", "count", len(rkeys)) + if err := gc.deleteOrphanedRecords(ctx, rkeys, result); err != nil { + return nil, fmt.Errorf("delete orphaned records: %w", err) + } + result.Duration = time.Since(start) + + gc.mu.Lock() + gc.lastResult = result + gc.lastResultAt = time.Now() + gc.mu.Unlock() + + return result, nil +} + +// DeleteOrphanedBlobs walks S3 and deletes blobs not referenced by any manifest. +// Runs a fresh analysis to build the current referenced set (reflects any reconciliation +// done since the last preview). +func (gc *GarbageCollector) DeleteOrphanedBlobs(ctx context.Context) (*GCResult, error) { + if !gc.tryStart() { + return nil, fmt.Errorf("GC operation already in progress") + } + defer gc.finish() + + start := time.Now() + result := &GCResult{} + + gc.logger.Info("Starting orphaned blob deletion (fresh analysis)") + + // Fresh analysis so the referenced set includes any records reconciled since preview + analysis, err := gc.analyzeRecords(ctx) + if err != nil { + return nil, fmt.Errorf("analyze records: %w", err) + } + + result.ReferencedBlobs = int64(len(analysis.referenced)) + + if err := gc.deleteOrphanedBlobs(ctx, analysis.referenced, result); err != nil { + return nil, fmt.Errorf("delete orphaned blobs: %w", err) + } + result.Duration = time.Since(start) + + gc.mu.Lock() + gc.lastResult = result + gc.lastResultAt = time.Now() + gc.mu.Unlock() + + return result, nil +} + // analyzeRecords performs Phase 1 analysis: builds referenced set, finds orphaned records, // and identifies missing layer records. Pure analysis — no mutations. // Discovers users, fetches manifests, scans records, identifies missing records. diff --git a/pkg/hold/oci/xrpc.go b/pkg/hold/oci/xrpc.go index bb440cf..909a122 100644 --- a/pkg/hold/oci/xrpc.go +++ b/pkg/hold/oci/xrpc.go @@ -380,8 +380,8 @@ func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Reques } } - // Enqueue scan job if scanner is connected - if h.scanBroadcaster != nil { + // Enqueue scan job if scanner is connected (skip manifest lists — children get their own jobs) + if h.scanBroadcaster != nil && !isMultiArch { tier := "deckhand" if stats != nil && stats.Tier != "" { tier = stats.Tier diff --git a/pkg/hold/pds/scan_broadcaster.go b/pkg/hold/pds/scan_broadcaster.go index 12d7bdd..9d536ed 100644 --- a/pkg/hold/pds/scan_broadcaster.go +++ b/pkg/hold/pds/scan_broadcaster.go @@ -551,9 +551,37 @@ func (sb *ScanBroadcaster) handleResult(sub *ScanSubscriber, msg ScannerMessage) "total", msg.Summary.Total) } -// handleError marks a job as failed +// handleError marks a job as failed and creates a scan record so the proactive +// scanner treats it as "stale" rather than "never scanned" (avoids retry loops). func (sb *ScanBroadcaster) handleError(sub *ScanSubscriber, msg ScannerMessage) { - _, err := sb.db.Exec(` + ctx := context.Background() + + // Get job details to create failure scan record + var manifestDigest, repository, userDID string + err := sb.db.QueryRow(` + SELECT manifest_digest, repository, user_did + FROM scan_jobs WHERE seq = ? + `, msg.Seq).Scan(&manifestDigest, &repository, &userDID) + if err != nil { + slog.Error("Failed to get job details for failure record", + "seq", msg.Seq, "error", err) + } else { + // Create a scan record with zero counts and nil blobs — marks it as + // "scanned" so the proactive scheduler won't retry until rescan interval + scanRecord := atproto.NewScanRecord( + manifestDigest, repository, userDID, + nil, nil, // no SBOM or vuln report + 0, 0, 0, 0, 0, + "failed: "+truncateError(msg.Error, 200), + ) + if _, _, err := sb.pds.CreateScanRecord(ctx, scanRecord); err != nil { + slog.Error("Failed to store failure scan record", + "seq", msg.Seq, "error", err) + } + } + + // Mark job as failed + _, err = sb.db.Exec(` UPDATE scan_jobs SET status = 'failed', completed_at = ? WHERE seq = ? `, time.Now(), msg.Seq) @@ -569,6 +597,13 @@ func (sb *ScanBroadcaster) handleError(sub *ScanSubscriber, msg ScannerMessage) "error", msg.Error) } +func truncateError(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] +} + // drainPendingJobs sends pending/timed-out jobs to a newly connected scanner. // Collects all pending rows first, closes cursor, then assigns and dispatches // to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY). @@ -650,12 +685,25 @@ func (sb *ScanBroadcaster) reDispatchLoop() { } } -// reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout. +// reDispatchTimedOut finds jobs that were assigned but not acked/completed within timeout, +// and also marks stuck processing jobs as failed. // Collects timed-out rows first, closes cursor, then resets and re-dispatches // to avoid holding a SELECT cursor open during UPDATEs (prevents SQLite BUSY). func (sb *ScanBroadcaster) reDispatchTimedOut() { timeout := time.Now().Add(-sb.ackTimeout) + // Fail processing jobs stuck for >10 minutes (scanner likely crashed mid-scan) + processingTimeout := time.Now().Add(-10 * time.Minute) + res, err := sb.db.Exec(` + UPDATE scan_jobs SET status = 'failed', completed_at = ? + WHERE status = 'processing' AND assigned_at < ? + `, time.Now(), processingTimeout) + if err != nil { + slog.Error("Failed to clean up stuck processing jobs", "error", err) + } else if n, _ := res.RowsAffected(); n > 0 { + slog.Warn("Cleaned up stuck processing jobs", "count", n) + } + rows, err := sb.db.Query(` SELECT seq, manifest_digest, repository, tag, user_did, user_handle, hold_did, hold_endpoint, tier, config_json, layers_json FROM scan_jobs @@ -798,13 +846,17 @@ func (sb *ScanBroadcaster) refreshManifestDIDs() { func (sb *ScanBroadcaster) proactiveScanLoop() { defer sb.wg.Done() - // Wait a bit before starting to let the system settle + // Wait for the system to settle and DID list to populate select { case <-sb.stopCh: return - case <-time.After(30 * time.Second): + case <-time.After(45 * time.Second): } + // Run immediately on startup, then every 60s + slog.Info("Proactive scan loop started") + sb.tryEnqueueProactiveScan() + ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() @@ -824,9 +876,11 @@ func (sb *ScanBroadcaster) proactiveScanLoop() { // Uses the cached DID list from the relay (refreshed by refreshManifestDIDsLoop). func (sb *ScanBroadcaster) tryEnqueueProactiveScan() { if !sb.hasConnectedScanners() { + slog.Debug("Proactive scan: no scanners connected, skipping") return } if sb.hasActiveJobs() { + slog.Debug("Proactive scan: active jobs in queue, skipping") return } @@ -839,6 +893,7 @@ func (sb *ScanBroadcaster) tryEnqueueProactiveScan() { sb.manifestDIDsMu.RUnlock() if len(userDIDs) == 0 { + slog.Debug("Proactive scan: no manifest DIDs cached from relay, skipping") return } @@ -854,8 +909,17 @@ func (sb *ScanBroadcaster) tryEnqueueProactiveScan() { } } +// scanCandidate is a manifest that needs scanning, with its scan freshness. +type scanCandidate struct { + manifest atproto.ManifestRecord + userDID string + userHandle string + scannedAt time.Time // zero value = never scanned +} + // tryEnqueueForUser fetches manifests from a user's PDS and enqueues a scan for the -// first one that needs scanning. Returns true if a job was enqueued. +// one that most needs it: never-scanned manifests first, then the stalest scan. +// Returns true if a job was enqueued. func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string) bool { // Resolve user DID to PDS endpoint and handle did, userHandle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, userDID) @@ -865,7 +929,10 @@ func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string return false } - // Fetch manifest records from user's PDS + // Collect all scannable manifests with their scan age + var unscanned []scanCandidate + var oldest *scanCandidate + client := atproto.NewClient(pdsEndpoint, did, "") var cursor string for { @@ -879,8 +946,6 @@ func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string for _, record := range records { var manifest atproto.ManifestRecord if err := json.Unmarshal(record.Value, &manifest); err != nil { - slog.Debug("Proactive scan: failed to unmarshal manifest record", - "uri", record.URI, "error", err) continue } @@ -898,39 +963,48 @@ func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string continue } - // Skip if config is nil (shouldn't happen for image manifests, but be safe) + // Skip if config is nil if manifest.Config == nil { continue } - // Check if already scanned recently - if sb.isRecentlyScanned(ctx, manifest.Digest) { + // Check scan status + _, scanRecord, err := sb.pds.GetScanRecord(ctx, manifest.Digest) + if err != nil { + // No scan record — never scanned + unscanned = append(unscanned, scanCandidate{ + manifest: manifest, + userDID: did, + userHandle: userHandle, + }) continue } - // Construct and enqueue scan job - configJSON, _ := json.Marshal(manifest.Config) - layersJSON, _ := json.Marshal(manifest.Layers) - - slog.Info("Enqueuing proactive scan", - "manifestDigest", manifest.Digest, - "repository", manifest.Repository, - "userDID", did) - - if err := sb.Enqueue(&ScanJobEvent{ - ManifestDigest: manifest.Digest, - Repository: manifest.Repository, - UserDID: did, - UserHandle: userHandle, - Tier: "deckhand", - Config: configJSON, - Layers: layersJSON, - }); err != nil { - slog.Error("Proactive scan: failed to enqueue", - "manifest", manifest.Digest, "error", err) - return false + scannedAt, err := time.Parse(time.RFC3339, scanRecord.ScannedAt) + if err != nil { + // Can't parse timestamp — treat as never scanned + unscanned = append(unscanned, scanCandidate{ + manifest: manifest, + userDID: did, + userHandle: userHandle, + }) + continue + } + + // Skip if scanned recently + if time.Since(scannedAt) < sb.rescanInterval { + continue + } + + // Stale scan — track the oldest + if oldest == nil || scannedAt.Before(oldest.scannedAt) { + oldest = &scanCandidate{ + manifest: manifest, + userDID: did, + userHandle: userHandle, + scannedAt: scannedAt, + } } - return true } if nextCursor == "" || len(records) == 0 { @@ -939,7 +1013,46 @@ func (sb *ScanBroadcaster) tryEnqueueForUser(ctx context.Context, userDID string cursor = nextCursor } - return false + // Prefer never-scanned, then oldest stale scan + var pick *scanCandidate + if len(unscanned) > 0 { + pick = &unscanned[0] + } else if oldest != nil { + pick = oldest + } + + if pick == nil { + return false + } + + configJSON, _ := json.Marshal(pick.manifest.Config) + layersJSON, _ := json.Marshal(pick.manifest.Layers) + + reason := "never scanned" + if !pick.scannedAt.IsZero() { + reason = fmt.Sprintf("last scanned %s ago", time.Since(pick.scannedAt).Truncate(time.Minute)) + } + + slog.Info("Enqueuing proactive scan", + "manifestDigest", pick.manifest.Digest, + "repository", pick.manifest.Repository, + "userDID", pick.userDID, + "reason", reason) + + if err := sb.Enqueue(&ScanJobEvent{ + ManifestDigest: pick.manifest.Digest, + Repository: pick.manifest.Repository, + UserDID: pick.userDID, + UserHandle: pick.userHandle, + Tier: "deckhand", + Config: configJSON, + Layers: layersJSON, + }); err != nil { + slog.Error("Proactive scan: failed to enqueue", + "manifest", pick.manifest.Digest, "error", err) + return false + } + return true } // isOurManifest checks if a manifest's holdDID matches this hold directly, @@ -1028,21 +1141,6 @@ func (sb *ScanBroadcaster) checkPredecessor(ctx context.Context, holdDID string) return false } -// isRecentlyScanned checks if a manifest has been scanned within the rescan interval. -func (sb *ScanBroadcaster) isRecentlyScanned(ctx context.Context, manifestDigest string) bool { - _, scanRecord, err := sb.pds.GetScanRecord(ctx, manifestDigest) - if err != nil { - return false // Not scanned or error reading → needs scanning - } - - scannedAt, err := time.Parse(time.RFC3339, scanRecord.ScannedAt) - if err != nil { - return false // Can't parse timestamp → treat as needing scan - } - - return time.Since(scannedAt) < sb.rescanInterval -} - // hasConnectedScanners returns true if at least one scanner is connected. func (sb *ScanBroadcaster) hasConnectedScanners() bool { sb.mu.RLock() diff --git a/scanner/go.mod b/scanner/go.mod index 8dd34ed..c98daa7 100644 --- a/scanner/go.mod +++ b/scanner/go.mod @@ -56,7 +56,7 @@ require ( github.com/anchore/go-sync v0.0.0-20260122203928-582959aeb913 // indirect github.com/anchore/go-version v1.2.2-0.20210903204242-51efa5b487c4 // indirect github.com/anchore/packageurl-go v0.1.1-0.20250220190351-d62adb6e1115 // indirect - github.com/anchore/stereoscope v0.1.20 // indirect + github.com/anchore/stereoscope v0.1.20 github.com/andybalholm/brotli v1.2.0 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/aquasecurity/go-pep440-version v0.0.1 // indirect @@ -181,7 +181,7 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/kastenhq/goversion v0.0.0-20230811215019-93b2f8823953 // indirect github.com/kevinburke/ssh_config v1.4.0 // indirect - github.com/klauspost/compress v1.18.4 + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/klauspost/pgzip v1.2.6 // indirect github.com/knqyf263/go-apk-version v0.0.0-20200609155635-041fdbb8563f // indirect diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go index 661982c..7b21d91 100644 --- a/scanner/internal/config/config.go +++ b/scanner/internal/config/config.go @@ -54,6 +54,9 @@ type VulnConfig struct { // Directory for temporary layer extraction. TmpDir string `yaml:"tmp_dir" comment:"Directory for temporary layer extraction."` + + // Maximum total compressed image size in bytes. Images exceeding this are skipped. 0 = no limit. + MaxImageSize int64 `yaml:"max_image_size" comment:"Maximum total compressed image size in bytes. 0 = no limit. Default: 2 GiB."` } // setScannerDefaults registers all default values on the given Viper instance. @@ -76,6 +79,7 @@ func setScannerDefaults(v *viper.Viper) { v.SetDefault("vuln.enabled", true) v.SetDefault("vuln.db_path", "/var/lib/atcr-scanner/vulndb") v.SetDefault("vuln.tmp_dir", "/var/lib/atcr-scanner/tmp") + v.SetDefault("vuln.max_image_size", 2*1024*1024*1024) // 2 GiB // Log shipper defaults v.SetDefault("log_shipper.batch_size", 100) diff --git a/scanner/internal/scan/extractor.go b/scanner/internal/scan/extractor.go index 3a19db5..b36d281 100644 --- a/scanner/internal/scan/extractor.go +++ b/scanner/internal/scan/extractor.go @@ -1,11 +1,9 @@ package scan import ( - "archive/tar" - "compress/gzip" + "crypto/sha256" "encoding/json" "fmt" - "io" "log/slog" "os" "path/filepath" @@ -13,12 +11,42 @@ import ( scanner "atcr.io/scanner" "atcr.io/scanner/internal/client" - "github.com/klauspost/compress/zstd" ) -// extractLayers downloads and extracts all image layers via presigned URLs -// Returns the rootfs directory path and a cleanup function -func extractLayers(job *scanner.ScanJob, tmpDir, secret string) (string, func(), error) { +// OCI image layout types for constructing the layout on disk. +type ociDescriptor struct { + MediaType string `json:"mediaType"` + Digest string `json:"digest"` + Size int64 `json:"size"` +} + +type ociManifest struct { + SchemaVersion int `json:"schemaVersion"` + MediaType string `json:"mediaType,omitempty"` + Config ociDescriptor `json:"config"` + Layers []ociDescriptor `json:"layers"` +} + +type ociIndex struct { + SchemaVersion int `json:"schemaVersion"` + Manifests []ociDescriptor `json:"manifests"` +} + +// buildOCILayout downloads image blobs and constructs an OCI image layout directory. +// Instead of extracting layers to a rootfs (which requires decompression and causes +// permission/security issues), this writes compressed blobs directly and lets Syft's +// stereoscope handle layer processing internally. +// +// Layout structure: +// +// scan-*/ +// ├── oci-layout +// ├── index.json +// └── blobs/sha256/ +// ├── +// ├── +// └── ... +func buildOCILayout(job *scanner.ScanJob, tmpDir, secret string) (string, func(), error) { scanDir, err := os.MkdirTemp(tmpDir, "scan-*") if err != nil { return "", nil, fmt.Errorf("failed to create temp directory: %w", err) @@ -30,194 +58,139 @@ func extractLayers(job *scanner.ScanJob, tmpDir, secret string) (string, func(), } } - imageDir := filepath.Join(scanDir, "image") - rootfsDir := filepath.Join(imageDir, "rootfs") - layersDir := filepath.Join(imageDir, "layers") - - for _, dir := range []string{rootfsDir, layersDir} { - if err := os.MkdirAll(dir, 0755); err != nil { - cleanup() - return "", nil, fmt.Errorf("failed to create directory %s: %w", dir, err) - } + blobsDir := filepath.Join(scanDir, "blobs", "sha256") + if err := os.MkdirAll(blobsDir, 0755); err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to create blobs directory: %w", err) } - // Download and validate config blob + // Download config blob if job.Config.Digest == "" { cleanup() return "", nil, fmt.Errorf("config blob has empty digest, cannot download") } slog.Info("Downloading config blob", "digest", job.Config.Digest) - configPath := filepath.Join(imageDir, "config.json") - if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, job.Config.Digest, configPath, secret); err != nil { + if err := downloadBlob(job, job.Config.Digest, blobsDir, secret); err != nil { cleanup() return "", nil, fmt.Errorf("failed to download config blob: %w", err) } - configData, err := os.ReadFile(configPath) - if err != nil { - cleanup() - return "", nil, fmt.Errorf("failed to read config: %w", err) - } - var configObj map[string]interface{} - if err := json.Unmarshal(configData, &configObj); err != nil { - cleanup() - return "", nil, fmt.Errorf("invalid config JSON: %w", err) - } - - // Download and extract each layer + // Download layer blobs (no extraction — kept compressed) for i, layer := range job.Layers { if layer.Digest == "" { slog.Warn("Skipping layer with empty digest", "index", i) continue } - // Skip non-tar layers (cosign signatures, attestations, etc.) + // Skip non-tar layers (cosign signatures, in-toto attestations, etc.) if layer.MediaType != "" && !strings.Contains(layer.MediaType, "tar") { slog.Info("Skipping non-tar layer", "index", i, "digest", layer.Digest, "mediaType", layer.MediaType) continue } - slog.Info("Extracting layer", "index", i, "digest", layer.Digest, "size", layer.Size, "mediaType", layer.MediaType) - - layerPath := filepath.Join(layersDir, fmt.Sprintf("layer-%d", i)) - if err := downloadBlobViaPresignedURL(job.HoldEndpoint, job.HoldDID, layer.Digest, layerPath, secret); err != nil { + slog.Info("Downloading layer", "index", i, "digest", layer.Digest, "size", layer.Size, "mediaType", layer.MediaType) + if err := downloadBlob(job, layer.Digest, blobsDir, secret); err != nil { cleanup() return "", nil, fmt.Errorf("failed to download layer %d: %w", i, err) } + } - if err := extractLayer(layerPath, rootfsDir, layer.MediaType); err != nil { - cleanup() - return "", nil, fmt.Errorf("failed to extract layer %d: %w", i, err) + // Build OCI manifest from job descriptors + manifest := ociManifest{ + SchemaVersion: 2, + MediaType: "application/vnd.oci.image.manifest.v1+json", + Config: ociDescriptor{ + MediaType: defaultMediaType(job.Config.MediaType, "application/vnd.oci.image.config.v1+json"), + Digest: job.Config.Digest, + Size: job.Config.Size, + }, + Layers: make([]ociDescriptor, 0, len(job.Layers)), + } + for _, layer := range job.Layers { + if layer.Digest == "" { + continue } - - // Remove layer file to save space - os.Remove(layerPath) + if layer.MediaType != "" && !strings.Contains(layer.MediaType, "tar") { + continue + } + manifest.Layers = append(manifest.Layers, ociDescriptor{ + MediaType: defaultMediaType(layer.MediaType, "application/vnd.oci.image.layer.v1.tar+gzip"), + Digest: layer.Digest, + Size: layer.Size, + }) } - entries, err := os.ReadDir(rootfsDir) + // Write manifest blob + manifestJSON, err := json.Marshal(manifest) if err != nil { - slog.Warn("Failed to read rootfs directory", "error", err) - } else { - slog.Info("Successfully extracted image", - "layers", len(job.Layers), - "topLevelEntries", len(entries)) + cleanup() + return "", nil, fmt.Errorf("failed to marshal manifest: %w", err) + } + manifestHash := sha256.Sum256(manifestJSON) + manifestDigest := fmt.Sprintf("sha256:%x", manifestHash) + manifestPath := filepath.Join(blobsDir, fmt.Sprintf("%x", manifestHash)) + if err := os.WriteFile(manifestPath, manifestJSON, 0644); err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to write manifest blob: %w", err) } - return rootfsDir, cleanup, nil + // Write index.json + index := ociIndex{ + SchemaVersion: 2, + Manifests: []ociDescriptor{ + { + MediaType: "application/vnd.oci.image.manifest.v1+json", + Digest: manifestDigest, + Size: int64(len(manifestJSON)), + }, + }, + } + indexJSON, err := json.Marshal(index) + if err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to marshal index: %w", err) + } + if err := os.WriteFile(filepath.Join(scanDir, "index.json"), indexJSON, 0644); err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to write index.json: %w", err) + } + + // Write oci-layout file + ociLayout := []byte(`{"imageLayoutVersion":"1.0.0"}`) + if err := os.WriteFile(filepath.Join(scanDir, "oci-layout"), ociLayout, 0644); err != nil { + cleanup() + return "", nil, fmt.Errorf("failed to write oci-layout: %w", err) + } + + slog.Info("OCI layout built", + "dir", scanDir, + "layers", len(manifest.Layers), + "manifestDigest", manifestDigest) + + return scanDir, cleanup, nil } -// downloadBlobViaPresignedURL gets a presigned URL from the hold and downloads the blob -func downloadBlobViaPresignedURL(holdEndpoint, holdDID, digest, destPath, secret string) error { - presignedURL, err := client.GetBlobPresignedURL(holdEndpoint, holdDID, digest, secret) +// downloadBlob downloads a blob by digest to the blobs directory using presigned URLs. +func downloadBlob(job *scanner.ScanJob, digest, blobsDir, secret string) error { + hex := digestHex(digest) + destPath := filepath.Join(blobsDir, hex) + + presignedURL, err := client.GetBlobPresignedURL(job.HoldEndpoint, job.HoldDID, digest, secret) if err != nil { return fmt.Errorf("failed to get presigned URL for %s: %w", digest, err) } return client.DownloadBlob(presignedURL, destPath) } -// extractLayer extracts a layer tar archive to a destination directory (overlayfs style). -// Supports gzip, zstd, and uncompressed tar based on the OCI media type. -// Falls back to header sniffing if the media type is unrecognized. -func extractLayer(layerPath, destDir, mediaType string) error { - file, err := os.Open(layerPath) - if err != nil { - return fmt.Errorf("failed to open layer: %w", err) +// digestHex extracts the hex portion from a digest string (e.g., "sha256:abc123" → "abc123"). +func digestHex(digest string) string { + if _, hex, ok := strings.Cut(digest, ":"); ok { + return hex } - defer file.Close() - - var tarReader io.Reader - - switch { - case strings.Contains(mediaType, "zstd"): - decoder, err := zstd.NewReader(file) - if err != nil { - return fmt.Errorf("failed to create zstd reader: %w", err) - } - defer decoder.Close() - tarReader = decoder - - case strings.Contains(mediaType, "gzip") || mediaType == "": - // Default to gzip for unspecified media types (most common) - gzr, err := gzip.NewReader(file) - if err != nil { - // If gzip fails, try plain tar (header sniff fallback) - if _, seekErr := file.Seek(0, io.SeekStart); seekErr != nil { - return fmt.Errorf("failed to create gzip reader: %w", err) - } - slog.Debug("Gzip header invalid, falling back to plain tar", "mediaType", mediaType) - tarReader = file - } else { - defer gzr.Close() - tarReader = gzr - } - - default: - // Uncompressed tar or unknown — try plain tar - tarReader = file - } - - tr := tar.NewReader(tarReader) - - for { - header, err := tr.Next() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to read tar header: %w", err) - } - - target := filepath.Join(destDir, filepath.Clean(header.Name)) - - // Security: ensure target is within destDir - if !strings.HasPrefix(target, filepath.Clean(destDir)+string(os.PathSeparator)) { - slog.Warn("Skipping path outside destination", "path", header.Name) - continue - } - - switch header.Typeflag { - case tar.TypeDir: - // Always set owner write bit so we can create files inside (e.g. Go module - // cache dirs are 0555 in images, which would block subsequent writes) - if err := os.MkdirAll(target, os.FileMode(header.Mode)|0200); err != nil { - return fmt.Errorf("failed to create directory %s: %w", target, err) - } - - case tar.TypeReg: - if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { - return fmt.Errorf("failed to create parent directory: %w", err) - } - outFile, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.FileMode(header.Mode)) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", target, err) - } - if _, err := io.Copy(outFile, tr); err != nil { - outFile.Close() - return fmt.Errorf("failed to write file %s: %w", target, err) - } - outFile.Close() - - case tar.TypeSymlink: - if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { - return fmt.Errorf("failed to create parent directory for symlink: %w", err) - } - os.Remove(target) - if err := os.Symlink(header.Linkname, target); err != nil { - slog.Warn("Failed to create symlink", "target", target, "link", header.Linkname, "error", err) - } - - case tar.TypeLink: - linkTarget := filepath.Join(destDir, filepath.Clean(header.Linkname)) - if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { - return fmt.Errorf("failed to create parent directory for hardlink: %w", err) - } - os.Remove(target) - if err := os.Link(linkTarget, target); err != nil { - slog.Warn("Failed to create hardlink", "target", target, "link", linkTarget, "error", err) - } - - default: - slog.Debug("Skipping unsupported tar entry type", "type", header.Typeflag, "name", header.Name) - } - } - - return nil + return digest +} + +func defaultMediaType(mediaType, fallback string) string { + if mediaType == "" { + return fallback + } + return mediaType } diff --git a/scanner/internal/scan/syft.go b/scanner/internal/scan/syft.go index 725efeb..dfc46f3 100644 --- a/scanner/internal/scan/syft.go +++ b/scanner/internal/scan/syft.go @@ -5,30 +5,40 @@ import ( "crypto/sha256" "fmt" "log/slog" - "os" + "github.com/anchore/stereoscope/pkg/file" + "github.com/anchore/stereoscope/pkg/image/oci" "github.com/anchore/syft/syft" "github.com/anchore/syft/syft/format" "github.com/anchore/syft/syft/format/spdxjson" "github.com/anchore/syft/syft/sbom" - "github.com/anchore/syft/syft/source/directorysource" + "github.com/anchore/syft/syft/source/stereoscopesource" ) -// generateSBOM generates an SBOM using Syft from an extracted image directory -// Returns the SBOM object, SBOM JSON bytes, and its digest -func generateSBOM(ctx context.Context, imageDir string) (*sbom.SBOM, []byte, string, error) { - slog.Info("Generating SBOM with Syft", "imageDir", imageDir) +// generateSBOM generates an SBOM using Syft from an OCI image layout directory. +// Returns the SBOM object, SBOM JSON bytes, and its digest. +func generateSBOM(ctx context.Context, ociLayoutDir string) (*sbom.SBOM, []byte, string, error) { + slog.Info("Generating SBOM with Syft", "ociLayout", ociLayoutDir) - entries, err := os.ReadDir(imageDir) - if err != nil { - return nil, nil, "", fmt.Errorf("failed to read image directory: %w", err) - } - slog.Info("Image directory contents", "path", imageDir, "entries", len(entries)) + // Create stereoscope OCI directory provider + tmpGen := file.NewTempDirGenerator("syft-scan") + defer tmpGen.Cleanup() - src, err := directorysource.NewFromPath(imageDir) + provider := oci.NewDirectoryProvider(tmpGen, ociLayoutDir) + img, err := provider.Provide(ctx) if err != nil { - return nil, nil, "", fmt.Errorf("failed to create Syft source: %w", err) + return nil, nil, "", fmt.Errorf("failed to load OCI image: %w", err) } + defer img.Cleanup() + + if err := img.Read(); err != nil { + return nil, nil, "", fmt.Errorf("failed to read OCI image: %w", err) + } + + // Wrap in Syft source + src := stereoscopesource.New(img, stereoscopesource.ImageConfig{ + Reference: ociLayoutDir, + }) defer src.Close() slog.Info("Running Syft cataloging") diff --git a/scanner/internal/scan/worker.go b/scanner/internal/scan/worker.go index ea5b105..9c1db89 100644 --- a/scanner/internal/scan/worker.go +++ b/scanner/internal/scan/worker.go @@ -45,6 +45,10 @@ func (wp *WorkerPool) Start(ctx context.Context) { }() } + // Point TMPDIR at the configured tmp dir so stereoscope's internal + // layer extraction uses the same partition (not /tmp which may be small) + os.Setenv("TMPDIR", wp.cfg.Vuln.TmpDir) + for i := 0; i < wp.cfg.Scanner.Workers; i++ { wp.wg.Add(1) go wp.worker(ctx, i) @@ -104,17 +108,29 @@ func (wp *WorkerPool) processJob(ctx context.Context, job *scanner.ScanJob) (*sc return nil, fmt.Errorf("failed to create tmp dir: %w", err) } - // Step 1: Extract image layers from hold via presigned URLs - slog.Info("Extracting image layers", "repository", job.Repository) - imageDir, cleanup, err := extractLayers(job, wp.cfg.Vuln.TmpDir, wp.cfg.Hold.Secret) + // Check total compressed image size before downloading + if wp.cfg.Vuln.MaxImageSize > 0 { + var totalSize int64 + for _, layer := range job.Layers { + totalSize += layer.Size + } + totalSize += job.Config.Size + if totalSize > wp.cfg.Vuln.MaxImageSize { + return nil, fmt.Errorf("image too large: %d bytes compressed (limit %d bytes)", totalSize, wp.cfg.Vuln.MaxImageSize) + } + } + + // Step 1: Build OCI image layout from hold via presigned URLs + slog.Info("Building OCI layout", "repository", job.Repository) + ociLayoutDir, cleanup, err := buildOCILayout(job, wp.cfg.Vuln.TmpDir, wp.cfg.Hold.Secret) if err != nil { - return nil, fmt.Errorf("failed to extract layers: %w", err) + return nil, fmt.Errorf("failed to build OCI layout: %w", err) } defer cleanup() // Step 2: Generate SBOM with Syft slog.Info("Generating SBOM", "repository", job.Repository) - sbomResult, sbomJSON, sbomDigest, err := generateSBOM(ctx, imageDir) + sbomResult, sbomJSON, sbomDigest, err := generateSBOM(ctx, ociLayoutDir) if err != nil { return nil, fmt.Errorf("failed to generate SBOM: %w", err) } @@ -127,7 +143,7 @@ func (wp *WorkerPool) processJob(ctx context.Context, job *scanner.ScanJob) (*sc // Step 3: Scan SBOM with Grype (if enabled) if wp.cfg.Vuln.Enabled { - slog.Info("Scanning for vulnerabilities", "repository", job.Repository) + slog.Info("Scanning for vulnerabilities", "repository", job.Repository, "handle", job.UserHandle) vulnJSON, vulnDigest, summary, err := scanVulnerabilities(ctx, sbomResult, wp.cfg.Vuln.DBPath) if err != nil { return nil, fmt.Errorf("failed to scan vulnerabilities: %w", err)