Files
at-container-registry/pkg/appview/jetstream/processor_test.go
2025-12-18 11:19:49 -06:00

695 lines
20 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"testing"
"time"
"atcr.io/pkg/atproto"
_ "github.com/mattn/go-sqlite3"
)
// setupTestDB creates an in-memory SQLite database for testing
func setupTestDB(t *testing.T) *sql.DB {
database, err := sql.Open("sqlite3", ":memory:")
if err != nil {
t.Fatalf("Failed to open test database: %v", err)
}
// Create schema
schema := `
CREATE TABLE users (
did TEXT PRIMARY KEY,
handle TEXT NOT NULL,
pds_endpoint TEXT NOT NULL,
avatar TEXT,
last_seen TIMESTAMP NOT NULL
);
CREATE TABLE manifests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
did TEXT NOT NULL,
repository TEXT NOT NULL,
digest TEXT NOT NULL,
hold_endpoint TEXT NOT NULL,
schema_version INTEGER NOT NULL,
media_type TEXT NOT NULL,
config_digest TEXT,
config_size INTEGER,
created_at TIMESTAMP NOT NULL,
UNIQUE(did, repository, digest)
);
CREATE TABLE repository_annotations (
did TEXT NOT NULL,
repository TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(did, repository, key),
FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
);
CREATE TABLE layers (
manifest_id INTEGER NOT NULL,
digest TEXT NOT NULL,
size INTEGER NOT NULL,
media_type TEXT NOT NULL,
layer_index INTEGER NOT NULL,
PRIMARY KEY(manifest_id, layer_index)
);
CREATE TABLE manifest_references (
manifest_id INTEGER NOT NULL,
digest TEXT NOT NULL,
media_type TEXT NOT NULL,
size INTEGER NOT NULL,
platform_architecture TEXT,
platform_os TEXT,
platform_variant TEXT,
platform_os_version TEXT,
is_attestation BOOLEAN DEFAULT FALSE,
reference_index INTEGER NOT NULL,
PRIMARY KEY(manifest_id, reference_index)
);
CREATE TABLE tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
did TEXT NOT NULL,
repository TEXT NOT NULL,
tag TEXT NOT NULL,
digest TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
UNIQUE(did, repository, tag)
);
CREATE TABLE stars (
starrer_did TEXT NOT NULL,
owner_did TEXT NOT NULL,
repository TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
PRIMARY KEY(starrer_did, owner_did, repository)
);
`
if _, err := database.Exec(schema); err != nil {
t.Fatalf("Failed to create schema: %v", err)
}
return database
}
func TestNewProcessor(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
tests := []struct {
name string
useCache bool
}{
{"with cache", true},
{"without cache", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewProcessor(database, tt.useCache)
if p == nil {
t.Fatal("NewProcessor returned nil")
}
if p.db != database {
t.Error("Processor database not set correctly")
}
if p.useCache != tt.useCache {
t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
}
if tt.useCache && p.userCache == nil {
t.Error("Cache enabled but userCache is nil")
}
if !tt.useCache && p.userCache != nil {
t.Error("Cache disabled but userCache is not nil")
}
})
}
}
func TestProcessManifest_ImageManifest(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
// Create test manifest record
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Config: &atproto.BlobReference{
Digest: "sha256:config123",
Size: 1234,
},
Layers: []atproto.BlobReference{
{Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
{Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
},
Annotations: map[string]string{
"org.opencontainers.image.title": "Test App",
"org.opencontainers.image.description": "A test application",
"org.opencontainers.image.source": "https://github.com/test/app",
"org.opencontainers.image.licenses": "MIT",
"io.atcr.icon": "https://example.com/icon.png",
},
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Process manifest
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
if manifestID == 0 {
t.Error("Expected non-zero manifest ID")
}
// Verify manifest was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
"did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
if err != nil {
t.Fatalf("Failed to query manifests: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 manifest, got %d", count)
}
// Verify annotations were stored in repository_annotations table
var title, source string
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
"did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
if err != nil {
t.Fatalf("Failed to query title annotation: %v", err)
}
if title != "Test App" {
t.Errorf("title = %q, want %q", title, "Test App")
}
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
"did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
if err != nil {
t.Fatalf("Failed to query source annotation: %v", err)
}
if source != "https://github.com/test/app" {
t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
}
// Verify layers were inserted
var layerCount int
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
if err != nil {
t.Fatalf("Failed to query layers: %v", err)
}
if layerCount != 2 {
t.Errorf("Expected 2 layers, got %d", layerCount)
}
// Verify no manifest references (this is an image, not a list)
var refCount int
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
if err != nil {
t.Fatalf("Failed to query manifest_references: %v", err)
}
if refCount != 0 {
t.Errorf("Expected 0 manifest references, got %d", refCount)
}
}
func TestProcessManifest_ManifestList(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
// Create test manifest list record
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:list123",
MediaType: "application/vnd.oci.image.index.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Manifests: []atproto.ManifestReference{
{
Digest: "sha256:amd64manifest",
MediaType: "application/vnd.oci.image.manifest.v1+json",
Size: 1000,
Platform: &atproto.Platform{
Architecture: "amd64",
OS: "linux",
},
},
{
Digest: "sha256:arm64manifest",
MediaType: "application/vnd.oci.image.manifest.v1+json",
Size: 1100,
Platform: &atproto.Platform{
Architecture: "arm64",
OS: "linux",
Variant: "v8",
},
},
},
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Process manifest list
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
// Verify manifest references were inserted
var refCount int
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
if err != nil {
t.Fatalf("Failed to query manifest_references: %v", err)
}
if refCount != 2 {
t.Errorf("Expected 2 manifest references, got %d", refCount)
}
// Verify platform info was stored
var arch, os string
err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
if err != nil {
t.Fatalf("Failed to query platform info: %v", err)
}
if arch != "amd64" {
t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
}
if os != "linux" {
t.Errorf("platform_os = %q, want %q", os, "linux")
}
// Verify no layers (this is a list, not an image)
var layerCount int
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
if err != nil {
t.Fatalf("Failed to query layers: %v", err)
}
if layerCount != 0 {
t.Errorf("Expected 0 layers, got %d", layerCount)
}
}
func TestProcessTag(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
// Create test tag record (using ManifestDigest field for simplicity)
tagRecord := &atproto.TagRecord{
Repository: "test-app",
Tag: "latest",
ManifestDigest: "sha256:abc123",
UpdatedAt: time.Now(),
}
// Marshal to bytes for ProcessTag
recordBytes, err := json.Marshal(tagRecord)
if err != nil {
t.Fatalf("Failed to marshal tag: %v", err)
}
// Process tag
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessTag failed: %v", err)
}
// Verify tag was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&count)
if err != nil {
t.Fatalf("Failed to query tags: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 tag, got %d", count)
}
// Verify digest was stored
var digest string
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&digest)
if err != nil {
t.Fatalf("Failed to query tag digest: %v", err)
}
if digest != "sha256:abc123" {
t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
}
// Test upserting same tag with new digest
tagRecord.ManifestDigest = "sha256:newdigest"
recordBytes, err = json.Marshal(tagRecord)
if err != nil {
t.Fatalf("Failed to marshal tag: %v", err)
}
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessTag (upsert) failed: %v", err)
}
// Verify tag was updated
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&digest)
if err != nil {
t.Fatalf("Failed to query updated tag: %v", err)
}
if digest != "sha256:newdigest" {
t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
}
// Verify still only one tag (upsert, not insert)
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&count)
if err != nil {
t.Fatalf("Failed to query tags after upsert: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 tag after upsert, got %d", count)
}
}
func TestProcessStar(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
// Create test star record
starRecord := &atproto.StarRecord{
Subject: atproto.StarSubject{
DID: "did:plc:owner123",
Repository: "test-app",
},
CreatedAt: time.Now(),
}
// Marshal to bytes for ProcessStar
recordBytes, err := json.Marshal(starRecord)
if err != nil {
t.Fatalf("Failed to marshal star: %v", err)
}
// Process star
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
if err != nil {
t.Fatalf("ProcessStar failed: %v", err)
}
// Verify star was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
if err != nil {
t.Fatalf("Failed to query stars: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 star, got %d", count)
}
// Test upserting same star (should be idempotent)
recordBytes, err = json.Marshal(starRecord)
if err != nil {
t.Fatalf("Failed to marshal star: %v", err)
}
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
if err != nil {
t.Fatalf("ProcessStar (upsert) failed: %v", err)
}
// Verify still only one star
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
if err != nil {
t.Fatalf("Failed to query stars after upsert: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 star after upsert, got %d", count)
}
}
func TestProcessManifest_Duplicate(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Insert first time
id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("First ProcessManifest failed: %v", err)
}
// Insert duplicate
id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("Duplicate ProcessManifest failed: %v", err)
}
// Should return existing ID
if id1 != id2 {
t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
}
// Verify only one manifest exists
var count int
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
"did:plc:test123", "sha256:abc123").Scan(&count)
if err != nil {
t.Fatalf("Failed to query manifests: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 manifest, got %d", count)
}
}
func TestProcessManifest_EmptyAnnotations(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false)
ctx := context.Background()
// Manifest with nil annotations
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Annotations: nil,
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
_, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
// Verify no annotations were stored (nil annotations should not create entries)
var annotationCount int
err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
"did:plc:test123", "test-app").Scan(&annotationCount)
if err != nil {
t.Fatalf("Failed to query annotations: %v", err)
}
if annotationCount != 0 {
t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
}
}
func TestProcessIdentity(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false)
// Setup: Create test user
testDID := "did:plc:alice123"
testHandle := "alice.bsky.social"
testPDS := "https://bsky.social"
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, last_seen)
VALUES (?, ?, ?, ?)
`, testDID, testHandle, testPDS, time.Now())
if err != nil {
t.Fatalf("Failed to insert test user: %v", err)
}
// Test 1: Process identity change event
newHandle := "alice-new.bsky.social"
err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
// Note: This will fail to invalidate cache since we don't have a real identity directory,
// but we can still verify the database update happened
if err != nil {
t.Logf("Expected cache invalidation error (no real directory): %v", err)
}
// Verify handle was updated in database
var retrievedHandle string
err = db.QueryRow(`
SELECT handle FROM users WHERE did = ?
`, testDID).Scan(&retrievedHandle)
if err != nil {
t.Fatalf("Failed to query updated user: %v", err)
}
if retrievedHandle != newHandle {
t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
}
// Test 2: Process identity change for non-existent user
// Should not error (UPDATE just affects 0 rows)
err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
// Test 3: Process multiple identity changes
handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
for _, handle := range handles {
err = processor.ProcessIdentity(context.Background(), testDID, handle)
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
err = db.QueryRow(`
SELECT handle FROM users WHERE did = ?
`, testDID).Scan(&retrievedHandle)
if err != nil {
t.Fatalf("Failed to query user after handle update: %v", err)
}
if retrievedHandle != handle {
t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
}
}
}
func TestProcessAccount(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false)
// Setup: Create test user
testDID := "did:plc:bob456"
testHandle := "bob.bsky.social"
testPDS := "https://bsky.social"
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, last_seen)
VALUES (?, ?, ?, ?)
`, testDID, testHandle, testPDS, time.Now())
if err != nil {
t.Fatalf("Failed to insert test user: %v", err)
}
// Test 1: Process account deactivation event
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
// Note: Cache invalidation will fail without real directory, but that's expected
if err != nil {
t.Logf("Expected cache invalidation error (no real directory): %v", err)
}
// Verify user still exists in database (we don't delete on deactivation)
var exists bool
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists: %v", err)
}
if !exists {
t.Error("User should still exist after deactivation event (no deletion)")
}
// Test 2: Process account with active=true (should be ignored)
err = processor.ProcessAccount(context.Background(), testDID, true, "active")
if err != nil {
t.Errorf("Expected no error for active account, got: %v", err)
}
// Test 3: Process account with status != "deactivated" (should be ignored)
err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
if err != nil {
t.Errorf("Expected no error for non-deactivated status, got: %v", err)
}
// Test 4: Process account deactivation for non-existent user
err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
// Cache invalidation will fail, but that's expected
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
// Test 5: Process multiple deactivation events (idempotent)
for i := 0; i < 3; i++ {
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
if err != nil {
t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
}
}
// User should still exist after multiple deactivations
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
}
if !exists {
t.Error("User should still exist after multiple deactivation events")
}
}