Files

956 lines
29 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"strings"
"testing"
"time"
"atcr.io/pkg/atproto"
_ "github.com/tursodatabase/go-libsql"
)
// execStatements splits a multi-statement SQL string and executes each statement individually.
// go-libsql does not support multi-statement Exec like mattn/go-sqlite3.
func execStatements(t *testing.T, db *sql.DB, schema string) {
t.Helper()
for _, stmt := range strings.Split(schema, ";") {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := db.Exec(stmt); err != nil {
t.Fatalf("Failed to execute statement: %v\nSQL: %s", err, stmt)
}
}
}
// setupTestDB creates an in-memory SQLite database for testing
func setupTestDB(t *testing.T) *sql.DB {
database, err := sql.Open("libsql", ":memory:")
if err != nil {
t.Fatalf("Failed to open test database: %v", err)
}
// Create schema
schema := `
CREATE TABLE users (
did TEXT PRIMARY KEY,
handle TEXT NOT NULL,
pds_endpoint TEXT NOT NULL,
avatar TEXT,
default_hold_did TEXT,
oci_client TEXT DEFAULT '',
last_seen TIMESTAMP NOT NULL
);
CREATE TABLE manifests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
did TEXT NOT NULL,
repository TEXT NOT NULL,
digest TEXT NOT NULL,
hold_endpoint TEXT NOT NULL,
schema_version INTEGER NOT NULL,
media_type TEXT NOT NULL,
config_digest TEXT,
config_size INTEGER,
artifact_type TEXT NOT NULL DEFAULT 'container-image',
created_at TIMESTAMP NOT NULL,
UNIQUE(did, repository, digest)
);
CREATE TABLE repository_annotations (
did TEXT NOT NULL,
repository TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(did, repository, key),
FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
);
CREATE TABLE layers (
manifest_id INTEGER NOT NULL,
digest TEXT NOT NULL,
size INTEGER NOT NULL,
media_type TEXT NOT NULL,
layer_index INTEGER NOT NULL,
annotations TEXT,
PRIMARY KEY(manifest_id, layer_index)
);
CREATE TABLE manifest_references (
manifest_id INTEGER NOT NULL,
digest TEXT NOT NULL,
media_type TEXT NOT NULL,
size INTEGER NOT NULL,
platform_architecture TEXT,
platform_os TEXT,
platform_variant TEXT,
platform_os_version TEXT,
is_attestation BOOLEAN DEFAULT FALSE,
reference_index INTEGER NOT NULL,
PRIMARY KEY(manifest_id, reference_index)
);
CREATE TABLE tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
did TEXT NOT NULL,
repository TEXT NOT NULL,
tag TEXT NOT NULL,
digest TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
UNIQUE(did, repository, tag)
);
CREATE TABLE stars (
starrer_did TEXT NOT NULL,
owner_did TEXT NOT NULL,
repository TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
PRIMARY KEY(starrer_did, owner_did, repository)
);
`
execStatements(t, database, schema)
return database
}
func TestNewProcessor(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
tests := []struct {
name string
useCache bool
}{
{"with cache", true},
{"without cache", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewProcessor(database, tt.useCache, nil)
if p == nil {
t.Fatal("NewProcessor returned nil")
}
if p.db != database {
t.Error("Processor database not set correctly")
}
if p.useCache != tt.useCache {
t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
}
if tt.useCache && p.userCache == nil {
t.Error("Cache enabled but userCache is nil")
}
if !tt.useCache && p.userCache != nil {
t.Error("Cache disabled but userCache is not nil")
}
})
}
}
func TestProcessManifest_ImageManifest(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
// Insert test user (required for foreign key on repository_annotations)
_, err := database.Exec(`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
"did:plc:test123", "test.bsky.social", "https://pds.example.com", time.Now())
if err != nil {
t.Fatalf("Failed to insert test user: %v", err)
}
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Create test manifest record
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Config: &atproto.BlobReference{
Digest: "sha256:config123",
Size: 1234,
},
Layers: []atproto.BlobReference{
{Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
{Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
},
Annotations: map[string]string{
"org.opencontainers.image.title": "Test App",
"org.opencontainers.image.description": "A test application",
"org.opencontainers.image.source": "https://github.com/test/app",
"org.opencontainers.image.licenses": "MIT",
"io.atcr.icon": "https://example.com/icon.png",
},
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Process manifest
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
if manifestID == 0 {
t.Error("Expected non-zero manifest ID")
}
// Verify manifest was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
"did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
if err != nil {
t.Fatalf("Failed to query manifests: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 manifest, got %d", count)
}
// Verify annotations were stored in repository_annotations table
var title, source string
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
"did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
if err != nil {
t.Fatalf("Failed to query title annotation: %v", err)
}
if title != "Test App" {
t.Errorf("title = %q, want %q", title, "Test App")
}
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
"did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
if err != nil {
t.Fatalf("Failed to query source annotation: %v", err)
}
if source != "https://github.com/test/app" {
t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
}
// Verify layers were inserted
var layerCount int
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
if err != nil {
t.Fatalf("Failed to query layers: %v", err)
}
if layerCount != 2 {
t.Errorf("Expected 2 layers, got %d", layerCount)
}
// Verify no manifest references (this is an image, not a list)
var refCount int
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
if err != nil {
t.Fatalf("Failed to query manifest_references: %v", err)
}
if refCount != 0 {
t.Errorf("Expected 0 manifest references, got %d", refCount)
}
}
func TestProcessManifest_ManifestList(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Create test manifest list record
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:list123",
MediaType: "application/vnd.oci.image.index.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Manifests: []atproto.ManifestReference{
{
Digest: "sha256:amd64manifest",
MediaType: "application/vnd.oci.image.manifest.v1+json",
Size: 1000,
Platform: &atproto.Platform{
Architecture: "amd64",
OS: "linux",
},
},
{
Digest: "sha256:arm64manifest",
MediaType: "application/vnd.oci.image.manifest.v1+json",
Size: 1100,
Platform: &atproto.Platform{
Architecture: "arm64",
OS: "linux",
Variant: "v8",
},
},
},
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Process manifest list
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
// Verify manifest references were inserted
var refCount int
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
if err != nil {
t.Fatalf("Failed to query manifest_references: %v", err)
}
if refCount != 2 {
t.Errorf("Expected 2 manifest references, got %d", refCount)
}
// Verify platform info was stored
var arch, os string
err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
if err != nil {
t.Fatalf("Failed to query platform info: %v", err)
}
if arch != "amd64" {
t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
}
if os != "linux" {
t.Errorf("platform_os = %q, want %q", os, "linux")
}
// Verify no layers (this is a list, not an image)
var layerCount int
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
if err != nil {
t.Fatalf("Failed to query layers: %v", err)
}
if layerCount != 0 {
t.Errorf("Expected 0 layers, got %d", layerCount)
}
}
func TestProcessTag(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Create test tag record (using ManifestDigest field for simplicity)
tagRecord := &atproto.TagRecord{
Repository: "test-app",
Tag: "latest",
ManifestDigest: "sha256:abc123",
UpdatedAt: time.Now(),
}
// Marshal to bytes for ProcessTag
recordBytes, err := json.Marshal(tagRecord)
if err != nil {
t.Fatalf("Failed to marshal tag: %v", err)
}
// Process tag
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessTag failed: %v", err)
}
// Verify tag was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&count)
if err != nil {
t.Fatalf("Failed to query tags: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 tag, got %d", count)
}
// Verify digest was stored
var digest string
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&digest)
if err != nil {
t.Fatalf("Failed to query tag digest: %v", err)
}
if digest != "sha256:abc123" {
t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
}
// Test upserting same tag with new digest
tagRecord.ManifestDigest = "sha256:newdigest"
recordBytes, err = json.Marshal(tagRecord)
if err != nil {
t.Fatalf("Failed to marshal tag: %v", err)
}
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessTag (upsert) failed: %v", err)
}
// Verify tag was updated
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&digest)
if err != nil {
t.Fatalf("Failed to query updated tag: %v", err)
}
if digest != "sha256:newdigest" {
t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
}
// Verify still only one tag (upsert, not insert)
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
"did:plc:test123", "test-app", "latest").Scan(&count)
if err != nil {
t.Fatalf("Failed to query tags after upsert: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 tag after upsert, got %d", count)
}
}
func TestProcessStar(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
// Insert test users (starrer + owner) so EnsureUser finds them without network calls
for _, did := range []string{"did:plc:starrer123", "did:plc:owner123"} {
_, err := database.Exec(
`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
did, did+".test", "https://pds.example.com", time.Now())
if err != nil {
t.Fatalf("Failed to insert test user %s: %v", did, err)
}
}
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Create test star record (new AT URI format)
starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app")
// Marshal to bytes for ProcessStar
recordBytes, err := json.Marshal(starRecord)
if err != nil {
t.Fatalf("Failed to marshal star: %v", err)
}
// Process star
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
if err != nil {
t.Fatalf("ProcessStar failed: %v", err)
}
// Verify star was inserted
var count int
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
if err != nil {
t.Fatalf("Failed to query stars: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 star, got %d", count)
}
// Test upserting same star (should be idempotent)
recordBytes, err = json.Marshal(starRecord)
if err != nil {
t.Fatalf("Failed to marshal star: %v", err)
}
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
if err != nil {
t.Fatalf("ProcessStar (upsert) failed: %v", err)
}
// Verify still only one star
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
if err != nil {
t.Fatalf("Failed to query stars after upsert: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 star after upsert, got %d", count)
}
}
func TestProcessStar_OldFormat(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
// Insert test users
for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} {
_, err := database.Exec(
`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
did, did+".test", "https://pds.example.com", time.Now())
if err != nil {
t.Fatalf("Failed to insert test user %s: %v", did, err)
}
}
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Old format JSON (object subject with did + repository)
oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}`
err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON))
if err != nil {
t.Fatalf("ProcessStar (old format) failed: %v", err)
}
// Verify star was inserted with correct owner/repo
var count int
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
"did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count)
if err != nil {
t.Fatalf("Failed to query stars: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 star from old format, got %d", count)
}
}
func TestProcessManifest_Duplicate(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false, nil)
ctx := context.Background()
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
// Insert first time
id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("First ProcessManifest failed: %v", err)
}
// Insert duplicate
id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("Duplicate ProcessManifest failed: %v", err)
}
// Should return existing ID
if id1 != id2 {
t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
}
// Verify only one manifest exists
var count int
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
"did:plc:test123", "sha256:abc123").Scan(&count)
if err != nil {
t.Fatalf("Failed to query manifests: %v", err)
}
if count != 1 {
t.Errorf("Expected 1 manifest, got %d", count)
}
}
func TestProcessManifest_EmptyAnnotations(t *testing.T) {
database := setupTestDB(t)
defer database.Close()
p := NewProcessor(database, false, nil)
ctx := context.Background()
// Manifest with nil annotations
manifestRecord := &atproto.ManifestRecord{
Repository: "test-app",
Digest: "sha256:abc123",
MediaType: "application/vnd.oci.image.manifest.v1+json",
SchemaVersion: 2,
HoldEndpoint: "did:web:hold01.atcr.io",
CreatedAt: time.Now(),
Annotations: nil,
}
// Marshal to bytes for ProcessManifest
recordBytes, err := json.Marshal(manifestRecord)
if err != nil {
t.Fatalf("Failed to marshal manifest: %v", err)
}
_, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
if err != nil {
t.Fatalf("ProcessManifest failed: %v", err)
}
// Verify no annotations were stored (nil annotations should not create entries)
var annotationCount int
err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
"did:plc:test123", "test-app").Scan(&annotationCount)
if err != nil {
t.Fatalf("Failed to query annotations: %v", err)
}
if annotationCount != 0 {
t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
}
}
func TestProcessIdentity(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false, nil)
// Setup: Create test user
testDID := "did:plc:alice123"
testHandle := "alice.bsky.social"
testPDS := "https://bsky.social"
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, last_seen)
VALUES (?, ?, ?, ?)
`, testDID, testHandle, testPDS, time.Now())
if err != nil {
t.Fatalf("Failed to insert test user: %v", err)
}
// Test 1: Process identity change event
newHandle := "alice-new.bsky.social"
err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
// Note: This will fail to invalidate cache since we don't have a real identity directory,
// but we can still verify the database update happened
if err != nil {
t.Logf("Expected cache invalidation error (no real directory): %v", err)
}
// Verify handle was updated in database
var retrievedHandle string
err = db.QueryRow(`
SELECT handle FROM users WHERE did = ?
`, testDID).Scan(&retrievedHandle)
if err != nil {
t.Fatalf("Failed to query updated user: %v", err)
}
if retrievedHandle != newHandle {
t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
}
// Test 2: Process identity change for non-existent user
// Should not error (UPDATE just affects 0 rows)
err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
// Test 3: Process multiple identity changes
handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
for _, handle := range handles {
err = processor.ProcessIdentity(context.Background(), testDID, handle)
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
err = db.QueryRow(`
SELECT handle FROM users WHERE did = ?
`, testDID).Scan(&retrievedHandle)
if err != nil {
t.Fatalf("Failed to query user after handle update: %v", err)
}
if retrievedHandle != handle {
t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
}
}
}
func TestProcessRecord_RoutesCorrectly(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
// Add missing tables for this test
execStatements(t, db, `
CREATE TABLE repo_pages (
did TEXT NOT NULL,
repository TEXT NOT NULL,
description TEXT,
avatar_cid TEXT,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
PRIMARY KEY(did, repository)
);
CREATE TABLE hold_captain_records (
hold_did TEXT PRIMARY KEY,
owner_did TEXT NOT NULL,
public BOOLEAN NOT NULL,
allow_all_crew BOOLEAN NOT NULL,
deployed_at TEXT,
region TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE hold_crew_members (
hold_did TEXT NOT NULL,
member_did TEXT NOT NULL,
rkey TEXT NOT NULL,
role TEXT,
permissions TEXT,
tier TEXT,
added_at TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (hold_did, member_did)
);
`)
processor := NewProcessor(db, false, nil)
ctx := context.Background()
var err error
// Test 1: ProcessRecord routes manifest correctly
// Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema,
// but this tests the routing logic
manifestRecord := map[string]any{
"$type": "io.atcr.manifest",
"repository": "test-app",
"digest": "sha256:route123",
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"schemaVersion": 2,
"holdDid": "did:web:hold01.atcr.io",
"createdAt": time.Now().Format(time.RFC3339),
}
recordBytes, _ := json.Marshal(manifestRecord)
// Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests)
// and will skip EnsureUser since we don't have a real PDS to resolve
// Just verify the record is processed without panic
err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil)
// Error expected since we can't resolve identity - that's fine for this test
if err != nil {
t.Logf("Expected error (can't resolve identity): %v", err)
}
// Test 2: ProcessRecord handles captain record without creating user
captainRecord := map[string]any{
"$type": "io.atcr.hold.captain",
"owner": "did:plc:owner123",
"public": true,
"allowAllCrew": false,
"enableBlueskyPosts": false,
"deployedAt": time.Now().Format(time.RFC3339),
}
captainBytes, _ := json.Marshal(captainRecord)
// This should NOT call EnsureUser (captain is a hold collection)
err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil)
if err != nil {
t.Logf("Error processing captain (validation may fail in test): %v", err)
}
// Verify no user was created for the hold DID
var userCount int
err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount)
if err != nil {
t.Fatalf("Failed to query users: %v", err)
}
if userCount != 0 {
t.Error("Captain record processing should NOT create a user entry for holds")
}
// Test 3: ProcessRecord handles delete operations
err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil)
if err != nil {
t.Errorf("Delete should not error: %v", err)
}
}
func TestProcessRecord_SkipsInvalidRecords(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false, nil)
ctx := context.Background()
// Test: Invalid JSON should be skipped silently (no error returned)
invalidJSON := []byte(`{invalid json}`)
err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil)
// Should return nil (skipped silently) not an error
if err != nil {
t.Errorf("Invalid record should be skipped silently, got error: %v", err)
}
}
func TestValidateRecord(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false, nil)
ctx := context.Background()
// Test 1: Manifest passes (no strict validation)
manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`)
err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON)
if err != nil {
t.Errorf("Manifest should pass validation: %v", err)
}
// Test 2: Invalid JSON returns error
invalidJSON := []byte(`{invalid}`)
err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON)
if err == nil {
t.Error("Invalid JSON should return error")
}
// Test 3: Captain with valid owner passes
captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`)
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid)
if err != nil {
t.Errorf("Valid captain should pass: %v", err)
}
// Test 4: Captain with empty owner is rejected
captainEmpty := []byte(`{"owner": "", "public": true}`)
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty)
if err == nil {
t.Error("Captain with empty owner should be rejected")
}
// Test 5: Captain with invalid owner (not a DID) is rejected
captainInvalid := []byte(`{"owner": "notadid", "public": true}`)
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid)
if err == nil {
t.Error("Captain with invalid owner should be rejected")
}
// Test 6: Crew with valid member passes
crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`)
err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid)
if err != nil {
t.Errorf("Valid crew should pass: %v", err)
}
// Test 7: Crew with empty member is rejected
crewEmpty := []byte(`{"member": "", "role": "write"}`)
err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty)
if err == nil {
t.Error("Crew with empty member should be rejected")
}
}
func TestProcessAccount(t *testing.T) {
db := setupTestDB(t)
defer db.Close()
processor := NewProcessor(db, false, nil)
// Setup: Create test user
testDID := "did:plc:bob456"
testHandle := "bob.bsky.social"
testPDS := "https://bsky.social"
_, err := db.Exec(`
INSERT INTO users (did, handle, pds_endpoint, last_seen)
VALUES (?, ?, ?, ?)
`, testDID, testHandle, testPDS, time.Now())
if err != nil {
t.Fatalf("Failed to insert test user: %v", err)
}
// Test 1: Process account deactivation event
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
// Note: Cache invalidation will fail without real directory, but that's expected
if err != nil {
t.Logf("Expected cache invalidation error (no real directory): %v", err)
}
// Verify user still exists in database (we don't delete on deactivation)
var exists bool
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists: %v", err)
}
if !exists {
t.Error("User should still exist after deactivation event (no deletion)")
}
// Test 2: Process account with active=true (should be ignored)
err = processor.ProcessAccount(context.Background(), testDID, true, "active")
if err != nil {
t.Errorf("Expected no error for active account, got: %v", err)
}
// Test 3: Process account with status != "deactivated" (should be ignored)
err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
if err != nil {
t.Errorf("Expected no error for non-deactivated status, got: %v", err)
}
// Test 4: Process account deactivation for non-existent user
err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
// Cache invalidation will fail, but that's expected
if err != nil {
t.Logf("Expected cache invalidation error: %v", err)
}
// Test 5: Process multiple deactivation events (idempotent)
for i := range 3 {
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
if err != nil {
t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
}
}
// User should still exist after multiple deactivations
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
}
if !exists {
t.Error("User should still exist after multiple deactivation events")
}
// Test 6: Process account deletion - should delete user data
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
if err != nil {
t.Logf("Cache invalidation error during deletion (expected): %v", err)
}
// User should be deleted after "deleted" status
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists after deletion: %v", err)
}
if exists {
t.Error("User should NOT exist after deletion event")
}
// Test 7: Process deletion for already-deleted user (idempotent)
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
if err != nil {
t.Errorf("Deletion of non-existent user should not error, got: %v", err)
}
}