dedupe hold health checks

This commit is contained in:
Evan Jarrett
2025-10-22 18:17:44 -05:00
parent 0e4dd9af20
commit 1b1400a6fb
5 changed files with 132 additions and 27 deletions

View File

@@ -133,7 +133,7 @@ networks:
driver: bridge
ipam:
config:
- subnet: 172.28.0.0/24
- subnet: 172.29.0.0/24
volumes:
caddy_data:

View File

@@ -8,14 +8,14 @@ import (
// HoldCaptainRecord represents a cached captain record from a hold's PDS
type HoldCaptainRecord struct {
HoldDID string `json:"-"` // Set manually, not from JSON
HoldDID string `json:"-"` // Set manually, not from JSON
OwnerDID string `json:"owner"`
Public bool `json:"public"`
AllowAllCrew bool `json:"allowAllCrew"`
DeployedAt string `json:"deployedAt"`
Region string `json:"region"`
Provider string `json:"provider"`
UpdatedAt time.Time `json:"-"` // Set manually, not from JSON
UpdatedAt time.Time `json:"-"` // Set manually, not from JSON
}
// GetCaptainRecord retrieves a captain record from the cache

View File

@@ -268,3 +268,56 @@ func TestNewWorkerWithStartupDelay(t *testing.T) {
t.Errorf("Expected startupDelay=%v, got %v", startupDelay, workerWithDelay.startupDelay)
}
}
func TestNormalizeHoldEndpoint(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{
name: "HTTP URL",
input: "http://hold01.atcr.io",
expected: "did:web:hold01.atcr.io",
},
{
name: "HTTPS URL",
input: "https://hold01.atcr.io",
expected: "did:web:hold01.atcr.io",
},
{
name: "HTTP URL with port",
input: "http://172.28.0.3:8080",
expected: "did:web:172.28.0.3:8080",
},
{
name: "HTTP URL with trailing slash",
input: "http://hold01.atcr.io/",
expected: "did:web:hold01.atcr.io",
},
{
name: "HTTP URL with path",
input: "http://hold01.atcr.io/some/path",
expected: "did:web:hold01.atcr.io",
},
{
name: "Already a DID",
input: "did:web:hold01.atcr.io",
expected: "did:web:hold01.atcr.io",
},
{
name: "DID with port",
input: "did:web:172.28.0.3:8080",
expected: "did:web:172.28.0.3:8080",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := normalizeHoldEndpoint(tt.input)
if result != tt.expected {
t.Errorf("normalizeHoldEndpoint(%q) = %q, want %q", tt.input, result, tt.expected)
}
})
}
}

View File

@@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"log"
"strings"
"sync"
"time"
)
@@ -115,7 +116,32 @@ func (w *Worker) refreshAllHolds(ctx context.Context) {
return
}
log.Printf("Hold health worker: Checking %d unique hold endpoints", len(endpoints))
log.Printf("Hold health worker: Fetched %d hold endpoint entries from database", len(endpoints))
// Deduplicate endpoints by normalizing to canonical DID format
// This handles cases where the same hold is stored with different representations:
// - http://172.28.0.3:8080 (internal IP)
// - http://hold01.atcr.io (external hostname)
// - did:web:hold01.atcr.io (DID format)
// All normalize to the same DID: did:web:hold01.atcr.io (or did:web:172.28.0.3:8080)
seen := make(map[string]bool)
uniqueEndpoints := make([]string, 0, len(endpoints))
for _, endpoint := range endpoints {
// Normalize to canonical DID format
normalizedDID := normalizeHoldEndpoint(endpoint)
// Skip if we've already seen this normalized DID
if seen[normalizedDID] {
continue
}
seen[normalizedDID] = true
// Use the normalized DID for health checks
uniqueEndpoints = append(uniqueEndpoints, normalizedDID)
}
log.Printf("Hold health worker: Checking %d unique hold endpoints (deduplicated from %d)", len(uniqueEndpoints), len(endpoints))
// Check health concurrently with rate limiting
// Use a semaphore to limit concurrent requests (max 10 at a time)
@@ -126,7 +152,7 @@ func (w *Worker) refreshAllHolds(ctx context.Context) {
unreachable := 0
var statsMu sync.Mutex
for _, endpoint := range endpoints {
for _, endpoint := range uniqueEndpoints {
wg.Add(1)
go func(ep string) {
@@ -193,3 +219,29 @@ func (a *DBAdapter) GetUniqueHoldEndpoints() ([]string, error) {
return endpoints, nil
}
// normalizeHoldEndpoint converts a hold endpoint (URL or DID) to canonical DID format
// This ensures that different representations of the same hold are deduplicated:
// - http://172.28.0.3:8080 → did:web:172.28.0.3:8080
// - http://hold01.atcr.io → did:web:hold01.atcr.io
// - https://hold01.atcr.io → did:web:hold01.atcr.io
// - did:web:hold01.atcr.io → did:web:hold01.atcr.io (passthrough)
func normalizeHoldEndpoint(endpoint string) string {
// Strip protocol and trailing slashes
normalized := endpoint
normalized = strings.TrimPrefix(normalized, "http://")
normalized = strings.TrimPrefix(normalized, "https://")
normalized = strings.TrimSuffix(normalized, "/")
// If already a DID, return as-is
if strings.HasPrefix(endpoint, "did:") {
return endpoint
}
// Extract hostname (remove path if present)
parts := strings.Split(normalized, "/")
hostname := parts[0]
// Convert to did:web
return "did:web:" + hostname
}

View File

@@ -128,32 +128,32 @@ func (p *HoldPDS) Bootstrap(ctx context.Context, storageDriver driver.StorageDri
if !captainExists {
// Initialize repo if it doesn't exist yet
// Check if repo exists by trying to get the head
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
if err != nil || !head.Defined() {
// Repo doesn't exist, initialize it
// InitNewActor creates an empty repo with initial commit
err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "")
if err != nil {
return fmt.Errorf("failed to initialize repo: %w", err)
// Initialize repo if it doesn't exist yet
// Check if repo exists by trying to get the head
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
if err != nil || !head.Defined() {
// Repo doesn't exist, initialize it
// InitNewActor creates an empty repo with initial commit
err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "")
if err != nil {
return fmt.Errorf("failed to initialize repo: %w", err)
}
fmt.Printf("✅ Initialized empty repo\n")
}
fmt.Printf("✅ Initialized empty repo\n")
}
// Create captain record (hold ownership and settings)
_, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew)
if err != nil {
return fmt.Errorf("failed to create captain record: %w", err)
}
// Create captain record (hold ownership and settings)
_, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew)
if err != nil {
return fmt.Errorf("failed to create captain record: %w", err)
}
fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew)
fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew)
// Add hold owner as first crew member with admin role
_, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"})
if err != nil {
return fmt.Errorf("failed to add owner as crew member: %w", err)
}
// Add hold owner as first crew member with admin role
_, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"})
if err != nil {
return fmt.Errorf("failed to add owner as crew member: %w", err)
}
fmt.Printf("✅ Added %s as hold admin\n", ownerDID)
}