Files
at-container-registry/pkg/hold/db/sqlite_store.go

552 lines
15 KiB
Go

// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go
// Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/
// Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql
// (both bundle SQLite C libraries and cannot coexist in the same binary).
//
// This package replaces the mattn driver with go-libsql and removes Prometheus metrics.
// Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed.
// Modifications:
// - Replaced mattn/go-sqlite3 driver with go-libsql
// - Removed all Prometheus metric counters and .Inc() calls
// - Changed package from 'carstore' to 'db'
// - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB
// - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN
package db
import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"go.opentelemetry.io/otel/attribute"
"github.com/bluesky-social/indigo/models"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/blocks"
"github.com/ipld/go-car"
_ "github.com/tursodatabase/go-libsql"
"go.opentelemetry.io/otel"
)
// CarShard represents metadata about a stored shard.
// Stripped of gorm tags since we don't use gorm in the SQLite store.
type CarShard struct {
Root models.DbCID
DataStart int64
Seq int
Path string
Usr models.Uid
Rev string
}
type SQLiteStore struct {
dbPath string
db *sql.DB
ownsDB bool // true when this store opened the connection itself
log *slog.Logger
lastShardCache lastShardCache
}
func ensureDir(path string) error {
fi, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return os.MkdirAll(path, 0755)
}
return err
}
if fi.IsDir() {
return nil
}
return fmt.Errorf("%s exists but is not a directory", path)
}
func NewSqliteStore(csdir string) (*SQLiteStore, error) {
if err := ensureDir(csdir); err != nil {
return nil, err
}
dbpath := filepath.Join(csdir, "db.sqlite3")
out := new(SQLiteStore)
err := out.Open(dbpath)
if err != nil {
return nil, err
}
return out, nil
}
// NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection.
// This allows callers to configure the driver independently (e.g., using go-libsql
// embedded replicas). The caller is responsible for the DB lifecycle.
func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) {
sqs := &SQLiteStore{
dbPath: dbPath,
db: db,
log: slog.Default(),
}
if err := sqs.createTables(); err != nil {
return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err)
}
sqs.lastShardCache.source = sqs
sqs.lastShardCache.Init()
return sqs, nil
}
func (sqs *SQLiteStore) Open(path string) error {
if sqs.log == nil {
sqs.log = slog.Default()
}
sqs.log.Debug("open db", "path", path)
// Build DSN for go-libsql
dsn := path
if path == ":memory:" {
dsn = ":memory:"
} else if !strings.HasPrefix(path, "file:") {
dsn = "file:" + path
}
db, err := sql.Open("libsql", dsn)
if err != nil {
return fmt.Errorf("%s: sqlite could not open, %w", path, err)
}
sqs.db = db
sqs.dbPath = path
sqs.ownsDB = true
err = sqs.createTables()
if err != nil {
return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
}
sqs.lastShardCache.source = sqs
sqs.lastShardCache.Init()
return nil
}
func (sqs *SQLiteStore) createTables() error {
tx, err := sqs.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
if err != nil {
return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
}
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
if err != nil {
return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
}
return tx.Commit()
}
// writeNewShard needed for DeltaSession.CloseWithRoot
func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
defer span.End()
buf := new(bytes.Buffer)
hnw, err := WriteCarHeader(buf, root)
if err != nil {
return nil, fmt.Errorf("failed to write car header: %w", err)
}
offset := hnw
tx, err := sqs.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("bad block insert tx, %w", err)
}
defer tx.Rollback()
insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
if err != nil {
return nil, fmt.Errorf("bad block insert sql, %w", err)
}
defer insertStatement.Close()
dbroot := models.DbCID{CID: root}
span.SetAttributes(attribute.Int("blocks", len(blks)))
for bcid, block := range blks {
nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
if err != nil {
return nil, fmt.Errorf("failed to write block: %w", err)
}
offset += nw
dbcid := models.DbCID{CID: bcid}
blockbytes := block.RawData()
_, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
if err != nil {
return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
}
sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
}
err = tx.Commit()
if err != nil {
return nil, fmt.Errorf("bad block insert commit, %w", err)
}
shard := CarShard{
Root: models.DbCID{CID: root},
DataStart: hnw,
Seq: seq,
Usr: user,
Rev: rev,
}
sqs.lastShardCache.put(&shard)
return buf.Bytes(), nil
}
var ErrNothingThere = errors.New("nothing to read)")
// GetLastShard needed for NewDeltaSession indirectly through lastShardCache
func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
if err != nil {
return nil, fmt.Errorf("bad last shard tx, %w", err)
}
defer tx.Rollback()
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
if err != nil {
return nil, fmt.Errorf("bad last shard sql, %w", err)
}
rows, err := qstmt.QueryContext(ctx, uid)
if err != nil {
return nil, fmt.Errorf("last shard err, %w", err)
}
if rows.Next() {
var rev string
var rootb models.DbCID
err = rows.Scan(&rev, &rootb)
if err != nil {
return nil, fmt.Errorf("last shard bad scan, %w", err)
}
return &CarShard{
Root: rootb,
Rev: rev,
}, nil
}
return nil, nil
}
func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
sqs.log.Warn("TODO: don't call compaction")
return nil, nil
}
func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
sqs.log.Warn("TODO: don't call compaction targets")
return nil, nil
}
func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
lastShard, err := sqs.lastShardCache.get(ctx, user)
if err != nil {
return cid.Undef, err
}
if lastShard == nil {
return cid.Undef, nil
}
return lastShard.Root.CID, nil
}
func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
lastShard, err := sqs.lastShardCache.get(ctx, user)
if err != nil {
return "", err
}
if lastShard == nil {
return "", nil
}
return lastShard.Rev, nil
}
func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
defer span.End()
carr, err := car.NewCarReader(bytes.NewReader(carslice))
if err != nil {
return cid.Undef, nil, err
}
if len(carr.Header.Roots) != 1 {
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
}
ds, err := sqs.NewDeltaSession(ctx, uid, since)
if err != nil {
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
}
for {
blk, err := carr.Next()
if err != nil {
if err == io.EOF {
break
}
return cid.Undef, nil, err
}
if err := ds.Put(ctx, blk); err != nil {
return cid.Undef, nil, err
}
}
return carr.Header.Roots[0], ds, nil
}
var zeroShard CarShard
func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
defer span.End()
lastShard, err := sqs.lastShardCache.get(ctx, user)
if err != nil {
return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
}
if lastShard == nil {
lastShard = &zeroShard
}
if since != nil && *since != lastShard.Rev {
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
}
return &DeltaSession{
blks: make(map[cid.Cid]blockformat.Block),
base: &sqliteUserView{
uid: user,
sqs: sqs,
},
user: user,
baseCid: lastShard.Root.CID,
cs: sqs,
seq: lastShard.Seq + 1,
lastRev: lastShard.Rev,
}, nil
}
func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
return &DeltaSession{
base: &sqliteUserView{
uid: user,
sqs: sqs,
},
readonly: true,
user: user,
cs: sqs,
}, nil
}
// ReadUserCar writes a CAR file for the user's blocks since sinceRev.
func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
defer span.End()
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
if err != nil {
return fmt.Errorf("rcar tx, %w", err)
}
defer tx.Rollback()
qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
if err != nil {
return fmt.Errorf("rcar sql, %w", err)
}
defer qstmt.Close()
rows, err := qstmt.QueryContext(ctx, user, sinceRev)
if err != nil {
return fmt.Errorf("rcar err, %w", err)
}
nblocks := 0
first := true
for rows.Next() {
var xcid models.DbCID
var xrev string
var xroot models.DbCID
var xblock []byte
err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
if err != nil {
return fmt.Errorf("rcar bad scan, %w", err)
}
if first {
if err := car.WriteHeader(&car.CarHeader{
Roots: []cid.Cid{xroot.CID},
Version: 1,
}, shardOut); err != nil {
return fmt.Errorf("rcar bad header, %w", err)
}
first = false
}
nblocks++
_, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
if err != nil {
return fmt.Errorf("rcar bad write, %w", err)
}
}
sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
return nil
}
// Stat is only used in a debugging admin handler
func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
sqs.log.Warn("Stat debugging method not implemented for sqlite store")
return nil, nil
}
func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
defer span.End()
tx, err := sqs.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("wipe tx, %w", err)
}
defer tx.Rollback()
_, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
if err == nil {
err = tx.Commit()
}
return err
}
// go-libsql does not support ReadOnly transactions, so we use default options.
var txReadOnly = sql.TxOptions{}
// HasUIDCid needed for NewDeltaSession userView
func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
if err != nil {
return false, fmt.Errorf("hasUC tx, %w", err)
}
defer tx.Rollback()
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
if err != nil {
return false, fmt.Errorf("hasUC sql, %w", err)
}
defer qstmt.Close()
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
if err != nil {
return false, fmt.Errorf("hasUC err, %w", err)
}
if rows.Next() {
var rev string
var rootb models.DbCID
err = rows.Scan(&rev, &rootb)
if err != nil {
return false, fmt.Errorf("hasUC bad scan, %w", err)
}
return true, nil
}
return false, nil
}
func (sqs *SQLiteStore) CarStore() CarStore {
return sqs
}
func (sqs *SQLiteStore) Close() error {
if sqs.ownsDB {
return sqs.db.Close()
}
return nil
}
func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
if err != nil {
return nil, fmt.Errorf("getb tx, %w", err)
}
defer tx.Rollback()
qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
if err != nil {
return nil, fmt.Errorf("getb sql, %w", err)
}
defer qstmt.Close()
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
if err != nil {
return nil, fmt.Errorf("getb err, %w", err)
}
if rows.Next() {
var blockb []byte
err = rows.Scan(&blockb)
if err != nil {
return nil, fmt.Errorf("getb bad scan, %w", err)
}
blk, err := blocks.NewBlockWithCid(blockb, bcid)
if err != nil {
return nil, fmt.Errorf("getb bad block, %w", err)
}
return blk, nil
}
return nil, ErrNothingThere
}
func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
if err != nil {
return 0, fmt.Errorf("getbs tx, %w", err)
}
defer tx.Rollback()
qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
if err != nil {
return 0, fmt.Errorf("getbs sql, %w", err)
}
defer qstmt.Close()
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
if err != nil {
return 0, fmt.Errorf("getbs err, %w", err)
}
if rows.Next() {
var out int64
err = rows.Scan(&out)
if err != nil {
return 0, fmt.Errorf("getbs bad scan, %w", err)
}
return out, nil
}
return 0, nil
}
type sqliteUserViewInner interface {
HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
}
type sqliteUserView struct {
sqs sqliteUserViewInner
uid models.Uid
}
func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
return s.sqs.HasUIDCid(ctx, s.uid, c)
}
func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
return s.sqs.getBlock(ctx, s.uid, c)
}
func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
return int(bigsize), err
}
// ensure we implement the interface
var _ minBlockstore = (*sqliteUserView)(nil)