package git_pages import ( "bytes" "context" "crypto/sha256" "fmt" "io" "iter" "net/http" "path" "strings" "sync" "time" "github.com/c2h5oh/datasize" "github.com/maypok86/otter/v2" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) var ( blobsDedupedCount prometheus.Counter blobsDedupedBytes prometheus.Counter blobCacheHitsCount prometheus.Counter blobCacheHitsBytes prometheus.Counter blobCacheMissesCount prometheus.Counter blobCacheMissesBytes prometheus.Counter blobCacheEvictionsCount prometheus.Counter blobCacheEvictionsBytes prometheus.Counter manifestCacheHitsCount prometheus.Counter manifestCacheMissesCount prometheus.Counter manifestCacheEvictionsCount prometheus.Counter s3GetObjectDurationSeconds *prometheus.HistogramVec s3GetObjectResponseCount *prometheus.CounterVec ) func initS3BackendMetrics() { blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blobs_deduped", Help: "Count of blobs deduplicated", }) blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blobs_deduped_bytes", Help: "Total size in bytes of blobs deduplicated", }) blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_hits_count", Help: "Count of blobs that were retrieved from the cache", }) blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_hits_bytes", Help: "Total size in bytes of blobs that were retrieved from the cache", }) blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_misses_count", Help: "Count of blobs that were not found in the cache (and were then successfully cached)", }) blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_misses_bytes", Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)", }) blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_evictions_count", Help: "Count of blobs evicted from the cache", }) blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_blob_cache_evictions_bytes", Help: "Total size in bytes of blobs evicted from the cache", }) manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_manifest_cache_hits_count", Help: "Count of manifests that were retrieved from the cache", }) manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_manifest_cache_misses_count", Help: "Count of manifests that were not found in the cache (and were then successfully cached)", }) manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{ Name: "git_pages_manifest_cache_evictions_count", Help: "Count of manifests evicted from the cache", }) s3GetObjectDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "git_pages_s3_get_object_duration_seconds", Help: "Time to read a whole object from S3", Buckets: []float64{.01, .025, .05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.5, 5, 10}, NativeHistogramBucketFactor: 1.1, NativeHistogramMaxBucketNumber: 100, NativeHistogramMinResetDuration: 10 * time.Minute, }, []string{"kind"}) s3GetObjectResponseCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "git_pages_s3_get_object_responses_count", Help: "Count of s3:GetObject responses", }, []string{"kind", "code"}) } // Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory. type CachedBlob struct { blob []byte mtime time.Time } func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) } // Manifests can only be cached for a short time to avoid serving stale content. Browser // page loads cause a large burst of manifest accesses that are essential for serving // `304 No Content` responses and these need to be handled very quickly, so both hits and // misses are cached. type CachedManifest struct { manifest *Manifest weight uint32 metadata ManifestMetadata err error } func (c *CachedManifest) Weight() uint32 { return c.weight } type S3Backend struct { client *minio.Client bucket string blobCache *observedCache[string, *CachedBlob] siteCache *observedCache[string, *CachedManifest] featureCache *otter.Cache[BackendFeature, bool] } var _ Backend = (*S3Backend)(nil) func makeCacheOptions[K comparable, V any]( config *CacheConfig, weigher func(K, V) uint32, ) *otter.Options[K, V] { options := &otter.Options[K, V]{} if config.MaxSize != 0 { options.MaximumWeight = config.MaxSize.Bytes() options.Weigher = weigher } if config.MaxStale != 0 { options.RefreshCalculator = otter.RefreshWriting[K, V]( time.Duration(config.MaxAge)) } if config.MaxAge != 0 || config.MaxStale != 0 { options.ExpiryCalculator = otter.ExpiryWriting[K, V]( time.Duration(config.MaxAge + config.MaxStale)) } return options } func NewS3Backend(ctx context.Context, config *S3Config) (*S3Backend, error) { client, err := minio.New(config.Endpoint, &minio.Options{ Creds: credentials.NewStaticV4( config.AccessKeyID, config.SecretAccessKey, "", ), Secure: !config.Insecure, }) if err != nil { return nil, err } bucket := config.Bucket exists, err := client.BucketExists(ctx, bucket) if err != nil { return nil, err } else if !exists { logc.Printf(ctx, "s3: create bucket %s\n", bucket) err = client.MakeBucket(ctx, bucket, minio.MakeBucketOptions{Region: config.Region}) if err != nil { return nil, err } err = (&S3Backend{client: client, bucket: bucket}). EnableFeature(ctx, FeatureCheckDomainMarker) if err != nil { return nil, err } } initS3BackendMetrics() blobCacheMetrics := observedCacheMetrics{ HitNumberCounter: blobCacheHitsCount, HitWeightCounter: blobCacheHitsBytes, MissNumberCounter: blobCacheMissesCount, MissWeightCounter: blobCacheMissesBytes, EvictionNumberCounter: blobCacheEvictionsCount, EvictionWeightCounter: blobCacheEvictionsBytes, } blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache, func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }), blobCacheMetrics) if err != nil { return nil, err } siteCacheMetrics := observedCacheMetrics{ HitNumberCounter: manifestCacheHitsCount, MissNumberCounter: manifestCacheMissesCount, EvictionNumberCounter: manifestCacheEvictionsCount, } siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache, func(key string, value *CachedManifest) uint32 { return value.weight }), siteCacheMetrics) if err != nil { return nil, err } featureCache, err := otter.New(&otter.Options[BackendFeature, bool]{ RefreshCalculator: otter.RefreshWriting[BackendFeature, bool](10 * time.Minute), }) if err != nil { return nil, err } return &S3Backend{client, bucket, blobCache, siteCache, featureCache}, nil } func (s3 *S3Backend) Backend() Backend { return s3 } func blobObjectName(name string) string { return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...)) } func storeFeatureObjectName(feature BackendFeature) string { return fmt.Sprintf("meta/feature/%s", feature) } func (s3 *S3Backend) HasFeature(ctx context.Context, feature BackendFeature) bool { loader := func(ctx context.Context, feature BackendFeature) (bool, error) { _, err := s3.client.StatObject(ctx, s3.bucket, storeFeatureObjectName(feature), minio.StatObjectOptions{}) if err != nil { if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { logc.Printf(ctx, "s3 feature %q: disabled", feature) return false, nil } else { return false, err } } logc.Printf(ctx, "s3 feature %q: enabled", feature) return true, nil } isOn, err := s3.featureCache.Get(ctx, feature, otter.LoaderFunc[BackendFeature, bool](loader)) if err != nil { err = fmt.Errorf("getting s3 backend feature %q: %w", feature, err) ObserveError(err) logc.Println(ctx, err) return false } return isOn } func (s3 *S3Backend) EnableFeature(ctx context.Context, feature BackendFeature) error { _, err := s3.client.PutObject(ctx, s3.bucket, storeFeatureObjectName(feature), &bytes.Reader{}, 0, minio.PutObjectOptions{}) return err } func (s3 *S3Backend) GetBlob( ctx context.Context, name string, ) ( reader io.ReadSeeker, metadata BlobMetadata, err error, ) { loader := func(ctx context.Context, name string) (*CachedBlob, error) { logc.Printf(ctx, "s3: get blob %s\n", name) startTime := time.Now() object, err := s3.client.GetObject(ctx, s3.bucket, blobObjectName(name), minio.GetObjectOptions{}) // Note that many errors (e.g. NoSuchKey) will be reported only after this point. if err != nil { return nil, err } defer object.Close() data, err := io.ReadAll(object) if err != nil { return nil, err } stat, err := object.Stat() if err != nil { return nil, err } s3GetObjectDurationSeconds. With(prometheus.Labels{"kind": "blob"}). Observe(time.Since(startTime).Seconds()) return &CachedBlob{data, stat.LastModified}, nil } observer := func(ctx context.Context, name string) (*CachedBlob, error) { cached, err := loader(ctx, name) var code = "OK" if resp, ok := err.(minio.ErrorResponse); ok { code = resp.Code } s3GetObjectResponseCount.With(prometheus.Labels{"kind": "blob", "code": code}).Inc() return cached, err } var cached *CachedBlob cached, err = s3.blobCache.Get(ctx, name, otter.LoaderFunc[string, *CachedBlob](observer)) if err != nil { if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key) } } else { reader = bytes.NewReader(cached.blob) metadata.Name = name metadata.Size = int64(len(cached.blob)) metadata.LastModified = cached.mtime } return } func (s3 *S3Backend) PutBlob(ctx context.Context, name string, data []byte) error { logc.Printf(ctx, "s3: put blob %s (%s)\n", name, datasize.ByteSize(len(data)).HumanReadable()) _, err := s3.client.StatObject(ctx, s3.bucket, blobObjectName(name), minio.GetObjectOptions{}) if err != nil { if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { _, err := s3.client.PutObject(ctx, s3.bucket, blobObjectName(name), bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}) if err != nil { return err } else { ObserveData(ctx, "blob.status", "created") logc.Printf(ctx, "s3: put blob %s (created)\n", name) return nil } } else { return err } } else { ObserveData(ctx, "blob.status", "exists") logc.Printf(ctx, "s3: put blob %s (exists)\n", name) blobsDedupedCount.Inc() blobsDedupedBytes.Add(float64(len(data))) return nil } } func (s3 *S3Backend) DeleteBlob(ctx context.Context, name string) error { logc.Printf(ctx, "s3: delete blob %s\n", name) return s3.client.RemoveObject(ctx, s3.bucket, blobObjectName(name), minio.RemoveObjectOptions{}) } func (s3 *S3Backend) EnumerateBlobs(ctx context.Context) iter.Seq2[BlobMetadata, error] { return func(yield func(BlobMetadata, error) bool) { logc.Print(ctx, "s3: enumerate blobs") ctx, cancel := context.WithCancel(ctx) defer cancel() prefix := "blob/" for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ Prefix: prefix, Recursive: true, }) { var metadata BlobMetadata var err error if err = object.Err; err == nil { key := strings.TrimPrefix(object.Key, prefix) if strings.HasSuffix(key, "/") { continue // directory; skip } else { metadata.Name = joinBlobName(strings.Split(key, "/")) metadata.Size = object.Size metadata.LastModified = object.LastModified } } if !yield(metadata, err) { break } } } } func manifestObjectName(name string) string { return fmt.Sprintf("site/%s", name) } func stagedManifestObjectName(manifestData []byte) string { return fmt.Sprintf("dirty/%x", sha256.Sum256(manifestData)) } type s3ManifestLoader struct { s3 *S3Backend } func (l s3ManifestLoader) Load( ctx context.Context, key string, ) ( *CachedManifest, error, ) { return l.load(ctx, key, nil) } func (l s3ManifestLoader) Reload( ctx context.Context, key string, oldValue *CachedManifest, ) ( *CachedManifest, error, ) { return l.load(ctx, key, oldValue) } func (l s3ManifestLoader) load( ctx context.Context, name string, oldManifest *CachedManifest, ) ( *CachedManifest, error, ) { logc.Printf(ctx, "s3: get manifest %s\n", name) loader := func() (*CachedManifest, error) { opts := minio.GetObjectOptions{} if oldManifest != nil && oldManifest.metadata.ETag != "" { opts.SetMatchETagExcept(oldManifest.metadata.ETag) } object, err := l.s3.client.GetObject(ctx, l.s3.bucket, manifestObjectName(name), opts) // Note that many errors (e.g. NoSuchKey) will be reported only after this point. if err != nil { return nil, err } defer object.Close() data, err := io.ReadAll(object) if err != nil { return nil, err } stat, err := object.Stat() if err != nil { return nil, err } manifest, err := DecodeManifest(data) if err != nil { return nil, err } metadata := ManifestMetadata{ LastModified: stat.LastModified, ETag: stat.ETag, } return &CachedManifest{manifest, uint32(len(data)), metadata, nil}, nil } observer := func() (*CachedManifest, error) { cached, err := loader() var code = "OK" if resp, ok := err.(minio.ErrorResponse); ok { code = resp.Code } s3GetObjectResponseCount.With(prometheus.Labels{"kind": "manifest", "code": code}).Inc() return cached, err } startTime := time.Now() cached, err := observer() s3GetObjectDurationSeconds. With(prometheus.Labels{"kind": "manifest"}). Observe(time.Since(startTime).Seconds()) if err != nil { errResp := minio.ToErrorResponse(err) if errResp.Code == "NoSuchKey" { err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key) return &CachedManifest{nil, 1, ManifestMetadata{}, err}, nil } else if errResp.StatusCode == http.StatusNotModified && oldManifest != nil { return oldManifest, nil } else { return nil, err } } else { return cached, nil } } func (s3 *S3Backend) GetManifest( ctx context.Context, name string, opts GetManifestOptions, ) ( manifest *Manifest, metadata ManifestMetadata, err error, ) { if opts.BypassCache { entry, found := s3.siteCache.Cache.GetEntry(name) if found && entry.RefreshableAt().Before(time.Now()) { s3.siteCache.Cache.Invalidate(name) } } var cached *CachedManifest cached, err = s3.siteCache.Get(ctx, name, s3ManifestLoader{s3}) if err != nil { return } else { // This could be `manifest, mtime, nil` or `nil, time.Time{}, ErrObjectNotFound`. manifest, metadata, err = cached.manifest, cached.metadata, cached.err return } } func (s3 *S3Backend) StageManifest(ctx context.Context, manifest *Manifest) error { data := EncodeManifest(manifest) logc.Printf(ctx, "s3: stage manifest %x\n", sha256.Sum256(data)) _, err := s3.client.PutObject(ctx, s3.bucket, stagedManifestObjectName(data), bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{}) return err } func domainFrozenObjectName(domain string) string { return manifestObjectName(fmt.Sprintf("%s/.frozen", domain)) } func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error { _, err := s3.client.StatObject(ctx, s3.bucket, domainFrozenObjectName(domain), minio.GetObjectOptions{}) if err == nil { return ErrDomainFrozen } else if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { return nil } else { return err } } func (s3 *S3Backend) HasAtomicCAS(ctx context.Context) bool { // Support for `If-Unmodified-Since:` or `If-Match:` for PutObject requests is very spotty: // - AWS supports only `If-Match:`: // https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html // - Minio supports `If-Match:`: // https://blog.min.io/leading-the-way-minios-conditional-write-feature-for-modern-data-workloads/ // - Tigris supports `If-Unmodified-Since:` and `If-Match:`, but only with `X-Tigris-Consistent: true`; // https://www.tigrisdata.com/docs/objects/conditionals/ // Note that the `X-Tigris-Consistent: true` header must be present on *every* transaction // touching the object, not just on the CAS transactions. // - Wasabi does not support either one and docs seem to suggest that the headers are ignored; // - Garage does not support either one and source code suggests the headers are ignored. // It seems that the only safe option is to not claim support for atomic CAS, and only do // best-effort CAS implementation using HeadObject and PutObject/DeleteObject. return false } func (s3 *S3Backend) checkManifestPrecondition( ctx context.Context, name string, opts ModifyManifestOptions, ) error { if opts.IfUnmodifiedSince.IsZero() && opts.IfMatch == "" { return nil } stat, err := s3.client.StatObject(ctx, s3.bucket, manifestObjectName(name), minio.GetObjectOptions{}) if err != nil { return err } if !opts.IfUnmodifiedSince.IsZero() && stat.LastModified.Compare(opts.IfUnmodifiedSince) > 0 { return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed) } if opts.IfMatch != "" && stat.ETag != opts.IfMatch { return fmt.Errorf("%w: If-Match", ErrPreconditionFailed) } return nil } func (s3 *S3Backend) CommitManifest( ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions, ) error { data := EncodeManifest(manifest) logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name) domain, _, _ := strings.Cut(name, "/") if err := s3.checkDomainFrozen(ctx, domain); err != nil { return err } if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil { return err } // Remove staged object unconditionally (whether commit succeeded or failed), since // the upper layer has to retry the complete operation anyway. putOptions := minio.PutObjectOptions{} putOptions.Header().Add("X-Tigris-Consistent", "true") if opts.IfMatch != "" { // Not guaranteed to do anything (see `HasAtomicCAS`), but let's try anyway; // this is a "belt and suspenders" approach, together with `checkManifestPrecondition`. // It does reliably work on MinIO at least. putOptions.SetMatchETag(opts.IfMatch) } _, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name), bytes.NewReader(data), int64(len(data)), putOptions) removeErr := s3.client.RemoveObject(ctx, s3.bucket, stagedManifestObjectName(data), minio.RemoveObjectOptions{}) s3.siteCache.Cache.Invalidate(name) if putErr != nil { if errResp := minio.ToErrorResponse(putErr); errResp.Code == "PreconditionFailed" { return ErrPreconditionFailed } else { return putErr } } else if removeErr != nil { return removeErr } else { return nil } } func (s3 *S3Backend) DeleteManifest( ctx context.Context, name string, opts ModifyManifestOptions, ) error { logc.Printf(ctx, "s3: delete manifest %s\n", name) domain, _, _ := strings.Cut(name, "/") if err := s3.checkDomainFrozen(ctx, domain); err != nil { return err } if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil { return err } err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name), minio.RemoveObjectOptions{}) if err != nil { return err } s3.siteCache.Cache.Invalidate(name) return s3.bumpLastDomainUpdateTimestamp(ctx) } func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] { return func(yield func(*ManifestMetadata, error) bool) { logc.Print(ctx, "s3: enumerate manifests") ctx, cancel := context.WithCancel(ctx) defer cancel() prefix := "site/" for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ Prefix: prefix, Recursive: true, }) { var metadata *ManifestMetadata var err error if err = object.Err; err == nil { key := strings.TrimPrefix(object.Key, prefix) _, project, _ := strings.Cut(key, "/") if strings.HasSuffix(key, "/") { continue // directory; skip } else if project == "" || strings.HasPrefix(project, ".") && project != ".index" { continue // internal; skip } else { metadata = &ManifestMetadata{ Name: key, Size: object.Size, LastModified: object.LastModified, ETag: object.ETag, } } } if !yield(metadata, err) { break } } } } // Limits the number of concurrent uploads, globally across the entire git-pages process. // Not currently configurable as there seems to be little need. var getAllManifestsSemaphore = make(chan struct{}, 64) func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] { type result struct { metadata *ManifestMetadata manifest *Manifest err error } resultsChan := make(chan result) enumeratorCtx, cancel := context.WithCancel(ctx) go func(ctx context.Context) { wg := sync.WaitGroup{} for metadata, err := range s3.EnumerateManifests(ctx) { if err != nil { resultsChan <- result{nil, nil, err} } else { getAllManifestsSemaphore <- struct{}{} // acquire wg.Go(func() { defer func() { <-getAllManifestsSemaphore }() // release manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) resultsChan <- result{metadata, manifest, err} }) } } wg.Wait() close(resultsChan) }(enumeratorCtx) return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) { for result := range resultsChan { item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest} if !yield(item, result.err) { cancel() break } } } } func domainCheckObjectName(domain string) string { return manifestObjectName(fmt.Sprintf("%s/.exists", domain)) } func (s3 *S3Backend) CheckDomain(ctx context.Context, domain string) (exists bool, err error) { logc.Printf(ctx, "s3: check domain %s\n", domain) _, err = s3.client.StatObject(ctx, s3.bucket, domainCheckObjectName(domain), minio.StatObjectOptions{}) if err != nil { if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { exists, err = false, nil } } else { exists = true } if !exists && !s3.HasFeature(ctx, FeatureCheckDomainMarker) { ctx, cancel := context.WithCancel(ctx) defer cancel() for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ Prefix: manifestObjectName(fmt.Sprintf("%s/", domain)), }) { if object.Err != nil { return false, object.Err } return true, nil } return false, nil } return } func (s3 *S3Backend) CreateDomain(ctx context.Context, domain string) error { logc.Printf(ctx, "s3: create domain %s\n", domain) exists, err := s3.CheckDomain(ctx, domain) if err != nil { return err } _, err = s3.client.PutObject(ctx, s3.bucket, domainCheckObjectName(domain), &bytes.Reader{}, 0, minio.PutObjectOptions{}) if err != nil { return err } if !exists { err = s3.bumpLastDomainUpdateTimestamp(ctx) } return err } func (s3 *S3Backend) FreezeDomain(ctx context.Context, domain string) error { logc.Printf(ctx, "s3: freeze domain %s\n", domain) _, err := s3.client.PutObject(ctx, s3.bucket, domainFrozenObjectName(domain), &bytes.Reader{}, 0, minio.PutObjectOptions{}) return err } func (s3 *S3Backend) UnfreezeDomain(ctx context.Context, domain string) error { logc.Printf(ctx, "s3: unfreeze domain %s\n", domain) err := s3.client.RemoveObject(ctx, s3.bucket, domainFrozenObjectName(domain), minio.RemoveObjectOptions{}) if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" { return nil } else { return err } } const lastDomainUpdateObjectName = "meta/last-domain-update" func (s3 *S3Backend) HaveDomainsChanged(ctx context.Context, since time.Time) (bool, error) { info, err := s3.client.StatObject(ctx, s3.bucket, lastDomainUpdateObjectName, minio.GetObjectOptions{}) if err != nil { return false, err } return info.LastModified.After(since), nil } func (s3 *S3Backend) bumpLastDomainUpdateTimestamp(ctx context.Context) error { logc.Print(ctx, "s3: bumping last domain update timestamp") _, err := s3.client.PutObject(ctx, s3.bucket, lastDomainUpdateObjectName, &bytes.Reader{}, 0, minio.PutObjectOptions{}) return err } func auditObjectName(id AuditID) string { return fmt.Sprintf("audit/%s", id) } func auditDetachedObjectName(id AuditID) string { return fmt.Sprintf("audit/%s.detached", id) } func (s3 *S3Backend) AppendAuditLog(ctx context.Context, id AuditID, record *AuditRecord) error { logc.Printf(ctx, "s3: append audit %s\n", id) name := auditObjectName(id) data := EncodeAuditRecord(record) options := minio.PutObjectOptions{} options.SetMatchETagExcept("*") // may or may not be supported _, err := s3.client.PutObject(ctx, s3.bucket, name, bytes.NewReader(data), int64(len(data)), options) if errResp := minio.ToErrorResponse(err); errResp.StatusCode == 412 { panic(fmt.Errorf("audit ID collision: %s", name)) } return err } func (s3 *S3Backend) QueryAuditLog(ctx context.Context, id AuditID) (*AuditRecord, error) { logc.Printf(ctx, "s3: read audit %s\n", id) object, err := s3.client.GetObject(ctx, s3.bucket, auditObjectName(id), minio.GetObjectOptions{}) if err != nil { return nil, err } defer object.Close() data, err := io.ReadAll(object) if err != nil { return nil, err } record, err := DecodeAuditRecord(data) if err != nil { return nil, err } _, err = s3.client.StatObject(ctx, s3.bucket, auditDetachedObjectName(id), minio.StatObjectOptions{}) if err == nil { record.Manifest = nil } else if errResp := minio.ToErrorResponse(err); err != nil && errResp.Code != "NoSuchKey" { return nil, err } return record, nil } func (s3 *S3Backend) SearchAuditLog( ctx context.Context, opts SearchAuditLogOptions, ) iter.Seq2[AuditID, error] { return func(yield func(AuditID, error) bool) { logc.Printf(ctx, "s3: search audit\n") ctx, cancel := context.WithCancel(ctx) defer cancel() prefix := "audit/" for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{ Prefix: prefix, }) { var id AuditID var err error if object.Err != nil { err = object.Err } else if strings.Contains(object.Key, ".") { continue } else if id, err = ParseAuditID(strings.TrimPrefix(object.Key, prefix)); err != nil { // report error } else if !opts.Since.IsZero() && id.CompareTime(opts.Since) < 0 { continue } else if !opts.Until.IsZero() && id.CompareTime(opts.Until) > 0 { continue } if !yield(id, err) { break } } } } var getAuditLogRecordsSemaphore = make(chan struct{}, 64) func (s3 *S3Backend) GetAuditLogRecords( ctx context.Context, ids iter.Seq2[AuditID, error], ) iter.Seq2[*AuditRecord, error] { return func(yield func(*AuditRecord, error) bool) { resultsChan := make(chan tuple[*AuditRecord, error]) enumeratorCtx, cancel := context.WithCancel(ctx) defer cancel() go func(ctx context.Context) { wg := sync.WaitGroup{} for id, err := range ids { if err != nil { resultsChan <- tuple[*AuditRecord, error]{nil, err} } else { getAuditLogRecordsSemaphore <- struct{}{} // acquire wg.Go(func() { defer func() { <-getAuditLogRecordsSemaphore }() // release record, err := s3.QueryAuditLog(ctx, id) resultsChan <- tuple[*AuditRecord, error]{record, err} }) } } wg.Wait() close(resultsChan) }(enumeratorCtx) for result := range resultsChan { record, err := result.Splat() if !yield(record, err) { break } } } } func (s3 *S3Backend) DetachAuditRecord(ctx context.Context, id AuditID) error { logc.Printf(ctx, "s3: detach audit record %s\n", id) _, err := s3.client.PutObject(ctx, s3.bucket, auditDetachedObjectName(id), &bytes.Reader{}, 0, minio.PutObjectOptions{}) return err } func (s3 *S3Backend) ExpireAuditRecord(ctx context.Context, id AuditID) error { logc.Printf(ctx, "s3: expire audit record %s\n", id) return s3.client.RemoveObject(ctx, s3.bucket, auditObjectName(id), minio.RemoveObjectOptions{}) }