552 lines
15 KiB
Go
552 lines
15 KiB
Go
// Package db contains a vendored from github.com/bluesky-social/indigo/carstore/sqlite_store.go
|
|
// Source: github.com/bluesky-social/indigo@v0.0.0-20260203235305-a86f3ae1f8ec/carstore/
|
|
// Reason: indigo's carstore hardcodes mattn/go-sqlite3, which conflicts with go-libsql
|
|
// (both bundle SQLite C libraries and cannot coexist in the same binary).
|
|
//
|
|
// This package replaces the mattn driver with go-libsql and removes Prometheus metrics.
|
|
// Once upstream accepts a driver-agnostic constructor, this vendored copy can be removed.
|
|
// Modifications:
|
|
// - Replaced mattn/go-sqlite3 driver with go-libsql
|
|
// - Removed all Prometheus metric counters and .Inc() calls
|
|
// - Changed package from 'carstore' to 'db'
|
|
// - Added NewSQLiteStoreWithDB constructor for injecting an existing *sql.DB
|
|
// - Changed sql.Open("sqlite3", path) to sql.Open("libsql", ...) with proper DSN
|
|
package db
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
|
|
"github.com/bluesky-social/indigo/models"
|
|
blockformat "github.com/ipfs/go-block-format"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-libipfs/blocks"
|
|
"github.com/ipld/go-car"
|
|
_ "github.com/tursodatabase/go-libsql"
|
|
"go.opentelemetry.io/otel"
|
|
)
|
|
|
|
// CarShard represents metadata about a stored shard.
|
|
// Stripped of gorm tags since we don't use gorm in the SQLite store.
|
|
type CarShard struct {
|
|
Root models.DbCID
|
|
DataStart int64
|
|
Seq int
|
|
Path string
|
|
Usr models.Uid
|
|
Rev string
|
|
}
|
|
|
|
type SQLiteStore struct {
|
|
dbPath string
|
|
db *sql.DB
|
|
ownsDB bool // true when this store opened the connection itself
|
|
|
|
log *slog.Logger
|
|
|
|
lastShardCache lastShardCache
|
|
}
|
|
|
|
func ensureDir(path string) error {
|
|
fi, err := os.Stat(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return os.MkdirAll(path, 0755)
|
|
}
|
|
return err
|
|
}
|
|
if fi.IsDir() {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("%s exists but is not a directory", path)
|
|
}
|
|
|
|
func NewSqliteStore(csdir string) (*SQLiteStore, error) {
|
|
if err := ensureDir(csdir); err != nil {
|
|
return nil, err
|
|
}
|
|
dbpath := filepath.Join(csdir, "db.sqlite3")
|
|
out := new(SQLiteStore)
|
|
err := out.Open(dbpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// NewSQLiteStoreWithDB creates a SQLiteStore using an existing *sql.DB connection.
|
|
// This allows callers to configure the driver independently (e.g., using go-libsql
|
|
// embedded replicas). The caller is responsible for the DB lifecycle.
|
|
func NewSQLiteStoreWithDB(dbPath string, db *sql.DB) (*SQLiteStore, error) {
|
|
sqs := &SQLiteStore{
|
|
dbPath: dbPath,
|
|
db: db,
|
|
log: slog.Default(),
|
|
}
|
|
if err := sqs.createTables(); err != nil {
|
|
return nil, fmt.Errorf("%s: sqlite could not create tables, %w", dbPath, err)
|
|
}
|
|
sqs.lastShardCache.source = sqs
|
|
sqs.lastShardCache.Init()
|
|
return sqs, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) Open(path string) error {
|
|
if sqs.log == nil {
|
|
sqs.log = slog.Default()
|
|
}
|
|
sqs.log.Debug("open db", "path", path)
|
|
|
|
// Build DSN for go-libsql
|
|
dsn := path
|
|
if path == ":memory:" {
|
|
dsn = ":memory:"
|
|
} else if !strings.HasPrefix(path, "file:") {
|
|
dsn = "file:" + path
|
|
}
|
|
|
|
db, err := sql.Open("libsql", dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("%s: sqlite could not open, %w", path, err)
|
|
}
|
|
sqs.db = db
|
|
sqs.dbPath = path
|
|
sqs.ownsDB = true
|
|
err = sqs.createTables()
|
|
if err != nil {
|
|
return fmt.Errorf("%s: sqlite could not create tables, %w", path, err)
|
|
}
|
|
sqs.lastShardCache.source = sqs
|
|
sqs.lastShardCache.Init()
|
|
return nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) createTables() error {
|
|
tx, err := sqs.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
_, err = tx.Exec("CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid));")
|
|
if err != nil {
|
|
return fmt.Errorf("%s: create table blocks..., %w", sqs.dbPath, err)
|
|
}
|
|
_, err = tx.Exec("CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)")
|
|
if err != nil {
|
|
return fmt.Errorf("%s: create blocks by rev index, %w", sqs.dbPath, err)
|
|
}
|
|
return tx.Commit()
|
|
}
|
|
|
|
// writeNewShard needed for DeltaSession.CloseWithRoot
|
|
func (sqs *SQLiteStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
|
|
sqs.log.Debug("write shard", "uid", user, "root", root, "rev", rev, "nblocks", len(blks))
|
|
ctx, span := otel.Tracer("carstore").Start(ctx, "writeNewShard")
|
|
defer span.End()
|
|
|
|
buf := new(bytes.Buffer)
|
|
hnw, err := WriteCarHeader(buf, root)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to write car header: %w", err)
|
|
}
|
|
offset := hnw
|
|
|
|
tx, err := sqs.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bad block insert tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
insertStatement, err := tx.PrepareContext(ctx, "INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bad block insert sql, %w", err)
|
|
}
|
|
defer insertStatement.Close()
|
|
|
|
dbroot := models.DbCID{CID: root}
|
|
|
|
span.SetAttributes(attribute.Int("blocks", len(blks)))
|
|
|
|
for bcid, block := range blks {
|
|
nw, err := LdWrite(buf, bcid.Bytes(), block.RawData())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to write block: %w", err)
|
|
}
|
|
offset += nw
|
|
|
|
dbcid := models.DbCID{CID: bcid}
|
|
blockbytes := block.RawData()
|
|
_, err = insertStatement.ExecContext(ctx, user, dbcid, rev, dbroot, blockbytes)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(uid,cid) block store failed, %w", err)
|
|
}
|
|
sqs.log.Debug("put block", "uid", user, "cid", bcid, "size", len(blockbytes))
|
|
}
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bad block insert commit, %w", err)
|
|
}
|
|
|
|
shard := CarShard{
|
|
Root: models.DbCID{CID: root},
|
|
DataStart: hnw,
|
|
Seq: seq,
|
|
Usr: user,
|
|
Rev: rev,
|
|
}
|
|
|
|
sqs.lastShardCache.put(&shard)
|
|
|
|
return buf.Bytes(), nil
|
|
}
|
|
|
|
var ErrNothingThere = errors.New("nothing to read)")
|
|
|
|
// GetLastShard needed for NewDeltaSession indirectly through lastShardCache
|
|
func (sqs *SQLiteStore) GetLastShard(ctx context.Context, uid models.Uid) (*CarShard, error) {
|
|
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bad last shard tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bad last shard sql, %w", err)
|
|
}
|
|
rows, err := qstmt.QueryContext(ctx, uid)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("last shard err, %w", err)
|
|
}
|
|
if rows.Next() {
|
|
var rev string
|
|
var rootb models.DbCID
|
|
err = rows.Scan(&rev, &rootb)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("last shard bad scan, %w", err)
|
|
}
|
|
return &CarShard{
|
|
Root: rootb,
|
|
Rev: rev,
|
|
}, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) {
|
|
sqs.log.Warn("TODO: don't call compaction")
|
|
return nil, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) {
|
|
sqs.log.Warn("TODO: don't call compaction targets")
|
|
return nil, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
|
|
lastShard, err := sqs.lastShardCache.get(ctx, user)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
if lastShard == nil {
|
|
return cid.Undef, nil
|
|
}
|
|
|
|
return lastShard.Root.CID, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
|
|
lastShard, err := sqs.lastShardCache.get(ctx, user)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if lastShard == nil {
|
|
return "", nil
|
|
}
|
|
|
|
return lastShard.Rev, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) {
|
|
ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
|
|
defer span.End()
|
|
|
|
carr, err := car.NewCarReader(bytes.NewReader(carslice))
|
|
if err != nil {
|
|
return cid.Undef, nil, err
|
|
}
|
|
|
|
if len(carr.Header.Roots) != 1 {
|
|
return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
|
|
}
|
|
|
|
ds, err := sqs.NewDeltaSession(ctx, uid, since)
|
|
if err != nil {
|
|
return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
|
|
}
|
|
|
|
for {
|
|
blk, err := carr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return cid.Undef, nil, err
|
|
}
|
|
|
|
if err := ds.Put(ctx, blk); err != nil {
|
|
return cid.Undef, nil, err
|
|
}
|
|
}
|
|
|
|
return carr.Header.Roots[0], ds, nil
|
|
}
|
|
|
|
var zeroShard CarShard
|
|
|
|
func (sqs *SQLiteStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) {
|
|
ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession")
|
|
defer span.End()
|
|
|
|
lastShard, err := sqs.lastShardCache.get(ctx, user)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NewDeltaSession, lsc, %w", err)
|
|
}
|
|
|
|
if lastShard == nil {
|
|
lastShard = &zeroShard
|
|
}
|
|
|
|
if since != nil && *since != lastShard.Rev {
|
|
return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, ErrRepoBaseMismatch)
|
|
}
|
|
|
|
return &DeltaSession{
|
|
blks: make(map[cid.Cid]blockformat.Block),
|
|
base: &sqliteUserView{
|
|
uid: user,
|
|
sqs: sqs,
|
|
},
|
|
user: user,
|
|
baseCid: lastShard.Root.CID,
|
|
cs: sqs,
|
|
seq: lastShard.Seq + 1,
|
|
lastRev: lastShard.Rev,
|
|
}, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) {
|
|
return &DeltaSession{
|
|
base: &sqliteUserView{
|
|
uid: user,
|
|
sqs: sqs,
|
|
},
|
|
readonly: true,
|
|
user: user,
|
|
cs: sqs,
|
|
}, nil
|
|
}
|
|
|
|
// ReadUserCar writes a CAR file for the user's blocks since sinceRev.
|
|
func (sqs *SQLiteStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
|
|
ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
|
|
defer span.End()
|
|
|
|
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
|
|
if err != nil {
|
|
return fmt.Errorf("rcar tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
qstmt, err := tx.PrepareContext(ctx, "SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC")
|
|
if err != nil {
|
|
return fmt.Errorf("rcar sql, %w", err)
|
|
}
|
|
defer qstmt.Close()
|
|
rows, err := qstmt.QueryContext(ctx, user, sinceRev)
|
|
if err != nil {
|
|
return fmt.Errorf("rcar err, %w", err)
|
|
}
|
|
nblocks := 0
|
|
first := true
|
|
for rows.Next() {
|
|
var xcid models.DbCID
|
|
var xrev string
|
|
var xroot models.DbCID
|
|
var xblock []byte
|
|
err = rows.Scan(&xcid, &xrev, &xroot, &xblock)
|
|
if err != nil {
|
|
return fmt.Errorf("rcar bad scan, %w", err)
|
|
}
|
|
if first {
|
|
if err := car.WriteHeader(&car.CarHeader{
|
|
Roots: []cid.Cid{xroot.CID},
|
|
Version: 1,
|
|
}, shardOut); err != nil {
|
|
return fmt.Errorf("rcar bad header, %w", err)
|
|
}
|
|
first = false
|
|
}
|
|
nblocks++
|
|
_, err := LdWrite(shardOut, xcid.CID.Bytes(), xblock)
|
|
if err != nil {
|
|
return fmt.Errorf("rcar bad write, %w", err)
|
|
}
|
|
}
|
|
sqs.log.Debug("read car", "nblocks", nblocks, "since", sinceRev)
|
|
return nil
|
|
}
|
|
|
|
// Stat is only used in a debugging admin handler
|
|
func (sqs *SQLiteStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) {
|
|
sqs.log.Warn("Stat debugging method not implemented for sqlite store")
|
|
return nil, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) WipeUserData(ctx context.Context, user models.Uid) error {
|
|
ctx, span := otel.Tracer("carstore").Start(ctx, "WipeUserData")
|
|
defer span.End()
|
|
tx, err := sqs.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("wipe tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
_, err = tx.ExecContext(ctx, "DELETE FROM blocks WHERE uid = ?", user)
|
|
if err == nil {
|
|
err = tx.Commit()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// go-libsql does not support ReadOnly transactions, so we use default options.
|
|
var txReadOnly = sql.TxOptions{}
|
|
|
|
// HasUIDCid needed for NewDeltaSession userView
|
|
func (sqs *SQLiteStore) HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error) {
|
|
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
|
|
if err != nil {
|
|
return false, fmt.Errorf("hasUC tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
qstmt, err := tx.PrepareContext(ctx, "SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
|
|
if err != nil {
|
|
return false, fmt.Errorf("hasUC sql, %w", err)
|
|
}
|
|
defer qstmt.Close()
|
|
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
|
|
if err != nil {
|
|
return false, fmt.Errorf("hasUC err, %w", err)
|
|
}
|
|
if rows.Next() {
|
|
var rev string
|
|
var rootb models.DbCID
|
|
err = rows.Scan(&rev, &rootb)
|
|
if err != nil {
|
|
return false, fmt.Errorf("hasUC bad scan, %w", err)
|
|
}
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) CarStore() CarStore {
|
|
return sqs
|
|
}
|
|
|
|
func (sqs *SQLiteStore) Close() error {
|
|
if sqs.ownsDB {
|
|
return sqs.db.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (sqs *SQLiteStore) getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error) {
|
|
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getb tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
qstmt, err := tx.PrepareContext(ctx, "SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getb sql, %w", err)
|
|
}
|
|
defer qstmt.Close()
|
|
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getb err, %w", err)
|
|
}
|
|
if rows.Next() {
|
|
var blockb []byte
|
|
err = rows.Scan(&blockb)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getb bad scan, %w", err)
|
|
}
|
|
blk, err := blocks.NewBlockWithCid(blockb, bcid)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getb bad block, %w", err)
|
|
}
|
|
return blk, nil
|
|
}
|
|
return nil, ErrNothingThere
|
|
}
|
|
|
|
func (sqs *SQLiteStore) getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error) {
|
|
tx, err := sqs.db.BeginTx(ctx, &txReadOnly)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getbs tx, %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
qstmt, err := tx.PrepareContext(ctx, "SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1")
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getbs sql, %w", err)
|
|
}
|
|
defer qstmt.Close()
|
|
rows, err := qstmt.QueryContext(ctx, user, models.DbCID{CID: bcid})
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getbs err, %w", err)
|
|
}
|
|
if rows.Next() {
|
|
var out int64
|
|
err = rows.Scan(&out)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getbs bad scan, %w", err)
|
|
}
|
|
return out, nil
|
|
}
|
|
return 0, nil
|
|
}
|
|
|
|
type sqliteUserViewInner interface {
|
|
HasUIDCid(ctx context.Context, user models.Uid, bcid cid.Cid) (bool, error)
|
|
getBlock(ctx context.Context, user models.Uid, bcid cid.Cid) (blockformat.Block, error)
|
|
getBlockSize(ctx context.Context, user models.Uid, bcid cid.Cid) (int64, error)
|
|
}
|
|
|
|
type sqliteUserView struct {
|
|
sqs sqliteUserViewInner
|
|
uid models.Uid
|
|
}
|
|
|
|
func (s sqliteUserView) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
|
return s.sqs.HasUIDCid(ctx, s.uid, c)
|
|
}
|
|
|
|
func (s sqliteUserView) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
|
|
return s.sqs.getBlock(ctx, s.uid, c)
|
|
}
|
|
|
|
func (s sqliteUserView) GetSize(ctx context.Context, c cid.Cid) (int, error) {
|
|
bigsize, err := s.sqs.getBlockSize(ctx, s.uid, c)
|
|
return int(bigsize), err
|
|
}
|
|
|
|
// ensure we implement the interface
|
|
var _ minBlockstore = (*sqliteUserView)(nil)
|