Files
at-container-registry/docs/VALKEY_MIGRATION.md

13 KiB

Analysis: AppView SQL Database Usage

Overview

The AppView uses SQLite with 19 tables. The key finding: most data is a cache of ATProto records that could theoretically be rebuilt from users' PDS instances.

Data Categories

1. MUST PERSIST (Local State Only)

These tables contain data that cannot be reconstructed from external sources:

Table Purpose Why It Must Persist
oauth_sessions OAuth tokens Refresh tokens are stateful; losing them = users must re-auth
ui_sessions Web browser sessions Session continuity for logged-in users
devices Approved devices + bcrypt secrets User authorization decisions; secrets are one-way hashed
pending_device_auth In-flight auth flows Short-lived (10min) but critical during auth
oauth_auth_requests OAuth flow state Short-lived but required for auth completion
repository_stats Pull/push counts Locally tracked metrics - not stored in ATProto

2. CACHED FROM PDS (Rebuildable)

These tables are essentially a read-through cache of ATProto data:

Table Source ATProto Collection
users User's PDS profile app.bsky.actor.profile + DID document
manifests User's PDS io.atcr.manifest records
tags User's PDS io.atcr.tag records
layers Derived from manifests Parsed from manifest content
manifest_references Derived from manifest lists Parsed from multi-arch manifests
repository_annotations Manifest config blob OCI annotations from config
repo_pages User's PDS io.atcr.repo.page records
stars User's PDS io.atcr.sailor.star records (synced via Jetstream)
hold_captain_records Hold's embedded PDS io.atcr.hold.captain records
hold_crew_approvals Hold's embedded PDS io.atcr.hold.crew records
hold_crew_denials Local authorization cache Could re-check on demand

3. OPERATIONAL

Table Purpose
schema_migrations Migration tracking
firehose_cursor Jetstream position (can restart from 0)

Key Insights

What's Actually Unique to AppView?

  1. Authentication state - OAuth sessions, devices, UI sessions
  2. Engagement metrics - Pull/push counts (locally tracked, not in ATProto)

What Could Be Eliminated?

If ATCR fully embraced the ATProto model:

  1. users - Query PDS on demand (with caching)
  2. manifests, tags, layers - Query PDS on demand (with caching)
  3. repository_annotations - Fetch manifest config on demand
  4. repo_pages - Query PDS on demand
  5. hold_* tables - Query hold's PDS on demand

Trade-offs

