package jetstream import ( "context" "database/sql" "encoding/json" "testing" "time" "atcr.io/pkg/atproto" _ "github.com/mattn/go-sqlite3" ) // setupTestDB creates an in-memory SQLite database for testing func setupTestDB(t *testing.T) *sql.DB { database, err := sql.Open("sqlite3", ":memory:") if err != nil { t.Fatalf("Failed to open test database: %v", err) } // Create schema schema := ` CREATE TABLE users ( did TEXT PRIMARY KEY, handle TEXT NOT NULL, pds_endpoint TEXT NOT NULL, avatar TEXT, last_seen TIMESTAMP NOT NULL ); CREATE TABLE manifests ( id INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, repository TEXT NOT NULL, digest TEXT NOT NULL, hold_endpoint TEXT NOT NULL, schema_version INTEGER NOT NULL, media_type TEXT NOT NULL, config_digest TEXT, config_size INTEGER, created_at TIMESTAMP NOT NULL, UNIQUE(did, repository, digest) ); CREATE TABLE repository_annotations ( did TEXT NOT NULL, repository TEXT NOT NULL, key TEXT NOT NULL, value TEXT NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY(did, repository, key), FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE ); CREATE TABLE layers ( manifest_id INTEGER NOT NULL, digest TEXT NOT NULL, size INTEGER NOT NULL, media_type TEXT NOT NULL, layer_index INTEGER NOT NULL, PRIMARY KEY(manifest_id, layer_index) ); CREATE TABLE manifest_references ( manifest_id INTEGER NOT NULL, digest TEXT NOT NULL, media_type TEXT NOT NULL, size INTEGER NOT NULL, platform_architecture TEXT, platform_os TEXT, platform_variant TEXT, platform_os_version TEXT, is_attestation BOOLEAN DEFAULT FALSE, reference_index INTEGER NOT NULL, PRIMARY KEY(manifest_id, reference_index) ); CREATE TABLE tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, did TEXT NOT NULL, repository TEXT NOT NULL, tag TEXT NOT NULL, digest TEXT NOT NULL, created_at TIMESTAMP NOT NULL, UNIQUE(did, repository, tag) ); CREATE TABLE stars ( starrer_did TEXT NOT NULL, owner_did TEXT NOT NULL, repository TEXT NOT NULL, created_at TIMESTAMP NOT NULL, PRIMARY KEY(starrer_did, owner_did, repository) ); ` if _, err := database.Exec(schema); err != nil { t.Fatalf("Failed to create schema: %v", err) } return database } func TestNewProcessor(t *testing.T) { database := setupTestDB(t) defer database.Close() tests := []struct { name string useCache bool }{ {"with cache", true}, {"without cache", false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := NewProcessor(database, tt.useCache) if p == nil { t.Fatal("NewProcessor returned nil") } if p.db != database { t.Error("Processor database not set correctly") } if p.useCache != tt.useCache { t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache) } if tt.useCache && p.userCache == nil { t.Error("Cache enabled but userCache is nil") } if !tt.useCache && p.userCache != nil { t.Error("Cache disabled but userCache is not nil") } }) } } func TestProcessManifest_ImageManifest(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() // Create test manifest record manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Config: &atproto.BlobReference{ Digest: "sha256:config123", Size: 1234, }, Layers: []atproto.BlobReference{ {Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, {Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"}, }, Annotations: map[string]string{ "org.opencontainers.image.title": "Test App", "org.opencontainers.image.description": "A test application", "org.opencontainers.image.source": "https://github.com/test/app", "org.opencontainers.image.licenses": "MIT", "io.atcr.icon": "https://example.com/icon.png", }, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Process manifest manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } if manifestID == 0 { t.Error("Expected non-zero manifest ID") } // Verify manifest was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?", "did:plc:test123", "test-app", "sha256:abc123").Scan(&count) if err != nil { t.Fatalf("Failed to query manifests: %v", err) } if count != 1 { t.Errorf("Expected 1 manifest, got %d", count) } // Verify annotations were stored in repository_annotations table var title, source string err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", "did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title) if err != nil { t.Fatalf("Failed to query title annotation: %v", err) } if title != "Test App" { t.Errorf("title = %q, want %q", title, "Test App") } err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?", "did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source) if err != nil { t.Fatalf("Failed to query source annotation: %v", err) } if source != "https://github.com/test/app" { t.Errorf("source = %q, want %q", source, "https://github.com/test/app") } // Verify layers were inserted var layerCount int err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) if err != nil { t.Fatalf("Failed to query layers: %v", err) } if layerCount != 2 { t.Errorf("Expected 2 layers, got %d", layerCount) } // Verify no manifest references (this is an image, not a list) var refCount int err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) if err != nil { t.Fatalf("Failed to query manifest_references: %v", err) } if refCount != 0 { t.Errorf("Expected 0 manifest references, got %d", refCount) } } func TestProcessManifest_ManifestList(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() // Create test manifest list record manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:list123", MediaType: "application/vnd.oci.image.index.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Manifests: []atproto.ManifestReference{ { Digest: "sha256:amd64manifest", MediaType: "application/vnd.oci.image.manifest.v1+json", Size: 1000, Platform: &atproto.Platform{ Architecture: "amd64", OS: "linux", }, }, { Digest: "sha256:arm64manifest", MediaType: "application/vnd.oci.image.manifest.v1+json", Size: 1100, Platform: &atproto.Platform{ Architecture: "arm64", OS: "linux", Variant: "v8", }, }, }, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Process manifest list manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } // Verify manifest references were inserted var refCount int err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount) if err != nil { t.Fatalf("Failed to query manifest_references: %v", err) } if refCount != 2 { t.Errorf("Expected 2 manifest references, got %d", refCount) } // Verify platform info was stored var arch, os string err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os) if err != nil { t.Fatalf("Failed to query platform info: %v", err) } if arch != "amd64" { t.Errorf("platform_architecture = %q, want %q", arch, "amd64") } if os != "linux" { t.Errorf("platform_os = %q, want %q", os, "linux") } // Verify no layers (this is a list, not an image) var layerCount int err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount) if err != nil { t.Fatalf("Failed to query layers: %v", err) } if layerCount != 0 { t.Errorf("Expected 0 layers, got %d", layerCount) } } func TestProcessTag(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() // Create test tag record (using ManifestDigest field for simplicity) tagRecord := &atproto.TagRecord{ Repository: "test-app", Tag: "latest", ManifestDigest: "sha256:abc123", UpdatedAt: time.Now(), } // Marshal to bytes for ProcessTag recordBytes, err := json.Marshal(tagRecord) if err != nil { t.Fatalf("Failed to marshal tag: %v", err) } // Process tag err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessTag failed: %v", err) } // Verify tag was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&count) if err != nil { t.Fatalf("Failed to query tags: %v", err) } if count != 1 { t.Errorf("Expected 1 tag, got %d", count) } // Verify digest was stored var digest string err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&digest) if err != nil { t.Fatalf("Failed to query tag digest: %v", err) } if digest != "sha256:abc123" { t.Errorf("digest = %q, want %q", digest, "sha256:abc123") } // Test upserting same tag with new digest tagRecord.ManifestDigest = "sha256:newdigest" recordBytes, err = json.Marshal(tagRecord) if err != nil { t.Fatalf("Failed to marshal tag: %v", err) } err = p.ProcessTag(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessTag (upsert) failed: %v", err) } // Verify tag was updated err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&digest) if err != nil { t.Fatalf("Failed to query updated tag: %v", err) } if digest != "sha256:newdigest" { t.Errorf("digest = %q, want %q", digest, "sha256:newdigest") } // Verify still only one tag (upsert, not insert) err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?", "did:plc:test123", "test-app", "latest").Scan(&count) if err != nil { t.Fatalf("Failed to query tags after upsert: %v", err) } if count != 1 { t.Errorf("Expected 1 tag after upsert, got %d", count) } } func TestProcessStar(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() // Create test star record starRecord := &atproto.StarRecord{ Subject: atproto.StarSubject{ DID: "did:plc:owner123", Repository: "test-app", }, CreatedAt: time.Now(), } // Marshal to bytes for ProcessStar recordBytes, err := json.Marshal(starRecord) if err != nil { t.Fatalf("Failed to marshal star: %v", err) } // Process star err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) if err != nil { t.Fatalf("ProcessStar failed: %v", err) } // Verify star was inserted var count int err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) if err != nil { t.Fatalf("Failed to query stars: %v", err) } if count != 1 { t.Errorf("Expected 1 star, got %d", count) } // Test upserting same star (should be idempotent) recordBytes, err = json.Marshal(starRecord) if err != nil { t.Fatalf("Failed to marshal star: %v", err) } err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes) if err != nil { t.Fatalf("ProcessStar (upsert) failed: %v", err) } // Verify still only one star err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?", "did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count) if err != nil { t.Fatalf("Failed to query stars after upsert: %v", err) } if count != 1 { t.Errorf("Expected 1 star after upsert, got %d", count) } } func TestProcessManifest_Duplicate(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } // Insert first time id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("First ProcessManifest failed: %v", err) } // Insert duplicate id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("Duplicate ProcessManifest failed: %v", err) } // Should return existing ID if id1 != id2 { t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2) } // Verify only one manifest exists var count int err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?", "did:plc:test123", "sha256:abc123").Scan(&count) if err != nil { t.Fatalf("Failed to query manifests: %v", err) } if count != 1 { t.Errorf("Expected 1 manifest, got %d", count) } } func TestProcessManifest_EmptyAnnotations(t *testing.T) { database := setupTestDB(t) defer database.Close() p := NewProcessor(database, false) ctx := context.Background() // Manifest with nil annotations manifestRecord := &atproto.ManifestRecord{ Repository: "test-app", Digest: "sha256:abc123", MediaType: "application/vnd.oci.image.manifest.v1+json", SchemaVersion: 2, HoldEndpoint: "did:web:hold01.atcr.io", CreatedAt: time.Now(), Annotations: nil, } // Marshal to bytes for ProcessManifest recordBytes, err := json.Marshal(manifestRecord) if err != nil { t.Fatalf("Failed to marshal manifest: %v", err) } _, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes) if err != nil { t.Fatalf("ProcessManifest failed: %v", err) } // Verify no annotations were stored (nil annotations should not create entries) var annotationCount int err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?", "did:plc:test123", "test-app").Scan(&annotationCount) if err != nil { t.Fatalf("Failed to query annotations: %v", err) } if annotationCount != 0 { t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount) } } func TestProcessIdentity(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false) // Setup: Create test user testDID := "did:plc:alice123" testHandle := "alice.bsky.social" testPDS := "https://bsky.social" _, err := db.Exec(` INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?) `, testDID, testHandle, testPDS, time.Now()) if err != nil { t.Fatalf("Failed to insert test user: %v", err) } // Test 1: Process identity change event newHandle := "alice-new.bsky.social" err = processor.ProcessIdentity(context.Background(), testDID, newHandle) // Note: This will fail to invalidate cache since we don't have a real identity directory, // but we can still verify the database update happened if err != nil { t.Logf("Expected cache invalidation error (no real directory): %v", err) } // Verify handle was updated in database var retrievedHandle string err = db.QueryRow(` SELECT handle FROM users WHERE did = ? `, testDID).Scan(&retrievedHandle) if err != nil { t.Fatalf("Failed to query updated user: %v", err) } if retrievedHandle != newHandle { t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle) } // Test 2: Process identity change for non-existent user // Should not error (UPDATE just affects 0 rows) err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle") if err != nil { t.Logf("Expected cache invalidation error: %v", err) } // Test 3: Process multiple identity changes handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"} for _, handle := range handles { err = processor.ProcessIdentity(context.Background(), testDID, handle) if err != nil { t.Logf("Expected cache invalidation error: %v", err) } err = db.QueryRow(` SELECT handle FROM users WHERE did = ? `, testDID).Scan(&retrievedHandle) if err != nil { t.Fatalf("Failed to query user after handle update: %v", err) } if retrievedHandle != handle { t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle) } } } func TestProcessAccount(t *testing.T) { db := setupTestDB(t) defer db.Close() processor := NewProcessor(db, false) // Setup: Create test user testDID := "did:plc:bob456" testHandle := "bob.bsky.social" testPDS := "https://bsky.social" _, err := db.Exec(` INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?) `, testDID, testHandle, testPDS, time.Now()) if err != nil { t.Fatalf("Failed to insert test user: %v", err) } // Test 1: Process account deactivation event err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") // Note: Cache invalidation will fail without real directory, but that's expected if err != nil { t.Logf("Expected cache invalidation error (no real directory): %v", err) } // Verify user still exists in database (we don't delete on deactivation) var exists bool err = db.QueryRow(` SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) `, testDID).Scan(&exists) if err != nil { t.Fatalf("Failed to check if user exists: %v", err) } if !exists { t.Error("User should still exist after deactivation event (no deletion)") } // Test 2: Process account with active=true (should be ignored) err = processor.ProcessAccount(context.Background(), testDID, true, "active") if err != nil { t.Errorf("Expected no error for active account, got: %v", err) } // Test 3: Process account with status != "deactivated" (should be ignored) err = processor.ProcessAccount(context.Background(), testDID, false, "suspended") if err != nil { t.Errorf("Expected no error for non-deactivated status, got: %v", err) } // Test 4: Process account deactivation for non-existent user err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated") // Cache invalidation will fail, but that's expected if err != nil { t.Logf("Expected cache invalidation error: %v", err) } // Test 5: Process multiple deactivation events (idempotent) for i := 0; i < 3; i++ { err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated") if err != nil { t.Logf("Expected cache invalidation error on iteration %d: %v", i, err) } } // User should still exist after multiple deactivations err = db.QueryRow(` SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) `, testDID).Scan(&exists) if err != nil { t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err) } if !exists { t.Error("User should still exist after multiple deactivation events") } }