Add manifest and blob metrics.

This commit is contained in:
miyuko
2025-09-22 14:33:25 +01:00
parent 98416af092
commit 1c7ef99359
7 changed files with 306 additions and 20 deletions

View File

@@ -21,8 +21,8 @@ func splitBlobName(name string) []string {
}
type Backend interface {
// Retrieve a blob. Returns `reader, mtime, err`.
GetBlob(name string) (reader io.ReadSeeker, mtime time.Time, err error)
// Retrieve a blob. Returns `reader, size, mtime, err`.
GetBlob(name string) (reader io.ReadSeeker, size uint64, mtime time.Time, err error)
// Store a blob. If a blob called `name` already exists, this function returns `nil` without
// regards to the old or new contents. It is expected that blobs are content-addressed, i.e.

View File

@@ -65,19 +65,22 @@ func (fs *FSBackend) Backend() Backend {
return fs
}
func (fs *FSBackend) GetBlob(name string) (io.ReadSeeker, time.Time, error) {
func (fs *FSBackend) GetBlob(name string) (reader io.ReadSeeker, size uint64, mtime time.Time, err error) {
blobPath := filepath.Join(splitBlobName(name)...)
stat, err := fs.blobRoot.Stat(blobPath)
if errors.Is(err, os.ErrNotExist) {
return nil, time.Time{}, fmt.Errorf("%w: %s", errNotFound, err.(*os.PathError).Path)
err = fmt.Errorf("%w: %s", errNotFound, err.(*os.PathError).Path)
return
} else if err != nil {
return nil, time.Time{}, fmt.Errorf("stat: %w", err)
err = fmt.Errorf("stat: %w", err)
return
}
file, err := fs.blobRoot.Open(blobPath)
if err != nil {
return nil, time.Time{}, fmt.Errorf("open: %w", err)
err = fmt.Errorf("open: %w", err)
return
}
return file, stat.ModTime(), nil
return file, uint64(stat.Size()), stat.ModTime(), nil
}
func (fs *FSBackend) PutBlob(name string, data []byte) error {

91
src/backend_observer.go Normal file
View File

@@ -0,0 +1,91 @@
package main
import (
"io"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
blobsRetrievedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_retrieved",
Help: "Count of blobs retrieved",
})
blobsRetrievedBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_retrieved_bytes",
Help: "Total size in bytes of blobs retrieved",
})
blobsStoredCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_stored",
Help: "Count of blobs stored",
})
blobsStoredBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_stored_bytes",
Help: "Total size in bytes of blobs stored",
})
manifestsRetrievedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_manifests_retrieved",
Help: "Count of manifests retrieved",
})
)
type observedBackend struct {
backend Backend
}
func NewObservedBackend(backend Backend) Backend {
return &observedBackend{backend: backend}
}
func (b *observedBackend) GetBlob(name string) (reader io.ReadSeeker, size uint64, mtime time.Time, err error) {
reader, size, mtime, err = b.backend.GetBlob(name)
if err != nil {
return
}
blobsRetrievedCount.Inc()
blobsRetrievedBytes.Add(float64(size))
return
}
func (b *observedBackend) PutBlob(name string, data []byte) error {
err := b.backend.PutBlob(name, data)
if err != nil {
return err
}
blobsStoredCount.Inc()
blobsStoredBytes.Add(float64(len(data)))
return nil
}
func (b *observedBackend) DeleteBlob(name string) error {
return b.backend.DeleteBlob(name)
}
func (b *observedBackend) GetManifest(name string) (manifest *Manifest, err error) {
manifest, err = b.backend.GetManifest(name)
if err != nil {
return
}
manifestsRetrievedCount.Inc()
return
}
func (b *observedBackend) StageManifest(manifest *Manifest) error {
return b.backend.StageManifest(manifest)
}
func (b *observedBackend) CommitManifest(name string, manifest *Manifest) error {
return b.backend.CommitManifest(name, manifest)
}
func (b *observedBackend) DeleteManifest(name string) error {
return b.backend.DeleteManifest(name)
}
func (b *observedBackend) CheckDomain(domain string) (found bool, err error) {
return b.backend.CheckDomain(domain)
}

View File

