From 1b1400a6fb09e642e2e9dc6fb47cd2632b4274c9 Mon Sep 17 00:00:00 2001 From: Evan Jarrett Date: Wed, 22 Oct 2025 18:17:44 -0500 Subject: [PATCH] dedupe hold health checks --- deploy/docker-compose.prod.yml | 2 +- pkg/appview/db/hold_store.go | 4 +- pkg/appview/holdhealth/checker_test.go | 53 ++++++++++++++++++++++++ pkg/appview/holdhealth/worker.go | 56 +++++++++++++++++++++++++- pkg/hold/pds/server.go | 44 ++++++++++---------- 5 files changed, 132 insertions(+), 27 deletions(-) diff --git a/deploy/docker-compose.prod.yml b/deploy/docker-compose.prod.yml index 5edfddf..e0d6bb3 100644 --- a/deploy/docker-compose.prod.yml +++ b/deploy/docker-compose.prod.yml @@ -133,7 +133,7 @@ networks: driver: bridge ipam: config: - - subnet: 172.28.0.0/24 + - subnet: 172.29.0.0/24 volumes: caddy_data: diff --git a/pkg/appview/db/hold_store.go b/pkg/appview/db/hold_store.go index 8f6bb97..d0d3e66 100644 --- a/pkg/appview/db/hold_store.go +++ b/pkg/appview/db/hold_store.go @@ -8,14 +8,14 @@ import ( // HoldCaptainRecord represents a cached captain record from a hold's PDS type HoldCaptainRecord struct { - HoldDID string `json:"-"` // Set manually, not from JSON + HoldDID string `json:"-"` // Set manually, not from JSON OwnerDID string `json:"owner"` Public bool `json:"public"` AllowAllCrew bool `json:"allowAllCrew"` DeployedAt string `json:"deployedAt"` Region string `json:"region"` Provider string `json:"provider"` - UpdatedAt time.Time `json:"-"` // Set manually, not from JSON + UpdatedAt time.Time `json:"-"` // Set manually, not from JSON } // GetCaptainRecord retrieves a captain record from the cache diff --git a/pkg/appview/holdhealth/checker_test.go b/pkg/appview/holdhealth/checker_test.go index 21f97b2..fe6f04e 100644 --- a/pkg/appview/holdhealth/checker_test.go +++ b/pkg/appview/holdhealth/checker_test.go @@ -268,3 +268,56 @@ func TestNewWorkerWithStartupDelay(t *testing.T) { t.Errorf("Expected startupDelay=%v, got %v", startupDelay, workerWithDelay.startupDelay) } } + +func TestNormalizeHoldEndpoint(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "HTTP URL", + input: "http://hold01.atcr.io", + expected: "did:web:hold01.atcr.io", + }, + { + name: "HTTPS URL", + input: "https://hold01.atcr.io", + expected: "did:web:hold01.atcr.io", + }, + { + name: "HTTP URL with port", + input: "http://172.28.0.3:8080", + expected: "did:web:172.28.0.3:8080", + }, + { + name: "HTTP URL with trailing slash", + input: "http://hold01.atcr.io/", + expected: "did:web:hold01.atcr.io", + }, + { + name: "HTTP URL with path", + input: "http://hold01.atcr.io/some/path", + expected: "did:web:hold01.atcr.io", + }, + { + name: "Already a DID", + input: "did:web:hold01.atcr.io", + expected: "did:web:hold01.atcr.io", + }, + { + name: "DID with port", + input: "did:web:172.28.0.3:8080", + expected: "did:web:172.28.0.3:8080", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := normalizeHoldEndpoint(tt.input) + if result != tt.expected { + t.Errorf("normalizeHoldEndpoint(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} diff --git a/pkg/appview/holdhealth/worker.go b/pkg/appview/holdhealth/worker.go index e61f53f..e094fdf 100644 --- a/pkg/appview/holdhealth/worker.go +++ b/pkg/appview/holdhealth/worker.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log" + "strings" "sync" "time" ) @@ -115,7 +116,32 @@ func (w *Worker) refreshAllHolds(ctx context.Context) { return } - log.Printf("Hold health worker: Checking %d unique hold endpoints", len(endpoints)) + log.Printf("Hold health worker: Fetched %d hold endpoint entries from database", len(endpoints)) + + // Deduplicate endpoints by normalizing to canonical DID format + // This handles cases where the same hold is stored with different representations: + // - http://172.28.0.3:8080 (internal IP) + // - http://hold01.atcr.io (external hostname) + // - did:web:hold01.atcr.io (DID format) + // All normalize to the same DID: did:web:hold01.atcr.io (or did:web:172.28.0.3:8080) + seen := make(map[string]bool) + uniqueEndpoints := make([]string, 0, len(endpoints)) + + for _, endpoint := range endpoints { + // Normalize to canonical DID format + normalizedDID := normalizeHoldEndpoint(endpoint) + + // Skip if we've already seen this normalized DID + if seen[normalizedDID] { + continue + } + + seen[normalizedDID] = true + // Use the normalized DID for health checks + uniqueEndpoints = append(uniqueEndpoints, normalizedDID) + } + + log.Printf("Hold health worker: Checking %d unique hold endpoints (deduplicated from %d)", len(uniqueEndpoints), len(endpoints)) // Check health concurrently with rate limiting // Use a semaphore to limit concurrent requests (max 10 at a time) @@ -126,7 +152,7 @@ func (w *Worker) refreshAllHolds(ctx context.Context) { unreachable := 0 var statsMu sync.Mutex - for _, endpoint := range endpoints { + for _, endpoint := range uniqueEndpoints { wg.Add(1) go func(ep string) { @@ -193,3 +219,29 @@ func (a *DBAdapter) GetUniqueHoldEndpoints() ([]string, error) { return endpoints, nil } + +// normalizeHoldEndpoint converts a hold endpoint (URL or DID) to canonical DID format +// This ensures that different representations of the same hold are deduplicated: +// - http://172.28.0.3:8080 → did:web:172.28.0.3:8080 +// - http://hold01.atcr.io → did:web:hold01.atcr.io +// - https://hold01.atcr.io → did:web:hold01.atcr.io +// - did:web:hold01.atcr.io → did:web:hold01.atcr.io (passthrough) +func normalizeHoldEndpoint(endpoint string) string { + // Strip protocol and trailing slashes + normalized := endpoint + normalized = strings.TrimPrefix(normalized, "http://") + normalized = strings.TrimPrefix(normalized, "https://") + normalized = strings.TrimSuffix(normalized, "/") + + // If already a DID, return as-is + if strings.HasPrefix(endpoint, "did:") { + return endpoint + } + + // Extract hostname (remove path if present) + parts := strings.Split(normalized, "/") + hostname := parts[0] + + // Convert to did:web + return "did:web:" + hostname +} diff --git a/pkg/hold/pds/server.go b/pkg/hold/pds/server.go index cf7b72b..bc409e8 100644 --- a/pkg/hold/pds/server.go +++ b/pkg/hold/pds/server.go @@ -128,32 +128,32 @@ func (p *HoldPDS) Bootstrap(ctx context.Context, storageDriver driver.StorageDri if !captainExists { - // Initialize repo if it doesn't exist yet - // Check if repo exists by trying to get the head - head, err := p.carstore.GetUserRepoHead(ctx, p.uid) - if err != nil || !head.Defined() { - // Repo doesn't exist, initialize it - // InitNewActor creates an empty repo with initial commit - err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "") - if err != nil { - return fmt.Errorf("failed to initialize repo: %w", err) + // Initialize repo if it doesn't exist yet + // Check if repo exists by trying to get the head + head, err := p.carstore.GetUserRepoHead(ctx, p.uid) + if err != nil || !head.Defined() { + // Repo doesn't exist, initialize it + // InitNewActor creates an empty repo with initial commit + err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "") + if err != nil { + return fmt.Errorf("failed to initialize repo: %w", err) + } + fmt.Printf("✅ Initialized empty repo\n") } - fmt.Printf("✅ Initialized empty repo\n") - } - // Create captain record (hold ownership and settings) - _, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew) - if err != nil { - return fmt.Errorf("failed to create captain record: %w", err) - } + // Create captain record (hold ownership and settings) + _, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew) + if err != nil { + return fmt.Errorf("failed to create captain record: %w", err) + } - fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew) + fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew) - // Add hold owner as first crew member with admin role - _, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"}) - if err != nil { - return fmt.Errorf("failed to add owner as crew member: %w", err) - } + // Add hold owner as first crew member with admin role + _, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"}) + if err != nil { + return fmt.Errorf("failed to add owner as crew member: %w", err) + } fmt.Printf("✅ Added %s as hold admin\n", ownerDID) }