Fetch manifests from S3 in parallel for histogram and tracing.

This is mainly done to speed up histogram collection, as waiting some
minutes defeats the purpose of having a quick overview function.

This commit does speed up GC tracing as well, but not as much because
audit records are still retrieved one at a time. A similar mechanism
could be added in the future there.

Filesystem logic is functionally identical since it was fine already.
This commit is contained in:
Catherine
2026-04-04 21:10:05 +00:00
parent bcd628fa6b
commit d7651941c0
9 changed files with 114 additions and 32 deletions

View File

@@ -124,9 +124,13 @@ type Backend interface {
// Delete a manifest.
DeleteManifest(ctx context.Context, name string, opts ModifyManifestOptions) error
// Iterate through all manifests. Whether manifests that are newly added during iteration
// will appear in the results is unspecified.
EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error]
// Iterate through metadata of all manifests. Whether manifests that are newly added during
// iteration will appear in the results is unspecified.
EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error]
// Iterate through contents of all manifests. Same considerations apply as for
// `EnumerateManifests`.
GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error]
// Check whether a domain has any deployments.
CheckDomain(ctx context.Context, domain string) (found bool, err error)

View File

@@ -402,12 +402,12 @@ func (fs *FSBackend) DeleteManifest(
}
}
func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] {
return func(yield func(ManifestMetadata, error) bool) {
func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] {
return func(yield func(*ManifestMetadata, error) bool) {
iofs.WalkDir(fs.siteRoot.FS(), ".",
func(path string, entry iofs.DirEntry, err error) error {
_, project, _ := strings.Cut(path, "/")
var metadata ManifestMetadata
var metadata *ManifestMetadata
if err != nil {
// report error
} else if entry.IsDir() {
@@ -420,9 +420,11 @@ func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM
// report error
} else {
// report blob
metadata.Name = path
metadata.Size = info.Size()
metadata.LastModified = info.ModTime()
metadata = &ManifestMetadata{
Name: path,
Size: info.Size(),
LastModified: info.ModTime(),
}
// not setting metadata.ETag since it is too costly
}
if !yield(metadata, err) {
@@ -433,6 +435,22 @@ func (fs *FSBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM
}
}
func (fs *FSBackend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] {
return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) {
for metadata, err := range fs.EnumerateManifests(ctx) {
var item tuple[*ManifestMetadata, *Manifest]
if err == nil {
var manifest *Manifest
manifest, _, err = backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
item = tuple[*ManifestMetadata, *Manifest]{metadata, manifest}
}
if !yield(item, err) {
break
}
}
}
}
func (fs *FSBackend) CheckDomain(ctx context.Context, domain string) (bool, error) {
_, err := fs.siteRoot.Stat(domain)
if errors.Is(err, os.ErrNotExist) {

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"path"
"strings"
"sync"
"time"
"github.com/c2h5oh/datasize"
@@ -640,8 +641,8 @@ func (s3 *S3Backend) DeleteManifest(
return err
}
func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] {
return func(yield func(ManifestMetadata, error) bool) {
func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] {
return func(yield func(*ManifestMetadata, error) bool) {
logc.Print(ctx, "s3: enumerate manifests")
ctx, cancel := context.WithCancel(ctx)
@@ -652,7 +653,7 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM
Prefix: prefix,
Recursive: true,
}) {
var metadata ManifestMetadata
var metadata *ManifestMetadata
var err error
if err = object.Err; err == nil {
key := strings.TrimPrefix(object.Key, prefix)
@@ -662,10 +663,12 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM
} else if project == "" || strings.HasPrefix(project, ".") && project != ".index" {
continue // internal; skip
} else {
metadata.Name = key
metadata.Size = object.Size
metadata.LastModified = object.LastModified
metadata.ETag = object.ETag
metadata = &ManifestMetadata{
Name: key,
Size: object.Size,
LastModified: object.LastModified,
ETag: object.ETag,
}
}
}
if !yield(metadata, err) {
@@ -675,6 +678,48 @@ func (s3 *S3Backend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestM
}
}
// Limits the number of concurrent uploads, globally across the entire git-pages process.
// Not currently configurable as there seems to be little need.
var getAllManifestsSemaphore = make(chan struct{}, 64)
func (s3 *S3Backend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] {
type result struct {
metadata *ManifestMetadata
manifest *Manifest
err error
}
resultsChan := make(chan result)
enumeratorCtx, cancel := context.WithCancel(ctx)
go func(ctx context.Context) {
wg := sync.WaitGroup{}
for metadata, err := range s3.EnumerateManifests(ctx) {
if err != nil {
resultsChan <- result{nil, nil, err}
} else {
getAllManifestsSemaphore <- struct{}{} // acquire
wg.Go(func() {
defer func() { <-getAllManifestsSemaphore }() // release
manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
resultsChan <- result{metadata, manifest, err}
})
}
}
wg.Wait()
close(resultsChan)
}(enumeratorCtx)
return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) {
for result := range resultsChan {
item := tuple[*ManifestMetadata, *Manifest]{result.metadata, result.manifest}
if !yield(item, result.err) {
cancel()
break
}
}
}
}
func domainCheckObjectName(domain string) string {
return manifestObjectName(fmt.Sprintf("%s/.exists", domain))
}

View File

@@ -44,11 +44,8 @@ func TraceGarbage(ctx context.Context) error {
}
// Enumerate blobs live via site manifests.
for metadata, err := range backend.EnumerateManifests(ctx) {
if err != nil {
return fmt.Errorf("trace sites err: %w", err)
}
manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
for item, err := range backend.GetAllManifests(ctx) {
metadata, manifest := item.Splat()
if err != nil {
return fmt.Errorf("trace sites err: %w", err)
}

View File

@@ -17,11 +17,8 @@ type DomainStatistics struct {
func SizeHistogram(ctx context.Context) ([]*DomainStatistics, error) {
statisticsMap := map[string]*DomainStatistics{}
for metadata, err := range backend.EnumerateManifests(ctx) {
if err != nil {
return nil, fmt.Errorf("size histogram err: %w", err)
}
manifest, _, err := backend.GetManifest(ctx, metadata.Name, GetManifestOptions{})
for item, err := range backend.GetAllManifests(ctx) {
metadata, manifest := item.Splat()
if err != nil {
return nil, fmt.Errorf("size histogram err: %w", err)
}

View File

@@ -65,7 +65,7 @@ func configureMemLimit(ctx context.Context) (err error) {
// Can only be safely called during initial configuration.
func configureConcurrency(_ context.Context) (err error) {
blobUploadSemaphore = make(chan struct{}, config.Limits.ConcurrentUploads)
putBlobSemaphore = make(chan struct{}, config.Limits.ConcurrentUploads)
return
}

View File

@@ -365,7 +365,7 @@ var ErrManifestTooLarge = errors.New("manifest too large")
// As created, there is no limit, but reinitializing the semaphore with a bounded channel
// limits the concurrency to the channel size. Note that the default *configuration* does
// limit the number of uploads.
var blobUploadSemaphore = make(chan struct{})
var putBlobSemaphore = make(chan struct{})
// Uploads inline file data over certain size to the storage backend. Returns a copy of
// the manifest updated to refer to an external content-addressable store.
@@ -441,9 +441,9 @@ func StoreManifest(
// If the entry in the original manifest is already an external reference, there's no need
// to externalize it (and no way for us to do so, since the entry only contains the blob name).
if entry.GetType() == Type_ExternalFile && manifest.Contents[name].GetType() == Type_InlineFile {
blobUploadSemaphore <- struct{}{} // acquire (and maybe block)
putBlobSemaphore <- struct{}{} // acquire (and maybe block)
wg.Go(func() {
defer func() { <-blobUploadSemaphore }() // release
defer func() { <-putBlobSemaphore }() // release
err := backend.PutBlob(ctx, string(entry.Data), manifest.Contents[name].Data)
if err != nil {
ch <- fmt.Errorf("put blob %s: %w", name, err)

View File

@@ -294,8 +294,8 @@ func (backend *observedBackend) DeleteManifest(ctx context.Context, name string,
return
}
func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] {
return func(yield func(ManifestMetadata, error) bool) {
func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq2[*ManifestMetadata, error] {
return func(yield func(*ManifestMetadata, error) bool) {
span, ctx := ObserveFunction(ctx, "EnumerateManifests")
for metadata, err := range backend.inner.EnumerateManifests(ctx) {
if !yield(metadata, err) {
@@ -306,6 +306,18 @@ func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq
}
}
func (backend *observedBackend) GetAllManifests(ctx context.Context) iter.Seq2[tuple[*ManifestMetadata, *Manifest], error] {
return func(yield func(tuple[*ManifestMetadata, *Manifest], error) bool) {
span, ctx := ObserveFunction(ctx, "GetAllManifests")
for item, err := range backend.inner.GetAllManifests(ctx) {
if !yield(item, err) {
break
}
}
span.Finish()
}
}
func (backend *observedBackend) CheckDomain(ctx context.Context, domain string) (found bool, err error) {
span, ctx := ObserveFunction(ctx, "CheckDomain", "domain.name", domain)
found, err = backend.inner.CheckDomain(ctx, domain)

View File

@@ -6,6 +6,15 @@ import (
"strings"
)
type tuple[A, B any] struct {
A A
B B
}
func (t tuple[A, B]) Splat() (A, B) {
return t.A, t.B
}
type BoundedReader struct {
inner io.Reader
fuel int64