Files
at-container-registry/pkg/appview/db/queries.go
2025-12-29 17:02:07 -06:00

1770 lines
53 KiB
Go

package db
import (
"database/sql"
"fmt"
"strings"
"time"
)
// BlobCDNURL returns the CDN URL for an ATProto blob
// This is a local copy to avoid importing atproto (prevents circular dependencies)
func BlobCDNURL(did, cid string) string {
return fmt.Sprintf("https://imgs.blue/%s/%s", did, cid)
}
// escapeLikePattern escapes SQL LIKE wildcards (%, _) and backslash for safe searching.
// It also sanitizes the input to prevent injection attacks via special characters.
func escapeLikePattern(s string) string {
// Remove NULL bytes (could truncate query in C-based databases like SQLite)
s = strings.ReplaceAll(s, "\x00", "")
// Remove other control characters that could cause issues
s = strings.Map(func(r rune) rune {
// Keep printable characters, spaces, and common punctuation
if r < 32 && r != '\t' && r != '\n' && r != '\r' {
return -1 // Remove control characters
}
return r
}, s)
// Escape LIKE wildcards - order matters! Backslash must be first
s = strings.ReplaceAll(s, "\\", "\\\\") // Escape backslash first
s = strings.ReplaceAll(s, "%", "\\%") // Escape % wildcard
s = strings.ReplaceAll(s, "_", "\\_") // Escape _ wildcard
return strings.TrimSpace(s)
}
// GetRecentPushes fetches recent pushes with pagination
func GetRecentPushes(db *sql.DB, limit, offset int, userFilter string, currentUserDID string) ([]Push, int, error) {
query := `
SELECT
u.did,
u.handle,
t.repository,
t.tag,
t.digest,
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.title'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.description'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'io.atcr.icon'), ''),
COALESCE(rs.pull_count, 0),
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = u.did AND repository = t.repository), 0),
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = u.did AND repository = t.repository), 0),
t.created_at,
m.hold_endpoint,
COALESCE(rp.avatar_cid, '')
FROM tags t
JOIN users u ON t.did = u.did
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
LEFT JOIN repository_stats rs ON t.did = rs.did AND t.repository = rs.repository
LEFT JOIN repo_pages rp ON t.did = rp.did AND t.repository = rp.repository
`
args := []any{currentUserDID}
if userFilter != "" {
query += " WHERE u.handle = ? OR u.did = ?"
args = append(args, userFilter, userFilter)
}
query += " ORDER BY t.created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := db.Query(query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var pushes []Push
for rows.Next() {
var p Push
var isStarredInt int
var avatarCID string
if err := rows.Scan(&p.DID, &p.Handle, &p.Repository, &p.Tag, &p.Digest, &p.Title, &p.Description, &p.IconURL, &p.PullCount, &p.StarCount, &isStarredInt, &p.CreatedAt, &p.HoldEndpoint, &avatarCID); err != nil {
return nil, 0, err
}
p.IsStarred = isStarredInt > 0
// Prefer repo page avatar over annotation icon
if avatarCID != "" {
p.IconURL = BlobCDNURL(p.DID, avatarCID)
}
pushes = append(pushes, p)
}
// Get total count
countQuery := "SELECT COUNT(*) FROM tags t JOIN users u ON t.did = u.did"
countArgs := []any{}
if userFilter != "" {
countQuery += " WHERE u.handle = ? OR u.did = ?"
countArgs = append(countArgs, userFilter, userFilter)
}
var total int
if err := db.QueryRow(countQuery, countArgs...).Scan(&total); err != nil {
return nil, 0, err
}
return pushes, total, nil
}
// SearchPushes searches for pushes matching the query across handles, DIDs, repositories, and annotations
func SearchPushes(db *sql.DB, query string, limit, offset int, currentUserDID string) ([]Push, int, error) {
// Escape LIKE wildcards so they're treated literally
query = escapeLikePattern(query)
// Prepare search pattern for LIKE queries (case-insensitive)
searchPattern := "%" + query + "%"
sqlQuery := `
SELECT DISTINCT
u.did,
u.handle,
t.repository,
t.tag,
t.digest,
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.title'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'org.opencontainers.image.description'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = u.did AND repository = t.repository AND key = 'io.atcr.icon'), ''),
COALESCE(rs.pull_count, 0),
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = u.did AND repository = t.repository), 0),
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = u.did AND repository = t.repository), 0),
t.created_at,
m.hold_endpoint,
COALESCE(rp.avatar_cid, '')
FROM tags t
JOIN users u ON t.did = u.did
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
LEFT JOIN repository_stats rs ON t.did = rs.did AND t.repository = rs.repository
LEFT JOIN repo_pages rp ON t.did = rp.did AND t.repository = rp.repository
WHERE u.handle LIKE ? ESCAPE '\'
OR u.did = ?
OR t.repository LIKE ? ESCAPE '\'
OR EXISTS (
SELECT 1 FROM repository_annotations ra
WHERE ra.did = u.did AND ra.repository = t.repository
AND ra.value LIKE ? ESCAPE '\'
)
ORDER BY t.created_at DESC
LIMIT ? OFFSET ?
`
rows, err := db.Query(sqlQuery, currentUserDID, searchPattern, query, searchPattern, searchPattern, limit, offset)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var pushes []Push
for rows.Next() {
var p Push
var isStarredInt int
var avatarCID string
if err := rows.Scan(&p.DID, &p.Handle, &p.Repository, &p.Tag, &p.Digest, &p.Title, &p.Description, &p.IconURL, &p.PullCount, &p.StarCount, &isStarredInt, &p.CreatedAt, &p.HoldEndpoint, &avatarCID); err != nil {
return nil, 0, err
}
p.IsStarred = isStarredInt > 0
// Prefer repo page avatar over annotation icon
if avatarCID != "" {
p.IconURL = BlobCDNURL(p.DID, avatarCID)
}
pushes = append(pushes, p)
}
// Get total count
countQuery := `
SELECT COUNT(DISTINCT t.id)
FROM tags t
JOIN users u ON t.did = u.did
JOIN manifests m ON t.did = m.did AND t.repository = m.repository AND t.digest = m.digest
WHERE u.handle LIKE ? ESCAPE '\'
OR u.did = ?
OR t.repository LIKE ? ESCAPE '\'
OR EXISTS (
SELECT 1 FROM repository_annotations ra
WHERE ra.did = u.did AND ra.repository = t.repository
AND ra.value LIKE ? ESCAPE '\'
)
`
var total int
if err := db.QueryRow(countQuery, searchPattern, query, searchPattern, searchPattern).Scan(&total); err != nil {
return nil, 0, err
}
return pushes, total, nil
}
// GetUserRepositories fetches all repositories for a user
func GetUserRepositories(db *sql.DB, did string) ([]Repository, error) {
// Get repository summary
rows, err := db.Query(`
SELECT
repository,
COUNT(DISTINCT tag) as tag_count,
COUNT(DISTINCT digest) as manifest_count,
MAX(created_at) as last_push
FROM (
SELECT repository, tag, digest, created_at FROM tags WHERE did = ?
UNION
SELECT repository, NULL, digest, created_at FROM manifests WHERE did = ?
)
GROUP BY repository
ORDER BY last_push DESC
`, did, did)
if err != nil {
return nil, err
}
defer rows.Close()
var repos []Repository
for rows.Next() {
var r Repository
var lastPushStr string
if err := rows.Scan(&r.Name, &r.TagCount, &r.ManifestCount, &lastPushStr); err != nil {
return nil, err
}
// Parse the timestamp string into time.Time
if lastPushStr != "" {
// Try multiple timestamp formats
formats := []string{
time.RFC3339Nano, // 2006-01-02T15:04:05.999999999Z07:00
"2006-01-02 15:04:05.999999999-07:00", // SQLite with microseconds and timezone
"2006-01-02 15:04:05.999999999", // SQLite with microseconds
time.RFC3339, // 2006-01-02T15:04:05Z07:00
"2006-01-02 15:04:05", // SQLite default
}
for _, format := range formats {
if t, err := time.Parse(format, lastPushStr); err == nil {
r.LastPush = t
break
}
}
}
// Get tags for this repo
tagRows, err := db.Query(`
SELECT id, tag, digest, created_at
FROM tags
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
`, did, r.Name)
if err != nil {
return nil, err
}
for tagRows.Next() {
var t Tag
t.DID = did
t.Repository = r.Name
if err := tagRows.Scan(&t.ID, &t.Tag, &t.Digest, &t.CreatedAt); err != nil {
tagRows.Close()
return nil, err
}
r.Tags = append(r.Tags, t)
}
tagRows.Close()
// Get manifests for this repo
manifestRows, err := db.Query(`
SELECT id, digest, hold_endpoint, schema_version, media_type,
config_digest, config_size, created_at
FROM manifests
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
`, did, r.Name)
if err != nil {
return nil, err
}
for manifestRows.Next() {
var m Manifest
m.DID = did
m.Repository = r.Name
if err := manifestRows.Scan(&m.ID, &m.Digest, &m.HoldEndpoint, &m.SchemaVersion,
&m.MediaType, &m.ConfigDigest, &m.ConfigSize, &m.CreatedAt); err != nil {
manifestRows.Close()
return nil, err
}
r.Manifests = append(r.Manifests, m)
}
manifestRows.Close()
// Fetch repository-level annotations from annotations table
annotations, err := GetRepositoryAnnotations(db, did, r.Name)
if err != nil {
return nil, err
}
r.Title = annotations["org.opencontainers.image.title"]
r.Description = annotations["org.opencontainers.image.description"]
r.SourceURL = annotations["org.opencontainers.image.source"]
r.DocumentationURL = annotations["org.opencontainers.image.documentation"]
r.Licenses = annotations["org.opencontainers.image.licenses"]
r.IconURL = annotations["io.atcr.icon"]
r.ReadmeURL = annotations["io.atcr.readme"]
// Check for repo page avatar (overrides annotation icon)
repoPage, err := GetRepoPage(db, did, r.Name)
if err == nil && repoPage != nil && repoPage.AvatarCID != "" {
r.IconURL = BlobCDNURL(did, repoPage.AvatarCID)
}
repos = append(repos, r)
}
return repos, nil
}
// GetRepositoryMetadata retrieves metadata for a repository from annotations table
// Returns a map of annotation key -> value for easy access in templates and handlers
func GetRepositoryMetadata(db *sql.DB, did string, repository string) (map[string]string, error) {
return GetRepositoryAnnotations(db, did, repository)
}
// GetUserByDID retrieves a user by DID
func GetUserByDID(db *sql.DB, did string) (*User, error) {
var user User
var avatar sql.NullString
err := db.QueryRow(`
SELECT did, handle, pds_endpoint, avatar, last_seen
FROM users
WHERE did = ?
`, did).Scan(&user.DID, &user.Handle, &user.PDSEndpoint, &avatar, &user.LastSeen)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
// Handle NULL avatar
if avatar.Valid {
user.Avatar = avatar.String
}
return &user, nil
}
// GetUserByHandle retrieves a user by handle
func GetUserByHandle(db *sql.DB, handle string) (*User, error) {
var user User
var avatar sql.NullString
err := db.QueryRow(`
SELECT did, handle, pds_endpoint, avatar, last_seen
FROM users
WHERE handle = ?
`, handle).Scan(&user.DID, &user.Handle, &user.PDSEndpoint, &avatar, &user.LastSeen)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
// Handle NULL avatar
if avatar.Valid {
user.Avatar = avatar.String
}
return &user, nil
}
// UpsertUser inserts or updates a user record
func UpsertUser(db *sql.DB, user *User) error {
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(did) DO UPDATE SET
handle = excluded.handle,
pds_endpoint = excluded.pds_endpoint,
avatar = excluded.avatar,
last_seen = excluded.last_seen
`, user.DID, user.Handle, user.PDSEndpoint, user.Avatar, user.LastSeen)
return err
}
// UpsertUserIgnoreAvatar inserts or updates a user record, but preserves existing avatar on update
// This is useful when avatar fetch fails, and we don't want to overwrite an existing avatar with empty string
func UpsertUserIgnoreAvatar(db *sql.DB, user *User) error {
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, avatar, last_seen)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(did) DO UPDATE SET
handle = excluded.handle,
pds_endpoint = excluded.pds_endpoint,
last_seen = excluded.last_seen
`, user.DID, user.Handle, user.PDSEndpoint, user.Avatar, user.LastSeen)
return err
}
// UpdateUserLastSeen updates only the last_seen timestamp for a user
// This is more efficient than UpsertUser when only updating activity timestamp
func UpdateUserLastSeen(db *sql.DB, did string) error {
_, err := db.Exec(`
UPDATE users SET last_seen = ? WHERE did = ?
`, time.Now(), did)
return err
}
// UpdateUserHandle updates a user's handle when an identity change event is received
// This is called when Jetstream receives an identity event indicating a handle change
func UpdateUserHandle(db *sql.DB, did string, newHandle string) error {
_, err := db.Exec(`
UPDATE users SET handle = ?, last_seen = ? WHERE did = ?
`, newHandle, time.Now(), did)
return err
}
// GetManifestDigestsForDID returns all manifest digests for a DID
func GetManifestDigestsForDID(db *sql.DB, did string) ([]string, error) {
rows, err := db.Query(`
SELECT digest FROM manifests WHERE did = ?
`, did)
if err != nil {
return nil, err
}
defer rows.Close()
var digests []string
for rows.Next() {
var digest string
if err := rows.Scan(&digest); err != nil {
return nil, err
}
digests = append(digests, digest)
}
return digests, rows.Err()
}
// DeleteManifestsNotInList deletes all manifests for a DID that are not in the provided list
func DeleteManifestsNotInList(db *sql.DB, did string, keepDigests []string) error {
if len(keepDigests) == 0 {
// No manifests to keep - delete all for this DID
_, err := db.Exec(`DELETE FROM manifests WHERE did = ?`, did)
return err
}
// Build placeholders for IN clause
placeholders := make([]string, len(keepDigests))
args := []any{did}
for i, digest := range keepDigests {
placeholders[i] = "?"
args = append(args, digest)
}
query := fmt.Sprintf(`
DELETE FROM manifests
WHERE did = ? AND digest NOT IN (%s)
`, strings.Join(placeholders, ","))
_, err := db.Exec(query, args...)
return err
}
// GetTagsForDID returns all (repository, tag) pairs for a DID
func GetTagsForDID(db *sql.DB, did string) ([]struct{ Repository, Tag string }, error) {
rows, err := db.Query(`
SELECT repository, tag FROM tags WHERE did = ?
`, did)
if err != nil {
return nil, err
}
defer rows.Close()
var tags []struct{ Repository, Tag string }
for rows.Next() {
var t struct{ Repository, Tag string }
if err := rows.Scan(&t.Repository, &t.Tag); err != nil {
return nil, err
}
tags = append(tags, t)
}
return tags, rows.Err()
}
// DeleteTagsNotInList deletes all tags for a DID that are not in the provided list
func DeleteTagsNotInList(db *sql.DB, did string, keepTags []struct{ Repository, Tag string }) error {
if len(keepTags) == 0 {
// No tags to keep - delete all for this DID
_, err := db.Exec(`DELETE FROM tags WHERE did = ?`, did)
return err
}
// For tags, we need to check (repository, tag) pairs
// Build a DELETE query that excludes the pairs we want to keep
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// First, get all current tags
rows, err := tx.Query(`SELECT id, repository, tag FROM tags WHERE did = ?`, did)
if err != nil {
return err
}
var toDelete []int64
for rows.Next() {
var id int64
var repo, tag string
if err := rows.Scan(&id, &repo, &tag); err != nil {
rows.Close()
return err
}
// Check if this tag should be kept
found := false
for _, keep := range keepTags {
if keep.Repository == repo && keep.Tag == tag {
found = true
break
}
}
if !found {
toDelete = append(toDelete, id)
}
}
rows.Close()
// Delete tags not in keep list
for _, id := range toDelete {
if _, err := tx.Exec(`DELETE FROM tags WHERE id = ?`, id); err != nil {
return err
}
}
return tx.Commit()
}
// InsertManifest inserts or updates a manifest record
// Uses UPSERT to update core metadata if manifest already exists
// Returns the manifest ID (works correctly for both insert and update)
// Note: Annotations are stored separately in repository_annotations table
func InsertManifest(db *sql.DB, manifest *Manifest) (int64, error) {
_, err := db.Exec(`
INSERT INTO manifests
(did, repository, digest, hold_endpoint, schema_version, media_type,
config_digest, config_size, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(did, repository, digest) DO UPDATE SET
hold_endpoint = excluded.hold_endpoint,
schema_version = excluded.schema_version,
media_type = excluded.media_type,
config_digest = excluded.config_digest,
config_size = excluded.config_size
`, manifest.DID, manifest.Repository, manifest.Digest, manifest.HoldEndpoint,
manifest.SchemaVersion, manifest.MediaType, manifest.ConfigDigest,
manifest.ConfigSize, manifest.CreatedAt)
if err != nil {
return 0, err
}
// Query for the ID (works for both insert and update)
var id int64
err = db.QueryRow(`
SELECT id FROM manifests
WHERE did = ? AND repository = ? AND digest = ?
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&id)
if err != nil {
return 0, fmt.Errorf("failed to get manifest ID after upsert: %w", err)
}
return id, nil
}
// InsertLayer inserts a new layer record
func InsertLayer(db *sql.DB, layer *Layer) error {
_, err := db.Exec(`
INSERT INTO layers (manifest_id, digest, size, media_type, layer_index)
VALUES (?, ?, ?, ?, ?)
`, layer.ManifestID, layer.Digest, layer.Size, layer.MediaType, layer.LayerIndex)
return err
}
// UpsertTag inserts or updates a tag record
func UpsertTag(db *sql.DB, tag *Tag) error {
_, err := db.Exec(`
INSERT INTO tags (did, repository, tag, digest, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(did, repository, tag) DO UPDATE SET
digest = excluded.digest,
created_at = excluded.created_at
`, tag.DID, tag.Repository, tag.Tag, tag.Digest, tag.CreatedAt)
return err
}
// DeleteTag deletes a tag record
func DeleteTag(db *sql.DB, did, repository, tag string) error {
_, err := db.Exec(`
DELETE FROM tags WHERE did = ? AND repository = ? AND tag = ?
`, did, repository, tag)
return err
}
// GetTagsWithPlatforms returns all tags for a repository with platform information
// Only multi-arch tags (manifest lists) have platform info in manifest_references
// Single-arch tags will have empty Platforms slice (platform is obvious for single-arch)
// Attestation references (unknown/unknown platforms) are filtered out but tracked via HasAttestations
func GetTagsWithPlatforms(db *sql.DB, did, repository string) ([]TagWithPlatforms, error) {
rows, err := db.Query(`
SELECT
t.id,
t.did,
t.repository,
t.tag,
t.digest,
t.created_at,
m.media_type,
COALESCE(mr.platform_os, '') as platform_os,
COALESCE(mr.platform_architecture, '') as platform_architecture,
COALESCE(mr.platform_variant, '') as platform_variant,
COALESCE(mr.platform_os_version, '') as platform_os_version,
COALESCE(mr.is_attestation, 0) as is_attestation
FROM tags t
JOIN manifests m ON t.digest = m.digest AND t.did = m.did AND t.repository = m.repository
LEFT JOIN manifest_references mr ON m.id = mr.manifest_id
WHERE t.did = ? AND t.repository = ?
ORDER BY t.created_at DESC, mr.reference_index
`, did, repository)
if err != nil {
return nil, err
}
defer rows.Close()
// Group platforms by tag
tagMap := make(map[string]*TagWithPlatforms)
var tagOrder []string // Preserve order
for rows.Next() {
var t Tag
var mediaType, platformOS, platformArch, platformVariant, platformOSVersion string
var isAttestation bool
if err := rows.Scan(&t.ID, &t.DID, &t.Repository, &t.Tag, &t.Digest, &t.CreatedAt,
&mediaType, &platformOS, &platformArch, &platformVariant, &platformOSVersion, &isAttestation); err != nil {
return nil, err
}
// Get or create TagWithPlatforms
tagKey := t.Tag
if _, exists := tagMap[tagKey]; !exists {
tagMap[tagKey] = &TagWithPlatforms{
Tag: t,
Platforms: []PlatformInfo{},
}
tagOrder = append(tagOrder, tagKey)
}
// Track if manifest list has attestations
if isAttestation {
tagMap[tagKey].HasAttestations = true
// Skip attestation references in platform display
continue
}
// Add platform info if present (only for multi-arch manifest lists)
if platformOS != "" || platformArch != "" {
tagMap[tagKey].Platforms = append(tagMap[tagKey].Platforms, PlatformInfo{
OS: platformOS,
Architecture: platformArch,
Variant: platformVariant,
OSVersion: platformOSVersion,
})
}
}
// Convert map to slice, preserving order and setting IsMultiArch
result := make([]TagWithPlatforms, 0, len(tagMap))
for _, tagKey := range tagOrder {
tag := tagMap[tagKey]
tag.IsMultiArch = len(tag.Platforms) > 1
result = append(result, *tag)
}
return result, nil
}
// DeleteManifest deletes a manifest and its associated layers
// If repository is empty, deletes all manifests matching did and digest
func DeleteManifest(db *sql.DB, did, repository, digest string) error {
var err error
if repository == "" {
// Delete by DID + digest only (used when repository is unknown, e.g., Jetstream DELETE events)
_, err = db.Exec(`DELETE FROM manifests WHERE did = ? AND digest = ?`, did, digest)
} else {
// Delete specific manifest
_, err = db.Exec(`DELETE FROM manifests WHERE did = ? AND repository = ? AND digest = ?`, did, repository, digest)
}
return err
}
// GetManifest fetches a single manifest by digest
// Note: Annotations are stored separately in repository_annotations table
func GetManifest(db *sql.DB, digest string) (*Manifest, error) {
var m Manifest
err := db.QueryRow(`
SELECT id, did, repository, digest, hold_endpoint, schema_version,
media_type, config_digest, config_size, created_at
FROM manifests
WHERE digest = ?
`, digest).Scan(&m.ID, &m.DID, &m.Repository, &m.Digest, &m.HoldEndpoint,
&m.SchemaVersion, &m.MediaType, &m.ConfigDigest, &m.ConfigSize,
&m.CreatedAt)
if err != nil {
return nil, err
}
return &m, nil
}
// GetNewestManifestForRepo returns the newest manifest for a specific repository
// Used by backfill to ensure annotations come from the most recent manifest
func GetNewestManifestForRepo(db *sql.DB, did, repository string) (*Manifest, error) {
var m Manifest
err := db.QueryRow(`
SELECT id, did, repository, digest, hold_endpoint, schema_version, media_type,
config_digest, config_size, created_at
FROM manifests
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
LIMIT 1
`, did, repository).Scan(
&m.ID, &m.DID, &m.Repository, &m.Digest,
&m.HoldEndpoint, &m.SchemaVersion, &m.MediaType,
&m.ConfigDigest, &m.ConfigSize, &m.CreatedAt,
)
if err != nil {
return nil, err
}
return &m, nil
}
// GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository
// Returns empty string if no manifests exist (e.g., first push)
// This is used instead of the in-memory cache to determine which hold to use for blob operations
func GetLatestHoldDIDForRepo(db *sql.DB, did, repository string) (string, error) {
var holdDID string
err := db.QueryRow(`
SELECT hold_endpoint
FROM manifests
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
LIMIT 1
`, did, repository).Scan(&holdDID)
if err == sql.ErrNoRows {
// No manifests yet - return empty string (first push case)
return "", nil
}
if err != nil {
return "", err
}
return holdDID, nil
}
// GetRepositoriesForDID returns all unique repository names for a DID
// Used by backfill to reconcile annotations for all repositories
func GetRepositoriesForDID(db *sql.DB, did string) ([]string, error) {
rows, err := db.Query(`
SELECT DISTINCT repository
FROM manifests
WHERE did = ?
`, did)
if err != nil {
return nil, err
}
defer rows.Close()
var repositories []string
for rows.Next() {
var repo string
if err := rows.Scan(&repo); err != nil {
return nil, err
}
repositories = append(repositories, repo)
}
return repositories, rows.Err()
}
// GetLayersForManifest fetches all layers for a manifest
func GetLayersForManifest(db *sql.DB, manifestID int64) ([]Layer, error) {
rows, err := db.Query(`
SELECT manifest_id, digest, size, media_type, layer_index
FROM layers
WHERE manifest_id = ?
ORDER BY layer_index
`, manifestID)
if err != nil {
return nil, err
}
defer rows.Close()
var layers []Layer
for rows.Next() {
var l Layer
if err := rows.Scan(&l.ManifestID, &l.Digest, &l.Size, &l.MediaType, &l.LayerIndex); err != nil {
return nil, err
}
layers = append(layers, l)
}
return layers, nil
}
// InsertManifestReference inserts a new manifest reference record (for manifest lists/indexes)
func InsertManifestReference(db *sql.DB, ref *ManifestReference) error {
_, err := db.Exec(`
INSERT INTO manifest_references (manifest_id, digest, size, media_type,
platform_architecture, platform_os,
platform_variant, platform_os_version,
is_attestation, reference_index)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, ref.ManifestID, ref.Digest, ref.Size, ref.MediaType,
ref.PlatformArchitecture, ref.PlatformOS,
ref.PlatformVariant, ref.PlatformOSVersion,
ref.IsAttestation, ref.ReferenceIndex)
return err
}
// GetManifestReferencesForManifest fetches all manifest references for a manifest list/index
func GetManifestReferencesForManifest(db *sql.DB, manifestID int64) ([]ManifestReference, error) {
rows, err := db.Query(`
SELECT manifest_id, digest, size, media_type,
platform_architecture, platform_os, platform_variant, platform_os_version,
reference_index
FROM manifest_references
WHERE manifest_id = ?
ORDER BY reference_index
`, manifestID)
if err != nil {
return nil, err
}
defer rows.Close()
var refs []ManifestReference
for rows.Next() {
var r ManifestReference
var arch, os, variant, osVersion sql.NullString
if err := rows.Scan(&r.ManifestID, &r.Digest, &r.Size, &r.MediaType,
&arch, &os, &variant, &osVersion,
&r.ReferenceIndex); err != nil {
return nil, err
}
// Convert nullable strings
if arch.Valid {
r.PlatformArchitecture = arch.String
}
if os.Valid {
r.PlatformOS = os.String
}
if variant.Valid {
r.PlatformVariant = variant.String
}
if osVersion.Valid {
r.PlatformOSVersion = osVersion.String
}
refs = append(refs, r)
}
return refs, nil
}
// GetTopLevelManifests returns only manifest lists and orphaned single-arch manifests
// Filters out platform-specific manifests that are referenced by manifest lists
// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
func GetTopLevelManifests(db *sql.DB, did, repository string, limit, offset int) ([]ManifestWithMetadata, error) {
rows, err := db.Query(`
WITH manifest_list_children AS (
-- Get all digests that are children of manifest lists
SELECT DISTINCT mr.digest
FROM manifest_references mr
JOIN manifests m ON mr.manifest_id = m.id
WHERE m.did = ? AND m.repository = ?
)
SELECT
m.id, m.did, m.repository, m.digest, m.media_type,
m.schema_version, m.created_at,
m.config_digest, m.config_size, m.hold_endpoint,
GROUP_CONCAT(DISTINCT t.tag) as tags,
COUNT(DISTINCT mr.digest) as platform_count
FROM manifests m
LEFT JOIN tags t ON m.digest = t.digest AND m.did = t.did AND m.repository = t.repository
LEFT JOIN manifest_references mr ON m.id = mr.manifest_id
WHERE m.did = ? AND m.repository = ?
AND (
-- Include manifest lists
m.media_type LIKE '%index%' OR m.media_type LIKE '%manifest.list%'
OR
-- Include single-arch NOT referenced by any list
m.digest NOT IN (SELECT digest FROM manifest_list_children WHERE digest IS NOT NULL)
)
GROUP BY m.id
ORDER BY m.created_at DESC
LIMIT ? OFFSET ?
`, did, repository, did, repository, limit, offset)
if err != nil {
return nil, err
}
defer rows.Close()
var manifests []ManifestWithMetadata
for rows.Next() {
var m ManifestWithMetadata
var tags, configDigest sql.NullString
var configSize sql.NullInt64
if err := rows.Scan(
&m.ID, &m.DID, &m.Repository, &m.Digest, &m.MediaType,
&m.SchemaVersion, &m.CreatedAt,
&configDigest, &configSize, &m.HoldEndpoint,
&tags, &m.PlatformCount,
); err != nil {
return nil, err
}
// Set nullable fields
if configDigest.Valid {
m.ConfigDigest = configDigest.String
}
if configSize.Valid {
m.ConfigSize = configSize.Int64
}
// Parse tags
if tags.Valid && tags.String != "" {
m.Tags = strings.Split(tags.String, ",")
}
// Determine if manifest list
m.IsManifestList = strings.Contains(m.MediaType, "index") || strings.Contains(m.MediaType, "manifest.list")
manifests = append(manifests, m)
}
// Fetch platform details for multi-arch manifests AFTER closing the main query
for i := range manifests {
if manifests[i].IsManifestList {
platformRows, err := db.Query(`
SELECT
mr.platform_os,
mr.platform_architecture,
mr.platform_variant,
mr.platform_os_version,
COALESCE(mr.is_attestation, 0) as is_attestation
FROM manifest_references mr
WHERE mr.manifest_id = ?
ORDER BY mr.reference_index
`, manifests[i].ID)
if err != nil {
return nil, err
}
manifests[i].Platforms = []PlatformInfo{}
for platformRows.Next() {
var p PlatformInfo
var os, arch, variant, osVersion sql.NullString
var isAttestation bool
if err := platformRows.Scan(&os, &arch, &variant, &osVersion, &isAttestation); err != nil {
platformRows.Close()
return nil, err
}
// Track if manifest list has attestations
if isAttestation {
manifests[i].HasAttestations = true
// Skip attestation references in platform display
continue
}
if os.Valid {
p.OS = os.String
}
if arch.Valid {
p.Architecture = arch.String
}
if variant.Valid {
p.Variant = variant.String
}
if osVersion.Valid {
p.OSVersion = osVersion.String
}
manifests[i].Platforms = append(manifests[i].Platforms, p)
}
platformRows.Close()
manifests[i].PlatformCount = len(manifests[i].Platforms)
}
}
return manifests, nil
}
// GetManifestDetail returns a manifest with full platform details and tags
// Note: Annotations are stored separately in repository_annotations table - use GetRepositoryMetadata to fetch them
func GetManifestDetail(db *sql.DB, did, repository, digest string) (*ManifestWithMetadata, error) {
// First, get the manifest and its tags
var m ManifestWithMetadata
var tags, configDigest sql.NullString
var configSize sql.NullInt64
err := db.QueryRow(`
SELECT
m.id, m.did, m.repository, m.digest, m.media_type,
m.schema_version, m.created_at,
m.config_digest, m.config_size, m.hold_endpoint,
GROUP_CONCAT(DISTINCT t.tag) as tags
FROM manifests m
LEFT JOIN tags t ON m.digest = t.digest AND m.did = t.did AND m.repository = t.repository
WHERE m.did = ? AND m.repository = ? AND m.digest = ?
GROUP BY m.id
`, did, repository, digest).Scan(
&m.ID, &m.DID, &m.Repository, &m.Digest, &m.MediaType,
&m.SchemaVersion, &m.CreatedAt,
&configDigest, &configSize, &m.HoldEndpoint,
&tags,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("manifest not found")
}
return nil, err
}
// Set nullable fields
if configDigest.Valid {
m.ConfigDigest = configDigest.String
}
if configSize.Valid {
m.ConfigSize = configSize.Int64
}
// Parse tags
if tags.Valid && tags.String != "" {
m.Tags = strings.Split(tags.String, ",")
}
// Determine if manifest list
m.IsManifestList = strings.Contains(m.MediaType, "index") || strings.Contains(m.MediaType, "manifest.list")
// If this is a manifest list, get platform details
if m.IsManifestList {
platforms, err := db.Query(`
SELECT
mr.platform_os,
mr.platform_architecture,
mr.platform_variant,
mr.platform_os_version,
COALESCE(mr.is_attestation, 0) as is_attestation
FROM manifest_references mr
WHERE mr.manifest_id = ?
ORDER BY mr.reference_index
`, m.ID)
if err != nil {
return nil, err
}
defer platforms.Close()
m.Platforms = []PlatformInfo{}
for platforms.Next() {
var p PlatformInfo
var os, arch, variant, osVersion sql.NullString
var isAttestation bool
if err := platforms.Scan(&os, &arch, &variant, &osVersion, &isAttestation); err != nil {
return nil, err
}
// Track if manifest list has attestations
if isAttestation {
m.HasAttestations = true
// Skip attestation references in platform display
continue
}
if os.Valid {
p.OS = os.String
}
if arch.Valid {
p.Architecture = arch.String
}
if variant.Valid {
p.Variant = variant.String
}
if osVersion.Valid {
p.OSVersion = osVersion.String
}
m.Platforms = append(m.Platforms, p)
}
m.PlatformCount = len(m.Platforms)
}
return &m, nil
}
// GetFirehoseCursor retrieves the current firehose cursor
func GetFirehoseCursor(db *sql.DB) (int64, error) {
var cursor int64
err := db.QueryRow("SELECT cursor FROM firehose_cursor WHERE id = 1").Scan(&cursor)
if err == sql.ErrNoRows {
return 0, nil
}
return cursor, err
}
// UpdateFirehoseCursor updates the firehose cursor
func UpdateFirehoseCursor(db *sql.DB, cursor int64) error {
_, err := db.Exec(`
INSERT INTO firehose_cursor (id, cursor, updated_at)
VALUES (1, ?, datetime('now'))
ON CONFLICT(id) DO UPDATE SET
cursor = excluded.cursor,
updated_at = excluded.updated_at
`, cursor)
return err
}
// IsManifestTagged checks if a manifest has any tags
func IsManifestTagged(db *sql.DB, did, repository, digest string) (bool, error) {
var count int
err := db.QueryRow(`
SELECT COUNT(*) FROM tags
WHERE did = ? AND repository = ? AND digest = ?
`, did, repository, digest).Scan(&count)
if err != nil {
return false, err
}
return count > 0, nil
}
// GetManifestTags retrieves all tags for a manifest
func GetManifestTags(db *sql.DB, did, repository, digest string) ([]string, error) {
rows, err := db.Query(`
SELECT tag FROM tags
WHERE did = ? AND repository = ? AND digest = ?
ORDER BY tag
`, did, repository, digest)
if err != nil {
return nil, err
}
defer rows.Close()
var tags []string
for rows.Next() {
var tag string
if err := rows.Scan(&tag); err != nil {
return nil, err
}
tags = append(tags, tag)
}
if err := rows.Err(); err != nil {
return nil, err
}
return tags, nil
}
// BackfillState represents the backfill progress
type BackfillState struct {
StartCursor int64
CurrentCursor int64
Completed bool
UpdatedAt time.Time
}
// GetBackfillState retrieves the backfill state
func GetBackfillState(db *sql.DB) (*BackfillState, error) {
var state BackfillState
var updatedAtStr string
err := db.QueryRow(`
SELECT start_cursor, current_cursor, completed, updated_at
FROM backfill_state
WHERE id = 1
`).Scan(&state.StartCursor, &state.CurrentCursor, &state.Completed, &updatedAtStr)
if err == sql.ErrNoRows {
return nil, nil // No backfill state exists
}
if err != nil {
return nil, err
}
// Parse timestamp
if updatedAtStr != "" {
formats := []string{
time.RFC3339Nano,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
time.RFC3339,
"2006-01-02 15:04:05",
}
for _, format := range formats {
if t, err := time.Parse(format, updatedAtStr); err == nil {
state.UpdatedAt = t
break
}
}
}
return &state, nil
}
// UpsertBackfillState updates or creates backfill state
func UpsertBackfillState(db *sql.DB, state *BackfillState) error {
_, err := db.Exec(`
INSERT INTO backfill_state (id, start_cursor, current_cursor, completed, updated_at)
VALUES (1, ?, ?, ?, datetime('now'))
ON CONFLICT(id) DO UPDATE SET
start_cursor = excluded.start_cursor,
current_cursor = excluded.current_cursor,
completed = excluded.completed,
updated_at = excluded.updated_at
`, state.StartCursor, state.CurrentCursor, state.Completed)
return err
}
// UpdateBackfillCursor updates just the current cursor position
func UpdateBackfillCursor(db *sql.DB, cursor int64) error {
_, err := db.Exec(`
UPDATE backfill_state
SET current_cursor = ?, updated_at = datetime('now')
WHERE id = 1
`, cursor)
return err
}
// MarkBackfillCompleted marks the backfill as completed
func MarkBackfillCompleted(db *sql.DB) error {
_, err := db.Exec(`
UPDATE backfill_state
SET completed = 1, updated_at = datetime('now')
WHERE id = 1
`)
return err
}
// GetRepository fetches a specific repository for a user
func GetRepository(db *sql.DB, did, repository string) (*Repository, error) {
// Get repository summary
var r Repository
r.Name = repository
var tagCount, manifestCount int
var lastPushStr string
err := db.QueryRow(`
SELECT
COUNT(DISTINCT tag) as tag_count,
COUNT(DISTINCT digest) as manifest_count,
MAX(created_at) as last_push
FROM (
SELECT tag, digest, created_at FROM tags WHERE did = ? AND repository = ?
UNION
SELECT NULL, digest, created_at FROM manifests WHERE did = ? AND repository = ?
)
`, did, repository, did, repository).Scan(&tagCount, &manifestCount, &lastPushStr)
if err != nil {
return nil, err
}
r.TagCount = tagCount
r.ManifestCount = manifestCount
// Parse the timestamp string into time.Time
if lastPushStr != "" {
formats := []string{
time.RFC3339Nano,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
time.RFC3339,
"2006-01-02 15:04:05",
}
for _, format := range formats {
if t, err := time.Parse(format, lastPushStr); err == nil {
r.LastPush = t
break
}
}
}
// Get tags for this repo
tagRows, err := db.Query(`
SELECT id, tag, digest, created_at
FROM tags
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
`, did, repository)
if err != nil {
return nil, err
}
for tagRows.Next() {
var t Tag
t.DID = did
t.Repository = repository
if err := tagRows.Scan(&t.ID, &t.Tag, &t.Digest, &t.CreatedAt); err != nil {
tagRows.Close()
return nil, err
}
r.Tags = append(r.Tags, t)
}
tagRows.Close()
// Get manifests for this repo
manifestRows, err := db.Query(`
SELECT id, digest, hold_endpoint, schema_version, media_type,
config_digest, config_size, created_at
FROM manifests
WHERE did = ? AND repository = ?
ORDER BY created_at DESC
`, did, repository)
if err != nil {
return nil, err
}
for manifestRows.Next() {
var m Manifest
m.DID = did
m.Repository = repository
if err := manifestRows.Scan(&m.ID, &m.Digest, &m.HoldEndpoint, &m.SchemaVersion,
&m.MediaType, &m.ConfigDigest, &m.ConfigSize, &m.CreatedAt); err != nil {
manifestRows.Close()
return nil, err
}
r.Manifests = append(r.Manifests, m)
}
manifestRows.Close()
// Fetch repository-level annotations from annotations table
annotations, err := GetRepositoryAnnotations(db, did, repository)
if err != nil {
return nil, err
}
r.Title = annotations["org.opencontainers.image.title"]
r.Description = annotations["org.opencontainers.image.description"]
r.SourceURL = annotations["org.opencontainers.image.source"]
r.DocumentationURL = annotations["org.opencontainers.image.documentation"]
r.Licenses = annotations["org.opencontainers.image.licenses"]
r.IconURL = annotations["io.atcr.icon"]
r.ReadmeURL = annotations["io.atcr.readme"]
return &r, nil
}
// GetRepositoryStats fetches stats for a repository
func GetRepositoryStats(db *sql.DB, did, repository string) (*RepositoryStats, error) {
var stats RepositoryStats
var lastPullStr, lastPushStr sql.NullString
// Get pull/push stats from repository_stats, and star count from stars table
err := db.QueryRow(`
SELECT
COALESCE(rs.did, ?) as did,
COALESCE(rs.repository, ?) as repository,
(SELECT COUNT(*) FROM stars WHERE owner_did = ? AND repository = ?) as star_count,
COALESCE(rs.pull_count, 0) as pull_count,
rs.last_pull,
COALESCE(rs.push_count, 0) as push_count,
rs.last_push
FROM (SELECT ? as did, ? as repository) AS placeholder
LEFT JOIN repository_stats rs ON rs.did = ? AND rs.repository = ?
`, did, repository, did, repository, did, repository, did, repository).Scan(&stats.DID, &stats.Repository, &stats.StarCount, &stats.PullCount, &lastPullStr, &stats.PushCount, &lastPushStr)
if err == sql.ErrNoRows {
// Return zero stats if no record exists yet
return &RepositoryStats{
DID: did,
Repository: repository,
StarCount: 0,
PullCount: 0,
PushCount: 0,
}, nil
}
if err != nil {
return nil, err
}
// Parse timestamps
if lastPullStr.Valid {
t, err := parseTimestamp(lastPullStr.String)
if err == nil {
stats.LastPull = &t
}
}
if lastPushStr.Valid {
t, err := parseTimestamp(lastPushStr.String)
if err == nil {
stats.LastPush = &t
}
}
return &stats, nil
}
// UpsertRepositoryStats inserts or updates repository stats
func UpsertRepositoryStats(db *sql.DB, stats *RepositoryStats) error {
_, err := db.Exec(`
INSERT INTO repository_stats (did, repository, star_count, pull_count, last_pull, push_count, last_push)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(did, repository) DO UPDATE SET
star_count = excluded.star_count,
pull_count = excluded.pull_count,
last_pull = excluded.last_pull,
push_count = excluded.push_count,
last_push = excluded.last_push
`, stats.DID, stats.Repository, stats.StarCount, stats.PullCount, stats.LastPull, stats.PushCount, stats.LastPush)
return err
}
// IncrementStarCount increments the star count for a repository
func IncrementStarCount(db *sql.DB, did, repository string) error {
_, err := db.Exec(`
INSERT INTO repository_stats (did, repository, star_count)
VALUES (?, ?, 1)
ON CONFLICT(did, repository) DO UPDATE SET
star_count = star_count + 1
`, did, repository)
return err
}
// DecrementStarCount decrements the star count for a repository
func DecrementStarCount(db *sql.DB, did, repository string) error {
_, err := db.Exec(`
UPDATE repository_stats
SET star_count = MAX(0, star_count - 1)
WHERE did = ? AND repository = ?
`, did, repository)
return err
}
// UpsertStar inserts or updates a star record (idempotent)
func UpsertStar(db *sql.DB, starrerDID, ownerDID, repository string, createdAt time.Time) error {
_, err := db.Exec(`
INSERT INTO stars (starrer_did, owner_did, repository, created_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(starrer_did, owner_did, repository) DO UPDATE SET
created_at = excluded.created_at
`, starrerDID, ownerDID, repository, createdAt)
return err
}
// DeleteStar deletes a star record
func DeleteStar(db *sql.DB, starrerDID, ownerDID, repository string) error {
_, err := db.Exec(`
DELETE FROM stars
WHERE starrer_did = ? AND owner_did = ? AND repository = ?
`, starrerDID, ownerDID, repository)
return err
}
// RebuildStarCount rebuilds the star count for a specific repository from the stars table
func RebuildStarCount(db *sql.DB, ownerDID, repository string) error {
_, err := db.Exec(`
INSERT INTO repository_stats (did, repository, star_count)
VALUES (?, ?, (
SELECT COUNT(*) FROM stars
WHERE owner_did = ? AND repository = ?
))
ON CONFLICT(did, repository) DO UPDATE SET
star_count = (
SELECT COUNT(*) FROM stars
WHERE owner_did = ? AND repository = ?
)
`, ownerDID, repository, ownerDID, repository, ownerDID, repository)
return err
}
// GetStarsForDID returns all stars created by a specific DID (for backfill reconciliation)
// Returns a map of (ownerDID, repository) -> createdAt
func GetStarsForDID(db *sql.DB, starrerDID string) (map[string]time.Time, error) {
rows, err := db.Query(`
SELECT owner_did, repository, created_at
FROM stars
WHERE starrer_did = ?
`, starrerDID)
if err != nil {
return nil, err
}
defer rows.Close()
stars := make(map[string]time.Time)
for rows.Next() {
var ownerDID, repository string
var createdAt time.Time
if err := rows.Scan(&ownerDID, &repository, &createdAt); err != nil {
return nil, err
}
key := fmt.Sprintf("%s/%s", ownerDID, repository)
stars[key] = createdAt
}
return stars, rows.Err()
}
// CleanupOrphanedTags removes tags whose manifest digest no longer exists
// This handles cases where manifests were deleted but tags pointing to them remain
func CleanupOrphanedTags(db *sql.DB, did string) error {
_, err := db.Exec(`
DELETE FROM tags
WHERE did = ?
AND NOT EXISTS (
SELECT 1 FROM manifests
WHERE manifests.did = tags.did
AND manifests.digest = tags.digest
)
`, did)
return err
}
// DeleteStarsNotInList deletes stars from the database that are not in the provided list
// This is used during backfill reconciliation to remove stars that no longer exist on PDS
func DeleteStarsNotInList(db *sql.DB, starrerDID string, foundStars map[string]time.Time) error {
// Get current stars in DB
currentStars, err := GetStarsForDID(db, starrerDID)
if err != nil {
return fmt.Errorf("failed to get current stars: %w", err)
}
// Find stars to delete (in DB but not on PDS)
var toDelete []struct{ ownerDID, repository string }
for key := range currentStars {
if _, exists := foundStars[key]; !exists {
parts := strings.SplitN(key, "/", 2)
if len(parts) == 2 {
toDelete = append(toDelete, struct{ ownerDID, repository string }{
ownerDID: parts[0],
repository: parts[1],
})
}
}
}
// Delete orphaned stars
for _, star := range toDelete {
if err := DeleteStar(db, starrerDID, star.ownerDID, star.repository); err != nil {
return fmt.Errorf("failed to delete star: %w", err)
}
}
return nil
}
// IncrementPullCount increments the pull count for a repository
func IncrementPullCount(db *sql.DB, did, repository string) error {
_, err := db.Exec(`
INSERT INTO repository_stats (did, repository, pull_count, last_pull)
VALUES (?, ?, 1, datetime('now'))
ON CONFLICT(did, repository) DO UPDATE SET
pull_count = pull_count + 1,
last_pull = datetime('now')
`, did, repository)
return err
}
// IncrementPushCount increments the push count for a repository
func IncrementPushCount(db *sql.DB, did, repository string) error {
_, err := db.Exec(`
INSERT INTO repository_stats (did, repository, push_count, last_push)
VALUES (?, ?, 1, datetime('now'))
ON CONFLICT(did, repository) DO UPDATE SET
push_count = push_count + 1,
last_push = datetime('now')
`, did, repository)
return err
}
// parseTimestamp parses a timestamp string with multiple format attempts
func parseTimestamp(s string) (time.Time, error) {
formats := []string{
time.RFC3339Nano,
"2006-01-02 15:04:05.999999999-07:00",
"2006-01-02 15:04:05.999999999",
time.RFC3339,
"2006-01-02 15:04:05",
}
for _, format := range formats {
if t, err := time.Parse(format, s); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("unable to parse timestamp: %s", s)
}
// GetFeaturedRepositories fetches top repositories sorted by stars and pulls
func GetFeaturedRepositories(db *sql.DB, limit int, currentUserDID string) ([]FeaturedRepository, error) {
query := `
WITH latest_manifests AS (
SELECT did, repository, MAX(id) as latest_id
FROM manifests
GROUP BY did, repository
),
repo_stats AS (
SELECT
lm.did,
lm.repository,
COALESCE(rs.pull_count, 0) as pull_count,
COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) as star_count,
(COALESCE(rs.pull_count, 0) + COALESCE((SELECT COUNT(*) FROM stars WHERE owner_did = lm.did AND repository = lm.repository), 0) * 10) as score
FROM latest_manifests lm
LEFT JOIN repository_stats rs ON lm.did = rs.did AND lm.repository = rs.repository
)
SELECT
m.did,
u.handle,
m.repository,
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.title'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'org.opencontainers.image.description'), ''),
COALESCE((SELECT value FROM repository_annotations WHERE did = m.did AND repository = m.repository AND key = 'io.atcr.icon'), ''),
rs.pull_count,
rs.star_count,
COALESCE((SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = m.did AND repository = m.repository), 0),
COALESCE(rp.avatar_cid, '')
FROM latest_manifests lm
JOIN manifests m ON lm.latest_id = m.id
JOIN users u ON m.did = u.did
JOIN repo_stats rs ON m.did = rs.did AND m.repository = rs.repository
LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
ORDER BY rs.score DESC, rs.star_count DESC, rs.pull_count DESC, m.created_at DESC
LIMIT ?
`
rows, err := db.Query(query, currentUserDID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var featured []FeaturedRepository
for rows.Next() {
var f FeaturedRepository
var isStarredInt int
var avatarCID string
if err := rows.Scan(&f.OwnerDID, &f.OwnerHandle, &f.Repository,
&f.Title, &f.Description, &f.IconURL, &f.PullCount, &f.StarCount, &isStarredInt, &avatarCID); err != nil {
return nil, err
}
f.IsStarred = isStarredInt > 0
// Prefer repo page avatar over annotation icon
if avatarCID != "" {
f.IconURL = BlobCDNURL(f.OwnerDID, avatarCID)
}
featured = append(featured, f)
}
return featured, nil
}
// RepoPage represents a repository page record cached from PDS
type RepoPage struct {
DID string
Repository string
Description string
AvatarCID string
CreatedAt time.Time
UpdatedAt time.Time
}
// UpsertRepoPage inserts or updates a repo page record
func UpsertRepoPage(db *sql.DB, did, repository, description, avatarCID string, createdAt, updatedAt time.Time) error {
_, err := db.Exec(`
INSERT INTO repo_pages (did, repository, description, avatar_cid, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(did, repository) DO UPDATE SET
description = excluded.description,
avatar_cid = excluded.avatar_cid,
updated_at = excluded.updated_at
`, did, repository, description, avatarCID, createdAt, updatedAt)
return err
}
// GetRepoPage retrieves a repo page record
func GetRepoPage(db *sql.DB, did, repository string) (*RepoPage, error) {
var rp RepoPage
err := db.QueryRow(`
SELECT did, repository, description, avatar_cid, created_at, updated_at
FROM repo_pages
WHERE did = ? AND repository = ?
`, did, repository).Scan(&rp.DID, &rp.Repository, &rp.Description, &rp.AvatarCID, &rp.CreatedAt, &rp.UpdatedAt)
if err != nil {
return nil, err
}
return &rp, nil
}
// DeleteRepoPage deletes a repo page record
func DeleteRepoPage(db *sql.DB, did, repository string) error {
_, err := db.Exec(`
DELETE FROM repo_pages WHERE did = ? AND repository = ?
`, did, repository)
return err
}
// GetRepoPagesByDID returns all repo pages for a DID
func GetRepoPagesByDID(db *sql.DB, did string) ([]RepoPage, error) {
rows, err := db.Query(`
SELECT did, repository, description, avatar_cid, created_at, updated_at
FROM repo_pages
WHERE did = ?
`, did)
if err != nil {
return nil, err
}
defer rows.Close()
var pages []RepoPage
for rows.Next() {
var rp RepoPage
if err := rows.Scan(&rp.DID, &rp.Repository, &rp.Description, &rp.AvatarCID, &rp.CreatedAt, &rp.UpdatedAt); err != nil {
return nil, err
}
pages = append(pages, rp)
}
return pages, rows.Err()
}