diff --git a/pkg/appview/db/schema.go b/pkg/appview/db/schema.go index d622253..95a5481 100644 --- a/pkg/appview/db/schema.go +++ b/pkg/appview/db/schema.go @@ -92,6 +92,16 @@ func InitDB(path string, cfg LibsqlConfig) (*sql.DB, error) { return nil, err } + // Bound the connection pool. With a remote target (Bunny Database), each + // idle conn is a stable libsql stream — keeping a handful warm avoids + // reconnect cost, capping the total prevents runaway contention. Short + // lifetimes ensure we recycle past any idle-side disconnects and drop any + // poisoned conn that survived IsPoisonedTxErr eviction. + db.SetMaxOpenConns(8) + db.SetMaxIdleConns(4) + db.SetConnMaxLifetime(5 * time.Minute) + db.SetConnMaxIdleTime(2 * time.Minute) + // Check if this is an existing database with migrations applied isExisting, err := hasAppliedMigrations(db) if err != nil { @@ -202,20 +212,21 @@ func runMigrations(db *sql.DB, freshDB bool) error { if err != nil { return fmt.Errorf("failed to begin transaction for migration %d: %w", m.Version, err) } + // Deferred rollback is a no-op once Commit succeeds; it guards against + // panics and any early return that forgets an explicit rollback. + defer func() { _ = tx.Rollback() }() // Split query into individual statements and execute each // go-sqlite3's Exec() doesn't reliably execute all statements in multi-statement queries statements := splitSQLStatements(m.Query) for i, stmt := range statements { if _, err := tx.Exec(stmt); err != nil { - tx.Rollback() return fmt.Errorf("failed to apply migration %d (%s) statement %d: %w", m.Version, m.Name, i+1, err) } } // Record migration if _, err := tx.Exec("INSERT INTO schema_migrations (version) VALUES (?)", m.Version); err != nil { - tx.Rollback() return fmt.Errorf("failed to record migration %d: %w", m.Version, err) } diff --git a/pkg/appview/db/schema.sql b/pkg/appview/db/schema.sql index 9b33db5..03c0252 100644 --- a/pkg/appview/db/schema.sql +++ b/pkg/appview/db/schema.sql @@ -180,6 +180,12 @@ CREATE TABLE IF NOT EXISTS repository_stats_daily ( ); CREATE INDEX IF NOT EXISTS idx_repo_stats_daily_date ON repository_stats_daily(date DESC); +CREATE TABLE IF NOT EXISTS jetstream_cursor ( + id INTEGER PRIMARY KEY CHECK (id = 1), + cursor INTEGER NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + CREATE TABLE IF NOT EXISTS stars ( starrer_did TEXT NOT NULL, owner_did TEXT NOT NULL, diff --git a/pkg/appview/jetstream/backfill.go b/pkg/appview/jetstream/backfill.go index 88dd22a..63791d1 100644 --- a/pkg/appview/jetstream/backfill.go +++ b/pkg/appview/jetstream/backfill.go @@ -267,36 +267,51 @@ func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection strin // ProcessManifest calls ResolveHoldDID for legacy manifests. b.prewarmHoldCaches(ctx, collection, allRecords) - // Phase 3: Process records in chunked transactions. - // All network I/O should be cached by now, so transactions stay fast. - const chunkSize = 20 - recordCount := 0 - - for i := 0; i < len(allRecords); i += chunkSize { - end := i + chunkSize - if end > len(allRecords) { - end = len(allRecords) - } - - tx, err := b.db.Begin() - if err != nil { - return recordCount, fmt.Errorf("failed to begin transaction: %w", err) - } - - txProcessor := NewProcessor(tx, false, b.processor.statsCache) - - for j := i; j < end; j++ { - if err := b.processRecordWith(ctx, txProcessor, did, collection, &allRecords[j]); err != nil { - slog.Warn("Backfill failed to process record", "uri", allRecords[j].URI, "error", err) + // Phase 3: Write records to the DB. + // + // For collections whose writes are straightforward idempotent upserts, we + // batch every record in the repo into one multi-row INSERT per table. This + // replaces the previous 20-record chunked transaction loop, which exceeded + // Bunny Database's remote transaction timeout (~5s) once chunks grew large + // and poisoned the connection pool on timeout. + // + // Collections that do network I/O per record (SailorProfile) or have + // conditional read-then-write logic (Scan) stay on the single-record path + // where each write is its own statement and cannot hold a long transaction. + var recordCount int + var procErr error + switch collection { + case atproto.ManifestCollection: + recordCount, procErr = b.batchManifests(ctx, did, allRecords) + case atproto.TagCollection: + recordCount, procErr = b.batchTags(did, allRecords) + case atproto.StarCollection: + recordCount, procErr = b.batchStars(ctx, did, allRecords) + case atproto.RepoPageCollection: + recordCount, procErr = b.batchRepoPages(did, allRecords) + case atproto.DailyStatsCollection: + recordCount, procErr = b.batchDailyStats(ctx, did, allRecords) + case atproto.StatsCollection: + recordCount, procErr = b.batchStats(ctx, did, allRecords) + case atproto.CaptainCollection: + recordCount, procErr = b.batchCaptains(did, allRecords) + case atproto.CrewCollection: + recordCount, procErr = b.batchCrew(did, allRecords) + default: + // SailorProfileCollection and ScanCollection keep per-record processing + // because they do network I/O or conditional reads that would be awkward + // to batch. Each call writes a single row, so there is no long-lived + // transaction at risk. + for i := range allRecords { + if err := b.processRecordWith(ctx, b.processor, did, collection, &allRecords[i]); err != nil { + slog.Warn("Backfill failed to process record", "uri", allRecords[i].URI, "error", err) continue } recordCount++ } - - if err := tx.Commit(); err != nil { - tx.Rollback() - return recordCount, fmt.Errorf("failed to commit transaction: %w", err) - } + } + if procErr != nil { + return recordCount, procErr } // Reconciliation runs outside the transaction (involves network I/O and fewer writes) diff --git a/pkg/appview/jetstream/worker.go b/pkg/appview/jetstream/worker.go index f7e852a..b8d1c54 100644 --- a/pkg/appview/jetstream/worker.go +++ b/pkg/appview/jetstream/worker.go @@ -49,6 +49,12 @@ type Worker struct { // In-memory cursor tracking for reconnects lastCursor int64 cursorMutex sync.RWMutex + + // Cursor persistence: a single-slot channel carries the most recent + // cursor to a background saver goroutine. The saver writes to + // jetstream_cursor every tick, dropping any older value that has not + // yet been flushed so the WS read loop is never blocked on DB I/O. + cursorSave chan int64 } // NewWorker creates a new Jetstream worker @@ -74,6 +80,7 @@ func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker { }, statsCache: statsCache, processor: NewProcessor(database, true, statsCache), // Use cache for live streaming + cursorSave: make(chan int64, 1), } } @@ -258,7 +265,7 @@ func (w *Worker) Start(ctx context.Context) error { // TODO: Re-enable compression once debugging is complete _ = decoder // Keep decoder to avoid unused variable error - if err := w.processMessage(message); err != nil { + if err := w.processMessageResilient(ctx, message); err != nil { slog.Error("ERROR processing message", "error", err) // Continue processing other messages } else { @@ -274,6 +281,37 @@ func (w *Worker) Start(ctx context.Context) error { // 30 seconds to avoid missing events (events are idempotent DB upserts). // Cycles through all endpoints indefinitely and never gives up. func (w *Worker) StartWithFailover(ctx context.Context) { + // Bootstrap from the persisted cursor the first time we run. If the DB + // has a saved cursor we resume from it (minus a small safety rewind so + // any gap from the previous shutdown is covered). Events are idempotent + // UPSERTs, so re-processing a handful is harmless. + if w.startCursor == 0 { + if cursor, err := db.GetJetstreamCursor(w.db); err != nil { + slog.Warn("Jetstream failed to load persisted cursor", "error", err) + } else if cursor > 0 { + const rewind = int64(30 * 1_000_000) // 30s safety rewind, same units as cursor + resume := cursor - rewind + if resume < 0 { + resume = 0 + } + w.cursorMutex.Lock() + w.startCursor = resume + w.lastCursor = resume + w.cursorMutex.Unlock() + slog.Info("Jetstream resuming from persisted cursor", + "persisted_cursor", cursor, + "resume_cursor", resume) + } + } + + // Launch the background cursor saver. It runs for the lifetime of this + // Start call and exits on ctx.Done with a final flush. + saverDone := make(chan struct{}) + go w.runCursorSaver(ctx, saverDone) + defer func() { + <-saverDone + }() + retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second} for { @@ -358,6 +396,49 @@ func (w *Worker) Processor() *Processor { return w.processor } +// runCursorSaver is a long-running goroutine that persists the most recent +// Jetstream cursor to SQLite. It writes at most once every cursorSaveInterval +// so we never hit the DB faster than it can keep up, and always flushes a +// final value on shutdown so the next Start resumes from the right place. +// +// The goroutine intentionally uses b.db directly (not ExecResilient) because +// the INSERT ... ON CONFLICT statement is a single round-trip that cannot +// trigger the poisoned-tx cascade. +func (w *Worker) runCursorSaver(ctx context.Context, done chan<- struct{}) { + defer close(done) + + const cursorSaveInterval = 5 * time.Second + ticker := time.NewTicker(cursorSaveInterval) + defer ticker.Stop() + + var pending int64 + flush := func() { + if pending == 0 { + return + } + if err := db.SaveJetstreamCursor(w.db, pending); err != nil { + slog.Warn("Jetstream failed to persist cursor", "cursor", pending, "error", err) + return + } + pending = 0 + } + + for { + select { + case <-ctx.Done(): + flush() + return + case c := <-w.cursorSave: + // Keep only the newest value; the ticker decides when to flush. + if c > pending { + pending = c + } + case <-ticker.C: + flush() + } + } +} + // GetLastCursor returns the last processed cursor (time_us) for reconnects func (w *Worker) GetLastCursor() int64 { w.cursorMutex.RLock() @@ -365,6 +446,49 @@ func (w *Worker) GetLastCursor() int64 { return w.lastCursor } +// processMessageResilient runs processMessage and, if the underlying DB +// connection was poisoned by a remote tx timeout (common with Bunny Database +// after a backfill chunk exceeded the server-side transaction limit), drains +// the poisoned connections from the pool and retries once. A second failure +// returns the error so the caller's Error log line fires — replacing silent +// data loss with a loud, attributable one. +func (w *Worker) processMessageResilient(ctx context.Context, message []byte) error { + err := w.processMessage(message) + if err == nil || !db.IsPoisonedTxErr(err) { + return err + } + + slog.Warn("Jetstream poisoned connection detected, draining pool and retrying", + "error", err) + drainPool(ctx, w.db) + time.Sleep(100 * time.Millisecond) + return w.processMessage(message) +} + +// drainPool borrows each idle connection from the pool in turn and runs a +// trivial probe. A poisoned connection fails the probe, and db.ExecResilient +// evicts it via driver.ErrBadConn. Loops up to the pool's open-connection +// limit so a single call can clear every bad conn. +func drainPool(ctx context.Context, database *sql.DB) { + // MaxOpenConns is 8 (see pkg/appview/db/schema.go). We probe one more time + // than that to ensure we cycle through every conn if any were mid-use. + const maxProbes = 10 + for i := 0; i < maxProbes; i++ { + err := db.ExecResilient(ctx, database, func(conn *sql.Conn) error { + _, err := conn.ExecContext(ctx, "SELECT 1") + return err + }) + if err == nil { + // Got a healthy conn; no need to probe further — any remaining + // poisoned conns will be evicted on their next use. + return + } + if ctx.Err() != nil { + return + } + } +} + // processMessage processes a single Jetstream event func (w *Worker) processMessage(message []byte) error { var event JetstreamEvent @@ -377,6 +501,26 @@ func (w *Worker) processMessage(message []byte) error { w.lastCursor = event.TimeUS w.cursorMutex.Unlock() + // Offer the cursor to the async saver. Non-blocking: if the saver is + // still writing the previous value, we drop-and-replace so the DB always + // converges on the freshest cursor without ever stalling the read loop. + if w.cursorSave != nil { + select { + case w.cursorSave <- event.TimeUS: + default: + // Drain any stale value and try once more — if that still fails + // we just skip this tick; the saver's timer will catch up. + select { + case <-w.cursorSave: + default: + } + select { + case w.cursorSave <- event.TimeUS: + default: + } + } + } + // Call callback if set if w.eventCallback != nil { w.eventCallback(event.TimeUS)