Current approach (heavy caching):

  • Fast queries for UI (search, browse, stats)
  • Offline resilience (PDS down doesn't break UI)
  • Complex sync logic (Jetstream consumer, backfill)
  • State can diverge from source of truth

Lighter approach (query on demand):

  • Always fresh data
  • Simpler codebase (no sync)
  • Slower queries (network round-trips)
  • Depends on PDS availability

Current Limitation: No Cache-Miss Queries

Finding: There's no "query PDS on cache miss" logic. Users/manifests only enter the DB via:

  1. OAuth login (user authenticates)
  2. Jetstream events (firehose activity)

Problem: If someone visits atcr.io/alice/myapp before alice is indexed → 404

Where this happens:

  • pkg/appview/handlers/repository.go:50-53: If db.GetUserByDID() returns nil → 404
  • No fallback to atproto.Client.ListRecords() or similar

This matters for Valkey migration: If cache is ephemeral and restarts clear it, you need cache-miss logic to repopulate on demand. Otherwise:

  • Restart Valkey → all users/manifests gone
  • Wait for Jetstream to re-index OR implement cache-miss queries

Cache-miss implementation design:

Existing code to reuse: pkg/appview/jetstream/processor.go:43-97 (EnsureUser)

// New: pkg/appview/cache/loader.go

type Loader struct {
    cache  Cache  // Valkey interface
    client *atproto.Client
}

// GetUser with cache-miss fallback
func (l *Loader) GetUser(ctx context.Context, did string) (*User, error) {
    // 1. Try cache
    if user := l.cache.GetUser(did); user != nil {
        return user, nil
    }

    // 2. Cache miss - resolve identity (already queries network)
    _, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
    if err != nil {
        return nil, err  // User doesn't exist in network
    }

    // 3. Fetch profile for avatar
    client := atproto.NewClient(pdsEndpoint, "", "")
    profile, _ := client.GetProfileRecord(ctx, did)
    avatarURL := ""
    if profile != nil && profile.Avatar != nil {
        avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
    }

    // 4. Cache and return
    user := &User{DID: did, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL}
    l.cache.SetUser(user, 1*time.Hour)
    return user, nil
}

// GetManifestsForRepo with cache-miss fallback
func (l *Loader) GetManifestsForRepo(ctx context.Context, did, repo string) ([]Manifest, error) {
    cacheKey := fmt.Sprintf("manifests:%s:%s", did, repo)

    // 1. Try cache
    if cached := l.cache.Get(cacheKey); cached != nil {
        return cached.([]Manifest), nil
    }

    // 2. Cache miss - get user's PDS endpoint
    user, err := l.GetUser(ctx, did)
    if err != nil {
        return nil, err
    }

    // 3. Query PDS for manifests
    client := atproto.NewClient(user.PDSEndpoint, "", "")
    records, _, err := client.ListRecordsForRepo(ctx, did, atproto.ManifestCollection, 100, "")
    if err != nil {
        return nil, err
    }

    // 4. Filter by repository and parse
    var manifests []Manifest
    for _, rec := range records {
        var m atproto.ManifestRecord
        if err := json.Unmarshal(rec.Value, &m); err != nil {
            continue
        }
        if m.Repository == repo {
            manifests = append(manifests, convertManifest(m))
        }
    }

    // 5. Cache and return
    l.cache.Set(cacheKey, manifests, 10*time.Minute)
    return manifests, nil
}

Handler changes:

// Before (repository.go:45-53):
owner, err := db.GetUserByDID(h.DB, did)
if owner == nil {
    RenderNotFound(w, r, h.Templates, h.RegistryURL)
    return
}

// After:
owner, err := h.Loader.GetUser(r.Context(), did)
if err != nil {
    RenderNotFound(w, r, h.Templates, h.RegistryURL)
    return
}

Performance considerations:

  • Cache hit: ~1ms (Valkey lookup)
  • Cache miss: ~200-500ms (PDS round-trip)
  • First request after restart: slower but correct
  • Jetstream still useful for proactive warming

Proposed Architecture: Valkey + ATProto

Goal

Replace SQLite with Valkey (Redis-compatible) for ephemeral state, push remaining persistent data to ATProto.

What goes to Valkey (ephemeral, TTL-based)

Current Table Valkey Key Pattern TTL Notes
oauth_sessions oauth:{did}:{session_id} 90 days Lost on restart = re-auth
ui_sessions ui:{session_id} Session duration Lost on restart = re-login
oauth_auth_requests authreq:{state} 10 min In-flight flows
pending_device_auth pending:{device_code} 10 min In-flight flows
firehose_cursor cursor:jetstream None Can restart from 0
All PDS cache tables cache:{collection}:{did}:{rkey} 10-60 min Query PDS on miss

Benefits:

  • Multi-instance ready (shared Valkey)
  • No schema migrations
  • Natural TTL expiry
  • Simpler code (no SQL)

What could become ATProto records

Current Table Proposed Collection Where Stored Open Questions
devices io.atcr.sailor.device User's PDS Privacy: IP, user-agent sensitive?
repository_stats io.atcr.repo.stats Hold's PDS or User's PDS Who owns the stats?

Devices → Valkey:

  • Move current device table to Valkey
  • Key: device:{did}:{device_id}{name, secret_hash, ip, user_agent, created_at, last_used}
  • TTL: Long (1 year?) or no expiry
  • Device list: devices:{did} → Set of device IDs
  • Secret validation works the same, just different backend

Service auth exploration (future): The challenge with pure ATProto service auth is the AppView still needs the user's OAuth session to write manifests to their PDS. The current flow:

  1. User authenticates via OAuth → AppView gets OAuth tokens
  2. AppView issues registry JWT to credential helper
  3. Credential helper presents JWT on each push/pull
  4. AppView uses OAuth session to write to user's PDS

Service auth could work for the hold side (AppView → Hold), but not for the user's OAuth session.

Repository stats → Hold's PDS:

Challenge discovered: The hold's getBlob endpoint only receives did + cid, not the repository name.

Current flow (proxy_blob_store.go:358-362):

xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s",
    p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation)

Implementation options:

Option A: Add repository parameter to getBlob (recommended)

