Files
at-container-registry/pkg/atproto/client.go

690 lines
21 KiB
Go

package atproto
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/bluesky-social/indigo/atproto/atclient"
indigo_oauth "github.com/bluesky-social/indigo/atproto/auth/oauth"
)
// Sentinel errors
var (
ErrRecordNotFound = errors.New("record not found")
)
// SessionProvider provides locked OAuth sessions for PDS operations.
// This interface allows the ATProto client to use DoWithSession() for each PDS call,
// preventing DPoP nonce race conditions during concurrent operations.
type SessionProvider interface {
// DoWithSession executes fn with a locked OAuth session.
// The lock is held for the entire duration, serializing DPoP nonce updates.
DoWithSession(ctx context.Context, did string, fn func(session *indigo_oauth.ClientSession) error) error
}
// Client wraps ATProto operations for the registry
type Client struct {
pdsEndpoint string
did string
accessToken string // For Basic Auth only
httpClient *http.Client
sessionProvider SessionProvider // For locked OAuth sessions (prevents DPoP nonce races)
}
// NewClient creates a new ATProto client for Basic Auth tokens (app passwords)
func NewClient(pdsEndpoint, did, accessToken string) *Client {
return &Client{
pdsEndpoint: pdsEndpoint,
did: did,
accessToken: accessToken,
httpClient: &http.Client{},
}
}
// NewClientWithSessionProvider creates an ATProto client that uses locked OAuth sessions.
// This is the preferred constructor for concurrent operations (e.g., Docker layer uploads)
// as it prevents DPoP nonce race conditions by serializing PDS calls per-DID.
//
// Each PDS call acquires a per-DID lock, ensuring that:
// - Only one goroutine at a time can negotiate DPoP nonces with the PDS
// - The session's nonce is saved to DB before other goroutines load it
// - Concurrent manifest operations don't cause nonce thrashing
func NewClientWithSessionProvider(pdsEndpoint, did string, sessionProvider SessionProvider) *Client {
return &Client{
pdsEndpoint: pdsEndpoint,
did: did,
sessionProvider: sessionProvider,
httpClient: &http.Client{},
}
}
// Record represents a generic ATProto record
type Record struct {
URI string `json:"uri"`
CID string `json:"cid"`
Value json.RawMessage `json:"value"`
}
// PutRecord stores a record in the ATProto repository
func (c *Client) PutRecord(ctx context.Context, collection, rkey string, record any) (*Record, error) {
payload := map[string]any{
"repo": c.did,
"collection": collection,
"rkey": rkey,
"record": record,
}
// Use session provider (locked OAuth with DPoP) - prevents nonce races
if c.sessionProvider != nil {
var result Record
err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error {
apiClient := session.APIClient()
return apiClient.Post(ctx, "com.atproto.repo.putRecord", payload, &result)
})
if err != nil {
return nil, fmt.Errorf("putRecord failed: %w", err)
}
return &result, nil
}
// Basic Auth (app passwords)
body, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("failed to marshal record: %w", err)
}
url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoPutRecord)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.accessToken)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to put record: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("put record failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result Record
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// GetRecord retrieves a record from the ATProto repository
func (c *Client) GetRecord(ctx context.Context, collection, rkey string) (*Record, error) {
params := map[string]any{
"repo": c.did,
"collection": collection,
"rkey": rkey,
}
// Use session provider (locked OAuth with DPoP) - prevents nonce races
if c.sessionProvider != nil {
var result Record
err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error {
apiClient := session.APIClient()
return apiClient.Get(ctx, "com.atproto.repo.getRecord", params, &result)
})
if err != nil {
// Check for RecordNotFound error from indigo's APIError type
var apiErr *atclient.APIError
if errors.As(err, &apiErr) {
if apiErr.StatusCode == 404 || apiErr.Name == "RecordNotFound" {
return nil, ErrRecordNotFound
}
}
return nil, fmt.Errorf("getRecord failed: %w", err)
}
return &result, nil
}
// Basic Auth (app passwords)
url := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=%s",
c.pdsEndpoint, RepoGetRecord, c.did, collection, rkey)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
// Only set Authorization header if we have a token
// Empty Bearer tokens will be rejected by PDS
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get record: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, ErrRecordNotFound
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
bodyStr := string(bodyBytes)
// Check for RecordNotFound error (PDS returns 400 with this error)
if strings.Contains(bodyStr, "RecordNotFound") {
return nil, ErrRecordNotFound
}
return nil, fmt.Errorf("get record failed with status %d: %s", resp.StatusCode, bodyStr)
}
var result Record
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// DeleteRecord deletes a record from the ATProto repository
func (c *Client) DeleteRecord(ctx context.Context, collection, rkey string) error {
payload := map[string]any{
"repo": c.did,
"collection": collection,
"rkey": rkey,
}
// Use session provider (locked OAuth with DPoP) - prevents nonce races
if c.sessionProvider != nil {
err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error {
apiClient := session.APIClient()
var result map[string]any // deleteRecord returns empty object on success
return apiClient.Post(ctx, "com.atproto.repo.deleteRecord", payload, &result)
})
if err != nil {
return fmt.Errorf("deleteRecord failed: %w", err)
}
return nil
}
// Basic Auth (app passwords)
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal delete request: %w", err)
}
url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoDeleteRecord)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.accessToken)
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to delete record: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return fmt.Errorf("delete record failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
return nil
}
// ListRecords lists records in a collection
func (c *Client) ListRecords(ctx context.Context, collection string, limit int) ([]Record, error) {
url := fmt.Sprintf("%s%s?repo=%s&collection=%s&limit=%d",
c.pdsEndpoint, RepoListRecords, c.did, collection, limit)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
// Only set Authorization header if we have a token
// Empty Bearer tokens will be rejected by PDS
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to list records: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("list records failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
Records []Record `json:"records"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return result.Records, nil
}
// ATProtoBlobRef represents a reference to a blob in ATProto's native blob storage
// This is different from OCIBlobDescriptor which describes OCI image layers
type ATProtoBlobRef struct {
Type string `json:"$type"`
Ref Link `json:"ref"`
MimeType string `json:"mimeType"`
Size int64 `json:"size"`
}
// Link represents an IPFS link to blob content
type Link struct {
Link string `json:"$link"`
}
// UploadBlob uploads binary data to the PDS and returns a blob reference
func (c *Client) UploadBlob(ctx context.Context, data []byte, mimeType string) (*ATProtoBlobRef, error) {
// Use session provider (locked OAuth with DPoP) - prevents nonce races
if c.sessionProvider != nil {
var result struct {
Blob ATProtoBlobRef `json:"blob"`
}
err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error {
apiClient := session.APIClient()
// IMPORTANT: Use io.Reader for blob uploads
// LexDo JSON-encodes []byte (base64), but streams io.Reader as raw bytes
// Use the actual MIME type so PDS can validate against blob:image/* scope
return apiClient.LexDo(ctx,
"POST",
mimeType,
"com.atproto.repo.uploadBlob",
nil,
bytes.NewReader(data),
&result,
)
})
if err != nil {
return nil, fmt.Errorf("uploadBlob failed: %w", err)
}
return &result.Blob, nil
}
// Basic Auth (app passwords)
url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoUploadBlob)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.accessToken)
req.Header.Set("Content-Type", mimeType)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to upload blob: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("upload blob failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
Blob ATProtoBlobRef `json:"blob"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result.Blob, nil
}
// GetBlob downloads a blob by its CID from the PDS
func (c *Client) GetBlob(ctx context.Context, cid string) ([]byte, error) {
url := fmt.Sprintf("%s%s?did=%s&cid=%s",
c.pdsEndpoint, SyncGetBlob, c.did, cid)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
// Note: getBlob may not require auth for public repos, but we include it anyway
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get blob: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("blob not found")
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("get blob failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
// Read the blob data
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read blob data: %w", err)
}
// Check if PDS returned JSON-wrapped blob (Bluesky implementation)
// PDS may wrap blobs as JSON-encoded base64 strings
// Detection: Check if content starts with a quote (indicating JSON string)
if len(data) > 0 && data[0] == '"' {
// Blob is JSON-encoded - decode it
var base64Str string
if err := json.Unmarshal(data, &base64Str); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON-wrapped blob: %w", err)
}
// Base64-decode the blob content
decoded, err := base64.StdEncoding.DecodeString(base64Str)
if err != nil {
return nil, fmt.Errorf("failed to base64-decode blob: %w", err)
}
return decoded, nil
}
// Raw blob response (expected ATProto behavior)
return data, nil
}
// ListReposByCollectionResult represents the response from com.atproto.sync.listReposByCollection
type ListReposByCollectionResult struct {
Repos []RepoRef `json:"repos"` // Array of repo references
Cursor string `json:"cursor,omitempty"`
}
// RepoRef represents a repository reference in listReposByCollection response
type RepoRef struct {
DID string `json:"did"`
}
// ListReposByCollection lists all repos (DIDs) that have records in a collection
// This is a network-wide query, not limited to a single PDS
func (c *Client) ListReposByCollection(ctx context.Context, collection string, limit int, cursor string) (*ListReposByCollectionResult, error) {
// Build URL with query parameters
url := fmt.Sprintf("%s%s?collection=%s", c.pdsEndpoint, SyncListReposByCollection, collection)
if limit > 0 {
url += fmt.Sprintf("&limit=%d", limit)
}
if cursor != "" {
url += fmt.Sprintf("&cursor=%s", cursor)
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
// This endpoint typically doesn't require auth for public data
// but we include it if available
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to list repos by collection: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("list repos by collection failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result ListReposByCollectionResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &result, nil
}
// ListRecordsForRepo lists records in a collection for a specific repo (DID)
// This differs from ListRecords which uses the client's DID
func (c *Client) ListRecordsForRepo(ctx context.Context, repoDID, collection string, limit int, cursor string) ([]Record, string, error) {
url := fmt.Sprintf("%s%s?repo=%s&collection=%s",
c.pdsEndpoint, RepoListRecords, repoDID, collection)
if limit > 0 {
url += fmt.Sprintf("&limit=%d", limit)
}
if cursor != "" {
url += fmt.Sprintf("&cursor=%s", cursor)
}
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, "", err
}
// This endpoint typically doesn't require auth for public records
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, "", fmt.Errorf("failed to list records: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, "", fmt.Errorf("list records failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
Records []Record `json:"records"`
Cursor string `json:"cursor,omitempty"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, "", fmt.Errorf("failed to decode response: %w", err)
}
return result.Records, result.Cursor, nil
}
// ActorProfile represents a Bluesky actor profile (from AppView)
type ActorProfile struct {
DID string `json:"did"`
Handle string `json:"handle"`
DisplayName string `json:"displayName,omitempty"`
Description string `json:"description,omitempty"`
Avatar string `json:"avatar,omitempty"` // CDN URL from AppView
}
// ProfileRecord represents the app.bsky.actor.profile record (from PDS)
type ProfileRecord struct {
DisplayName string `json:"displayName,omitempty"`
Description string `json:"description,omitempty"`
Avatar *ATProtoBlobRef `json:"avatar,omitempty"` // Blob reference
Banner *ATProtoBlobRef `json:"banner,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
}
// GetActorProfile fetches an actor's profile from their PDS
// The actor parameter can be a DID or handle
func (c *Client) GetActorProfile(ctx context.Context, actor string) (*ActorProfile, error) {
// Basic Auth (app passwords) or unauthenticated
url := fmt.Sprintf("%s/xrpc/app.bsky.actor.getProfile?actor=%s", c.pdsEndpoint, actor)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
// This endpoint typically doesn't require auth for public profiles
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get profile: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("profile not found")
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("get profile failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var profile ActorProfile
if err := json.NewDecoder(resp.Body).Decode(&profile); err != nil {
return nil, fmt.Errorf("failed to decode profile: %w", err)
}
return &profile, nil
}
// GetProfileRecord fetches the app.bsky.actor.profile record from PDS
// This returns the raw profile record with blob references (not CDN URLs)
func (c *Client) GetProfileRecord(ctx context.Context, did string) (*ProfileRecord, error) {
params := map[string]any{
"repo": did,
"collection": "app.bsky.actor.profile",
"rkey": "self",
}
// Use session provider (locked OAuth with DPoP) - prevents nonce races
if c.sessionProvider != nil {
var result struct {
Value ProfileRecord `json:"value"`
}
err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error {
apiClient := session.APIClient()
return apiClient.Get(ctx, "com.atproto.repo.getRecord", params, &result)
})
if err != nil {
return nil, fmt.Errorf("getRecord failed: %w", err)
}
return &result.Value, nil
}
// Basic Auth (app passwords)
url := fmt.Sprintf("%s%s?repo=%s&collection=app.bsky.actor.profile&rkey=self",
c.pdsEndpoint, RepoGetRecord, did)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
if c.accessToken != "" {
req.Header.Set("Authorization", "Bearer "+c.accessToken)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to get profile record: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, fmt.Errorf("profile record not found")
}
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("get profile record failed with status %d: %s", resp.StatusCode, string(bodyBytes))
}
var result struct {
Value ProfileRecord `json:"value"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode profile record: %w", err)
}
return &result.Value, nil
}
// BlobCDNURL constructs an imgs.blue CDN URL for a blob
// The imgs.blue service can serve blobs using DID or handle
func BlobCDNURL(didOrHandle, cid string) string {
return fmt.Sprintf("https://imgs.blue/%s/%s", didOrHandle, cid)
}
// DIDDocument represents a did:web document
type DIDDocument struct {
Context []string `json:"@context"`
ID string `json:"id"`
Service []struct {
ID string `json:"id"`
Type string `json:"type"`
ServiceEndpoint string `json:"serviceEndpoint"`
} `json:"service"`
}
// FetchDIDDocument fetches and parses a DID document from a URL
func (c *Client) FetchDIDDocument(ctx context.Context, didDocURL string) (*DIDDocument, error) {
req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil)
if err != nil {
return nil, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch DID document: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fetch DID document failed with status %d", resp.StatusCode)
}
var didDoc DIDDocument
if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil {
return nil, fmt.Errorf("failed to decode DID document: %w", err)
}
return &didDoc, nil
}
// DID returns the DID associated with this client
func (c *Client) DID() string {
return c.did
}
func (c *Client) PDSEndpoint() string {
return c.pdsEndpoint
}