From d7651941c04dc37f2013c6dc2611c9a75fab222c Mon Sep 17 00:00:00 2001 From: Catherine Date: Sat, 4 Apr 2026 21:10:05 +0000 Subject: [PATCH] Fetch manifests from S3 in parallel for histogram and tracing. This is mainly done to speed up histogram collection, as waiting some minutes defeats the purpose of having a quick overview function. This commit does speed up GC tracing as well, but not as much because audit records are still retrieved one at a time. A similar mechanism could be added in the future there. Filesystem logic is functionally identical since it was fine already. --- src/backend.go | 10 +++++--- src/backend_fs.go | 30 +++++++++++++++++++----- src/backend_s3.go | 59 +++++++++++++++++++++++++++++++++++++++++------ src/garbage.go | 7 ++---- src/histogram.go | 7 ++---- src/main.go | 2 +- src/manifest.go | 6 ++--- src/observe.go | 16 +++++++++++-- src/util.go | 9 ++++++++ 9 files changed, 114 insertions(+), 32 deletions(-) diff --git a/src/backend.go b/src/backend.go index be8672d..f4ee04b 100644 --- a/src/backend.go +++ b/src/backend.go @@ -124,9 +124,13 @@ type Backend interface { // Delete a manifest. DeleteManifest(ctx context.Context, name string, opts ModifyManifestOptions) error - // Iterate through all manifests. Whether manifests that are newly added during iteration - // will appear in the results is unspecified. - EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] + // Iterate through metadata of all manifests. Whether manifests that are newly added during + // iteration will appear in the results is unspecified. + EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] + + // Iterate through contents of all manifests. Same considerations apply as for + // `EnumerateManifests`. + GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] // Check whether a domain has any deployments. CheckDomain(ctx context.Context, domain string) (found bool, err error) diff --git a/src/backend_fs.go b/src/backend_fs.go index 8d1c2b9..be93049 100644 --- a/src/backend_fs.go +++ b/src/backend_fs.go @@ -402,12 +402,12 @@ func (fs *FSBackend) DeleteManifest( } } -func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] { - return func(yield func(ManifestMetadata, error) bool) { +func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] { + return func(yield func(*ManifestMetadata, error) bool) { iofs.WalkDir(fs.siteRoot.FS(), ".", func(path string, entry iofs.DirEntry, err error) error { _, project, _ := strings.Cut(path, "/") - var metadata ManifestMetadata + var metadata *ManifestMetadata if err != nil { // report error } else if entry.IsDir() { @@ -420,9 +420,11 @@ func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM // report error } else { // report blob - metadata.Name = path - metadata.Size = info.Size() - metadata.LastModified = info.ModTime() + metadata = &ManifestMetadata{ + Name: path, + Size: info.Size(), + LastModified: info.ModTime(), + } // not setting metadata.ETag since it is too costly } if !yield(metadata, err) { @@ -433,6 +435,22 @@ func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM } } +func (fs *FSBackend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] { + return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) { + for metadata, err := range fs.EnumerateManifests(ctx) { + var item tuple[*ManifestMetadata, *Manifest] + if err == nil { + var manifest *Manifest + manifest, _, err = backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) + item = tuple[*ManifestMetadata, *Manifest]{metadata, manifest} + } + if !yield(item, err) { + break + } + } + } +} + func (fs *FSBackend) CheckDomain(ctx context.Context, domain string) (bool, error) { _, err := fs.siteRoot.Stat(domain) if errors.Is(err, os.ErrNotExist) { diff --git a/src/backend_s3.go b/src/backend_s3.go index 6825669..5f73700 100644 --- a/src/backend_s3.go +++ b/src/backend_s3.go @@ -10,6 +10,7 @@ import ( "net/http" "path" "strings" + "sync" "time" "github.com/c2h5oh/datasize" @@ -640,8 +641,8 @@ func (s3 *S3Backend) DeleteManifest( return err } -func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] { - return func(yield func(ManifestMetadata, error) bool) { +func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] { + return func(yield func(*ManifestMetadata, error) bool) { logc.Print(ctx, "s3: enumerate manifests") ctx, cancel := context.WithCancel(ctx) @@ -652,7 +653,7 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM Prefix: prefix, Recursive: true, }) { - var metadata ManifestMetadata + var metadata *ManifestMetadata var err error if err = object.Err; err == nil { key := strings.TrimPrefix(object.Key, prefix) @@ -662,10 +663,12 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM } else if project == "" || strings.HasPrefix(project, ".") && project != ".index" { continue // internal; skip } else { - metadata.Name = key - metadata.Size = object.Size - metadata.LastModified = object.LastModified - metadata.ETag = object.ETag + metadata = &ManifestMetadata{ + Name: key, + Size: object.Size, + LastModified: object.LastModified, + ETag: object.ETag, + } } } if !yield(metadata, err) { @@ -675,6 +678,48 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM } } +// Limits the number of concurrent uploads, globally across the entire git-pages process. +// Not currently configurable as there seems to be little need. +var getAllManifestsSemaphore = make(chan struct{}, 64) + +func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] { + type result struct { + metadata *ManifestMetadata + manifest *Manifest + err error + } + + resultsChan := make(chan result) + enumeratorCtx, cancel := context.WithCancel(ctx) + go func(ctx context.Context) { + wg := sync.WaitGroup{} + for metadata, err := range s3.EnumerateManifests(ctx) { + if err != nil { + resultsChan <- result{nil, nil, err} + } else { + getAllManifestsSemaphore <- struct{}{} // acquire + wg.Go(func() { + defer func() { <-getAllManifestsSemaphore }() // release + manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) + resultsChan <- result{metadata, manifest, err} + }) + } + } + wg.Wait() + close(resultsChan) + }(enumeratorCtx) + + return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) { + for result := range resultsChan { + item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest} + if !yield(item, result.err) { + cancel() + break + } + } + } +} + func domainCheckObjectName(domain string) string { return manifestObjectName(fmt.Sprintf("%s/.exists", domain)) } diff --git a/src/garbage.go b/src/garbage.go index e3d509d..484bb72 100644 --- a/src/garbage.go +++ b/src/garbage.go @@ -44,11 +44,8 @@ func TraceGarbage(ctx context.Context) error { } // Enumerate blobs live via site manifests. - for metadata, err := range backend.EnumerateManifests(ctx) { - if err != nil { - return fmt.Errorf("trace sites err: %w", err) - } - manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) + for item, err := range backend.GetAllManifests(ctx) { + metadata, manifest := item.Splat() if err != nil { return fmt.Errorf("trace sites err: %w", err) } diff --git a/src/histogram.go b/src/histogram.go index ef08e12..827f895 100644 --- a/src/histogram.go +++ b/src/histogram.go @@ -17,11 +17,8 @@ type DomainStatistics struct { func SizeHistogram(ctx context.Context) ([]*DomainStatistics, error) { statisticsMap := map[string]*DomainStatistics{} - for metadata, err := range backend.EnumerateManifests(ctx) { - if err != nil { - return nil, fmt.Errorf("size histogram err: %w", err) - } - manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{}) + for item, err := range backend.GetAllManifests(ctx) { + metadata, manifest := item.Splat() if err != nil { return nil, fmt.Errorf("size histogram err: %w", err) } diff --git a/src/main.go b/src/main.go index 7fc69aa..ca94635 100644 --- a/src/main.go +++ b/src/main.go @@ -65,7 +65,7 @@ func configureMemLimit(ctx context.Context) (err error) { // Can only be safely called during initial configuration. func configureConcurrency(_ context.Context) (err error) { - blobUploadSemaphore = make(chan struct{}, config.Limits.ConcurrentUploads) + putBlobSemaphore = make(chan struct{}, config.Limits.ConcurrentUploads) return } diff --git a/src/manifest.go b/src/manifest.go index 88e6067..9aa4e48 100644 --- a/src/manifest.go +++ b/src/manifest.go @@ -365,7 +365,7 @@ var ErrManifestTooLarge = errors.New("manifest too large") // As created, there is no limit, but reinitializing the semaphore with a bounded channel // limits the concurrency to the channel size. Note that the default *configuration* does // limit the number of uploads. -var blobUploadSemaphore = make(chan struct{}) +var putBlobSemaphore = make(chan struct{}) // Uploads inline file data over certain size to the storage backend. Returns a copy of // the manifest updated to refer to an external content-addressable store. @@ -441,9 +441,9 @@ func StoreManifest( // If the entry in the original manifest is already an external reference, there's no need // to externalize it (and no way for us to do so, since the entry only contains the blob name). if entry.GetType() == Type_ExternalFile && manifest.Contents[name].GetType() == Type_InlineFile { - blobUploadSemaphore <- struct{}{} // acquire (and maybe block) + putBlobSemaphore <- struct{}{} // acquire (and maybe block) wg.Go(func() { - defer func() { <-blobUploadSemaphore }() // release + defer func() { <-putBlobSemaphore }() // release err := backend.PutBlob(ctx, string(entry.Data), manifest.Contents[name].Data) if err != nil { ch <- fmt.Errorf("put blob %s: %w", name, err) diff --git a/src/observe.go b/src/observe.go index b834dc9..da744c9 100644 --- a/src/observe.go +++ b/src/observe.go @@ -294,8 +294,8 @@ func (backend *observedBackend) DeleteManifest(ctx context.Context, name string, return } -func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] { - return func(yield func(ManifestMetadata, error) bool) { +func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] { + return func(yield func(*ManifestMetadata, error) bool) { span, ctx := ObserveFunction(ctx, "EnumerateManifests") for metadata, err := range backend.inner.EnumerateManifests(ctx) { if !yield(metadata, err) { @@ -306,6 +306,18 @@ func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq } } +func (backend *observedBackend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] { + return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) { + span, ctx := ObserveFunction(ctx, "GetAllManifests") + for item, err := range backend.inner.GetAllManifests(ctx) { + if !yield(item, err) { + break + } + } + span.Finish() + } +} + func (backend *observedBackend) CheckDomain(ctx context.Context, domain string) (found bool, err error) { span, ctx := ObserveFunction(ctx, "CheckDomain", "domain.name", domain) found, err = backend.inner.CheckDomain(ctx, domain) diff --git a/src/util.go b/src/util.go index 9079a25..4d40276 100644 --- a/src/util.go +++ b/src/util.go @@ -6,6 +6,15 @@ import ( "strings" ) +type tuple[A, B any] struct { + A A + B B +} + +func (t tuple[A, B]) Splat() (A, B) { + return t.A, t.B +} + type BoundedReader struct { inner io.Reader fuel int64