// Modified AppView call:
xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s&repo=%s",
    p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation, p.ctx.Repository)
// Modified hold handler (xrpc.go:969):
func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
    did := r.URL.Query().Get("did")
    cidOrDigest := r.URL.Query().Get("cid")
    repo := r.URL.Query().Get("repo")  // NEW

    // ... existing blob handling ...

    // Increment stats if repo provided
    if repo != "" {
        go h.pds.IncrementPullCount(did, repo)  // Async, non-blocking
    }
}

Stats record structure:

Collection: io.atcr.hold.stats
Rkey: base64(did:repository)  // Deterministic, unique

{
  "$type": "io.atcr.hold.stats",
  "did": "did:plc:alice123",
  "repository": "myapp",
  "pullCount": 1542,
  "pushCount": 47,
  "lastPull": "2025-01-15T...",
  "lastPush": "2025-01-10T...",
  "createdAt": "2025-01-01T..."
}

Hold-side implementation:

// New file: pkg/hold/pds/stats.go

func (p *HoldPDS) IncrementPullCount(ctx context.Context, did, repo string) error {
    rkey := statsRecordKey(did, repo)

    // Get or create stats record
    stats, err := p.GetStatsRecord(ctx, rkey)
    if err != nil || stats == nil {
        stats = &atproto.StatsRecord{
            Type:       atproto.StatsCollection,
            DID:        did,
            Repository: repo,
            PullCount:  0,
            PushCount:  0,
            CreatedAt:  time.Now(),
        }
    }

    // Increment and update
    stats.PullCount++
    stats.LastPull = time.Now()

    _, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, stats)
    return err
}

Query endpoint (new XRPC):

GET /xrpc/io.atcr.hold.getStats?did={userDID}&repo={repository}
→ Returns JSON: { pullCount, pushCount, lastPull, lastPush }

GET /xrpc/io.atcr.hold.listStats?did={userDID}
→ Returns all stats for a user across all repos on this hold

AppView aggregation:

func (l *Loader) GetAggregatedStats(ctx context.Context, did, repo string) (*Stats, error) {
    // 1. Get all holds that have served this repo
    holdDIDs, _ := l.cache.GetHoldDIDsForRepo(did, repo)

    // 2. Query each hold for stats
    var total Stats
    for _, holdDID := range holdDIDs {
        holdURL := resolveHoldDID(holdDID)
        stats, _ := queryHoldStats(ctx, holdURL, did, repo)
        total.PullCount += stats.PullCount
        total.PushCount += stats.PushCount
    }

    return &total, nil
}

Files to modify:

  • pkg/atproto/lexicon.go - Add StatsCollection + StatsRecord
  • pkg/hold/pds/stats.go - New file for stats operations
  • pkg/hold/pds/xrpc.go - Add repo param to getBlob, add stats endpoints
  • pkg/appview/storage/proxy_blob_store.go - Pass repository to getBlob
  • pkg/appview/cache/loader.go - Aggregation logic

Migration Path

Phase 1: Add Valkey infrastructure

  • Add Valkey client to AppView
  • Create store interfaces that abstract SQLite vs Valkey
  • Dual-write OAuth sessions to both

Phase 2: Migrate sessions to Valkey

  • OAuth sessions, UI sessions, auth requests, pending device auth
  • Remove SQLite session tables
  • Test: restart AppView, users get logged out (acceptable)

Phase 3: Migrate devices to Valkey

  • Move device store to Valkey
  • Same data structure, different backend
  • Consider device expiry policy

Phase 4: Implement hold-side stats

  • Add io.atcr.hold.stats collection to hold's embedded PDS
  • Hold increments stats on blob access
  • Add XRPC endpoint: io.atcr.hold.getStats

Phase 5: AppView stats aggregation

  • Track holdDids per repo in Valkey cache
  • Query holds for stats, aggregate
  • Cache aggregated stats with TTL

Phase 6: Remove SQLite (optional)

  • Keep SQLite as optional cache layer for UI queries
  • Or: Query PDS on demand with Valkey caching
  • Jetstream still useful for real-time updates

Summary Table

Category Tables % of Schema Truly Persistent?
Auth & Sessions + Metrics 6 32% Yes
PDS Cache 11 58% No (rebuildable)
Operational 2 10% No

~58% of the database is cached ATProto data that could be rebuilt from PDSes.