695 lines
20 KiB
Go
695 lines
20 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"testing"
|
|
"time"
|
|
|
|
"atcr.io/pkg/atproto"
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
// setupTestDB creates an in-memory SQLite database for testing
|
|
func setupTestDB(t *testing.T) *sql.DB {
|
|
database, err := sql.Open("sqlite3", ":memory:")
|
|
if err != nil {
|
|
t.Fatalf("Failed to open test database: %v", err)
|
|
}
|
|
|
|
// Create schema
|
|
schema := `
|
|
CREATE TABLE users (
|
|
did TEXT PRIMARY KEY,
|
|
handle TEXT NOT NULL,
|
|
pds_endpoint TEXT NOT NULL,
|
|
avatar TEXT,
|
|
last_seen TIMESTAMP NOT NULL
|
|
);
|
|
|
|
CREATE TABLE manifests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
hold_endpoint TEXT NOT NULL,
|
|
schema_version INTEGER NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
config_digest TEXT,
|
|
config_size INTEGER,
|
|
created_at TIMESTAMP NOT NULL,
|
|
UNIQUE(did, repository, digest)
|
|
);
|
|
|
|
CREATE TABLE repository_annotations (
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(did, repository, key),
|
|
FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE layers (
|
|
manifest_id INTEGER NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
layer_index INTEGER NOT NULL,
|
|
PRIMARY KEY(manifest_id, layer_index)
|
|
);
|
|
|
|
CREATE TABLE manifest_references (
|
|
manifest_id INTEGER NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
platform_architecture TEXT,
|
|
platform_os TEXT,
|
|
platform_variant TEXT,
|
|
platform_os_version TEXT,
|
|
is_attestation BOOLEAN DEFAULT FALSE,
|
|
reference_index INTEGER NOT NULL,
|
|
PRIMARY KEY(manifest_id, reference_index)
|
|
);
|
|
|
|
CREATE TABLE tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
tag TEXT NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL,
|
|
UNIQUE(did, repository, tag)
|
|
);
|
|
|
|
CREATE TABLE stars (
|
|
starrer_did TEXT NOT NULL,
|
|
owner_did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL,
|
|
PRIMARY KEY(starrer_did, owner_did, repository)
|
|
);
|
|
`
|
|
|
|
if _, err := database.Exec(schema); err != nil {
|
|
t.Fatalf("Failed to create schema: %v", err)
|
|
}
|
|
|
|
return database
|
|
}
|
|
|
|
func TestNewProcessor(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
tests := []struct {
|
|
name string
|
|
useCache bool
|
|
}{
|
|
{"with cache", true},
|
|
{"without cache", false},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
p := NewProcessor(database, tt.useCache)
|
|
if p == nil {
|
|
t.Fatal("NewProcessor returned nil")
|
|
}
|
|
if p.db != database {
|
|
t.Error("Processor database not set correctly")
|
|
}
|
|
if p.useCache != tt.useCache {
|
|
t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
|
|
}
|
|
if tt.useCache && p.userCache == nil {
|
|
t.Error("Cache enabled but userCache is nil")
|
|
}
|
|
if !tt.useCache && p.userCache != nil {
|
|
t.Error("Cache disabled but userCache is not nil")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_ImageManifest(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
// Create test manifest record
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Config: &atproto.BlobReference{
|
|
Digest: "sha256:config123",
|
|
Size: 1234,
|
|
},
|
|
Layers: []atproto.BlobReference{
|
|
{Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
|
|
{Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
|
|
},
|
|
Annotations: map[string]string{
|
|
"org.opencontainers.image.title": "Test App",
|
|
"org.opencontainers.image.description": "A test application",
|
|
"org.opencontainers.image.source": "https://github.com/test/app",
|
|
"org.opencontainers.image.licenses": "MIT",
|
|
"io.atcr.icon": "https://example.com/icon.png",
|
|
},
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Process manifest
|
|
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
if manifestID == 0 {
|
|
t.Error("Expected non-zero manifest ID")
|
|
}
|
|
|
|
// Verify manifest was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
|
|
"did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifests: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 manifest, got %d", count)
|
|
}
|
|
|
|
// Verify annotations were stored in repository_annotations table
|
|
var title, source string
|
|
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
|
|
"did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query title annotation: %v", err)
|
|
}
|
|
if title != "Test App" {
|
|
t.Errorf("title = %q, want %q", title, "Test App")
|
|
}
|
|
|
|
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
|
|
"did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query source annotation: %v", err)
|
|
}
|
|
if source != "https://github.com/test/app" {
|
|
t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
|
|
}
|
|
|
|
// Verify layers were inserted
|
|
var layerCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query layers: %v", err)
|
|
}
|
|
if layerCount != 2 {
|
|
t.Errorf("Expected 2 layers, got %d", layerCount)
|
|
}
|
|
|
|
// Verify no manifest references (this is an image, not a list)
|
|
var refCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifest_references: %v", err)
|
|
}
|
|
if refCount != 0 {
|
|
t.Errorf("Expected 0 manifest references, got %d", refCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_ManifestList(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
// Create test manifest list record
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:list123",
|
|
MediaType: "application/vnd.oci.image.index.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Manifests: []atproto.ManifestReference{
|
|
{
|
|
Digest: "sha256:amd64manifest",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
Size: 1000,
|
|
Platform: &atproto.Platform{
|
|
Architecture: "amd64",
|
|
OS: "linux",
|
|
},
|
|
},
|
|
{
|
|
Digest: "sha256:arm64manifest",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
Size: 1100,
|
|
Platform: &atproto.Platform{
|
|
Architecture: "arm64",
|
|
OS: "linux",
|
|
Variant: "v8",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Process manifest list
|
|
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Verify manifest references were inserted
|
|
var refCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifest_references: %v", err)
|
|
}
|
|
if refCount != 2 {
|
|
t.Errorf("Expected 2 manifest references, got %d", refCount)
|
|
}
|
|
|
|
// Verify platform info was stored
|
|
var arch, os string
|
|
err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query platform info: %v", err)
|
|
}
|
|
if arch != "amd64" {
|
|
t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
|
|
}
|
|
if os != "linux" {
|
|
t.Errorf("platform_os = %q, want %q", os, "linux")
|
|
}
|
|
|
|
// Verify no layers (this is a list, not an image)
|
|
var layerCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query layers: %v", err)
|
|
}
|
|
if layerCount != 0 {
|
|
t.Errorf("Expected 0 layers, got %d", layerCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessTag(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
// Create test tag record (using ManifestDigest field for simplicity)
|
|
tagRecord := &atproto.TagRecord{
|
|
Repository: "test-app",
|
|
Tag: "latest",
|
|
ManifestDigest: "sha256:abc123",
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
// Marshal to bytes for ProcessTag
|
|
recordBytes, err := json.Marshal(tagRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal tag: %v", err)
|
|
}
|
|
|
|
// Process tag
|
|
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessTag failed: %v", err)
|
|
}
|
|
|
|
// Verify tag was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tags: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 tag, got %d", count)
|
|
}
|
|
|
|
// Verify digest was stored
|
|
var digest string
|
|
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&digest)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tag digest: %v", err)
|
|
}
|
|
if digest != "sha256:abc123" {
|
|
t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
|
|
}
|
|
|
|
// Test upserting same tag with new digest
|
|
tagRecord.ManifestDigest = "sha256:newdigest"
|
|
recordBytes, err = json.Marshal(tagRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal tag: %v", err)
|
|
}
|
|
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessTag (upsert) failed: %v", err)
|
|
}
|
|
|
|
// Verify tag was updated
|
|
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&digest)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query updated tag: %v", err)
|
|
}
|
|
if digest != "sha256:newdigest" {
|
|
t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
|
|
}
|
|
|
|
// Verify still only one tag (upsert, not insert)
|
|
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tags after upsert: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 tag after upsert, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessStar(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
// Create test star record
|
|
starRecord := &atproto.StarRecord{
|
|
Subject: atproto.StarSubject{
|
|
DID: "did:plc:owner123",
|
|
Repository: "test-app",
|
|
},
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
// Marshal to bytes for ProcessStar
|
|
recordBytes, err := json.Marshal(starRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal star: %v", err)
|
|
}
|
|
|
|
// Process star
|
|
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessStar failed: %v", err)
|
|
}
|
|
|
|
// Verify star was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
|
|
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query stars: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 star, got %d", count)
|
|
}
|
|
|
|
// Test upserting same star (should be idempotent)
|
|
recordBytes, err = json.Marshal(starRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal star: %v", err)
|
|
}
|
|
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessStar (upsert) failed: %v", err)
|
|
}
|
|
|
|
// Verify still only one star
|
|
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
|
|
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query stars after upsert: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 star after upsert, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_Duplicate(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Insert first time
|
|
id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("First ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Insert duplicate
|
|
id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("Duplicate ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Should return existing ID
|
|
if id1 != id2 {
|
|
t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
|
|
}
|
|
|
|
// Verify only one manifest exists
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
|
|
"did:plc:test123", "sha256:abc123").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifests: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 manifest, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_EmptyAnnotations(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false)
|
|
ctx := context.Background()
|
|
|
|
// Manifest with nil annotations
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Annotations: nil,
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
_, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Verify no annotations were stored (nil annotations should not create entries)
|
|
var annotationCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
|
|
"did:plc:test123", "test-app").Scan(&annotationCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query annotations: %v", err)
|
|
}
|
|
if annotationCount != 0 {
|
|
t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessIdentity(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false)
|
|
|
|
// Setup: Create test user
|
|
testDID := "did:plc:alice123"
|
|
testHandle := "alice.bsky.social"
|
|
testPDS := "https://bsky.social"
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, last_seen)
|
|
VALUES (?, ?, ?, ?)
|
|
`, testDID, testHandle, testPDS, time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user: %v", err)
|
|
}
|
|
|
|
// Test 1: Process identity change event
|
|
newHandle := "alice-new.bsky.social"
|
|
err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
|
|
// Note: This will fail to invalidate cache since we don't have a real identity directory,
|
|
// but we can still verify the database update happened
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error (no real directory): %v", err)
|
|
}
|
|
|
|
// Verify handle was updated in database
|
|
var retrievedHandle string
|
|
err = db.QueryRow(`
|
|
SELECT handle FROM users WHERE did = ?
|
|
`, testDID).Scan(&retrievedHandle)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query updated user: %v", err)
|
|
}
|
|
if retrievedHandle != newHandle {
|
|
t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
|
|
}
|
|
|
|
// Test 2: Process identity change for non-existent user
|
|
// Should not error (UPDATE just affects 0 rows)
|
|
err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
// Test 3: Process multiple identity changes
|
|
handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
|
|
for _, handle := range handles {
|
|
err = processor.ProcessIdentity(context.Background(), testDID, handle)
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
err = db.QueryRow(`
|
|
SELECT handle FROM users WHERE did = ?
|
|
`, testDID).Scan(&retrievedHandle)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query user after handle update: %v", err)
|
|
}
|
|
if retrievedHandle != handle {
|
|
t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestProcessAccount(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false)
|
|
|
|
// Setup: Create test user
|
|
testDID := "did:plc:bob456"
|
|
testHandle := "bob.bsky.social"
|
|
testPDS := "https://bsky.social"
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, last_seen)
|
|
VALUES (?, ?, ?, ?)
|
|
`, testDID, testHandle, testPDS, time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user: %v", err)
|
|
}
|
|
|
|
// Test 1: Process account deactivation event
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
|
|
// Note: Cache invalidation will fail without real directory, but that's expected
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error (no real directory): %v", err)
|
|
}
|
|
|
|
// Verify user still exists in database (we don't delete on deactivation)
|
|
var exists bool
|
|
err = db.QueryRow(`
|
|
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
|
|
`, testDID).Scan(&exists)
|
|
if err != nil {
|
|
t.Fatalf("Failed to check if user exists: %v", err)
|
|
}
|
|
if !exists {
|
|
t.Error("User should still exist after deactivation event (no deletion)")
|
|
}
|
|
|
|
// Test 2: Process account with active=true (should be ignored)
|
|
err = processor.ProcessAccount(context.Background(), testDID, true, "active")
|
|
if err != nil {
|
|
t.Errorf("Expected no error for active account, got: %v", err)
|
|
}
|
|
|
|
// Test 3: Process account with status != "deactivated" (should be ignored)
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
|
|
if err != nil {
|
|
t.Errorf("Expected no error for non-deactivated status, got: %v", err)
|
|
}
|
|
|
|
// Test 4: Process account deactivation for non-existent user
|
|
err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
|
|
// Cache invalidation will fail, but that's expected
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
// Test 5: Process multiple deactivation events (idempotent)
|
|
for i := 0; i < 3; i++ {
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
// User should still exist after multiple deactivations
|
|
err = db.QueryRow(`
|
|
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
|
|
`, testDID).Scan(&exists)
|
|
if err != nil {
|
|
t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
|
|
}
|
|
if !exists {
|
|
t.Error("User should still exist after multiple deactivation events")
|
|
}
|
|
}
|