Files
at-container-registry/docs/VALKEY_MIGRATION.md

400 lines
13 KiB
Markdown

# Analysis: AppView SQL Database Usage
## Overview
The AppView uses SQLite with 19 tables. The key finding: **most data is a cache of ATProto records** that could theoretically be rebuilt from users' PDS instances.
## Data Categories
### 1. MUST PERSIST (Local State Only)
These tables contain data that **cannot be reconstructed** from external sources:
| Table | Purpose | Why It Must Persist |
|-------|---------|---------------------|
| `oauth_sessions` | OAuth tokens | Refresh tokens are stateful; losing them = users must re-auth |
| `ui_sessions` | Web browser sessions | Session continuity for logged-in users |
| `devices` | Approved devices + bcrypt secrets | User authorization decisions; secrets are one-way hashed |
| `pending_device_auth` | In-flight auth flows | Short-lived (10min) but critical during auth |
| `oauth_auth_requests` | OAuth flow state | Short-lived but required for auth completion |
| `repository_stats` | Pull/push counts | **Locally tracked metrics** - not stored in ATProto |
### 2. CACHED FROM PDS (Rebuildable)
These tables are essentially a **read-through cache** of ATProto data:
| Table | Source | ATProto Collection |
|-------|--------|-------------------|
| `users` | User's PDS profile | `app.bsky.actor.profile` + DID document |
| `manifests` | User's PDS | `io.atcr.manifest` records |
| `tags` | User's PDS | `io.atcr.tag` records |
| `layers` | Derived from manifests | Parsed from manifest content |
| `manifest_references` | Derived from manifest lists | Parsed from multi-arch manifests |
| `repository_annotations` | Manifest config blob | OCI annotations from config |
| `repo_pages` | User's PDS | `io.atcr.repo.page` records |
| `stars` | User's PDS | `io.atcr.sailor.star` records (synced via Jetstream) |
| `hold_captain_records` | Hold's embedded PDS | `io.atcr.hold.captain` records |
| `hold_crew_approvals` | Hold's embedded PDS | `io.atcr.hold.crew` records |
| `hold_crew_denials` | Local authorization cache | Could re-check on demand |
### 3. OPERATIONAL
| Table | Purpose |
|-------|---------|
| `schema_migrations` | Migration tracking |
| `firehose_cursor` | Jetstream position (can restart from 0) |
## Key Insights
### What's Actually Unique to AppView?
1. **Authentication state** - OAuth sessions, devices, UI sessions
2. **Engagement metrics** - Pull/push counts (locally tracked, not in ATProto)
### What Could Be Eliminated?
If ATCR fully embraced the ATProto model:
1. **`users`** - Query PDS on demand (with caching)
2. **`manifests`, `tags`, `layers`** - Query PDS on demand (with caching)
3. **`repository_annotations`** - Fetch manifest config on demand
4. **`repo_pages`** - Query PDS on demand
5. **`hold_*` tables** - Query hold's PDS on demand
### Trade-offs
**Current approach (heavy caching):**
- Fast queries for UI (search, browse, stats)
- Offline resilience (PDS down doesn't break UI)
- Complex sync logic (Jetstream consumer, backfill)
- State can diverge from source of truth
**Lighter approach (query on demand):**
- Always fresh data
- Simpler codebase (no sync)
- Slower queries (network round-trips)
- Depends on PDS availability
## Current Limitation: No Cache-Miss Queries
**Finding:** There's no "query PDS on cache miss" logic. Users/manifests only enter the DB via:
1. OAuth login (user authenticates)
2. Jetstream events (firehose activity)
**Problem:** If someone visits `atcr.io/alice/myapp` before alice is indexed → 404
**Where this happens:**
- `pkg/appview/handlers/repository.go:50-53`: If `db.GetUserByDID()` returns nil → 404
- No fallback to `atproto.Client.ListRecords()` or similar
**This matters for Valkey migration:** If cache is ephemeral and restarts clear it, you need cache-miss logic to repopulate on demand. Otherwise:
- Restart Valkey → all users/manifests gone
- Wait for Jetstream to re-index OR implement cache-miss queries
**Cache-miss implementation design:**
Existing code to reuse: `pkg/appview/jetstream/processor.go:43-97` (`EnsureUser`)
```go
// New: pkg/appview/cache/loader.go
type Loader struct {
cache Cache // Valkey interface
client *atproto.Client
}
// GetUser with cache-miss fallback
func (l *Loader) GetUser(ctx context.Context, did string) (*User, error) {
// 1. Try cache
if user := l.cache.GetUser(did); user != nil {
return user, nil
}
// 2. Cache miss - resolve identity (already queries network)
_, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
if err != nil {
return nil, err // User doesn't exist in network
}
// 3. Fetch profile for avatar
client := atproto.NewClient(pdsEndpoint, "", "")
profile, _ := client.GetProfileRecord(ctx, did)
avatarURL := ""
if profile != nil && profile.Avatar != nil {
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
}
// 4. Cache and return
user := &User{DID: did, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL}
l.cache.SetUser(user, 1*time.Hour)
return user, nil
}
// GetManifestsForRepo with cache-miss fallback
func (l *Loader) GetManifestsForRepo(ctx context.Context, did, repo string) ([]Manifest, error) {
cacheKey := fmt.Sprintf("manifests:%s:%s", did, repo)
// 1. Try cache
if cached := l.cache.Get(cacheKey); cached != nil {
return cached.([]Manifest), nil
}
// 2. Cache miss - get user's PDS endpoint
user, err := l.GetUser(ctx, did)
if err != nil {
return nil, err
}
// 3. Query PDS for manifests
client := atproto.NewClient(user.PDSEndpoint, "", "")
records, _, err := client.ListRecordsForRepo(ctx, did, atproto.ManifestCollection, 100, "")
if err != nil {
return nil, err
}
// 4. Filter by repository and parse
var manifests []Manifest
for _, rec := range records {
var m atproto.ManifestRecord
if err := json.Unmarshal(rec.Value, &m); err != nil {
continue
}
if m.Repository == repo {
manifests = append(manifests, convertManifest(m))
}
}
// 5. Cache and return
l.cache.Set(cacheKey, manifests, 10*time.Minute)
return manifests, nil
}
```
**Handler changes:**
```go
// Before (repository.go:45-53):
owner, err := db.GetUserByDID(h.DB, did)
if owner == nil {
RenderNotFound(w, r, h.Templates, h.RegistryURL)
return
}
// After:
owner, err := h.Loader.GetUser(r.Context(), did)
if err != nil {
RenderNotFound(w, r, h.Templates, h.RegistryURL)
return
}
```
**Performance considerations:**
- Cache hit: ~1ms (Valkey lookup)
- Cache miss: ~200-500ms (PDS round-trip)
- First request after restart: slower but correct
- Jetstream still useful for proactive warming
---
## Proposed Architecture: Valkey + ATProto
### Goal
Replace SQLite with Valkey (Redis-compatible) for ephemeral state, push remaining persistent data to ATProto.
### What goes to Valkey (ephemeral, TTL-based)
| Current Table | Valkey Key Pattern | TTL | Notes |
|---------------|-------------------|-----|-------|
| `oauth_sessions` | `oauth:{did}:{session_id}` | 90 days | Lost on restart = re-auth |
| `ui_sessions` | `ui:{session_id}` | Session duration | Lost on restart = re-login |
| `oauth_auth_requests` | `authreq:{state}` | 10 min | In-flight flows |
| `pending_device_auth` | `pending:{device_code}` | 10 min | In-flight flows |
| `firehose_cursor` | `cursor:jetstream` | None | Can restart from 0 |
| All PDS cache tables | `cache:{collection}:{did}:{rkey}` | 10-60 min | Query PDS on miss |
**Benefits:**
- Multi-instance ready (shared Valkey)
- No schema migrations
- Natural TTL expiry
- Simpler code (no SQL)
### What could become ATProto records
| Current Table | Proposed Collection | Where Stored | Open Questions |
|---------------|---------------------|--------------|----------------|
| `devices` | `io.atcr.sailor.device` | User's PDS | Privacy: IP, user-agent sensitive? |
| `repository_stats` | `io.atcr.repo.stats` | Hold's PDS or User's PDS | Who owns the stats? |
**Devices → Valkey:**
- Move current device table to Valkey
- Key: `device:{did}:{device_id}``{name, secret_hash, ip, user_agent, created_at, last_used}`
- TTL: Long (1 year?) or no expiry
- Device list: `devices:{did}` → Set of device IDs
- Secret validation works the same, just different backend
**Service auth exploration (future):**
The challenge with pure ATProto service auth is the AppView still needs the user's OAuth session to write manifests to their PDS. The current flow:
1. User authenticates via OAuth → AppView gets OAuth tokens
2. AppView issues registry JWT to credential helper
3. Credential helper presents JWT on each push/pull
4. AppView uses OAuth session to write to user's PDS
Service auth could work for the hold side (AppView → Hold), but not for the user's OAuth session.
**Repository stats → Hold's PDS:**
**Challenge discovered:** The hold's `getBlob` endpoint only receives `did` + `cid`, not the repository name.
Current flow (`proxy_blob_store.go:358-362`):
```go
xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s",
p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation)
```
**Implementation options:**
**Option A: Add repository parameter to getBlob (recommended)**
```go
// Modified AppView call:
xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s&repo=%s",
p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation, p.ctx.Repository)
```
```go
// Modified hold handler (xrpc.go:969):
func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
did := r.URL.Query().Get("did")
cidOrDigest := r.URL.Query().Get("cid")
repo := r.URL.Query().Get("repo") // NEW
// ... existing blob handling ...
// Increment stats if repo provided
if repo != "" {
go h.pds.IncrementPullCount(did, repo) // Async, non-blocking
}
}
```
**Stats record structure:**
```
Collection: io.atcr.hold.stats
Rkey: base64(did:repository) // Deterministic, unique
{
"$type": "io.atcr.hold.stats",
"did": "did:plc:alice123",
"repository": "myapp",
"pullCount": 1542,
"pushCount": 47,
"lastPull": "2025-01-15T...",
"lastPush": "2025-01-10T...",
"createdAt": "2025-01-01T..."
}
```
**Hold-side implementation:**
```go
// New file: pkg/hold/pds/stats.go
func (p *HoldPDS) IncrementPullCount(ctx context.Context, did, repo string) error {
rkey := statsRecordKey(did, repo)
// Get or create stats record
stats, err := p.GetStatsRecord(ctx, rkey)
if err != nil || stats == nil {
stats = &atproto.StatsRecord{
Type: atproto.StatsCollection,
DID: did,
Repository: repo,
PullCount: 0,
PushCount: 0,
CreatedAt: time.Now(),
}
}
// Increment and update
stats.PullCount++
stats.LastPull = time.Now()
_, err = p.repomgr.UpdateRecord(ctx, p.uid, atproto.StatsCollection, rkey, stats)
return err
}
```
**Query endpoint (new XRPC):**
```
GET /xrpc/io.atcr.hold.getStats?did={userDID}&repo={repository}
→ Returns JSON: { pullCount, pushCount, lastPull, lastPush }
GET /xrpc/io.atcr.hold.listStats?did={userDID}
→ Returns all stats for a user across all repos on this hold
```
**AppView aggregation:**
```go
func (l *Loader) GetAggregatedStats(ctx context.Context, did, repo string) (*Stats, error) {
// 1. Get all holds that have served this repo
holdDIDs, _ := l.cache.GetHoldDIDsForRepo(did, repo)
// 2. Query each hold for stats
var total Stats
for _, holdDID := range holdDIDs {
holdURL := resolveHoldDID(holdDID)
stats, _ := queryHoldStats(ctx, holdURL, did, repo)
total.PullCount += stats.PullCount
total.PushCount += stats.PushCount
}
return &total, nil
}
```
**Files to modify:**
- `pkg/atproto/lexicon.go` - Add `StatsCollection` + `StatsRecord`
- `pkg/hold/pds/stats.go` - New file for stats operations
- `pkg/hold/pds/xrpc.go` - Add `repo` param to getBlob, add stats endpoints
- `pkg/appview/storage/proxy_blob_store.go` - Pass repository to getBlob
- `pkg/appview/cache/loader.go` - Aggregation logic
### Migration Path
**Phase 1: Add Valkey infrastructure**
- Add Valkey client to AppView
- Create store interfaces that abstract SQLite vs Valkey
- Dual-write OAuth sessions to both
**Phase 2: Migrate sessions to Valkey**
- OAuth sessions, UI sessions, auth requests, pending device auth
- Remove SQLite session tables
- Test: restart AppView, users get logged out (acceptable)
**Phase 3: Migrate devices to Valkey**
- Move device store to Valkey
- Same data structure, different backend
- Consider device expiry policy
**Phase 4: Implement hold-side stats**
- Add `io.atcr.hold.stats` collection to hold's embedded PDS
- Hold increments stats on blob access
- Add XRPC endpoint: `io.atcr.hold.getStats`
**Phase 5: AppView stats aggregation**
- Track holdDids per repo in Valkey cache
- Query holds for stats, aggregate
- Cache aggregated stats with TTL
**Phase 6: Remove SQLite (optional)**
- Keep SQLite as optional cache layer for UI queries
- Or: Query PDS on demand with Valkey caching
- Jetstream still useful for real-time updates
## Summary Table
| Category | Tables | % of Schema | Truly Persistent? |
|----------|--------|-------------|-------------------|
| Auth & Sessions + Metrics | 6 | 32% | Yes |
| PDS Cache | 11 | 58% | No (rebuildable) |
| Operational | 2 | 10% | No |
**~58% of the database is cached ATProto data that could be rebuilt from PDSes.**