mirror of
https://codeberg.org/git-pages/git-pages.git
synced 2026-05-14 03:01:48 +00:00
965 lines
28 KiB
Go
965 lines
28 KiB
Go
package git_pages
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"fmt"
|
|
"io"
|
|
"iter"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/c2h5oh/datasize"
|
|
"github.com/maypok86/otter/v2"
|
|
"github.com/minio/minio-go/v7"
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
)
|
|
|
|
var (
|
|
blobsDedupedCount prometheus.Counter
|
|
blobsDedupedBytes prometheus.Counter
|
|
|
|
blobCacheHitsCount prometheus.Counter
|
|
blobCacheHitsBytes prometheus.Counter
|
|
blobCacheMissesCount prometheus.Counter
|
|
blobCacheMissesBytes prometheus.Counter
|
|
blobCacheEvictionsCount prometheus.Counter
|
|
blobCacheEvictionsBytes prometheus.Counter
|
|
|
|
manifestCacheHitsCount prometheus.Counter
|
|
manifestCacheMissesCount prometheus.Counter
|
|
manifestCacheEvictionsCount prometheus.Counter
|
|
|
|
s3GetObjectDurationSeconds *prometheus.HistogramVec
|
|
s3GetObjectResponseCount *prometheus.CounterVec
|
|
)
|
|
|
|
func initS3BackendMetrics() {
|
|
blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blobs_deduped",
|
|
Help: "Count of blobs deduplicated",
|
|
})
|
|
blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blobs_deduped_bytes",
|
|
Help: "Total size in bytes of blobs deduplicated",
|
|
})
|
|
|
|
blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_hits_count",
|
|
Help: "Count of blobs that were retrieved from the cache",
|
|
})
|
|
blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_hits_bytes",
|
|
Help: "Total size in bytes of blobs that were retrieved from the cache",
|
|
})
|
|
blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_misses_count",
|
|
Help: "Count of blobs that were not found in the cache (and were then successfully cached)",
|
|
})
|
|
blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_misses_bytes",
|
|
Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)",
|
|
})
|
|
blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_evictions_count",
|
|
Help: "Count of blobs evicted from the cache",
|
|
})
|
|
blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_blob_cache_evictions_bytes",
|
|
Help: "Total size in bytes of blobs evicted from the cache",
|
|
})
|
|
|
|
manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_manifest_cache_hits_count",
|
|
Help: "Count of manifests that were retrieved from the cache",
|
|
})
|
|
manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_manifest_cache_misses_count",
|
|
Help: "Count of manifests that were not found in the cache (and were then successfully cached)",
|
|
})
|
|
manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
|
|
Name: "git_pages_manifest_cache_evictions_count",
|
|
Help: "Count of manifests evicted from the cache",
|
|
})
|
|
|
|
s3GetObjectDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
|
Name: "git_pages_s3_get_object_duration_seconds",
|
|
Help: "Time to read a whole object from S3",
|
|
Buckets: []float64{.01, .025, .05, .1, .25, .5, .75, 1, 1.25, 1.5, 1.75, 2, 2.5, 5, 10},
|
|
|
|
NativeHistogramBucketFactor: 1.1,
|
|
NativeHistogramMaxBucketNumber: 100,
|
|
NativeHistogramMinResetDuration: 10 * time.Minute,
|
|
}, []string{"kind"})
|
|
s3GetObjectResponseCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
|
Name: "git_pages_s3_get_object_responses_count",
|
|
Help: "Count of s3:GetObject responses",
|
|
}, []string{"kind", "code"})
|
|
}
|
|
|
|
// Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory.
|
|
type CachedBlob struct {
|
|
blob []byte
|
|
mtime time.Time
|
|
}
|
|
|
|
func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) }
|
|
|
|
// Manifests can only be cached for a short time to avoid serving stale content. Browser
|
|
// page loads cause a large burst of manifest accesses that are essential for serving
|
|
// `304 No Content` responses and these need to be handled very quickly, so both hits and
|
|
// misses are cached.
|
|
type CachedManifest struct {
|
|
manifest *Manifest
|
|
weight uint32
|
|
metadata ManifestMetadata
|
|
err error
|
|
}
|
|
|
|
func (c *CachedManifest) Weight() uint32 { return c.weight }
|
|
|
|
type S3Backend struct {
|
|
client *minio.Client
|
|
bucket string
|
|
blobCache *observedCache[string, *CachedBlob]
|
|
siteCache *observedCache[string, *CachedManifest]
|
|
featureCache *otter.Cache[BackendFeature, bool]
|
|
}
|
|
|
|
var _ Backend = (*S3Backend)(nil)
|
|
|
|
func makeCacheOptions[K comparable, V any](
|
|
config *CacheConfig,
|
|
weigher func(K, V) uint32,
|
|
) *otter.Options[K, V] {
|
|
options := &otter.Options[K, V]{}
|
|
if config.MaxSize != 0 {
|
|
options.MaximumWeight = config.MaxSize.Bytes()
|
|
options.Weigher = weigher
|
|
}
|
|
if config.MaxStale != 0 {
|
|
options.RefreshCalculator = otter.RefreshWriting[K, V](
|
|
time.Duration(config.MaxAge))
|
|
}
|
|
if config.MaxAge != 0 || config.MaxStale != 0 {
|
|
options.ExpiryCalculator = otter.ExpiryWriting[K, V](
|
|
time.Duration(config.MaxAge + config.MaxStale))
|
|
}
|
|
return options
|
|
}
|
|
|
|
func NewS3Backend(ctx context.Context, config *S3Config) (*S3Backend, error) {
|
|
client, err := minio.New(config.Endpoint, &minio.Options{
|
|
Creds: credentials.NewStaticV4(
|
|
config.AccessKeyID,
|
|
config.SecretAccessKey,
|
|
"",
|
|
),
|
|
Secure: !config.Insecure,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bucket := config.Bucket
|
|
exists, err := client.BucketExists(ctx, bucket)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if !exists {
|
|
logc.Printf(ctx, "s3: create bucket %s\n", bucket)
|
|
|
|
err = client.MakeBucket(ctx, bucket,
|
|
minio.MakeBucketOptions{Region: config.Region})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = (&S3Backend{client: client, bucket: bucket}).
|
|
EnableFeature(ctx, FeatureCheckDomainMarker)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
initS3BackendMetrics()
|
|
|
|
blobCacheMetrics := observedCacheMetrics{
|
|
HitNumberCounter: blobCacheHitsCount,
|
|
HitWeightCounter: blobCacheHitsBytes,
|
|
MissNumberCounter: blobCacheMissesCount,
|
|
MissWeightCounter: blobCacheMissesBytes,
|
|
EvictionNumberCounter: blobCacheEvictionsCount,
|
|
EvictionWeightCounter: blobCacheEvictionsBytes,
|
|
}
|
|
blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache,
|
|
func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }),
|
|
blobCacheMetrics)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
siteCacheMetrics := observedCacheMetrics{
|
|
HitNumberCounter: manifestCacheHitsCount,
|
|
MissNumberCounter: manifestCacheMissesCount,
|
|
EvictionNumberCounter: manifestCacheEvictionsCount,
|
|
}
|
|
siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache,
|
|
func(key string, value *CachedManifest) uint32 { return value.weight }),
|
|
siteCacheMetrics)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
featureCache, err := otter.New(&otter.Options[BackendFeature, bool]{
|
|
RefreshCalculator: otter.RefreshWriting[BackendFeature, bool](10 * time.Minute),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &S3Backend{client, bucket, blobCache, siteCache, featureCache}, nil
|
|
}
|
|
|
|
func (s3 *S3Backend) Backend() Backend {
|
|
return s3
|
|
}
|
|
|
|
func blobObjectName(name string) string {
|
|
return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...))
|
|
}
|
|
|
|
func storeFeatureObjectName(feature BackendFeature) string {
|
|
return fmt.Sprintf("meta/feature/%s", feature)
|
|
}
|
|
|
|
func (s3 *S3Backend) HasFeature(ctx context.Context, feature BackendFeature) bool {
|
|
loader := func(ctx context.Context, feature BackendFeature) (bool, error) {
|
|
_, err := s3.client.StatObject(ctx, s3.bucket, storeFeatureObjectName(feature),
|
|
minio.StatObjectOptions{})
|
|
if err != nil {
|
|
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
logc.Printf(ctx, "s3 feature %q: disabled", feature)
|
|
return false, nil
|
|
} else {
|
|
return false, err
|
|
}
|
|
}
|
|
logc.Printf(ctx, "s3 feature %q: enabled", feature)
|
|
return true, nil
|
|
}
|
|
|
|
isOn, err := s3.featureCache.Get(ctx, feature, otter.LoaderFunc[BackendFeature, bool](loader))
|
|
if err != nil {
|
|
err = fmt.Errorf("getting s3 backend feature %q: %w", feature, err)
|
|
ObserveError(err)
|
|
logc.Println(ctx, err)
|
|
return false
|
|
}
|
|
return isOn
|
|
}
|
|
|
|
func (s3 *S3Backend) EnableFeature(ctx context.Context, feature BackendFeature) error {
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, storeFeatureObjectName(feature),
|
|
&bytes.Reader{}, 0, minio.PutObjectOptions{})
|
|
return err
|
|
}
|
|
|
|
func (s3 *S3Backend) GetBlob(
|
|
ctx context.Context, name string,
|
|
) (
|
|
reader io.ReadSeeker, metadata BlobMetadata, err error,
|
|
) {
|
|
loader := func(ctx context.Context, name string) (*CachedBlob, error) {
|
|
logc.Printf(ctx, "s3: get blob %s\n", name)
|
|
|
|
startTime := time.Now()
|
|
|
|
object, err := s3.client.GetObject(ctx, s3.bucket, blobObjectName(name),
|
|
minio.GetObjectOptions{})
|
|
// Note that many errors (e.g. NoSuchKey) will be reported only after this point.
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer object.Close()
|
|
|
|
data, err := io.ReadAll(object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stat, err := object.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s3GetObjectDurationSeconds.
|
|
With(prometheus.Labels{"kind": "blob"}).
|
|
Observe(time.Since(startTime).Seconds())
|
|
|
|
return &CachedBlob{data, stat.LastModified}, nil
|
|
}
|
|
|
|
observer := func(ctx context.Context, name string) (*CachedBlob, error) {
|
|
cached, err := loader(ctx, name)
|
|
var code = "OK"
|
|
if resp, ok := err.(minio.ErrorResponse); ok {
|
|
code = resp.Code
|
|
}
|
|
s3GetObjectResponseCount.With(prometheus.Labels{"kind": "blob", "code": code}).Inc()
|
|
return cached, err
|
|
}
|
|
|
|
var cached *CachedBlob
|
|
cached, err = s3.blobCache.Get(ctx, name, otter.LoaderFunc[string, *CachedBlob](observer))
|
|
if err != nil {
|
|
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
|
|
}
|
|
} else {
|
|
reader = bytes.NewReader(cached.blob)
|
|
metadata.Name = name
|
|
metadata.Size = int64(len(cached.blob))
|
|
metadata.LastModified = cached.mtime
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s3 *S3Backend) PutBlob(ctx context.Context, name string, data []byte) error {
|
|
logc.Printf(ctx, "s3: put blob %s (%s)\n", name, datasize.ByteSize(len(data)).HumanReadable())
|
|
|
|
_, err := s3.client.StatObject(ctx, s3.bucket, blobObjectName(name),
|
|
minio.GetObjectOptions{})
|
|
if err != nil {
|
|
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, blobObjectName(name),
|
|
bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
|
|
if err != nil {
|
|
return err
|
|
} else {
|
|
ObserveData(ctx, "blob.status", "created")
|
|
logc.Printf(ctx, "s3: put blob %s (created)\n", name)
|
|
return nil
|
|
}
|
|
} else {
|
|
return err
|
|
}
|
|
} else {
|
|
ObserveData(ctx, "blob.status", "exists")
|
|
logc.Printf(ctx, "s3: put blob %s (exists)\n", name)
|
|
blobsDedupedCount.Inc()
|
|
blobsDedupedBytes.Add(float64(len(data)))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) DeleteBlob(ctx context.Context, name string) error {
|
|
logc.Printf(ctx, "s3: delete blob %s\n", name)
|
|
|
|
return s3.client.RemoveObject(ctx, s3.bucket, blobObjectName(name),
|
|
minio.RemoveObjectOptions{})
|
|
}
|
|
|
|
func (s3 *S3Backend) EnumerateBlobs(ctx context.Context) iter.Seq2[BlobMetadata, error] {
|
|
return func(yield func(BlobMetadata, error) bool) {
|
|
logc.Print(ctx, "s3: enumerate blobs")
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
prefix := "blob/"
|
|
for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
|
|
Prefix: prefix,
|
|
Recursive: true,
|
|
}) {
|
|
var metadata BlobMetadata
|
|
var err error
|
|
if err = object.Err; err == nil {
|
|
key := strings.TrimPrefix(object.Key, prefix)
|
|
if strings.HasSuffix(key, "/") {
|
|
continue // directory; skip
|
|
} else {
|
|
metadata.Name = joinBlobName(strings.Split(key, "/"))
|
|
metadata.Size = object.Size
|
|
metadata.LastModified = object.LastModified
|
|
}
|
|
}
|
|
if !yield(metadata, err) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func manifestObjectName(name string) string {
|
|
return fmt.Sprintf("site/%s", name)
|
|
}
|
|
|
|
func stagedManifestObjectName(manifestData []byte) string {
|
|
return fmt.Sprintf("dirty/%x", sha256.Sum256(manifestData))
|
|
}
|
|
|
|
type s3ManifestLoader struct {
|
|
s3 *S3Backend
|
|
}
|
|
|
|
func (l s3ManifestLoader) Load(
|
|
ctx context.Context, key string,
|
|
) (
|
|
*CachedManifest, error,
|
|
) {
|
|
return l.load(ctx, key, nil)
|
|
}
|
|
|
|
func (l s3ManifestLoader) Reload(
|
|
ctx context.Context, key string, oldValue *CachedManifest,
|
|
) (
|
|
*CachedManifest, error,
|
|
) {
|
|
return l.load(ctx, key, oldValue)
|
|
}
|
|
|
|
func (l s3ManifestLoader) load(
|
|
ctx context.Context, name string, oldManifest *CachedManifest,
|
|
) (
|
|
*CachedManifest, error,
|
|
) {
|
|
logc.Printf(ctx, "s3: get manifest %s\n", name)
|
|
|
|
loader := func() (*CachedManifest, error) {
|
|
opts := minio.GetObjectOptions{}
|
|
if oldManifest != nil && oldManifest.metadata.ETag != "" {
|
|
opts.SetMatchETagExcept(oldManifest.metadata.ETag)
|
|
}
|
|
object, err := l.s3.client.GetObject(ctx, l.s3.bucket, manifestObjectName(name), opts)
|
|
// Note that many errors (e.g. NoSuchKey) will be reported only after this point.
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer object.Close()
|
|
|
|
data, err := io.ReadAll(object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stat, err := object.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
manifest, err := DecodeManifest(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
metadata := ManifestMetadata{
|
|
LastModified: stat.LastModified,
|
|
ETag: stat.ETag,
|
|
}
|
|
return &CachedManifest{manifest, uint32(len(data)), metadata, nil}, nil
|
|
}
|
|
|
|
observer := func() (*CachedManifest, error) {
|
|
cached, err := loader()
|
|
var code = "OK"
|
|
if resp, ok := err.(minio.ErrorResponse); ok {
|
|
code = resp.Code
|
|
}
|
|
s3GetObjectResponseCount.With(prometheus.Labels{"kind": "manifest", "code": code}).Inc()
|
|
return cached, err
|
|
}
|
|
|
|
startTime := time.Now()
|
|
cached, err := observer()
|
|
s3GetObjectDurationSeconds.
|
|
With(prometheus.Labels{"kind": "manifest"}).
|
|
Observe(time.Since(startTime).Seconds())
|
|
|
|
if err != nil {
|
|
errResp := minio.ToErrorResponse(err)
|
|
if errResp.Code == "NoSuchKey" {
|
|
err = fmt.Errorf("%w: %s", ErrObjectNotFound, errResp.Key)
|
|
return &CachedManifest{nil, 1, ManifestMetadata{}, err}, nil
|
|
} else if errResp.StatusCode == http.StatusNotModified && oldManifest != nil {
|
|
return oldManifest, nil
|
|
} else {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
return cached, nil
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) GetManifest(
|
|
ctx context.Context, name string, opts GetManifestOptions,
|
|
) (
|
|
manifest *Manifest, metadata ManifestMetadata, err error,
|
|
) {
|
|
if opts.BypassCache {
|
|
entry, found := s3.siteCache.Cache.GetEntry(name)
|
|
if found && entry.RefreshableAt().Before(time.Now()) {
|
|
s3.siteCache.Cache.Invalidate(name)
|
|
}
|
|
}
|
|
|
|
var cached *CachedManifest
|
|
cached, err = s3.siteCache.Get(ctx, name, s3ManifestLoader{s3})
|
|
if err != nil {
|
|
return
|
|
} else {
|
|
// This could be `manifest, mtime, nil` or `nil, time.Time{}, ErrObjectNotFound`.
|
|
manifest, metadata, err = cached.manifest, cached.metadata, cached.err
|
|
return
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) StageManifest(ctx context.Context, manifest *Manifest) error {
|
|
data := EncodeManifest(manifest)
|
|
logc.Printf(ctx, "s3: stage manifest %x\n", sha256.Sum256(data))
|
|
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, stagedManifestObjectName(data),
|
|
bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
|
|
return err
|
|
}
|
|
|
|
func domainFrozenObjectName(domain string) string {
|
|
return manifestObjectName(fmt.Sprintf("%s/.frozen", domain))
|
|
}
|
|
|
|
func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error {
|
|
_, err := s3.client.StatObject(ctx, s3.bucket, domainFrozenObjectName(domain),
|
|
minio.GetObjectOptions{})
|
|
if err == nil {
|
|
return ErrDomainFrozen
|
|
} else if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
return nil
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) HasAtomicCAS(ctx context.Context) bool {
|
|
// Support for `If-Unmodified-Since:` or `If-Match:` for PutObject requests is very spotty:
|
|
// - AWS supports only `If-Match:`:
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
|
|
// - Minio supports `If-Match:`:
|
|
// https://blog.min.io/leading-the-way-minios-conditional-write-feature-for-modern-data-workloads/
|
|
// - Tigris supports `If-Unmodified-Since:` and `If-Match:`, but only with `X-Tigris-Consistent: true`;
|
|
// https://www.tigrisdata.com/docs/objects/conditionals/
|
|
// Note that the `X-Tigris-Consistent: true` header must be present on *every* transaction
|
|
// touching the object, not just on the CAS transactions.
|
|
// - Wasabi does not support either one and docs seem to suggest that the headers are ignored;
|
|
// - Garage does not support either one and source code suggests the headers are ignored.
|
|
// It seems that the only safe option is to not claim support for atomic CAS, and only do
|
|
// best-effort CAS implementation using HeadObject and PutObject/DeleteObject.
|
|
return false
|
|
}
|
|
|
|
func (s3 *S3Backend) checkManifestPrecondition(
|
|
ctx context.Context, name string, opts ModifyManifestOptions,
|
|
) error {
|
|
if opts.IfUnmodifiedSince.IsZero() && opts.IfMatch == "" {
|
|
return nil
|
|
}
|
|
|
|
stat, err := s3.client.StatObject(ctx, s3.bucket, manifestObjectName(name),
|
|
minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !opts.IfUnmodifiedSince.IsZero() && stat.LastModified.Compare(opts.IfUnmodifiedSince) > 0 {
|
|
return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed)
|
|
}
|
|
if opts.IfMatch != "" && stat.ETag != opts.IfMatch {
|
|
return fmt.Errorf("%w: If-Match", ErrPreconditionFailed)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s3 *S3Backend) CommitManifest(
|
|
ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
|
|
) error {
|
|
data := EncodeManifest(manifest)
|
|
logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name)
|
|
|
|
domain, _, _ := strings.Cut(name, "/")
|
|
if err := s3.checkDomainFrozen(ctx, domain); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remove staged object unconditionally (whether commit succeeded or failed), since
|
|
// the upper layer has to retry the complete operation anyway.
|
|
putOptions := minio.PutObjectOptions{}
|
|
putOptions.Header().Add("X-Tigris-Consistent", "true")
|
|
if opts.IfMatch != "" {
|
|
// Not guaranteed to do anything (see `HasAtomicCAS`), but let's try anyway;
|
|
// this is a "belt and suspenders" approach, together with `checkManifestPrecondition`.
|
|
// It does reliably work on MinIO at least.
|
|
putOptions.SetMatchETag(opts.IfMatch)
|
|
}
|
|
_, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name),
|
|
bytes.NewReader(data), int64(len(data)), putOptions)
|
|
removeErr := s3.client.RemoveObject(ctx, s3.bucket, stagedManifestObjectName(data),
|
|
minio.RemoveObjectOptions{})
|
|
s3.siteCache.Cache.Invalidate(name)
|
|
if putErr != nil {
|
|
if errResp := minio.ToErrorResponse(putErr); errResp.Code == "PreconditionFailed" {
|
|
return ErrPreconditionFailed
|
|
} else {
|
|
return putErr
|
|
}
|
|
} else if removeErr != nil {
|
|
return removeErr
|
|
} else {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) DeleteManifest(
|
|
ctx context.Context, name string, opts ModifyManifestOptions,
|
|
) error {
|
|
logc.Printf(ctx, "s3: delete manifest %s\n", name)
|
|
|
|
domain, _, _ := strings.Cut(name, "/")
|
|
if err := s3.checkDomainFrozen(ctx, domain); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name),
|
|
minio.RemoveObjectOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s3.siteCache.Cache.Invalidate(name)
|
|
return s3.bumpLastDomainUpdateTimestamp(ctx)
|
|
}
|
|
|
|
func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] {
|
|
return func(yield func(*ManifestMetadata, error) bool) {
|
|
logc.Print(ctx, "s3: enumerate manifests")
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
prefix := "site/"
|
|
for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
|
|
Prefix: prefix,
|
|
Recursive: true,
|
|
}) {
|
|
var metadata *ManifestMetadata
|
|
var err error
|
|
if err = object.Err; err == nil {
|
|
key := strings.TrimPrefix(object.Key, prefix)
|
|
_, project, _ := strings.Cut(key, "/")
|
|
if strings.HasSuffix(key, "/") {
|
|
continue // directory; skip
|
|
} else if project == "" || strings.HasPrefix(project, ".") && project != ".index" {
|
|
continue // internal; skip
|
|
} else {
|
|
metadata = &ManifestMetadata{
|
|
Name: key,
|
|
Size: object.Size,
|
|
LastModified: object.LastModified,
|
|
ETag: object.ETag,
|
|
}
|
|
}
|
|
}
|
|
if !yield(metadata, err) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Limits the number of concurrent uploads, globally across the entire git-pages process.
|
|
// Not currently configurable as there seems to be little need.
|
|
var getAllManifestsSemaphore = make(chan struct{}, 64)
|
|
|
|
func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] {
|
|
type result struct {
|
|
metadata *ManifestMetadata
|
|
manifest *Manifest
|
|
err error
|
|
}
|
|
|
|
resultsChan := make(chan result)
|
|
enumeratorCtx, cancel := context.WithCancel(ctx)
|
|
go func(ctx context.Context) {
|
|
wg := sync.WaitGroup{}
|
|
for metadata, err := range s3.EnumerateManifests(ctx) {
|
|
if err != nil {
|
|
resultsChan <- result{nil, nil, err}
|
|
} else {
|
|
getAllManifestsSemaphore <- struct{}{} // acquire
|
|
wg.Go(func() {
|
|
defer func() { <-getAllManifestsSemaphore }() // release
|
|
manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
|
|
resultsChan <- result{metadata, manifest, err}
|
|
})
|
|
}
|
|
}
|
|
wg.Wait()
|
|
close(resultsChan)
|
|
}(enumeratorCtx)
|
|
|
|
return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) {
|
|
for result := range resultsChan {
|
|
item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest}
|
|
if !yield(item, result.err) {
|
|
cancel()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func domainCheckObjectName(domain string) string {
|
|
return manifestObjectName(fmt.Sprintf("%s/.exists", domain))
|
|
}
|
|
|
|
func (s3 *S3Backend) CheckDomain(ctx context.Context, domain string) (exists bool, err error) {
|
|
logc.Printf(ctx, "s3: check domain %s\n", domain)
|
|
|
|
_, err = s3.client.StatObject(ctx, s3.bucket, domainCheckObjectName(domain),
|
|
minio.StatObjectOptions{})
|
|
if err != nil {
|
|
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
exists, err = false, nil
|
|
}
|
|
} else {
|
|
exists = true
|
|
}
|
|
|
|
if !exists && !s3.HasFeature(ctx, FeatureCheckDomainMarker) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
|
|
Prefix: manifestObjectName(fmt.Sprintf("%s/", domain)),
|
|
}) {
|
|
if object.Err != nil {
|
|
return false, object.Err
|
|
}
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (s3 *S3Backend) CreateDomain(ctx context.Context, domain string) error {
|
|
logc.Printf(ctx, "s3: create domain %s\n", domain)
|
|
|
|
exists, err := s3.CheckDomain(ctx, domain)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = s3.client.PutObject(ctx, s3.bucket, domainCheckObjectName(domain),
|
|
&bytes.Reader{}, 0, minio.PutObjectOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !exists {
|
|
err = s3.bumpLastDomainUpdateTimestamp(ctx)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s3 *S3Backend) FreezeDomain(ctx context.Context, domain string) error {
|
|
logc.Printf(ctx, "s3: freeze domain %s\n", domain)
|
|
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, domainFrozenObjectName(domain),
|
|
&bytes.Reader{}, 0, minio.PutObjectOptions{})
|
|
return err
|
|
|
|
}
|
|
|
|
func (s3 *S3Backend) UnfreezeDomain(ctx context.Context, domain string) error {
|
|
logc.Printf(ctx, "s3: unfreeze domain %s\n", domain)
|
|
|
|
err := s3.client.RemoveObject(ctx, s3.bucket, domainFrozenObjectName(domain),
|
|
minio.RemoveObjectOptions{})
|
|
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
|
|
return nil
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
|
|
const lastDomainUpdateObjectName = "meta/last-domain-update"
|
|
|
|
func (s3 *S3Backend) HaveDomainsChanged(ctx context.Context, since time.Time) (bool, error) {
|
|
info, err := s3.client.StatObject(ctx, s3.bucket, lastDomainUpdateObjectName,
|
|
minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return info.LastModified.After(since), nil
|
|
}
|
|
|
|
func (s3 *S3Backend) bumpLastDomainUpdateTimestamp(ctx context.Context) error {
|
|
logc.Print(ctx, "s3: bumping last domain update timestamp")
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, lastDomainUpdateObjectName,
|
|
&bytes.Reader{}, 0, minio.PutObjectOptions{})
|
|
return err
|
|
}
|
|
|
|
func auditObjectName(id AuditID) string {
|
|
return fmt.Sprintf("audit/%s", id)
|
|
}
|
|
|
|
func auditDetachedObjectName(id AuditID) string {
|
|
return fmt.Sprintf("audit/%s.detached", id)
|
|
}
|
|
|
|
func (s3 *S3Backend) AppendAuditLog(ctx context.Context, id AuditID, record *AuditRecord) error {
|
|
logc.Printf(ctx, "s3: append audit %s\n", id)
|
|
|
|
name := auditObjectName(id)
|
|
data := EncodeAuditRecord(record)
|
|
|
|
options := minio.PutObjectOptions{}
|
|
options.SetMatchETagExcept("*") // may or may not be supported
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, name,
|
|
bytes.NewReader(data), int64(len(data)), options)
|
|
if errResp := minio.ToErrorResponse(err); errResp.StatusCode == 412 {
|
|
panic(fmt.Errorf("audit ID collision: %s", name))
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s3 *S3Backend) QueryAuditLog(ctx context.Context, id AuditID) (*AuditRecord, error) {
|
|
logc.Printf(ctx, "s3: read audit %s\n", id)
|
|
|
|
object, err := s3.client.GetObject(ctx, s3.bucket, auditObjectName(id),
|
|
minio.GetObjectOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer object.Close()
|
|
|
|
data, err := io.ReadAll(object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
record, err := DecodeAuditRecord(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = s3.client.StatObject(ctx, s3.bucket, auditDetachedObjectName(id),
|
|
minio.StatObjectOptions{})
|
|
if err == nil {
|
|
record.Manifest = nil
|
|
} else if errResp := minio.ToErrorResponse(err); err != nil && errResp.Code != "NoSuchKey" {
|
|
return nil, err
|
|
}
|
|
|
|
return record, nil
|
|
}
|
|
|
|
func (s3 *S3Backend) SearchAuditLog(
|
|
ctx context.Context, opts SearchAuditLogOptions,
|
|
) iter.Seq2[AuditID, error] {
|
|
return func(yield func(AuditID, error) bool) {
|
|
logc.Printf(ctx, "s3: search audit\n")
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
prefix := "audit/"
|
|
for object := range s3.client.ListObjectsIter(ctx, s3.bucket, minio.ListObjectsOptions{
|
|
Prefix: prefix,
|
|
}) {
|
|
var id AuditID
|
|
var err error
|
|
if object.Err != nil {
|
|
err = object.Err
|
|
} else if strings.Contains(object.Key, ".") {
|
|
continue
|
|
} else if id, err = ParseAuditID(strings.TrimPrefix(object.Key, prefix)); err != nil {
|
|
// report error
|
|
} else if !opts.Since.IsZero() && id.CompareTime(opts.Since) < 0 {
|
|
continue
|
|
} else if !opts.Until.IsZero() && id.CompareTime(opts.Until) > 0 {
|
|
continue
|
|
}
|
|
if !yield(id, err) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var getAuditLogRecordsSemaphore = make(chan struct{}, 64)
|
|
|
|
func (s3 *S3Backend) GetAuditLogRecords(
|
|
ctx context.Context, ids iter.Seq2[AuditID, error],
|
|
) iter.Seq2[*AuditRecord, error] {
|
|
return func(yield func(*AuditRecord, error) bool) {
|
|
resultsChan := make(chan tuple[*AuditRecord, error])
|
|
enumeratorCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
go func(ctx context.Context) {
|
|
wg := sync.WaitGroup{}
|
|
for id, err := range ids {
|
|
if err != nil {
|
|
resultsChan <- tuple[*AuditRecord, error]{nil, err}
|
|
} else {
|
|
getAuditLogRecordsSemaphore <- struct{}{} // acquire
|
|
wg.Go(func() {
|
|
defer func() { <-getAuditLogRecordsSemaphore }() // release
|
|
record, err := s3.QueryAuditLog(ctx, id)
|
|
resultsChan <- tuple[*AuditRecord, error]{record, err}
|
|
})
|
|
}
|
|
}
|
|
wg.Wait()
|
|
close(resultsChan)
|
|
}(enumeratorCtx)
|
|
|
|
for result := range resultsChan {
|
|
record, err := result.Splat()
|
|
if !yield(record, err) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s3 *S3Backend) DetachAuditRecord(ctx context.Context, id AuditID) error {
|
|
logc.Printf(ctx, "s3: detach audit record %s\n", id)
|
|
|
|
_, err := s3.client.PutObject(ctx, s3.bucket, auditDetachedObjectName(id),
|
|
&bytes.Reader{}, 0, minio.PutObjectOptions{})
|
|
return err
|
|
}
|
|
|
|
func (s3 *S3Backend) ExpireAuditRecord(ctx context.Context, id AuditID) error {
|
|
logc.Printf(ctx, "s3: expire audit record %s\n", id)
|
|
|
|
return s3.client.RemoveObject(ctx, s3.bucket, auditObjectName(id),
|
|
minio.RemoveObjectOptions{})
|
|
}
|