Files
at-container-registry/pkg/appview/db/batch.go
2026-04-19 18:04:57 -05:00

581 lines
20 KiB
Go

package db
import (
"encoding/json"
"fmt"
"strings"
"time"
)
// BatchSize is the maximum number of rows included in a single multi-row INSERT.
// Kept well under SQLite's default SQLITE_MAX_VARIABLE_NUMBER (32766) and any
// remote libsql parameter ceiling — at 11 columns this is 1100 placeholders.
const BatchSize = 100
// buildPlaceholders returns a comma-separated list of `rows` groups of the form
// `(?,?,?)`, each group containing `cols` placeholders. Used to construct the
// VALUES clause of multi-row INSERT statements.
func buildPlaceholders(rows, cols int) string {
if rows <= 0 || cols <= 0 {
return ""
}
group := "(" + strings.Repeat("?,", cols-1) + "?)"
var sb strings.Builder
sb.Grow((len(group) + 1) * rows)
for i := 0; i < rows; i++ {
if i > 0 {
sb.WriteByte(',')
}
sb.WriteString(group)
}
return sb.String()
}
// chunk returns the half-open range [start, end) for the i-th chunk of size
// BatchSize within a slice of length n.
func chunk(n, i int) (start, end int) {
start = i * BatchSize
end = start + BatchSize
if end > n {
end = n
}
return start, end
}
// BatchInsertManifests upserts a batch of manifests and returns a map of
// digest → manifest id for the inserted rows (both new and existing). Rows
// are keyed by (did, repository, digest); callers that need the id must
// group their input so that digest is unique per (did, repository) in one
// batch call.
//
// Implementation: one multi-row INSERT per sub-batch, followed by one SELECT
// to fetch ids back (libsql's RETURNING support across replica modes is
// uneven; a second SELECT is reliable and still a single round-trip per
// sub-batch).
func BatchInsertManifests(db DBTX, manifests []Manifest) (map[string]int64, error) {
out := make(map[string]int64, len(manifests))
if len(manifests) == 0 {
return out, nil
}
for i := 0; i*BatchSize < len(manifests); i++ {
start, end := chunk(len(manifests), i)
batch := manifests[start:end]
const cols = 11
args := make([]any, 0, len(batch)*cols)
for _, m := range batch {
args = append(args,
m.DID, m.Repository, m.Digest, m.HoldEndpoint,
m.SchemaVersion, m.MediaType, m.ConfigDigest,
m.ConfigSize, m.ArtifactType,
nullString(m.SubjectDigest),
m.CreatedAt,
)
}
query := `
INSERT INTO manifests
(did, repository, digest, hold_endpoint, schema_version, media_type,
config_digest, config_size, artifact_type, subject_digest, created_at)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository, digest) DO UPDATE SET
hold_endpoint = excluded.hold_endpoint,
schema_version = excluded.schema_version,
media_type = excluded.media_type,
config_digest = excluded.config_digest,
config_size = excluded.config_size,
artifact_type = excluded.artifact_type,
subject_digest = excluded.subject_digest
WHERE excluded.hold_endpoint != manifests.hold_endpoint
OR excluded.schema_version != manifests.schema_version
OR excluded.media_type != manifests.media_type
OR excluded.config_digest IS NOT manifests.config_digest
OR excluded.config_size IS NOT manifests.config_size
OR excluded.artifact_type != manifests.artifact_type
OR excluded.subject_digest IS NOT manifests.subject_digest
`
if _, err := db.Exec(query, args...); err != nil {
return nil, fmt.Errorf("batch insert manifests: %w", err)
}
// Fetch ids for this sub-batch by (did, digest) — digests are unique enough
// that matching on (did, digest) avoids needing a three-column IN list.
// repository is included in the row to disambiguate if a user genuinely has
// the same digest across repos.
selectArgs := make([]any, 0, 1+2*len(batch))
// Group by did (caller usually supplies one did per call, but be safe).
didSet := make(map[string]struct{})
for _, m := range batch {
didSet[m.DID] = struct{}{}
}
// Build a per-did IN (?) query; usually exactly one iteration.
for did := range didSet {
digests := make([]string, 0, len(batch))
for _, m := range batch {
if m.DID == did {
digests = append(digests, m.Digest)
}
}
selectArgs = append(selectArgs[:0], did)
for _, d := range digests {
selectArgs = append(selectArgs, d)
}
selectQuery := `
SELECT repository, digest, id FROM manifests
WHERE did = ? AND digest IN (` +
strings.TrimSuffix(strings.Repeat("?,", len(digests)), ",") + `)
`
rows, err := db.Query(selectQuery, selectArgs...)
if err != nil {
return nil, fmt.Errorf("batch select manifest ids: %w", err)
}
for rows.Next() {
var repo, digest string
var id int64
if err := rows.Scan(&repo, &digest, &id); err != nil {
rows.Close()
return nil, fmt.Errorf("scan manifest id: %w", err)
}
// Key format matches what callers use: "did|repo|digest".
out[manifestKey(did, repo, digest)] = id
}
rows.Close()
}
}
return out, nil
}
// ManifestKey builds the lookup key used by BatchInsertManifests' result map.
// Callers construct the same key from their in-memory Manifest structs to
// find the assigned id.
func ManifestKey(did, repository, digest string) string {
return manifestKey(did, repository, digest)
}
func manifestKey(did, repository, digest string) string {
return did + "|" + repository + "|" + digest
}
// BatchInsertLayers inserts a batch of layers, skipping any that already exist.
// Layers are immutable, so ON CONFLICT DO NOTHING matches the single-row
// InsertLayer semantics.
func BatchInsertLayers(db DBTX, layers []Layer) error {
if len(layers) == 0 {
return nil
}
for i := 0; i*BatchSize < len(layers); i++ {
start, end := chunk(len(layers), i)
batch := layers[start:end]
const cols = 6
args := make([]any, 0, len(batch)*cols)
for _, l := range batch {
var annotationsJSON any
if len(l.Annotations) > 0 {
b, err := json.Marshal(l.Annotations)
if err != nil {
return fmt.Errorf("marshal layer annotations: %w", err)
}
s := string(b)
annotationsJSON = &s
}
args = append(args, l.ManifestID, l.Digest, l.Size, l.MediaType, l.LayerIndex, annotationsJSON)
}
query := `
INSERT INTO layers (manifest_id, digest, size, media_type, layer_index, annotations)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(manifest_id, layer_index) DO NOTHING
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch insert layers: %w", err)
}
}
return nil
}
// BatchInsertManifestReferences inserts a batch of manifest references.
// The table has PRIMARY KEY(manifest_id, reference_index); duplicates skip.
func BatchInsertManifestReferences(db DBTX, refs []ManifestReference) error {
if len(refs) == 0 {
return nil
}
for i := 0; i*BatchSize < len(refs); i++ {
start, end := chunk(len(refs), i)
batch := refs[start:end]
const cols = 10
args := make([]any, 0, len(batch)*cols)
for _, r := range batch {
args = append(args,
r.ManifestID, r.Digest, r.Size, r.MediaType,
r.PlatformArchitecture, r.PlatformOS,
r.PlatformVariant, r.PlatformOSVersion,
r.IsAttestation, r.ReferenceIndex,
)
}
query := `
INSERT INTO manifest_references (manifest_id, digest, size, media_type,
platform_architecture, platform_os,
platform_variant, platform_os_version,
is_attestation, reference_index)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(manifest_id, reference_index) DO NOTHING
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch insert manifest references: %w", err)
}
}
return nil
}
// BatchUpsertTags upserts a batch of tag records, matching UpsertTag semantics.
func BatchUpsertTags(db DBTX, tags []Tag) error {
if len(tags) == 0 {
return nil
}
for i := 0; i*BatchSize < len(tags); i++ {
start, end := chunk(len(tags), i)
batch := tags[start:end]
const cols = 5
args := make([]any, 0, len(batch)*cols)
for _, t := range batch {
args = append(args, t.DID, t.Repository, t.Tag, t.Digest, t.CreatedAt)
}
query := `
INSERT INTO tags (did, repository, tag, digest, created_at)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository, tag) DO UPDATE SET
digest = excluded.digest,
created_at = excluded.created_at
WHERE excluded.digest != tags.digest
OR excluded.created_at != tags.created_at
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert tags: %w", err)
}
}
return nil
}
// StarInput is a struct projection of the UpsertStar argument list for use with BatchUpsertStars.
type StarInput struct {
StarrerDID string
OwnerDID string
Repository string
CreatedAt time.Time
}
// BatchUpsertStars upserts a batch of stars. Stars are immutable.
func BatchUpsertStars(db DBTX, stars []StarInput) error {
if len(stars) == 0 {
return nil
}
for i := 0; i*BatchSize < len(stars); i++ {
start, end := chunk(len(stars), i)
batch := stars[start:end]
const cols = 4
args := make([]any, 0, len(batch)*cols)
for _, s := range batch {
args = append(args, s.StarrerDID, s.OwnerDID, s.Repository, s.CreatedAt)
}
query := `
INSERT INTO stars (starrer_did, owner_did, repository, created_at)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(starrer_did, owner_did, repository) DO NOTHING
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert stars: %w", err)
}
}
return nil
}
// BatchUpsertRepoPages upserts a batch of repo page records.
func BatchUpsertRepoPages(db DBTX, pages []RepoPage) error {
if len(pages) == 0 {
return nil
}
for i := 0; i*BatchSize < len(pages); i++ {
start, end := chunk(len(pages), i)
batch := pages[start:end]
const cols = 7
args := make([]any, 0, len(batch)*cols)
for _, p := range batch {
args = append(args,
p.DID, p.Repository, p.Description, p.AvatarCID,
p.UserEdited, p.CreatedAt, p.UpdatedAt,
)
}
query := `
INSERT INTO repo_pages (did, repository, description, avatar_cid, user_edited, created_at, updated_at)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository) DO UPDATE SET
description = excluded.description,
avatar_cid = excluded.avatar_cid,
user_edited = excluded.user_edited,
updated_at = excluded.updated_at
WHERE excluded.description IS NOT repo_pages.description
OR excluded.avatar_cid IS NOT repo_pages.avatar_cid
OR excluded.user_edited IS NOT repo_pages.user_edited
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert repo pages: %w", err)
}
}
return nil
}
// BatchUpsertDailyStats upserts a batch of daily stats rows.
func BatchUpsertDailyStats(db DBTX, stats []DailyStats) error {
if len(stats) == 0 {
return nil
}
for i := 0; i*BatchSize < len(stats); i++ {
start, end := chunk(len(stats), i)
batch := stats[start:end]
const cols = 5
args := make([]any, 0, len(batch)*cols)
for _, s := range batch {
args = append(args, s.DID, s.Repository, s.Date, s.PullCount, s.PushCount)
}
query := `
INSERT INTO repository_stats_daily (did, repository, date, pull_count, push_count)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository, date) DO UPDATE SET
pull_count = excluded.pull_count,
push_count = excluded.push_count
WHERE excluded.pull_count != repository_stats_daily.pull_count
OR excluded.push_count != repository_stats_daily.push_count
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert daily stats: %w", err)
}
}
return nil
}
// BatchUpsertRepositoryStats upserts aggregated repository stats.
func BatchUpsertRepositoryStats(db DBTX, stats []RepositoryStats) error {
if len(stats) == 0 {
return nil
}
for i := 0; i*BatchSize < len(stats); i++ {
start, end := chunk(len(stats), i)
batch := stats[start:end]
const cols = 6
args := make([]any, 0, len(batch)*cols)
for _, s := range batch {
args = append(args,
s.DID, s.Repository, s.PullCount, s.LastPull, s.PushCount, s.LastPush,
)
}
query := `
INSERT INTO repository_stats (did, repository, pull_count, last_pull, push_count, last_push)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository) DO UPDATE SET
pull_count = excluded.pull_count,
last_pull = excluded.last_pull,
push_count = excluded.push_count,
last_push = excluded.last_push
WHERE excluded.pull_count != repository_stats.pull_count
OR excluded.last_pull IS NOT repository_stats.last_pull
OR excluded.push_count != repository_stats.push_count
OR excluded.last_push IS NOT repository_stats.last_push
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert repository stats: %w", err)
}
}
return nil
}
// BatchUpsertCaptainRecords upserts a batch of captain records.
func BatchUpsertCaptainRecords(db DBTX, records []HoldCaptainRecord) error {
if len(records) == 0 {
return nil
}
for i := 0; i*BatchSize < len(records); i++ {
start, end := chunk(len(records), i)
batch := records[start:end]
const cols = 8
args := make([]any, 0, len(batch)*cols)
for _, r := range batch {
args = append(args,
r.HoldDID, r.OwnerDID, r.Public, r.AllowAllCrew,
nullString(r.DeployedAt),
nullString(r.Region),
nullString(r.Successor),
r.UpdatedAt,
)
}
query := `
INSERT INTO hold_captain_records (
hold_did, owner_did, public, allow_all_crew,
deployed_at, region, successor, updated_at
) VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(hold_did) DO UPDATE SET
owner_did = excluded.owner_did,
public = excluded.public,
allow_all_crew = excluded.allow_all_crew,
deployed_at = excluded.deployed_at,
region = excluded.region,
successor = excluded.successor,
updated_at = excluded.updated_at
WHERE excluded.owner_did != hold_captain_records.owner_did
OR excluded.public != hold_captain_records.public
OR excluded.allow_all_crew != hold_captain_records.allow_all_crew
OR excluded.deployed_at IS NOT hold_captain_records.deployed_at
OR excluded.region IS NOT hold_captain_records.region
OR excluded.successor IS NOT hold_captain_records.successor
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert captain records: %w", err)
}
}
return nil
}
// BatchUpsertCrewMembers upserts a batch of crew members.
func BatchUpsertCrewMembers(db DBTX, members []CrewMember) error {
if len(members) == 0 {
return nil
}
for i := 0; i*BatchSize < len(members); i++ {
start, end := chunk(len(members), i)
batch := members[start:end]
// updated_at uses CURRENT_TIMESTAMP literal, so it's not a placeholder.
const cols = 7
args := make([]any, 0, len(batch)*cols)
for _, m := range batch {
args = append(args,
m.HoldDID, m.MemberDID, m.Rkey,
nullString(m.Role),
nullString(m.Permissions),
nullString(m.Tier),
nullString(m.AddedAt),
)
}
// Replace each group with `(?,?,?,?,?,?,?,CURRENT_TIMESTAMP)` — we build it
// manually because buildPlaceholders only handles uniform placeholders.
group := "(" + strings.Repeat("?,", cols) + "CURRENT_TIMESTAMP)"
var sb strings.Builder
sb.Grow((len(group) + 1) * len(batch))
for i := 0; i < len(batch); i++ {
if i > 0 {
sb.WriteByte(',')
}
sb.WriteString(group)
}
query := `
INSERT INTO hold_crew_members (
hold_did, member_did, rkey, role, permissions, tier, added_at, updated_at
) VALUES ` + sb.String() + `
ON CONFLICT(hold_did, member_did) DO UPDATE SET
rkey = excluded.rkey,
role = excluded.role,
permissions = excluded.permissions,
tier = excluded.tier,
added_at = excluded.added_at,
updated_at = CURRENT_TIMESTAMP
WHERE excluded.rkey != hold_crew_members.rkey
OR excluded.role IS NOT hold_crew_members.role
OR excluded.permissions IS NOT hold_crew_members.permissions
OR excluded.tier IS NOT hold_crew_members.tier
OR excluded.added_at IS NOT hold_crew_members.added_at
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert crew members: %w", err)
}
}
return nil
}
// AnnotationRow represents a single key/value annotation for a repository,
// used by BatchUpsertRepositoryAnnotations.
type AnnotationRow struct {
DID string
Repository string
Key string
Value string
}
// BatchUpsertRepositoryAnnotations upserts annotation rows and deletes any
// stale keys for each (did, repository) represented in the input. The caller
// is responsible for pre-filtering: rows should represent only repositories
// whose newest manifest has at least one non-empty annotation, matching the
// single-row UpsertRepositoryAnnotations semantics.
func BatchUpsertRepositoryAnnotations(db DBTX, rows []AnnotationRow) error {
if len(rows) == 0 {
return nil
}
// Group rows by (did, repository) so we can delete stale keys per repo.
type repoKey struct{ did, repo string }
keysByRepo := make(map[repoKey][]string)
for _, r := range rows {
k := repoKey{r.DID, r.Repository}
keysByRepo[k] = append(keysByRepo[k], r.Key)
}
// Delete stale keys per repository in one statement each. We could batch
// further with OR chains, but DELETE is cheap and each repo has few keys.
for k, keys := range keysByRepo {
placeholders := strings.TrimSuffix(strings.Repeat("?,", len(keys)), ",")
args := make([]any, 0, 2+len(keys))
args = append(args, k.did, k.repo)
for _, key := range keys {
args = append(args, key)
}
if _, err := db.Exec(`
DELETE FROM repository_annotations
WHERE did = ? AND repository = ? AND key NOT IN (`+placeholders+`)
`, args...); err != nil {
return fmt.Errorf("batch delete stale annotations: %w", err)
}
}
// Upsert all annotation rows in sub-batches.
now := time.Now()
for i := 0; i*BatchSize < len(rows); i++ {
start, end := chunk(len(rows), i)
batch := rows[start:end]
const cols = 5
args := make([]any, 0, len(batch)*cols)
for _, r := range batch {
args = append(args, r.DID, r.Repository, r.Key, r.Value, now)
}
query := `
INSERT INTO repository_annotations (did, repository, key, value, updated_at)
VALUES ` + buildPlaceholders(len(batch), cols) + `
ON CONFLICT(did, repository, key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
WHERE excluded.value != repository_annotations.value
`
if _, err := db.Exec(query, args...); err != nil {
return fmt.Errorf("batch upsert annotations: %w", err)
}
}
return nil
}