mirror of
https://tangled.org/evan.jarrett.net/at-container-registry
synced 2026-04-19 16:15:01 +00:00
1770 lines
53 KiB
Go
1770 lines
53 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// BlobCDNURL returns the CDN URL for an ATProto blob
|
|
// This is a local copy to avoid importing atproto (prevents circular dependencies)
|
|
func BlobCDNURL(did, cid string) string {
|
|
return fmt.Sprintf("https://imgs.blue/%s/%s", did, cid)
|
|
}
|
|
|
|
// escapeLikePattern escapes SQL LIKE wildcards (%, _) and backslash for safe searching.
|
|
// It also sanitizes the input to prevent injection attacks via special characters.
|
|
func escapeLikePattern(s string) string {
|
|
// Remove NULL bytes (could truncate query in C-based databases like SQLite)
|
|
s = strings.ReplaceAll(s, "\x00", "")
|
|
|
|
// Remove other control characters that could cause issues
|
|
s = strings.Map(func(r rune) rune {
|
|
// Keep printable characters, spaces, and common punctuation
|
|
if r < 32 && r != '\t' && r != '\n' && r != '\r' {
|
|
return -1 // Remove control characters
|
|
}
|
|
return r
|
|
}, s)
|
|
|
|
// Escape LIKE wildcards - order matters! Backslash must be first
|
|
s = strings.ReplaceAll(s, "\\", "\\\\") // Escape backslash first
|
|
s = strings.ReplaceAll(s, "%", "\\%") // Escape % wildcard
|
|
s = strings.ReplaceAll(s, "_", "\\_") // Escape _ wildcard
|
|
|
|
return strings.TrimSpace(s)
|
|
}
|
|
|
|
// GetRecentPushes fetches recent pushes with pagination
|
|
func GetRecentPushes(db *sql.DB, limit, offset int, userFilter string, currentUserDID string) ([]Push, int, error) {
|
|
query := `
|
|
SELECT
|
|
u.did,
|
|
u.handle,
|
|
t.repository,
|
|
t.tag,
|
|
t.digest,
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.title'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.description'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'io.atcr.icon'), ''),
|
|
COALESCE(rs.pull_count, 0),
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = u.did AND repository = t.repository), 0),
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = u.did AND repository = t.repository), 0),
|
|
t.created_at,
|
|
m.hold_endpoint,
|
|
COALESCE(rp.avatar_cid, '')
|
|
FROM tags t
|
|
JOIN users u ON t.did = u.did
|
|
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
|
|
LEFT JOIN repository_stats rs ON t.did = rs.did AND t.repository = rs.repository
|
|
LEFT JOIN repo_pages rp ON t.did = rp.did AND t.repository = rp.repository
|
|
`
|
|
|
|
args := []any{currentUserDID}
|
|
|
|
if userFilter != "" {
|
|
query += " WHERE u.handle = ? OR u.did = ?"
|
|
args = append(args, userFilter, userFilter)
|
|
}
|
|
|
|
query += " ORDER BY t.created_at DESC LIMIT ? OFFSET ?"
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := db.Query(query, args...)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var pushes []Push
|
|
for rows.Next() {
|
|
var p Push
|
|
var isStarredInt int
|
|
var avatarCID string
|
|
if err := rows.Scan(&p.DID, &p.Handle, &p.Repository, &p.Tag, &p.Digest, &p.Title, &p.Description, &p.IconURL, &p.PullCount, &p.StarCount, &isStarredInt, &p.CreatedAt, &p.HoldEndpoint, &avatarCID); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
p.IsStarred = isStarredInt > 0
|
|
// Prefer repo page avatar over annotation icon
|
|
if avatarCID != "" {
|
|
p.IconURL = BlobCDNURL(p.DID, avatarCID)
|
|
}
|
|
pushes = append(pushes, p)
|
|
}
|
|
|
|
// Get total count
|
|
countQuery := "SELECT COUNT(*) FROM tags t JOIN users u ON t.did = u.did"
|
|
countArgs := []any{}
|
|
|
|
if userFilter != "" {
|
|
countQuery += " WHERE u.handle = ? OR u.did = ?"
|
|
countArgs = append(countArgs, userFilter, userFilter)
|
|
}
|
|
|
|
var total int
|
|
if err := db.QueryRow(countQuery, countArgs...).Scan(&total); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return pushes, total, nil
|
|
}
|
|
|
|
// SearchPushes searches for pushes matching the query across handles, DIDs, repositories, and annotations
|
|
func SearchPushes(db *sql.DB, query string, limit, offset int, currentUserDID string) ([]Push, int, error) {
|
|
// Escape LIKE wildcards so they're treated literally
|
|
query = escapeLikePattern(query)
|
|
|
|
// Prepare search pattern for LIKE queries (case-insensitive)
|
|
searchPattern := "%" + query + "%"
|
|
|
|
sqlQuery := `
|
|
SELECT DISTINCT
|
|
u.did,
|
|
u.handle,
|
|
t.repository,
|
|
t.tag,
|
|
t.digest,
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.title'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.description'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'io.atcr.icon'), ''),
|
|
COALESCE(rs.pull_count, 0),
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = u.did AND repository = t.repository), 0),
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = u.did AND repository = t.repository), 0),
|
|
t.created_at,
|
|
m.hold_endpoint,
|
|
COALESCE(rp.avatar_cid, '')
|
|
FROM tags t
|
|
JOIN users u ON t.did = u.did
|
|
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
|
|
LEFT JOIN repository_stats rs ON t.did = rs.did AND t.repository = rs.repository
|
|
LEFT JOIN repo_pages rp ON t.did = rp.did AND t.repository = rp.repository
|
|
WHERE u.handle LIKE ? ESCAPE '\'
|
|
OR u.did = ?
|
|
OR t.repository LIKE ? ESCAPE '\'
|
|
OR EXISTS (
|
|
SELECT 1 FROM repository_annotations ra
|
|
WHERE ra.did = u.did AND ra.repository = t.repository
|
|
AND ra.value LIKE ? ESCAPE '\'
|
|
)
|
|
ORDER BY t.created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`
|
|
|
|
rows, err := db.Query(sqlQuery, currentUserDID, searchPattern, query, searchPattern, searchPattern, limit, offset)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var pushes []Push
|
|
for rows.Next() {
|
|
var p Push
|
|
var isStarredInt int
|
|
var avatarCID string
|
|
if err := rows.Scan(&p.DID, &p.Handle, &p.Repository, &p.Tag, &p.Digest, &p.Title, &p.Description, &p.IconURL, &p.PullCount, &p.StarCount, &isStarredInt, &p.CreatedAt, &p.HoldEndpoint, &avatarCID); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
p.IsStarred = isStarredInt > 0
|
|
// Prefer repo page avatar over annotation icon
|
|
if avatarCID != "" {
|
|
p.IconURL = BlobCDNURL(p.DID, avatarCID)
|
|
}
|
|
pushes = append(pushes, p)
|
|
}
|
|
|
|
// Get total count
|
|
countQuery := `
|
|
SELECT COUNT(DISTINCT t.id)
|
|
FROM tags t
|
|
JOIN users u ON t.did = u.did
|
|
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
|
|
WHERE u.handle LIKE ? ESCAPE '\'
|
|
OR u.did = ?
|
|
OR t.repository LIKE ? ESCAPE '\'
|
|
OR EXISTS (
|
|
SELECT 1 FROM repository_annotations ra
|
|
WHERE ra.did = u.did AND ra.repository = t.repository
|
|
AND ra.value LIKE ? ESCAPE '\'
|
|
)
|
|
`
|
|
|
|
var total int
|
|
if err := db.QueryRow(countQuery, searchPattern, query, searchPattern, searchPattern).Scan(&total); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
return pushes, total, nil
|
|
}
|
|
|
|
// GetUserRepositories fetches all repositories for a user
|
|
func GetUserRepositories(db *sql.DB, did string) ([]Repository, error) {
|
|
// Get repository summary
|
|
rows, err := db.Query(`
|
|
SELECT
|
|
repository,
|
|
COUNT(DISTINCT tag) as tag_count,
|
|
COUNT(DISTINCT digest) as manifest_count,
|
|
MAX(created_at) as last_push
|
|
FROM (
|
|
SELECT repository, tag, digest, created_at FROM tags WHERE did = ?
|
|
UNION
|
|
SELECT repository, NULL, digest, created_at FROM manifests WHERE did = ?
|
|
)
|
|
GROUP BY repository
|
|
ORDER BY last_push DESC
|
|
`, did, did)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var repos []Repository
|
|
for rows.Next() {
|
|
var r Repository
|
|
var lastPushStr string
|
|
if err := rows.Scan(&r.Name, &r.TagCount, &r.ManifestCount, &lastPushStr); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Parse the timestamp string into time.Time
|
|
if lastPushStr != "" {
|
|
// Try multiple timestamp formats
|
|
formats := []string{
|
|
time.RFC3339Nano, // 2006-01-02T15:04:05.999999999Z07:00
|
|
"2006-01-02 15:04:05.999999999-07:00", // SQLite with microseconds and timezone
|
|
"2006-01-02 15:04:05.999999999", // SQLite with microseconds
|
|
time.RFC3339, // 2006-01-02T15:04:05Z07:00
|
|
"2006-01-02 15:04:05", // SQLite default
|
|
}
|
|
|
|
for _, format := range formats {
|
|
if t, err := time.Parse(format, lastPushStr); err == nil {
|
|
r.LastPush = t
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get tags for this repo
|
|
tagRows, err := db.Query(`
|
|
SELECT id, tag, digest, created_at
|
|
FROM tags
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
`, did, r.Name)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for tagRows.Next() {
|
|
var t Tag
|
|
t.DID = did
|
|
t.Repository = r.Name
|
|
if err := tagRows.Scan(&t.ID, &t.Tag, &t.Digest, &t.CreatedAt); err != nil {
|
|
tagRows.Close()
|
|
return nil, err
|
|
}
|
|
r.Tags = append(r.Tags, t)
|
|
}
|
|
tagRows.Close()
|
|
|
|
// Get manifests for this repo
|
|
manifestRows, err := db.Query(`
|
|
SELECT id, digest, hold_endpoint, schema_version, media_type,
|
|
config_digest, config_size, created_at
|
|
FROM manifests
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
`, did, r.Name)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for manifestRows.Next() {
|
|
var m Manifest
|
|
m.DID = did
|
|
m.Repository = r.Name
|
|
|
|
if err := manifestRows.Scan(&m.ID, &m.Digest, &m.HoldEndpoint, &m.SchemaVersion,
|
|
&m.MediaType, &m.ConfigDigest, &m.ConfigSize, &m.CreatedAt); err != nil {
|
|
manifestRows.Close()
|
|
return nil, err
|
|
}
|
|
|
|
r.Manifests = append(r.Manifests, m)
|
|
}
|
|
manifestRows.Close()
|
|
|
|
// Fetch repository-level annotations from annotations table
|
|
annotations, err := GetRepositoryAnnotations(db, did, r.Name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.Title = annotations["org.opencontainers.image.title"]
|
|
r.Description = annotations["org.opencontainers.image.description"]
|
|
r.SourceURL = annotations["org.opencontainers.image.source"]
|
|
r.DocumentationURL = annotations["org.opencontainers.image.documentation"]
|
|
r.Licenses = annotations["org.opencontainers.image.licenses"]
|
|
r.IconURL = annotations["io.atcr.icon"]
|
|
r.ReadmeURL = annotations["io.atcr.readme"]
|
|
|
|
// Check for repo page avatar (overrides annotation icon)
|
|
repoPage, err := GetRepoPage(db, did, r.Name)
|
|
if err == nil && repoPage != nil && repoPage.AvatarCID != "" {
|
|
r.IconURL = BlobCDNURL(did, repoPage.AvatarCID)
|
|
}
|
|
|
|
repos = append(repos, r)
|
|
}
|
|
|
|
return repos, nil
|
|
}
|
|
|
|
// GetRepositoryMetadata retrieves metadata for a repository from annotations table
|
|
// Returns a map of annotation key -> value for easy access in templates and handlers
|
|
func GetRepositoryMetadata(db *sql.DB, did string, repository string) (map[string]string, error) {
|
|
return GetRepositoryAnnotations(db, did, repository)
|
|
}
|
|
|
|
// GetUserByDID retrieves a user by DID
|
|
func GetUserByDID(db *sql.DB, did string) (*User, error) {
|
|
var user User
|
|
var avatar sql.NullString
|
|
err := db.QueryRow(`
|
|
SELECT did, handle, pds_endpoint, avatar, last_seen
|
|
FROM users
|
|
WHERE did = ?
|
|
`, did).Scan(&user.DID, &user.Handle, &user.PDSEndpoint, &avatar, &user.LastSeen)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Handle NULL avatar
|
|
if avatar.Valid {
|
|
user.Avatar = avatar.String
|
|
}
|
|
|
|
return &user, nil
|
|
}
|
|
|
|
// GetUserByHandle retrieves a user by handle
|
|
func GetUserByHandle(db *sql.DB, handle string) (*User, error) {
|
|
var user User
|
|
var avatar sql.NullString
|
|
err := db.QueryRow(`
|
|
SELECT did, handle, pds_endpoint, avatar, last_seen
|
|
FROM users
|
|
WHERE handle = ?
|
|
`, handle).Scan(&user.DID, &user.Handle, &user.PDSEndpoint, &avatar, &user.LastSeen)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Handle NULL avatar
|
|
if avatar.Valid {
|
|
user.Avatar = avatar.String
|
|
}
|
|
|
|
return &user, nil
|
|
}
|
|
|
|
// UpsertUser inserts or updates a user record
|
|
func UpsertUser(db *sql.DB, user *User) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(did) DO UPDATE SET
|
|
handle = excluded.handle,
|
|
pds_endpoint = excluded.pds_endpoint,
|
|
avatar = excluded.avatar,
|
|
last_seen = excluded.last_seen
|
|
`, user.DID, user.Handle, user.PDSEndpoint, user.Avatar, user.LastSeen)
|
|
return err
|
|
}
|
|
|
|
// UpsertUserIgnoreAvatar inserts or updates a user record, but preserves existing avatar on update
|
|
// This is useful when avatar fetch fails, and we don't want to overwrite an existing avatar with empty string
|
|
func UpsertUserIgnoreAvatar(db *sql.DB, user *User) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(did) DO UPDATE SET
|
|
handle = excluded.handle,
|
|
pds_endpoint = excluded.pds_endpoint,
|
|
last_seen = excluded.last_seen
|
|
`, user.DID, user.Handle, user.PDSEndpoint, user.Avatar, user.LastSeen)
|
|
return err
|
|
}
|
|
|
|
// UpdateUserLastSeen updates only the last_seen timestamp for a user
|
|
// This is more efficient than UpsertUser when only updating activity timestamp
|
|
func UpdateUserLastSeen(db *sql.DB, did string) error {
|
|
_, err := db.Exec(`
|
|
UPDATE users SET last_seen = ? WHERE did = ?
|
|
`, time.Now(), did)
|
|
return err
|
|
}
|
|
|
|
// UpdateUserHandle updates a user's handle when an identity change event is received
|
|
// This is called when Jetstream receives an identity event indicating a handle change
|
|
func UpdateUserHandle(db *sql.DB, did string, newHandle string) error {
|
|
_, err := db.Exec(`
|
|
UPDATE users SET handle = ?, last_seen = ? WHERE did = ?
|
|
`, newHandle, time.Now(), did)
|
|
return err
|
|
}
|
|
|
|
// GetManifestDigestsForDID returns all manifest digests for a DID
|
|
func GetManifestDigestsForDID(db *sql.DB, did string) ([]string, error) {
|
|
rows, err := db.Query(`
|
|
SELECT digest FROM manifests WHERE did = ?
|
|
`, did)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var digests []string
|
|
for rows.Next() {
|
|
var digest string
|
|
if err := rows.Scan(&digest); err != nil {
|
|
return nil, err
|
|
}
|
|
digests = append(digests, digest)
|
|
}
|
|
|
|
return digests, rows.Err()
|
|
}
|
|
|
|
// DeleteManifestsNotInList deletes all manifests for a DID that are not in the provided list
|
|
func DeleteManifestsNotInList(db *sql.DB, did string, keepDigests []string) error {
|
|
if len(keepDigests) == 0 {
|
|
// No manifests to keep - delete all for this DID
|
|
_, err := db.Exec(`DELETE FROM manifests WHERE did = ?`, did)
|
|
return err
|
|
}
|
|
|
|
// Build placeholders for IN clause
|
|
placeholders := make([]string, len(keepDigests))
|
|
args := []any{did}
|
|
for i, digest := range keepDigests {
|
|
placeholders[i] = "?"
|
|
args = append(args, digest)
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
DELETE FROM manifests
|
|
WHERE did = ? AND digest NOT IN (%s)
|
|
`, strings.Join(placeholders, ","))
|
|
|
|
_, err := db.Exec(query, args...)
|
|
return err
|
|
}
|
|
|
|
// GetTagsForDID returns all (repository, tag) pairs for a DID
|
|
func GetTagsForDID(db *sql.DB, did string) ([]struct{ Repository, Tag string }, error) {
|
|
rows, err := db.Query(`
|
|
SELECT repository, tag FROM tags WHERE did = ?
|
|
`, did)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var tags []struct{ Repository, Tag string }
|
|
for rows.Next() {
|
|
var t struct{ Repository, Tag string }
|
|
if err := rows.Scan(&t.Repository, &t.Tag); err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, t)
|
|
}
|
|
|
|
return tags, rows.Err()
|
|
}
|
|
|
|
// DeleteTagsNotInList deletes all tags for a DID that are not in the provided list
|
|
func DeleteTagsNotInList(db *sql.DB, did string, keepTags []struct{ Repository, Tag string }) error {
|
|
if len(keepTags) == 0 {
|
|
// No tags to keep - delete all for this DID
|
|
_, err := db.Exec(`DELETE FROM tags WHERE did = ?`, did)
|
|
return err
|
|
}
|
|
|
|
// For tags, we need to check (repository, tag) pairs
|
|
// Build a DELETE query that excludes the pairs we want to keep
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
// First, get all current tags
|
|
rows, err := tx.Query(`SELECT id, repository, tag FROM tags WHERE did = ?`, did)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var toDelete []int64
|
|
for rows.Next() {
|
|
var id int64
|
|
var repo, tag string
|
|
if err := rows.Scan(&id, &repo, &tag); err != nil {
|
|
rows.Close()
|
|
return err
|
|
}
|
|
|
|
// Check if this tag should be kept
|
|
found := false
|
|
for _, keep := range keepTags {
|
|
if keep.Repository == repo && keep.Tag == tag {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
toDelete = append(toDelete, id)
|
|
}
|
|
}
|
|
rows.Close()
|
|
|
|
// Delete tags not in keep list
|
|
for _, id := range toDelete {
|
|
if _, err := tx.Exec(`DELETE FROM tags WHERE id = ?`, id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// InsertManifest inserts or updates a manifest record
|
|
// Uses UPSERT to update core metadata if manifest already exists
|
|
// Returns the manifest ID (works correctly for both insert and update)
|
|
// Note: Annotations are stored separately in repository_annotations table
|
|
func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) {
|
|
_, err := db.Exec(`
|
|
INSERT INTO manifests
|
|
(did, repository, digest, hold_endpoint, schema_version, media_type,
|
|
config_digest, config_size, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(did, repository, digest) DO UPDATE SET
|
|
hold_endpoint = excluded.hold_endpoint,
|
|
schema_version = excluded.schema_version,
|
|
media_type = excluded.media_type,
|
|
config_digest = excluded.config_digest,
|
|
config_size = excluded.config_size
|
|
`, manifest.DID, manifest.Repository, manifest.Digest, manifest.HoldEndpoint,
|
|
manifest.SchemaVersion, manifest.MediaType, manifest.ConfigDigest,
|
|
manifest.ConfigSize, manifest.CreatedAt)
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Query for the ID (works for both insert and update)
|
|
var id int64
|
|
err = db.QueryRow(`
|
|
SELECT id FROM manifests
|
|
WHERE did = ? AND repository = ? AND digest = ?
|
|
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&id)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get manifest ID after upsert: %w", err)
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// InsertLayer inserts a new layer record
|
|
func InsertLayer(db *sql.DB, layer *Layer) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO layers (manifest_id, digest, size, media_type, layer_index)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
`, layer.ManifestID, layer.Digest, layer.Size, layer.MediaType, layer.LayerIndex)
|
|
return err
|
|
}
|
|
|
|
// UpsertTag inserts or updates a tag record
|
|
func UpsertTag(db *sql.DB, tag *Tag) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO tags (did, repository, tag, digest, created_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
ON CONFLICT(did, repository, tag) DO UPDATE SET
|
|
digest = excluded.digest,
|
|
created_at = excluded.created_at
|
|
`, tag.DID, tag.Repository, tag.Tag, tag.Digest, tag.CreatedAt)
|
|
return err
|
|
}
|
|
|
|
// DeleteTag deletes a tag record
|
|
func DeleteTag(db *sql.DB, did, repository, tag string) error {
|
|
_, err := db.Exec(`
|
|
DELETE FROM tags WHERE did = ? AND repository = ? AND tag = ?
|
|
`, did, repository, tag)
|
|
return err
|
|
}
|
|
|
|
// GetTagsWithPlatforms returns all tags for a repository with platform information
|
|
// Only multi-arch tags (manifest lists) have platform info in manifest_references
|
|
// Single-arch tags will have empty Platforms slice (platform is obvious for single-arch)
|
|
// Attestation references (unknown/unknown platforms) are filtered out but tracked via HasAttestations
|
|
func GetTagsWithPlatforms(db *sql.DB, did, repository string) ([]TagWithPlatforms, error) {
|
|
rows, err := db.Query(`
|
|
SELECT
|
|
t.id,
|
|
t.did,
|
|
t.repository,
|
|
t.tag,
|
|
t.digest,
|
|
t.created_at,
|
|
m.media_type,
|
|
COALESCE(mr.platform_os, '') as platform_os,
|
|
COALESCE(mr.platform_architecture, '') as platform_architecture,
|
|
COALESCE(mr.platform_variant, '') as platform_variant,
|
|
COALESCE(mr.platform_os_version, '') as platform_os_version,
|
|
COALESCE(mr.is_attestation, 0) as is_attestation
|
|
FROM tags t
|
|
JOIN manifests m ON t.digest = m.digest AND t.did = m.did AND t.repository = m.repository
|
|
LEFT JOIN manifest_references mr ON m.id = mr.manifest_id
|
|
WHERE t.did = ? AND t.repository = ?
|
|
ORDER BY t.created_at DESC, mr.reference_index
|
|
`, did, repository)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Group platforms by tag
|
|
tagMap := make(map[string]*TagWithPlatforms)
|
|
var tagOrder []string // Preserve order
|
|
|
|
for rows.Next() {
|
|
var t Tag
|
|
var mediaType, platformOS, platformArch, platformVariant, platformOSVersion string
|
|
var isAttestation bool
|
|
|
|
if err := rows.Scan(&t.ID, &t.DID, &t.Repository, &t.Tag, &t.Digest, &t.CreatedAt,
|
|
&mediaType, &platformOS, &platformArch, &platformVariant, &platformOSVersion, &isAttestation); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get or create TagWithPlatforms
|
|
tagKey := t.Tag
|
|
if _, exists := tagMap[tagKey]; !exists {
|
|
tagMap[tagKey] = &TagWithPlatforms{
|
|
Tag: t,
|
|
Platforms: []PlatformInfo{},
|
|
}
|
|
tagOrder = append(tagOrder, tagKey)
|
|
}
|
|
|
|
// Track if manifest list has attestations
|
|
if isAttestation {
|
|
tagMap[tagKey].HasAttestations = true
|
|
// Skip attestation references in platform display
|
|
continue
|
|
}
|
|
|
|
// Add platform info if present (only for multi-arch manifest lists)
|
|
if platformOS != "" || platformArch != "" {
|
|
tagMap[tagKey].Platforms = append(tagMap[tagKey].Platforms, PlatformInfo{
|
|
OS: platformOS,
|
|
Architecture: platformArch,
|
|
Variant: platformVariant,
|
|
OSVersion: platformOSVersion,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Convert map to slice, preserving order and setting IsMultiArch
|
|
result := make([]TagWithPlatforms, 0, len(tagMap))
|
|
for _, tagKey := range tagOrder {
|
|
tag := tagMap[tagKey]
|
|
tag.IsMultiArch = len(tag.Platforms) > 1
|
|
result = append(result, *tag)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// DeleteManifest deletes a manifest and its associated layers
|
|
// If repository is empty, deletes all manifests matching did and digest
|
|
func DeleteManifest(db *sql.DB, did, repository, digest string) error {
|
|
var err error
|
|
if repository == "" {
|
|
// Delete by DID + digest only (used when repository is unknown, e.g., Jetstream DELETE events)
|
|
_, err = db.Exec(`DELETE FROM manifests WHERE did = ? AND digest = ?`, did, digest)
|
|
} else {
|
|
// Delete specific manifest
|
|
_, err = db.Exec(`DELETE FROM manifests WHERE did = ? AND repository = ? AND digest = ?`, did, repository, digest)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetManifest fetches a single manifest by digest
|
|
// Note: Annotations are stored separately in repository_annotations table
|
|
func GetManifest(db *sql.DB, digest string) (*Manifest, error) {
|
|
var m Manifest
|
|
|
|
err := db.QueryRow(`
|
|
SELECT id, did, repository, digest, hold_endpoint, schema_version,
|
|
media_type, config_digest, config_size, created_at
|
|
FROM manifests
|
|
WHERE digest = ?
|
|
`, digest).Scan(&m.ID, &m.DID, &m.Repository, &m.Digest, &m.HoldEndpoint,
|
|
&m.SchemaVersion, &m.MediaType, &m.ConfigDigest, &m.ConfigSize,
|
|
&m.CreatedAt)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &m, nil
|
|
}
|
|
|
|
// GetNewestManifestForRepo returns the newest manifest for a specific repository
|
|
// Used by backfill to ensure annotations come from the most recent manifest
|
|
func GetNewestManifestForRepo(db *sql.DB, did, repository string) (*Manifest, error) {
|
|
var m Manifest
|
|
err := db.QueryRow(`
|
|
SELECT id, did, repository, digest, hold_endpoint, schema_version, media_type,
|
|
config_digest, config_size, created_at
|
|
FROM manifests
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`, did, repository).Scan(
|
|
&m.ID, &m.DID, &m.Repository, &m.Digest,
|
|
&m.HoldEndpoint, &m.SchemaVersion, &m.MediaType,
|
|
&m.ConfigDigest, &m.ConfigSize, &m.CreatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &m, nil
|
|
}
|
|
|
|
// GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository
|
|
// Returns empty string if no manifests exist (e.g., first push)
|
|
// This is used instead of the in-memory cache to determine which hold to use for blob operations
|
|
func GetLatestHoldDIDForRepo(db *sql.DB, did, repository string) (string, error) {
|
|
var holdDID string
|
|
err := db.QueryRow(`
|
|
SELECT hold_endpoint
|
|
FROM manifests
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`, did, repository).Scan(&holdDID)
|
|
|
|
if err == sql.ErrNoRows {
|
|
// No manifests yet - return empty string (first push case)
|
|
return "", nil
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return holdDID, nil
|
|
}
|
|
|
|
// GetRepositoriesForDID returns all unique repository names for a DID
|
|
// Used by backfill to reconcile annotations for all repositories
|
|
func GetRepositoriesForDID(db *sql.DB, did string) ([]string, error) {
|
|
rows, err := db.Query(`
|
|
SELECT DISTINCT repository
|
|
FROM manifests
|
|
WHERE did = ?
|
|
`, did)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var repositories []string
|
|
for rows.Next() {
|
|
var repo string
|
|
if err := rows.Scan(&repo); err != nil {
|
|
return nil, err
|
|
}
|
|
repositories = append(repositories, repo)
|
|
}
|
|
return repositories, rows.Err()
|
|
}
|
|
|
|
// GetLayersForManifest fetches all layers for a manifest
|
|
func GetLayersForManifest(db *sql.DB, manifestID int64) ([]Layer, error) {
|
|
rows, err := db.Query(`
|
|
SELECT manifest_id, digest, size, media_type, layer_index
|
|
FROM layers
|
|
WHERE manifest_id = ?
|
|
ORDER BY layer_index
|
|
`, manifestID)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var layers []Layer
|
|
for rows.Next() {
|
|
var l Layer
|
|
if err := rows.Scan(&l.ManifestID, &l.Digest, &l.Size, &l.MediaType, &l.LayerIndex); err != nil {
|
|
return nil, err
|
|
}
|
|
layers = append(layers, l)
|
|
}
|
|
|
|
return layers, nil
|
|
}
|
|
|
|
// InsertManifestReference inserts a new manifest reference record (for manifest lists/indexes)
|
|
func InsertManifestReference(db *sql.DB, ref *ManifestReference) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO manifest_references (manifest_id, digest, size, media_type,
|
|
platform_architecture, platform_os,
|
|
platform_variant, platform_os_version,
|
|
is_attestation, reference_index)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, ref.ManifestID, ref.Digest, ref.Size, ref.MediaType,
|
|
ref.PlatformArchitecture, ref.PlatformOS,
|
|
ref.PlatformVariant, ref.PlatformOSVersion,
|
|
ref.IsAttestation, ref.ReferenceIndex)
|
|
return err
|
|
}
|
|
|
|
// GetManifestReferencesForManifest fetches all manifest references for a manifest list/index
|
|
func GetManifestReferencesForManifest(db *sql.DB, manifestID int64) ([]ManifestReference, error) {
|
|
rows, err := db.Query(`
|
|
SELECT manifest_id, digest, size, media_type,
|
|
platform_architecture, platform_os, platform_variant, platform_os_version,
|
|
reference_index
|
|
FROM manifest_references
|
|
WHERE manifest_id = ?
|
|
ORDER BY reference_index
|
|
`, manifestID)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var refs []ManifestReference
|
|
for rows.Next() {
|
|
var r ManifestReference
|
|
var arch, os, variant, osVersion sql.NullString
|
|
if err := rows.Scan(&r.ManifestID, &r.Digest, &r.Size, &r.MediaType,
|
|
&arch, &os, &variant, &osVersion,
|
|
&r.ReferenceIndex); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert nullable strings
|
|
if arch.Valid {
|
|
r.PlatformArchitecture = arch.String
|
|
}
|
|
if os.Valid {
|
|
r.PlatformOS = os.String
|
|
}
|
|
if variant.Valid {
|
|
r.PlatformVariant = variant.String
|
|
}
|
|
if osVersion.Valid {
|
|
r.PlatformOSVersion = osVersion.String
|
|
}
|
|
|
|
refs = append(refs, r)
|
|
}
|
|
|
|
return refs, nil
|
|
}
|
|
|
|
// GetTopLevelManifests returns only manifest lists and orphaned single-arch manifests
|
|
// Filters out platform-specific manifests that are referenced by manifest lists
|
|
// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
|
|
func GetTopLevelManifests(db *sql.DB, did, repository string, limit, offset int) ([]ManifestWithMetadata, error) {
|
|
rows, err := db.Query(`
|
|
WITH manifest_list_children AS (
|
|
-- Get all digests that are children of manifest lists
|
|
SELECT DISTINCT mr.digest
|
|
FROM manifest_references mr
|
|
JOIN manifests m ON mr.manifest_id = m.id
|
|
WHERE m.did = ? AND m.repository = ?
|
|
)
|
|
SELECT
|
|
m.id, m.did, m.repository, m.digest, m.media_type,
|
|
m.schema_version, m.created_at,
|
|
m.config_digest, m.config_size, m.hold_endpoint,
|
|
GROUP_CONCAT(DISTINCT t.tag) as tags,
|
|
COUNT(DISTINCT mr.digest) as platform_count
|
|
FROM manifests m
|
|
LEFT JOIN tags t ON m.digest = t.digest AND m.did = t.did AND m.repository = t.repository
|
|
LEFT JOIN manifest_references mr ON m.id = mr.manifest_id
|
|
WHERE m.did = ? AND m.repository = ?
|
|
AND (
|
|
-- Include manifest lists
|
|
m.media_type LIKE '%index%' OR m.media_type LIKE '%manifest.list%'
|
|
OR
|
|
-- Include single-arch NOT referenced by any list
|
|
m.digest NOT IN (SELECT digest FROM manifest_list_children WHERE digest IS NOT NULL)
|
|
)
|
|
GROUP BY m.id
|
|
ORDER BY m.created_at DESC
|
|
LIMIT ? OFFSET ?
|
|
`, did, repository, did, repository, limit, offset)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var manifests []ManifestWithMetadata
|
|
for rows.Next() {
|
|
var m ManifestWithMetadata
|
|
var tags, configDigest sql.NullString
|
|
var configSize sql.NullInt64
|
|
|
|
if err := rows.Scan(
|
|
&m.ID, &m.DID, &m.Repository, &m.Digest, &m.MediaType,
|
|
&m.SchemaVersion, &m.CreatedAt,
|
|
&configDigest, &configSize, &m.HoldEndpoint,
|
|
&tags, &m.PlatformCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Set nullable fields
|
|
if configDigest.Valid {
|
|
m.ConfigDigest = configDigest.String
|
|
}
|
|
if configSize.Valid {
|
|
m.ConfigSize = configSize.Int64
|
|
}
|
|
|
|
// Parse tags
|
|
if tags.Valid && tags.String != "" {
|
|
m.Tags = strings.Split(tags.String, ",")
|
|
}
|
|
|
|
// Determine if manifest list
|
|
m.IsManifestList = strings.Contains(m.MediaType, "index") || strings.Contains(m.MediaType, "manifest.list")
|
|
|
|
manifests = append(manifests, m)
|
|
}
|
|
|
|
// Fetch platform details for multi-arch manifests AFTER closing the main query
|
|
for i := range manifests {
|
|
if manifests[i].IsManifestList {
|
|
platformRows, err := db.Query(`
|
|
SELECT
|
|
mr.platform_os,
|
|
mr.platform_architecture,
|
|
mr.platform_variant,
|
|
mr.platform_os_version,
|
|
COALESCE(mr.is_attestation, 0) as is_attestation
|
|
FROM manifest_references mr
|
|
WHERE mr.manifest_id = ?
|
|
ORDER BY mr.reference_index
|
|
`, manifests[i].ID)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
manifests[i].Platforms = []PlatformInfo{}
|
|
for platformRows.Next() {
|
|
var p PlatformInfo
|
|
var os, arch, variant, osVersion sql.NullString
|
|
var isAttestation bool
|
|
|
|
if err := platformRows.Scan(&os, &arch, &variant, &osVersion, &isAttestation); err != nil {
|
|
platformRows.Close()
|
|
return nil, err
|
|
}
|
|
|
|
// Track if manifest list has attestations
|
|
if isAttestation {
|
|
manifests[i].HasAttestations = true
|
|
// Skip attestation references in platform display
|
|
continue
|
|
}
|
|
|
|
if os.Valid {
|
|
p.OS = os.String
|
|
}
|
|
if arch.Valid {
|
|
p.Architecture = arch.String
|
|
}
|
|
if variant.Valid {
|
|
p.Variant = variant.String
|
|
}
|
|
if osVersion.Valid {
|
|
p.OSVersion = osVersion.String
|
|
}
|
|
|
|
manifests[i].Platforms = append(manifests[i].Platforms, p)
|
|
}
|
|
platformRows.Close()
|
|
|
|
manifests[i].PlatformCount = len(manifests[i].Platforms)
|
|
}
|
|
}
|
|
|
|
return manifests, nil
|
|
}
|
|
|
|
// GetManifestDetail returns a manifest with full platform details and tags
|
|
// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
|
|
func GetManifestDetail(db *sql.DB, did, repository, digest string) (*ManifestWithMetadata, error) {
|
|
// First, get the manifest and its tags
|
|
var m ManifestWithMetadata
|
|
var tags, configDigest sql.NullString
|
|
var configSize sql.NullInt64
|
|
|
|
err := db.QueryRow(`
|
|
SELECT
|
|
m.id, m.did, m.repository, m.digest, m.media_type,
|
|
m.schema_version, m.created_at,
|
|
m.config_digest, m.config_size, m.hold_endpoint,
|
|
GROUP_CONCAT(DISTINCT t.tag) as tags
|
|
FROM manifests m
|
|
LEFT JOIN tags t ON m.digest = t.digest AND m.did = t.did AND m.repository = t.repository
|
|
WHERE m.did = ? AND m.repository = ? AND m.digest = ?
|
|
GROUP BY m.id
|
|
`, did, repository, digest).Scan(
|
|
&m.ID, &m.DID, &m.Repository, &m.Digest, &m.MediaType,
|
|
&m.SchemaVersion, &m.CreatedAt,
|
|
&configDigest, &configSize, &m.HoldEndpoint,
|
|
&tags,
|
|
)
|
|
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, fmt.Errorf("manifest not found")
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// Set nullable fields
|
|
if configDigest.Valid {
|
|
m.ConfigDigest = configDigest.String
|
|
}
|
|
if configSize.Valid {
|
|
m.ConfigSize = configSize.Int64
|
|
}
|
|
|
|
// Parse tags
|
|
if tags.Valid && tags.String != "" {
|
|
m.Tags = strings.Split(tags.String, ",")
|
|
}
|
|
|
|
// Determine if manifest list
|
|
m.IsManifestList = strings.Contains(m.MediaType, "index") || strings.Contains(m.MediaType, "manifest.list")
|
|
|
|
// If this is a manifest list, get platform details
|
|
if m.IsManifestList {
|
|
platforms, err := db.Query(`
|
|
SELECT
|
|
mr.platform_os,
|
|
mr.platform_architecture,
|
|
mr.platform_variant,
|
|
mr.platform_os_version,
|
|
COALESCE(mr.is_attestation, 0) as is_attestation
|
|
FROM manifest_references mr
|
|
WHERE mr.manifest_id = ?
|
|
ORDER BY mr.reference_index
|
|
`, m.ID)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer platforms.Close()
|
|
|
|
m.Platforms = []PlatformInfo{}
|
|
for platforms.Next() {
|
|
var p PlatformInfo
|
|
var os, arch, variant, osVersion sql.NullString
|
|
var isAttestation bool
|
|
|
|
if err := platforms.Scan(&os, &arch, &variant, &osVersion, &isAttestation); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Track if manifest list has attestations
|
|
if isAttestation {
|
|
m.HasAttestations = true
|
|
// Skip attestation references in platform display
|
|
continue
|
|
}
|
|
|
|
if os.Valid {
|
|
p.OS = os.String
|
|
}
|
|
if arch.Valid {
|
|
p.Architecture = arch.String
|
|
}
|
|
if variant.Valid {
|
|
p.Variant = variant.String
|
|
}
|
|
if osVersion.Valid {
|
|
p.OSVersion = osVersion.String
|
|
}
|
|
|
|
m.Platforms = append(m.Platforms, p)
|
|
}
|
|
|
|
m.PlatformCount = len(m.Platforms)
|
|
}
|
|
|
|
return &m, nil
|
|
}
|
|
|
|
// GetFirehoseCursor retrieves the current firehose cursor
|
|
func GetFirehoseCursor(db *sql.DB) (int64, error) {
|
|
var cursor int64
|
|
err := db.QueryRow("SELECT cursor FROM firehose_cursor WHERE id = 1").Scan(&cursor)
|
|
if err == sql.ErrNoRows {
|
|
return 0, nil
|
|
}
|
|
return cursor, err
|
|
}
|
|
|
|
// UpdateFirehoseCursor updates the firehose cursor
|
|
func UpdateFirehoseCursor(db *sql.DB, cursor int64) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO firehose_cursor (id, cursor, updated_at)
|
|
VALUES (1, ?, datetime('now'))
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
cursor = excluded.cursor,
|
|
updated_at = excluded.updated_at
|
|
`, cursor)
|
|
return err
|
|
}
|
|
|
|
// IsManifestTagged checks if a manifest has any tags
|
|
func IsManifestTagged(db *sql.DB, did, repository, digest string) (bool, error) {
|
|
var count int
|
|
err := db.QueryRow(`
|
|
SELECT COUNT(*) FROM tags
|
|
WHERE did = ? AND repository = ? AND digest = ?
|
|
`, did, repository, digest).Scan(&count)
|
|
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return count > 0, nil
|
|
}
|
|
|
|
// GetManifestTags retrieves all tags for a manifest
|
|
func GetManifestTags(db *sql.DB, did, repository, digest string) ([]string, error) {
|
|
rows, err := db.Query(`
|
|
SELECT tag FROM tags
|
|
WHERE did = ? AND repository = ? AND digest = ?
|
|
ORDER BY tag
|
|
`, did, repository, digest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var tags []string
|
|
for rows.Next() {
|
|
var tag string
|
|
if err := rows.Scan(&tag); err != nil {
|
|
return nil, err
|
|
}
|
|
tags = append(tags, tag)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return tags, nil
|
|
}
|
|
|
|
// BackfillState represents the backfill progress
|
|
type BackfillState struct {
|
|
StartCursor int64
|
|
CurrentCursor int64
|
|
Completed bool
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// GetBackfillState retrieves the backfill state
|
|
func GetBackfillState(db *sql.DB) (*BackfillState, error) {
|
|
var state BackfillState
|
|
var updatedAtStr string
|
|
|
|
err := db.QueryRow(`
|
|
SELECT start_cursor, current_cursor, completed, updated_at
|
|
FROM backfill_state
|
|
WHERE id = 1
|
|
`).Scan(&state.StartCursor, &state.CurrentCursor, &state.Completed, &updatedAtStr)
|
|
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil // No backfill state exists
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Parse timestamp
|
|
if updatedAtStr != "" {
|
|
formats := []string{
|
|
time.RFC3339Nano,
|
|
"2006-01-02 15:04:05.999999999-07:00",
|
|
"2006-01-02 15:04:05.999999999",
|
|
time.RFC3339,
|
|
"2006-01-02 15:04:05",
|
|
}
|
|
for _, format := range formats {
|
|
if t, err := time.Parse(format, updatedAtStr); err == nil {
|
|
state.UpdatedAt = t
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return &state, nil
|
|
}
|
|
|
|
// UpsertBackfillState updates or creates backfill state
|
|
func UpsertBackfillState(db *sql.DB, state *BackfillState) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO backfill_state (id, start_cursor, current_cursor, completed, updated_at)
|
|
VALUES (1, ?, ?, ?, datetime('now'))
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
start_cursor = excluded.start_cursor,
|
|
current_cursor = excluded.current_cursor,
|
|
completed = excluded.completed,
|
|
updated_at = excluded.updated_at
|
|
`, state.StartCursor, state.CurrentCursor, state.Completed)
|
|
return err
|
|
}
|
|
|
|
// UpdateBackfillCursor updates just the current cursor position
|
|
func UpdateBackfillCursor(db *sql.DB, cursor int64) error {
|
|
_, err := db.Exec(`
|
|
UPDATE backfill_state
|
|
SET current_cursor = ?, updated_at = datetime('now')
|
|
WHERE id = 1
|
|
`, cursor)
|
|
return err
|
|
}
|
|
|
|
// MarkBackfillCompleted marks the backfill as completed
|
|
func MarkBackfillCompleted(db *sql.DB) error {
|
|
_, err := db.Exec(`
|
|
UPDATE backfill_state
|
|
SET completed = 1, updated_at = datetime('now')
|
|
WHERE id = 1
|
|
`)
|
|
return err
|
|
}
|
|
|
|
// GetRepository fetches a specific repository for a user
|
|
func GetRepository(db *sql.DB, did, repository string) (*Repository, error) {
|
|
// Get repository summary
|
|
var r Repository
|
|
r.Name = repository
|
|
|
|
var tagCount, manifestCount int
|
|
var lastPushStr string
|
|
|
|
err := db.QueryRow(`
|
|
SELECT
|
|
COUNT(DISTINCT tag) as tag_count,
|
|
COUNT(DISTINCT digest) as manifest_count,
|
|
MAX(created_at) as last_push
|
|
FROM (
|
|
SELECT tag, digest, created_at FROM tags WHERE did = ? AND repository = ?
|
|
UNION
|
|
SELECT NULL, digest, created_at FROM manifests WHERE did = ? AND repository = ?
|
|
)
|
|
`, did, repository, did, repository).Scan(&tagCount, &manifestCount, &lastPushStr)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.TagCount = tagCount
|
|
r.ManifestCount = manifestCount
|
|
|
|
// Parse the timestamp string into time.Time
|
|
if lastPushStr != "" {
|
|
formats := []string{
|
|
time.RFC3339Nano,
|
|
"2006-01-02 15:04:05.999999999-07:00",
|
|
"2006-01-02 15:04:05.999999999",
|
|
time.RFC3339,
|
|
"2006-01-02 15:04:05",
|
|
}
|
|
|
|
for _, format := range formats {
|
|
if t, err := time.Parse(format, lastPushStr); err == nil {
|
|
r.LastPush = t
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Get tags for this repo
|
|
tagRows, err := db.Query(`
|
|
SELECT id, tag, digest, created_at
|
|
FROM tags
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
`, did, repository)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for tagRows.Next() {
|
|
var t Tag
|
|
t.DID = did
|
|
t.Repository = repository
|
|
if err := tagRows.Scan(&t.ID, &t.Tag, &t.Digest, &t.CreatedAt); err != nil {
|
|
tagRows.Close()
|
|
return nil, err
|
|
}
|
|
r.Tags = append(r.Tags, t)
|
|
}
|
|
tagRows.Close()
|
|
|
|
// Get manifests for this repo
|
|
manifestRows, err := db.Query(`
|
|
SELECT id, digest, hold_endpoint, schema_version, media_type,
|
|
config_digest, config_size, created_at
|
|
FROM manifests
|
|
WHERE did = ? AND repository = ?
|
|
ORDER BY created_at DESC
|
|
`, did, repository)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for manifestRows.Next() {
|
|
var m Manifest
|
|
m.DID = did
|
|
m.Repository = repository
|
|
|
|
if err := manifestRows.Scan(&m.ID, &m.Digest, &m.HoldEndpoint, &m.SchemaVersion,
|
|
&m.MediaType, &m.ConfigDigest, &m.ConfigSize, &m.CreatedAt); err != nil {
|
|
manifestRows.Close()
|
|
return nil, err
|
|
}
|
|
|
|
r.Manifests = append(r.Manifests, m)
|
|
}
|
|
manifestRows.Close()
|
|
|
|
// Fetch repository-level annotations from annotations table
|
|
annotations, err := GetRepositoryAnnotations(db, did, repository)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r.Title = annotations["org.opencontainers.image.title"]
|
|
r.Description = annotations["org.opencontainers.image.description"]
|
|
r.SourceURL = annotations["org.opencontainers.image.source"]
|
|
r.DocumentationURL = annotations["org.opencontainers.image.documentation"]
|
|
r.Licenses = annotations["org.opencontainers.image.licenses"]
|
|
r.IconURL = annotations["io.atcr.icon"]
|
|
r.ReadmeURL = annotations["io.atcr.readme"]
|
|
|
|
return &r, nil
|
|
}
|
|
|
|
// GetRepositoryStats fetches stats for a repository
|
|
func GetRepositoryStats(db *sql.DB, did, repository string) (*RepositoryStats, error) {
|
|
var stats RepositoryStats
|
|
var lastPullStr, lastPushStr sql.NullString
|
|
|
|
// Get pull/push stats from repository_stats, and star count from stars table
|
|
err := db.QueryRow(`
|
|
SELECT
|
|
COALESCE(rs.did, ?) as did,
|
|
COALESCE(rs.repository, ?) as repository,
|
|
(SELECT COUNT(*) FROM stars WHERE owner_did = ? AND repository = ?) as star_count,
|
|
COALESCE(rs.pull_count, 0) as pull_count,
|
|
rs.last_pull,
|
|
COALESCE(rs.push_count, 0) as push_count,
|
|
rs.last_push
|
|
FROM (SELECT ? as did, ? as repository) AS placeholder
|
|
LEFT JOIN repository_stats rs ON rs.did = ? AND rs.repository = ?
|
|
`, did, repository, did, repository, did, repository, did, repository).Scan(&stats.DID, &stats.Repository, &stats.StarCount, &stats.PullCount, &lastPullStr, &stats.PushCount, &lastPushStr)
|
|
|
|
if err == sql.ErrNoRows {
|
|
// Return zero stats if no record exists yet
|
|
return &RepositoryStats{
|
|
DID: did,
|
|
Repository: repository,
|
|
StarCount: 0,
|
|
PullCount: 0,
|
|
PushCount: 0,
|
|
}, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Parse timestamps
|
|
if lastPullStr.Valid {
|
|
t, err := parseTimestamp(lastPullStr.String)
|
|
if err == nil {
|
|
stats.LastPull = &t
|
|
}
|
|
}
|
|
if lastPushStr.Valid {
|
|
t, err := parseTimestamp(lastPushStr.String)
|
|
if err == nil {
|
|
stats.LastPush = &t
|
|
}
|
|
}
|
|
|
|
return &stats, nil
|
|
}
|
|
|
|
// UpsertRepositoryStats inserts or updates repository stats
|
|
func UpsertRepositoryStats(db *sql.DB, stats *RepositoryStats) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repository_stats (did, repository, star_count, pull_count, last_pull, push_count, last_push)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
star_count = excluded.star_count,
|
|
pull_count = excluded.pull_count,
|
|
last_pull = excluded.last_pull,
|
|
push_count = excluded.push_count,
|
|
last_push = excluded.last_push
|
|
`, stats.DID, stats.Repository, stats.StarCount, stats.PullCount, stats.LastPull, stats.PushCount, stats.LastPush)
|
|
return err
|
|
}
|
|
|
|
// IncrementStarCount increments the star count for a repository
|
|
func IncrementStarCount(db *sql.DB, did, repository string) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repository_stats (did, repository, star_count)
|
|
VALUES (?, ?, 1)
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
star_count = star_count + 1
|
|
`, did, repository)
|
|
return err
|
|
}
|
|
|
|
// DecrementStarCount decrements the star count for a repository
|
|
func DecrementStarCount(db *sql.DB, did, repository string) error {
|
|
_, err := db.Exec(`
|
|
UPDATE repository_stats
|
|
SET star_count = MAX(0, star_count - 1)
|
|
WHERE did = ? AND repository = ?
|
|
`, did, repository)
|
|
return err
|
|
}
|
|
|
|
// UpsertStar inserts or updates a star record (idempotent)
|
|
func UpsertStar(db *sql.DB, starrerDID, ownerDID, repository string, createdAt time.Time) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO stars (starrer_did, owner_did, repository, created_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(starrer_did, owner_did, repository) DO UPDATE SET
|
|
created_at = excluded.created_at
|
|
`, starrerDID, ownerDID, repository, createdAt)
|
|
return err
|
|
}
|
|
|
|
// DeleteStar deletes a star record
|
|
func DeleteStar(db *sql.DB, starrerDID, ownerDID, repository string) error {
|
|
_, err := db.Exec(`
|
|
DELETE FROM stars
|
|
WHERE starrer_did = ? AND owner_did = ? AND repository = ?
|
|
`, starrerDID, ownerDID, repository)
|
|
return err
|
|
}
|
|
|
|
// RebuildStarCount rebuilds the star count for a specific repository from the stars table
|
|
func RebuildStarCount(db *sql.DB, ownerDID, repository string) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repository_stats (did, repository, star_count)
|
|
VALUES (?, ?, (
|
|
SELECT COUNT(*) FROM stars
|
|
WHERE owner_did = ? AND repository = ?
|
|
))
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
star_count = (
|
|
SELECT COUNT(*) FROM stars
|
|
WHERE owner_did = ? AND repository = ?
|
|
)
|
|
`, ownerDID, repository, ownerDID, repository, ownerDID, repository)
|
|
return err
|
|
}
|
|
|
|
// GetStarsForDID returns all stars created by a specific DID (for backfill reconciliation)
|
|
// Returns a map of (ownerDID, repository) -> createdAt
|
|
func GetStarsForDID(db *sql.DB, starrerDID string) (map[string]time.Time, error) {
|
|
rows, err := db.Query(`
|
|
SELECT owner_did, repository, created_at
|
|
FROM stars
|
|
WHERE starrer_did = ?
|
|
`, starrerDID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
stars := make(map[string]time.Time)
|
|
for rows.Next() {
|
|
var ownerDID, repository string
|
|
var createdAt time.Time
|
|
if err := rows.Scan(&ownerDID, &repository, &createdAt); err != nil {
|
|
return nil, err
|
|
}
|
|
key := fmt.Sprintf("%s/%s", ownerDID, repository)
|
|
stars[key] = createdAt
|
|
}
|
|
|
|
return stars, rows.Err()
|
|
}
|
|
|
|
// CleanupOrphanedTags removes tags whose manifest digest no longer exists
|
|
// This handles cases where manifests were deleted but tags pointing to them remain
|
|
func CleanupOrphanedTags(db *sql.DB, did string) error {
|
|
_, err := db.Exec(`
|
|
DELETE FROM tags
|
|
WHERE did = ?
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM manifests
|
|
WHERE manifests.did = tags.did
|
|
AND manifests.digest = tags.digest
|
|
)
|
|
`, did)
|
|
return err
|
|
}
|
|
|
|
// DeleteStarsNotInList deletes stars from the database that are not in the provided list
|
|
// This is used during backfill reconciliation to remove stars that no longer exist on PDS
|
|
func DeleteStarsNotInList(db *sql.DB, starrerDID string, foundStars map[string]time.Time) error {
|
|
// Get current stars in DB
|
|
currentStars, err := GetStarsForDID(db, starrerDID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current stars: %w", err)
|
|
}
|
|
|
|
// Find stars to delete (in DB but not on PDS)
|
|
var toDelete []struct{ ownerDID, repository string }
|
|
for key := range currentStars {
|
|
if _, exists := foundStars[key]; !exists {
|
|
parts := strings.SplitN(key, "/", 2)
|
|
if len(parts) == 2 {
|
|
toDelete = append(toDelete, struct{ ownerDID, repository string }{
|
|
ownerDID: parts[0],
|
|
repository: parts[1],
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Delete orphaned stars
|
|
for _, star := range toDelete {
|
|
if err := DeleteStar(db, starrerDID, star.ownerDID, star.repository); err != nil {
|
|
return fmt.Errorf("failed to delete star: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IncrementPullCount increments the pull count for a repository
|
|
func IncrementPullCount(db *sql.DB, did, repository string) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repository_stats (did, repository, pull_count, last_pull)
|
|
VALUES (?, ?, 1, datetime('now'))
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
pull_count = pull_count + 1,
|
|
last_pull = datetime('now')
|
|
`, did, repository)
|
|
return err
|
|
}
|
|
|
|
// IncrementPushCount increments the push count for a repository
|
|
func IncrementPushCount(db *sql.DB, did, repository string) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repository_stats (did, repository, push_count, last_push)
|
|
VALUES (?, ?, 1, datetime('now'))
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
push_count = push_count + 1,
|
|
last_push = datetime('now')
|
|
`, did, repository)
|
|
return err
|
|
}
|
|
|
|
// parseTimestamp parses a timestamp string with multiple format attempts
|
|
func parseTimestamp(s string) (time.Time, error) {
|
|
formats := []string{
|
|
time.RFC3339Nano,
|
|
"2006-01-02 15:04:05.999999999-07:00",
|
|
"2006-01-02 15:04:05.999999999",
|
|
time.RFC3339,
|
|
"2006-01-02 15:04:05",
|
|
}
|
|
for _, format := range formats {
|
|
if t, err := time.Parse(format, s); err == nil {
|
|
return t, nil
|
|
}
|
|
}
|
|
return time.Time{}, fmt.Errorf("unable to parse timestamp: %s", s)
|
|
}
|
|
|
|
// GetFeaturedRepositories fetches top repositories sorted by stars and pulls
|
|
func GetFeaturedRepositories(db *sql.DB, limit int, currentUserDID string) ([]FeaturedRepository, error) {
|
|
query := `
|
|
WITH latest_manifests AS (
|
|
SELECT did, repository, MAX(id) as latest_id
|
|
FROM manifests
|
|
GROUP BY did, repository
|
|
),
|
|
repo_stats AS (
|
|
SELECT
|
|
lm.did,
|
|
lm.repository,
|
|
COALESCE(rs.pull_count, 0) as pull_count,
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) as star_count,
|
|
(COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) * 10) as score
|
|
FROM latest_manifests lm
|
|
LEFT JOIN repository_stats rs ON lm.did = rs.did AND lm.repository = rs.repository
|
|
)
|
|
SELECT
|
|
m.did,
|
|
u.handle,
|
|
m.repository,
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.title'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.description'), ''),
|
|
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'io.atcr.icon'), ''),
|
|
rs.pull_count,
|
|
rs.star_count,
|
|
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = m.did AND repository = m.repository), 0),
|
|
COALESCE(rp.avatar_cid, '')
|
|
FROM latest_manifests lm
|
|
JOIN manifests m ON lm.latest_id = m.id
|
|
JOIN users u ON m.did = u.did
|
|
JOIN repo_stats rs ON m.did = rs.did AND m.repository = rs.repository
|
|
LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
|
|
ORDER BY rs.score DESC, rs.star_count DESC, rs.pull_count DESC, m.created_at DESC
|
|
LIMIT ?
|
|
`
|
|
|
|
rows, err := db.Query(query, currentUserDID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var featured []FeaturedRepository
|
|
for rows.Next() {
|
|
var f FeaturedRepository
|
|
var isStarredInt int
|
|
var avatarCID string
|
|
|
|
if err := rows.Scan(&f.OwnerDID, &f.OwnerHandle, &f.Repository,
|
|
&f.Title, &f.Description, &f.IconURL, &f.PullCount, &f.StarCount, &isStarredInt, &avatarCID); err != nil {
|
|
return nil, err
|
|
}
|
|
f.IsStarred = isStarredInt > 0
|
|
// Prefer repo page avatar over annotation icon
|
|
if avatarCID != "" {
|
|
f.IconURL = BlobCDNURL(f.OwnerDID, avatarCID)
|
|
}
|
|
|
|
featured = append(featured, f)
|
|
}
|
|
|
|
return featured, nil
|
|
}
|
|
|
|
// RepoPage represents a repository page record cached from PDS
|
|
type RepoPage struct {
|
|
DID string
|
|
Repository string
|
|
Description string
|
|
AvatarCID string
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// UpsertRepoPage inserts or updates a repo page record
|
|
func UpsertRepoPage(db *sql.DB, did, repository, description, avatarCID string, createdAt, updatedAt time.Time) error {
|
|
_, err := db.Exec(`
|
|
INSERT INTO repo_pages (did, repository, description, avatar_cid, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(did, repository) DO UPDATE SET
|
|
description = excluded.description,
|
|
avatar_cid = excluded.avatar_cid,
|
|
updated_at = excluded.updated_at
|
|
`, did, repository, description, avatarCID, createdAt, updatedAt)
|
|
return err
|
|
}
|
|
|
|
// GetRepoPage retrieves a repo page record
|
|
func GetRepoPage(db *sql.DB, did, repository string) (*RepoPage, error) {
|
|
var rp RepoPage
|
|
err := db.QueryRow(`
|
|
SELECT did, repository, description, avatar_cid, created_at, updated_at
|
|
FROM repo_pages
|
|
WHERE did = ? AND repository = ?
|
|
`, did, repository).Scan(&rp.DID, &rp.Repository, &rp.Description, &rp.AvatarCID, &rp.CreatedAt, &rp.UpdatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &rp, nil
|
|
}
|
|
|
|
// DeleteRepoPage deletes a repo page record
|
|
func DeleteRepoPage(db *sql.DB, did, repository string) error {
|
|
_, err := db.Exec(`
|
|
DELETE FROM repo_pages WHERE did = ? AND repository = ?
|
|
`, did, repository)
|
|
return err
|
|
}
|
|
|
|
// GetRepoPagesByDID returns all repo pages for a DID
|
|
func GetRepoPagesByDID(db *sql.DB, did string) ([]RepoPage, error) {
|
|
rows, err := db.Query(`
|
|
SELECT did, repository, description, avatar_cid, created_at, updated_at
|
|
FROM repo_pages
|
|
WHERE did = ?
|
|
`, did)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var pages []RepoPage
|
|
for rows.Next() {
|
|
var rp RepoPage
|
|
if err := rows.Scan(&rp.DID, &rp.Repository, &rp.Description, &rp.AvatarCID, &rp.CreatedAt, &rp.UpdatedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
pages = append(pages, rp)
|
|
}
|
|
return pages, rows.Err()
|
|
}
|