956 lines
29 KiB
Go
956 lines
29 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"atcr.io/pkg/atproto"
|
|
_ "github.com/tursodatabase/go-libsql"
|
|
)
|
|
|
|
// execStatements splits a multi-statement SQL string and executes each statement individually.
|
|
// go-libsql does not support multi-statement Exec like mattn/go-sqlite3.
|
|
func execStatements(t *testing.T, db *sql.DB, schema string) {
|
|
t.Helper()
|
|
for _, stmt := range strings.Split(schema, ";") {
|
|
stmt = strings.TrimSpace(stmt)
|
|
if stmt == "" {
|
|
continue
|
|
}
|
|
if _, err := db.Exec(stmt); err != nil {
|
|
t.Fatalf("Failed to execute statement: %v\nSQL: %s", err, stmt)
|
|
}
|
|
}
|
|
}
|
|
|
|
// setupTestDB creates an in-memory SQLite database for testing
|
|
func setupTestDB(t *testing.T) *sql.DB {
|
|
database, err := sql.Open("libsql", ":memory:")
|
|
if err != nil {
|
|
t.Fatalf("Failed to open test database: %v", err)
|
|
}
|
|
|
|
// Create schema
|
|
schema := `
|
|
CREATE TABLE users (
|
|
did TEXT PRIMARY KEY,
|
|
handle TEXT NOT NULL,
|
|
pds_endpoint TEXT NOT NULL,
|
|
avatar TEXT,
|
|
default_hold_did TEXT,
|
|
oci_client TEXT DEFAULT '',
|
|
last_seen TIMESTAMP NOT NULL
|
|
);
|
|
|
|
CREATE TABLE manifests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
hold_endpoint TEXT NOT NULL,
|
|
schema_version INTEGER NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
config_digest TEXT,
|
|
config_size INTEGER,
|
|
artifact_type TEXT NOT NULL DEFAULT 'container-image',
|
|
created_at TIMESTAMP NOT NULL,
|
|
UNIQUE(did, repository, digest)
|
|
);
|
|
|
|
CREATE TABLE repository_annotations (
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY(did, repository, key),
|
|
FOREIGN KEY(did) REFERENCES users(did) ON DELETE CASCADE
|
|
);
|
|
|
|
CREATE TABLE layers (
|
|
manifest_id INTEGER NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
layer_index INTEGER NOT NULL,
|
|
annotations TEXT,
|
|
PRIMARY KEY(manifest_id, layer_index)
|
|
);
|
|
|
|
CREATE TABLE manifest_references (
|
|
manifest_id INTEGER NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
media_type TEXT NOT NULL,
|
|
size INTEGER NOT NULL,
|
|
platform_architecture TEXT,
|
|
platform_os TEXT,
|
|
platform_variant TEXT,
|
|
platform_os_version TEXT,
|
|
is_attestation BOOLEAN DEFAULT FALSE,
|
|
reference_index INTEGER NOT NULL,
|
|
PRIMARY KEY(manifest_id, reference_index)
|
|
);
|
|
|
|
CREATE TABLE tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
tag TEXT NOT NULL,
|
|
digest TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL,
|
|
UNIQUE(did, repository, tag)
|
|
);
|
|
|
|
CREATE TABLE stars (
|
|
starrer_did TEXT NOT NULL,
|
|
owner_did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL,
|
|
PRIMARY KEY(starrer_did, owner_did, repository)
|
|
);
|
|
`
|
|
|
|
execStatements(t, database, schema)
|
|
|
|
return database
|
|
}
|
|
|
|
func TestNewProcessor(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
tests := []struct {
|
|
name string
|
|
useCache bool
|
|
}{
|
|
{"with cache", true},
|
|
{"without cache", false},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
p := NewProcessor(database, tt.useCache, nil)
|
|
if p == nil {
|
|
t.Fatal("NewProcessor returned nil")
|
|
}
|
|
if p.db != database {
|
|
t.Error("Processor database not set correctly")
|
|
}
|
|
if p.useCache != tt.useCache {
|
|
t.Errorf("useCache = %v, want %v", p.useCache, tt.useCache)
|
|
}
|
|
if tt.useCache && p.userCache == nil {
|
|
t.Error("Cache enabled but userCache is nil")
|
|
}
|
|
if !tt.useCache && p.userCache != nil {
|
|
t.Error("Cache disabled but userCache is not nil")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_ImageManifest(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
// Insert test user (required for foreign key on repository_annotations)
|
|
_, err := database.Exec(`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
|
|
"did:plc:test123", "test.bsky.social", "https://pds.example.com", time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user: %v", err)
|
|
}
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Create test manifest record
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Config: &atproto.BlobReference{
|
|
Digest: "sha256:config123",
|
|
Size: 1234,
|
|
},
|
|
Layers: []atproto.BlobReference{
|
|
{Digest: "sha256:layer1", Size: 5000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
|
|
{Digest: "sha256:layer2", Size: 3000, MediaType: "application/vnd.oci.image.layer.v1.tar+gzip"},
|
|
},
|
|
Annotations: map[string]string{
|
|
"org.opencontainers.image.title": "Test App",
|
|
"org.opencontainers.image.description": "A test application",
|
|
"org.opencontainers.image.source": "https://github.com/test/app",
|
|
"org.opencontainers.image.licenses": "MIT",
|
|
"io.atcr.icon": "https://example.com/icon.png",
|
|
},
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Process manifest
|
|
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
if manifestID == 0 {
|
|
t.Error("Expected non-zero manifest ID")
|
|
}
|
|
|
|
// Verify manifest was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND repository = ? AND digest = ?",
|
|
"did:plc:test123", "test-app", "sha256:abc123").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifests: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 manifest, got %d", count)
|
|
}
|
|
|
|
// Verify annotations were stored in repository_annotations table
|
|
var title, source string
|
|
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
|
|
"did:plc:test123", "test-app", "org.opencontainers.image.title").Scan(&title)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query title annotation: %v", err)
|
|
}
|
|
if title != "Test App" {
|
|
t.Errorf("title = %q, want %q", title, "Test App")
|
|
}
|
|
|
|
err = database.QueryRow("SELECT value FROM repository_annotations WHERE did = ? AND repository = ? AND key = ?",
|
|
"did:plc:test123", "test-app", "org.opencontainers.image.source").Scan(&source)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query source annotation: %v", err)
|
|
}
|
|
if source != "https://github.com/test/app" {
|
|
t.Errorf("source = %q, want %q", source, "https://github.com/test/app")
|
|
}
|
|
|
|
// Verify layers were inserted
|
|
var layerCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query layers: %v", err)
|
|
}
|
|
if layerCount != 2 {
|
|
t.Errorf("Expected 2 layers, got %d", layerCount)
|
|
}
|
|
|
|
// Verify no manifest references (this is an image, not a list)
|
|
var refCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifest_references: %v", err)
|
|
}
|
|
if refCount != 0 {
|
|
t.Errorf("Expected 0 manifest references, got %d", refCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_ManifestList(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Create test manifest list record
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:list123",
|
|
MediaType: "application/vnd.oci.image.index.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Manifests: []atproto.ManifestReference{
|
|
{
|
|
Digest: "sha256:amd64manifest",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
Size: 1000,
|
|
Platform: &atproto.Platform{
|
|
Architecture: "amd64",
|
|
OS: "linux",
|
|
},
|
|
},
|
|
{
|
|
Digest: "sha256:arm64manifest",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
Size: 1100,
|
|
Platform: &atproto.Platform{
|
|
Architecture: "arm64",
|
|
OS: "linux",
|
|
Variant: "v8",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Process manifest list
|
|
manifestID, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Verify manifest references were inserted
|
|
var refCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifest_references WHERE manifest_id = ?", manifestID).Scan(&refCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifest_references: %v", err)
|
|
}
|
|
if refCount != 2 {
|
|
t.Errorf("Expected 2 manifest references, got %d", refCount)
|
|
}
|
|
|
|
// Verify platform info was stored
|
|
var arch, os string
|
|
err = database.QueryRow("SELECT platform_architecture, platform_os FROM manifest_references WHERE manifest_id = ? AND reference_index = 0", manifestID).Scan(&arch, &os)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query platform info: %v", err)
|
|
}
|
|
if arch != "amd64" {
|
|
t.Errorf("platform_architecture = %q, want %q", arch, "amd64")
|
|
}
|
|
if os != "linux" {
|
|
t.Errorf("platform_os = %q, want %q", os, "linux")
|
|
}
|
|
|
|
// Verify no layers (this is a list, not an image)
|
|
var layerCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM layers WHERE manifest_id = ?", manifestID).Scan(&layerCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query layers: %v", err)
|
|
}
|
|
if layerCount != 0 {
|
|
t.Errorf("Expected 0 layers, got %d", layerCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessTag(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Create test tag record (using ManifestDigest field for simplicity)
|
|
tagRecord := &atproto.TagRecord{
|
|
Repository: "test-app",
|
|
Tag: "latest",
|
|
ManifestDigest: "sha256:abc123",
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
// Marshal to bytes for ProcessTag
|
|
recordBytes, err := json.Marshal(tagRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal tag: %v", err)
|
|
}
|
|
|
|
// Process tag
|
|
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessTag failed: %v", err)
|
|
}
|
|
|
|
// Verify tag was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tags: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 tag, got %d", count)
|
|
}
|
|
|
|
// Verify digest was stored
|
|
var digest string
|
|
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&digest)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tag digest: %v", err)
|
|
}
|
|
if digest != "sha256:abc123" {
|
|
t.Errorf("digest = %q, want %q", digest, "sha256:abc123")
|
|
}
|
|
|
|
// Test upserting same tag with new digest
|
|
tagRecord.ManifestDigest = "sha256:newdigest"
|
|
recordBytes, err = json.Marshal(tagRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal tag: %v", err)
|
|
}
|
|
err = p.ProcessTag(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessTag (upsert) failed: %v", err)
|
|
}
|
|
|
|
// Verify tag was updated
|
|
err = database.QueryRow("SELECT digest FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&digest)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query updated tag: %v", err)
|
|
}
|
|
if digest != "sha256:newdigest" {
|
|
t.Errorf("digest = %q, want %q", digest, "sha256:newdigest")
|
|
}
|
|
|
|
// Verify still only one tag (upsert, not insert)
|
|
err = database.QueryRow("SELECT COUNT(*) FROM tags WHERE did = ? AND repository = ? AND tag = ?",
|
|
"did:plc:test123", "test-app", "latest").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query tags after upsert: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 tag after upsert, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessStar(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
// Insert test users (starrer + owner) so EnsureUser finds them without network calls
|
|
for _, did := range []string{"did:plc:starrer123", "did:plc:owner123"} {
|
|
_, err := database.Exec(
|
|
`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
|
|
did, did+".test", "https://pds.example.com", time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user %s: %v", did, err)
|
|
}
|
|
}
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Create test star record (new AT URI format)
|
|
starRecord := atproto.NewStarRecord("did:plc:owner123", "test-app")
|
|
|
|
// Marshal to bytes for ProcessStar
|
|
recordBytes, err := json.Marshal(starRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal star: %v", err)
|
|
}
|
|
|
|
// Process star
|
|
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessStar failed: %v", err)
|
|
}
|
|
|
|
// Verify star was inserted
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
|
|
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query stars: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 star, got %d", count)
|
|
}
|
|
|
|
// Test upserting same star (should be idempotent)
|
|
recordBytes, err = json.Marshal(starRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal star: %v", err)
|
|
}
|
|
err = p.ProcessStar(ctx, "did:plc:starrer123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessStar (upsert) failed: %v", err)
|
|
}
|
|
|
|
// Verify still only one star
|
|
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
|
|
"did:plc:starrer123", "did:plc:owner123", "test-app").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query stars after upsert: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 star after upsert, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessStar_OldFormat(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
// Insert test users
|
|
for _, did := range []string{"did:plc:starrer456", "did:plc:owner456"} {
|
|
_, err := database.Exec(
|
|
`INSERT INTO users (did, handle, pds_endpoint, last_seen) VALUES (?, ?, ?, ?)`,
|
|
did, did+".test", "https://pds.example.com", time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user %s: %v", did, err)
|
|
}
|
|
}
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Old format JSON (object subject with did + repository)
|
|
oldFormatJSON := `{"$type":"io.atcr.sailor.star","subject":{"did":"did:plc:owner456","repository":"legacy-app"},"createdAt":"2025-06-01T00:00:00Z"}`
|
|
|
|
err := p.ProcessStar(ctx, "did:plc:starrer456", []byte(oldFormatJSON))
|
|
if err != nil {
|
|
t.Fatalf("ProcessStar (old format) failed: %v", err)
|
|
}
|
|
|
|
// Verify star was inserted with correct owner/repo
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM stars WHERE starrer_did = ? AND owner_did = ? AND repository = ?",
|
|
"did:plc:starrer456", "did:plc:owner456", "legacy-app").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query stars: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 star from old format, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_Duplicate(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
// Insert first time
|
|
id1, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("First ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Insert duplicate
|
|
id2, err := p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("Duplicate ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Should return existing ID
|
|
if id1 != id2 {
|
|
t.Errorf("Duplicate manifest got different ID: %d vs %d", id1, id2)
|
|
}
|
|
|
|
// Verify only one manifest exists
|
|
var count int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM manifests WHERE did = ? AND digest = ?",
|
|
"did:plc:test123", "sha256:abc123").Scan(&count)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query manifests: %v", err)
|
|
}
|
|
if count != 1 {
|
|
t.Errorf("Expected 1 manifest, got %d", count)
|
|
}
|
|
}
|
|
|
|
func TestProcessManifest_EmptyAnnotations(t *testing.T) {
|
|
database := setupTestDB(t)
|
|
defer database.Close()
|
|
|
|
p := NewProcessor(database, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Manifest with nil annotations
|
|
manifestRecord := &atproto.ManifestRecord{
|
|
Repository: "test-app",
|
|
Digest: "sha256:abc123",
|
|
MediaType: "application/vnd.oci.image.manifest.v1+json",
|
|
SchemaVersion: 2,
|
|
HoldEndpoint: "did:web:hold01.atcr.io",
|
|
CreatedAt: time.Now(),
|
|
Annotations: nil,
|
|
}
|
|
|
|
// Marshal to bytes for ProcessManifest
|
|
recordBytes, err := json.Marshal(manifestRecord)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal manifest: %v", err)
|
|
}
|
|
|
|
_, err = p.ProcessManifest(ctx, "did:plc:test123", recordBytes)
|
|
if err != nil {
|
|
t.Fatalf("ProcessManifest failed: %v", err)
|
|
}
|
|
|
|
// Verify no annotations were stored (nil annotations should not create entries)
|
|
var annotationCount int
|
|
err = database.QueryRow("SELECT COUNT(*) FROM repository_annotations WHERE did = ? AND repository = ?",
|
|
"did:plc:test123", "test-app").Scan(&annotationCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query annotations: %v", err)
|
|
}
|
|
if annotationCount != 0 {
|
|
t.Errorf("Expected 0 annotations for nil annotations, got %d", annotationCount)
|
|
}
|
|
}
|
|
|
|
func TestProcessIdentity(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false, nil)
|
|
|
|
// Setup: Create test user
|
|
testDID := "did:plc:alice123"
|
|
testHandle := "alice.bsky.social"
|
|
testPDS := "https://bsky.social"
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, last_seen)
|
|
VALUES (?, ?, ?, ?)
|
|
`, testDID, testHandle, testPDS, time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user: %v", err)
|
|
}
|
|
|
|
// Test 1: Process identity change event
|
|
newHandle := "alice-new.bsky.social"
|
|
err = processor.ProcessIdentity(context.Background(), testDID, newHandle)
|
|
// Note: This will fail to invalidate cache since we don't have a real identity directory,
|
|
// but we can still verify the database update happened
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error (no real directory): %v", err)
|
|
}
|
|
|
|
// Verify handle was updated in database
|
|
var retrievedHandle string
|
|
err = db.QueryRow(`
|
|
SELECT handle FROM users WHERE did = ?
|
|
`, testDID).Scan(&retrievedHandle)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query updated user: %v", err)
|
|
}
|
|
if retrievedHandle != newHandle {
|
|
t.Errorf("Expected handle '%s', got '%s'", newHandle, retrievedHandle)
|
|
}
|
|
|
|
// Test 2: Process identity change for non-existent user
|
|
// Should not error (UPDATE just affects 0 rows)
|
|
err = processor.ProcessIdentity(context.Background(), "did:plc:nonexistent", "new.handle")
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
// Test 3: Process multiple identity changes
|
|
handles := []string{"alice1.bsky.social", "alice2.bsky.social", "alice3.bsky.social"}
|
|
for _, handle := range handles {
|
|
err = processor.ProcessIdentity(context.Background(), testDID, handle)
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
err = db.QueryRow(`
|
|
SELECT handle FROM users WHERE did = ?
|
|
`, testDID).Scan(&retrievedHandle)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query user after handle update: %v", err)
|
|
}
|
|
if retrievedHandle != handle {
|
|
t.Errorf("Expected handle '%s', got '%s'", handle, retrievedHandle)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestProcessRecord_RoutesCorrectly(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
// Add missing tables for this test
|
|
execStatements(t, db, `
|
|
CREATE TABLE repo_pages (
|
|
did TEXT NOT NULL,
|
|
repository TEXT NOT NULL,
|
|
description TEXT,
|
|
avatar_cid TEXT,
|
|
created_at TIMESTAMP NOT NULL,
|
|
updated_at TIMESTAMP NOT NULL,
|
|
PRIMARY KEY(did, repository)
|
|
);
|
|
CREATE TABLE hold_captain_records (
|
|
hold_did TEXT PRIMARY KEY,
|
|
owner_did TEXT NOT NULL,
|
|
public BOOLEAN NOT NULL,
|
|
allow_all_crew BOOLEAN NOT NULL,
|
|
deployed_at TEXT,
|
|
region TEXT,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE TABLE hold_crew_members (
|
|
hold_did TEXT NOT NULL,
|
|
member_did TEXT NOT NULL,
|
|
rkey TEXT NOT NULL,
|
|
role TEXT,
|
|
permissions TEXT,
|
|
tier TEXT,
|
|
added_at TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (hold_did, member_did)
|
|
);
|
|
`)
|
|
|
|
processor := NewProcessor(db, false, nil)
|
|
ctx := context.Background()
|
|
var err error
|
|
|
|
// Test 1: ProcessRecord routes manifest correctly
|
|
// Note: Schema validation may fail for io.atcr.manifest since we can't resolve the schema,
|
|
// but this tests the routing logic
|
|
manifestRecord := map[string]any{
|
|
"$type": "io.atcr.manifest",
|
|
"repository": "test-app",
|
|
"digest": "sha256:route123",
|
|
"mediaType": "application/vnd.oci.image.manifest.v1+json",
|
|
"schemaVersion": 2,
|
|
"holdDid": "did:web:hold01.atcr.io",
|
|
"createdAt": time.Now().Format(time.RFC3339),
|
|
}
|
|
recordBytes, _ := json.Marshal(manifestRecord)
|
|
|
|
// Note: ProcessRecord will skip validation if lexicon can't be resolved (expected in tests)
|
|
// and will skip EnsureUser since we don't have a real PDS to resolve
|
|
// Just verify the record is processed without panic
|
|
err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "route123", recordBytes, false, nil)
|
|
// Error expected since we can't resolve identity - that's fine for this test
|
|
if err != nil {
|
|
t.Logf("Expected error (can't resolve identity): %v", err)
|
|
}
|
|
|
|
// Test 2: ProcessRecord handles captain record without creating user
|
|
captainRecord := map[string]any{
|
|
"$type": "io.atcr.hold.captain",
|
|
"owner": "did:plc:owner123",
|
|
"public": true,
|
|
"allowAllCrew": false,
|
|
"enableBlueskyPosts": false,
|
|
"deployedAt": time.Now().Format(time.RFC3339),
|
|
}
|
|
captainBytes, _ := json.Marshal(captainRecord)
|
|
|
|
// This should NOT call EnsureUser (captain is a hold collection)
|
|
err = processor.ProcessRecord(ctx, "did:web:hold.example.com", atproto.CaptainCollection, "self", captainBytes, false, nil)
|
|
if err != nil {
|
|
t.Logf("Error processing captain (validation may fail in test): %v", err)
|
|
}
|
|
|
|
// Verify no user was created for the hold DID
|
|
var userCount int
|
|
err = db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, "did:web:hold.example.com").Scan(&userCount)
|
|
if err != nil {
|
|
t.Fatalf("Failed to query users: %v", err)
|
|
}
|
|
if userCount != 0 {
|
|
t.Error("Captain record processing should NOT create a user entry for holds")
|
|
}
|
|
|
|
// Test 3: ProcessRecord handles delete operations
|
|
err = processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "sha256:todelete", nil, true, nil)
|
|
if err != nil {
|
|
t.Errorf("Delete should not error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestProcessRecord_SkipsInvalidRecords(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Test: Invalid JSON should be skipped silently (no error returned)
|
|
invalidJSON := []byte(`{invalid json}`)
|
|
err := processor.ProcessRecord(ctx, "did:plc:test123", atproto.ManifestCollection, "test", invalidJSON, false, nil)
|
|
// Should return nil (skipped silently) not an error
|
|
if err != nil {
|
|
t.Errorf("Invalid record should be skipped silently, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestValidateRecord(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false, nil)
|
|
ctx := context.Background()
|
|
|
|
// Test 1: Manifest passes (no strict validation)
|
|
manifestJSON := []byte(`{"$type": "io.atcr.manifest", "repository": "test"}`)
|
|
err := processor.ValidateRecord(ctx, atproto.ManifestCollection, manifestJSON)
|
|
if err != nil {
|
|
t.Errorf("Manifest should pass validation: %v", err)
|
|
}
|
|
|
|
// Test 2: Invalid JSON returns error
|
|
invalidJSON := []byte(`{invalid}`)
|
|
err = processor.ValidateRecord(ctx, atproto.ManifestCollection, invalidJSON)
|
|
if err == nil {
|
|
t.Error("Invalid JSON should return error")
|
|
}
|
|
|
|
// Test 3: Captain with valid owner passes
|
|
captainValid := []byte(`{"owner": "did:plc:owner123", "public": true}`)
|
|
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainValid)
|
|
if err != nil {
|
|
t.Errorf("Valid captain should pass: %v", err)
|
|
}
|
|
|
|
// Test 4: Captain with empty owner is rejected
|
|
captainEmpty := []byte(`{"owner": "", "public": true}`)
|
|
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainEmpty)
|
|
if err == nil {
|
|
t.Error("Captain with empty owner should be rejected")
|
|
}
|
|
|
|
// Test 5: Captain with invalid owner (not a DID) is rejected
|
|
captainInvalid := []byte(`{"owner": "notadid", "public": true}`)
|
|
err = processor.ValidateRecord(ctx, atproto.CaptainCollection, captainInvalid)
|
|
if err == nil {
|
|
t.Error("Captain with invalid owner should be rejected")
|
|
}
|
|
|
|
// Test 6: Crew with valid member passes
|
|
crewValid := []byte(`{"member": "did:plc:member123", "role": "write"}`)
|
|
err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewValid)
|
|
if err != nil {
|
|
t.Errorf("Valid crew should pass: %v", err)
|
|
}
|
|
|
|
// Test 7: Crew with empty member is rejected
|
|
crewEmpty := []byte(`{"member": "", "role": "write"}`)
|
|
err = processor.ValidateRecord(ctx, atproto.CrewCollection, crewEmpty)
|
|
if err == nil {
|
|
t.Error("Crew with empty member should be rejected")
|
|
}
|
|
}
|
|
|
|
func TestProcessAccount(t *testing.T) {
|
|
db := setupTestDB(t)
|
|
defer db.Close()
|
|
|
|
processor := NewProcessor(db, false, nil)
|
|
|
|
// Setup: Create test user
|
|
testDID := "did:plc:bob456"
|
|
testHandle := "bob.bsky.social"
|
|
testPDS := "https://bsky.social"
|
|
_, err := db.Exec(`
|
|
INSERT INTO users (did, handle, pds_endpoint, last_seen)
|
|
VALUES (?, ?, ?, ?)
|
|
`, testDID, testHandle, testPDS, time.Now())
|
|
if err != nil {
|
|
t.Fatalf("Failed to insert test user: %v", err)
|
|
}
|
|
|
|
// Test 1: Process account deactivation event
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
|
|
// Note: Cache invalidation will fail without real directory, but that's expected
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error (no real directory): %v", err)
|
|
}
|
|
|
|
// Verify user still exists in database (we don't delete on deactivation)
|
|
var exists bool
|
|
err = db.QueryRow(`
|
|
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
|
|
`, testDID).Scan(&exists)
|
|
if err != nil {
|
|
t.Fatalf("Failed to check if user exists: %v", err)
|
|
}
|
|
if !exists {
|
|
t.Error("User should still exist after deactivation event (no deletion)")
|
|
}
|
|
|
|
// Test 2: Process account with active=true (should be ignored)
|
|
err = processor.ProcessAccount(context.Background(), testDID, true, "active")
|
|
if err != nil {
|
|
t.Errorf("Expected no error for active account, got: %v", err)
|
|
}
|
|
|
|
// Test 3: Process account with status != "deactivated" (should be ignored)
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "suspended")
|
|
if err != nil {
|
|
t.Errorf("Expected no error for non-deactivated status, got: %v", err)
|
|
}
|
|
|
|
// Test 4: Process account deactivation for non-existent user
|
|
err = processor.ProcessAccount(context.Background(), "did:plc:nonexistent", false, "deactivated")
|
|
// Cache invalidation will fail, but that's expected
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error: %v", err)
|
|
}
|
|
|
|
// Test 5: Process multiple deactivation events (idempotent)
|
|
for i := range 3 {
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deactivated")
|
|
if err != nil {
|
|
t.Logf("Expected cache invalidation error on iteration %d: %v", i, err)
|
|
}
|
|
}
|
|
|
|
// User should still exist after multiple deactivations
|
|
err = db.QueryRow(`
|
|
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
|
|
`, testDID).Scan(&exists)
|
|
if err != nil {
|
|
t.Fatalf("Failed to check if user exists after multiple deactivations: %v", err)
|
|
}
|
|
if !exists {
|
|
t.Error("User should still exist after multiple deactivation events")
|
|
}
|
|
|
|
// Test 6: Process account deletion - should delete user data
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
|
|
if err != nil {
|
|
t.Logf("Cache invalidation error during deletion (expected): %v", err)
|
|
}
|
|
|
|
// User should be deleted after "deleted" status
|
|
err = db.QueryRow(`
|
|
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
|
|
`, testDID).Scan(&exists)
|
|
if err != nil {
|
|
t.Fatalf("Failed to check if user exists after deletion: %v", err)
|
|
}
|
|
if exists {
|
|
t.Error("User should NOT exist after deletion event")
|
|
}
|
|
|
|
// Test 7: Process deletion for already-deleted user (idempotent)
|
|
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
|
|
if err != nil {
|
|
t.Errorf("Deletion of non-existent user should not error, got: %v", err)
|
|
}
|
|
}
|