@@ -13,14 +13,83 @@ import (
"github.com/maypok86/otter/v2"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
blobsDedupedCount prometheus.Counter
blobsDedupedBytes prometheus.Counter
blobCacheHitsCount prometheus.Counter
blobCacheHitsBytes prometheus.Counter
blobCacheMissesCount prometheus.Counter
blobCacheMissesBytes prometheus.Counter
blobCacheEvictionsCount prometheus.Counter
blobCacheEvictionsBytes prometheus.Counter
manifestCacheHitsCount prometheus.Counter
manifestCacheMissesCount prometheus.Counter
manifestCacheEvictionsCount prometheus.Counter
)
func initS3BackendMetrics() {
blobsDedupedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_deduped",
Help: "Count of blobs deduplicated",
})
blobsDedupedBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_deduped_bytes",
Help: "Total size in bytes of blobs deduplicated",
})
blobCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_hits_count",
Help: "Count of blobs that were retrieved from the cache",
})
blobCacheHitsBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_hits_bytes",
Help: "Total size in bytes of blobs that were retrieved from the cache",
})
blobCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_misses_count",
Help: "Count of blobs that were not found in the cache (and were then successfully cached)",
})
blobCacheMissesBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_misses_bytes",
Help: "Total size in bytes of blobs that were not found in the cache (and were then successfully cached)",
})
blobCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_evictions_count",
Help: "Count of blobs evicted from the cache",
})
blobCacheEvictionsBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blob_cache_evictions_bytes",
Help: "Total size in bytes of blobs evicted from the cache",
})
manifestCacheHitsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_manifest_cache_hits_count",
Help: "Count of manifests that were retrieved from the cache",
})
manifestCacheMissesCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_manifest_cache_misses_count",
Help: "Count of manifests that were not found in the cache (and were then successfully cached)",
})
manifestCacheEvictionsCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_manifest_cache_evictions_count",
Help: "Count of manifests evicted from the cache",
})
}
// Blobs can be safely cached indefinitely. They only need to be evicted to preserve memory.
type CachedBlob struct {
blob []byte
mtime time.Time
}
func (c *CachedBlob) Weight() uint32 { return uint32(len(c.blob)) }
// Manifests can only be cached for a short time to avoid serving stale content. Browser
// page loads cause a large burst of manifest accesses that are essential for serving
// `304 No Content` responses and these need to be handled very quickly, so both hits and
@@ -31,12 +100,14 @@ type CachedManifest struct {
err error
}
func (c *CachedManifest) Weight() uint32 { return c.weight }
type S3Backend struct {
ctx context.Context
client *minio.Client
bucket string
blobCache *otter.Cache[string, *CachedBlob]
siteCache *otter.Cache[string, *CachedManifest]
blobCache *observedCache[string, *CachedBlob]
siteCache *observedCache[string, *CachedManifest]
}
func makeCacheOptions[K comparable, V any](
@@ -85,14 +156,31 @@ func NewS3Backend(
}
}
blobCache, err := otter.New(makeCacheOptions(&config.BlobCache,
func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }))
initS3BackendMetrics()
blobCacheMetrics := observedCacheMetrics{
HitNumberCounter: blobCacheHitsCount,
HitWeightCounter: blobCacheHitsBytes,
MissNumberCounter: blobCacheMissesCount,
MissWeightCounter: blobCacheMissesBytes,
EvictionNumberCounter: blobCacheEvictionsCount,
EvictionWeightCounter: blobCacheEvictionsBytes,
}
blobCache, err := newObservedCache(makeCacheOptions(&config.BlobCache,
func(key string, value *CachedBlob) uint32 { return uint32(len(value.blob)) }),
blobCacheMetrics)
if err != nil {
return nil, err
}
siteCache, err := otter.New(makeCacheOptions(&config.SiteCache,
func(key string, value *CachedManifest) uint32 { return value.weight }))
siteCacheMetrics := observedCacheMetrics{
HitNumberCounter: manifestCacheHitsCount,
MissNumberCounter: manifestCacheMissesCount,
EvictionNumberCounter: manifestCacheEvictionsCount,
}
siteCache, err := newObservedCache(makeCacheOptions(&config.SiteCache,
func(key string, value *CachedManifest) uint32 { return value.weight }),
siteCacheMetrics)
if err != nil {
return nil, err
}
@@ -108,7 +196,7 @@ func blobObjectName(name string) string {
return fmt.Sprintf("blob/%s", path.Join(splitBlobName(name)...))
}
func (s3 *S3Backend) GetBlob(name string) (io.ReadSeeker, time.Time, error) {
func (s3 *S3Backend) GetBlob(name string) (io.ReadSeeker, uint64, time.Time, error) {
loader := func(ctx context.Context, name string) (*CachedBlob, error) {
log.Printf("s3: get blob %s\n", name)
@@ -138,9 +226,9 @@ func (s3 *S3Backend) GetBlob(name string) (io.ReadSeeker, time.Time, error) {
if errResp := minio.ToErrorResponse(err); errResp.Code == "NoSuchKey" {
err = fmt.Errorf("%w: %s", errNotFound, errResp.Key)
}
return nil, time.Time{}, err
return nil, 0, time.Time{}, err
} else {
return bytes.NewReader(cached.blob), cached.mtime, err
return bytes.NewReader(cached.blob), uint64(len(cached.blob)), cached.mtime, err
}
}
@@ -164,6 +252,8 @@ func (s3 *S3Backend) PutBlob(name string, data []byte) error {
}
} else {
log.Printf("s3: put blob %s (exists)\n", name)
blobsDedupedCount.Inc()
blobsDedupedBytes.Add(float64(len(data)))
return nil
}
}
@@ -246,7 +336,7 @@ func (s3 *S3Backend) CommitManifest(name string, manifest *Manifest) error {
bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{})
removeErr := s3.client.RemoveObject(s3.ctx, s3.bucket, stagedManifestObjectName(data),
minio.RemoveObjectOptions{})
s3.siteCache.Invalidate(name)
s3.siteCache.Cache.Invalidate(name)
if putErr != nil {
return putErr
} else if removeErr != nil {
@@ -261,7 +351,7 @@ func (s3 *S3Backend) DeleteManifest(name string) error {
err := s3.client.RemoveObject(s3.ctx, s3.bucket, manifestObjectName(name),
minio.RemoveObjectOptions{})
s3.siteCache.Invalidate(name)
s3.siteCache.Cache.Invalidate(name)
return err
}

100
src/cache.go Normal file
View File

@@ -0,0 +1,100 @@
package main
import (
"context"
"time"
"github.com/maypok86/otter/v2"
"github.com/prometheus/client_golang/prometheus"
)
type weightedCacheEntry interface {
Weight() uint32
}
type trackedLoader[K comparable, V any] struct {
loader otter.Loader[K, V]
invoked bool
}
func (l *trackedLoader[K, V]) Load(ctx context.Context, key K) (V, error) {
val, err := l.loader.Load(ctx, key)
l.invoked = true
return val, err
}
func (l *trackedLoader[K, V]) Reload(ctx context.Context, key K, oldValue V) (V, error) {
val, err := l.loader.Reload(ctx, key, oldValue)
l.invoked = true
return val, err
}
type observedCacheMetrics struct {
HitNumberCounter prometheus.Counter
HitWeightCounter prometheus.Counter
MissNumberCounter prometheus.Counter
MissWeightCounter prometheus.Counter
EvictionNumberCounter prometheus.Counter
EvictionWeightCounter prometheus.Counter
}
type observedCache[K comparable, V weightedCacheEntry] struct {
Cache *otter.Cache[K, V]
metrics observedCacheMetrics
}
func newObservedCache[K comparable, V weightedCacheEntry](
options *otter.Options[K, V],
metrics observedCacheMetrics,
) (*observedCache[K, V], error) {
c := &observedCache[K, V]{}
c.metrics = metrics
optionsCopy := *options
options = &optionsCopy
options.StatsRecorder = c
var err error
c.Cache, err = otter.New(options)
if err != nil {
return nil, err
}
return c, nil
}
func (c *observedCache[K, V]) Get(ctx context.Context, key K, loader otter.Loader[K, V]) (V, error) {
observedLoader := trackedLoader[K, V]{loader: loader}
val, err := c.Cache.Get(ctx, key, &observedLoader)
if err == nil {
if observedLoader.invoked {
if c.metrics.MissNumberCounter != nil {
c.metrics.MissNumberCounter.Inc()
}
if c.metrics.MissWeightCounter != nil {
c.metrics.MissWeightCounter.Add(float64(val.Weight()))
}
} else {
if c.metrics.HitNumberCounter != nil {
c.metrics.HitNumberCounter.Inc()
}
if c.metrics.HitWeightCounter != nil {
c.metrics.HitWeightCounter.Add(float64(val.Weight()))
}
}
}
return val, err
}
func (c *observedCache[K, V]) RecordHits(count int) {}
func (c *observedCache[K, V]) RecordMisses(count int) {}
func (c *observedCache[K, V]) RecordEviction(weight uint32) {
if c.metrics.EvictionNumberCounter != nil {
c.metrics.EvictionNumberCounter.Inc()
}
if c.metrics.EvictionWeightCounter != nil {
c.metrics.EvictionWeightCounter.Add(float64(weight))
}
}
func (c *observedCache[K, V]) RecordLoadSuccess(loadTime time.Duration) {}
func (c *observedCache[K, V]) RecordLoadFailure(loadTime time.Duration) {}

View File

@@ -156,7 +156,7 @@ func main() {
log.Fatalln(err)
}
reader, _, err := backend.GetBlob(*getBlob)
reader, _, _, err := backend.GetBlob(*getBlob)
if err != nil {
log.Fatalln(err)
}
@@ -228,6 +228,8 @@ func main() {
log.Fatalln(err)
}
backend = NewObservedBackend(backend)
if err := ConfigureWildcards(config.Wildcard); err != nil {
log.Fatalln(err)
}

View File

@@ -167,7 +167,7 @@ func getPage(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNotModified)
return nil
} else {
reader, mtime, err = backend.GetBlob(string(entry.Data))
reader, _, mtime, err = backend.GetBlob(string(entry.Data))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "internal server error: %s\n", err)