diff --git a/src/extract.go b/src/extract.go index 75eac40..652a8dc 100644 --- a/src/extract.go +++ b/src/extract.go @@ -59,13 +59,17 @@ func ExtractTar(reader io.Reader) (*Manifest, error) { } manifestEntry.Type = Type_InlineFile.Enum() - manifestEntry.Size = proto.Int64(header.Size) manifestEntry.Data = fileData + manifestEntry.Transform = Transform_Identity.Enum() + manifestEntry.OriginalSize = proto.Int64(header.Size) + manifestEntry.CompressedSize = proto.Int64(header.Size) case tar.TypeSymlink: manifestEntry.Type = Type_Symlink.Enum() - manifestEntry.Size = proto.Int64(header.Size) manifestEntry.Data = []byte(header.Linkname) + manifestEntry.Transform = Transform_Identity.Enum() + manifestEntry.OriginalSize = proto.Int64(header.Size) + manifestEntry.CompressedSize = proto.Int64(header.Size) case tar.TypeDir: manifestEntry.Type = Type_Directory.Enum() @@ -150,8 +154,10 @@ func ExtractZip(reader io.Reader) (*Manifest, error) { } else { manifestEntry.Type = Type_InlineFile.Enum() } - manifestEntry.Size = proto.Int64(int64(file.UncompressedSize64)) manifestEntry.Data = fileData + manifestEntry.Transform = Transform_Identity.Enum() + manifestEntry.OriginalSize = proto.Int64(int64(file.UncompressedSize64)) + manifestEntry.CompressedSize = proto.Int64(int64(file.UncompressedSize64)) } else { manifestEntry.Type = Type_Directory.Enum() } diff --git a/src/fetch.go b/src/fetch.go index 37f9a1d..4bc71aa 100644 --- a/src/fetch.go +++ b/src/fetch.go @@ -2,45 +2,76 @@ package git_pages import ( "context" + "errors" "fmt" "io" + "maps" + "net/url" "os" + "slices" + "github.com/c2h5oh/datasize" "github.com/go-git/go-billy/v6/osfs" "github.com/go-git/go-git/v6" "github.com/go-git/go-git/v6/plumbing" "github.com/go-git/go-git/v6/plumbing/cache" "github.com/go-git/go-git/v6/plumbing/filemode" "github.com/go-git/go-git/v6/plumbing/object" + "github.com/go-git/go-git/v6/plumbing/protocol/packp" + "github.com/go-git/go-git/v6/plumbing/transport" "github.com/go-git/go-git/v6/storage/filesystem" "google.golang.org/protobuf/proto" ) -func FetchRepository(ctx context.Context, repoURL string, branch string) (*Manifest, error) { +func FetchRepository( + ctx context.Context, repoURL string, branch string, oldManifest *Manifest, +) ( + *Manifest, error, +) { span, ctx := ObserveFunction(ctx, "FetchRepository", "git.repository", repoURL, "git.branch", branch) defer span.Finish() - baseDir, err := os.MkdirTemp("", "fetchRepo") + parsedRepoURL, err := url.Parse(repoURL) if err != nil { - return nil, fmt.Errorf("mkdtemp: %w", err) + return nil, fmt.Errorf("URL parse: %w", err) } - defer os.RemoveAll(baseDir) - fs := osfs.New(baseDir, osfs.WithBoundOS()) - cache := cache.NewObjectLRUDefault() - storer := filesystem.NewStorageWithOptions(fs, cache, filesystem.Options{ - ExclusiveAccess: true, - LargeObjectThreshold: int64(config.Limits.GitLargeObjectThreshold.Bytes()), - }) - repo, err := git.CloneContext(ctx, storer, nil, &git.CloneOptions{ - Bare: true, - URL: repoURL, - ReferenceName: plumbing.ReferenceName(branch), - SingleBranch: true, - Depth: 1, - Tags: git.NoTags, - }) + var repo *git.Repository + var storer *filesystem.Storage + for _, filter := range []packp.Filter{packp.FilterBlobNone(), packp.Filter("")} { + var tempDir string + tempDir, err = os.MkdirTemp("", "fetchRepo") + if err != nil { + return nil, fmt.Errorf("mkdtemp: %w", err) + } + defer os.RemoveAll(tempDir) + + storer = filesystem.NewStorageWithOptions( + osfs.New(tempDir, osfs.WithBoundOS()), + cache.NewObjectLRUDefault(), + filesystem.Options{ + ExclusiveAccess: true, + LargeObjectThreshold: int64(config.Limits.GitLargeObjectThreshold.Bytes()), + }, + ) + repo, err = git.CloneContext(ctx, storer, nil, &git.CloneOptions{ + Bare: true, + URL: repoURL, + ReferenceName: plumbing.ReferenceName(branch), + SingleBranch: true, + Depth: 1, + Tags: git.NoTags, + Filter: filter, + }) + if err != nil { + logc.Printf(ctx, "clone err: %s %s filter=%q\n", repoURL, branch, filter) + continue + } else { + logc.Printf(ctx, "clone ok: %s %s filter=%q\n", repoURL, branch, filter) + break + } + } if err != nil { return nil, fmt.Errorf("git clone: %w", err) } @@ -63,7 +94,9 @@ func FetchRepository(ctx context.Context, repoURL string, branch string) (*Manif walker := object.NewTreeWalker(tree, true, make(map[plumbing.Hash]bool)) defer walker.Close() - manifest := Manifest{ + // Create a manifest for the tree object corresponding to `branch`, but do not populate it + // with data yet; instead, record all the blobs we'll need. + manifest := &Manifest{ RepoUrl: proto.String(repoURL), Branch: proto.String(branch), Commit: proto.String(ref.Hash().String()), @@ -71,6 +104,7 @@ func FetchRepository(ctx context.Context, repoURL string, branch string) (*Manif "": {Type: Type_Directory.Enum()}, }, } + blobsNeeded := map[plumbing.Hash]*Entry{} for { name, entry, err := walker.Next() if err == io.EOF { @@ -78,39 +112,138 @@ func FetchRepository(ctx context.Context, repoURL string, branch string) (*Manif } else if err != nil { return nil, fmt.Errorf("git walker: %w", err) } else { - manifestEntry := Entry{} - if entry.Mode.IsFile() { - blob, err := repo.BlobObject(entry.Hash) - if err != nil { - return nil, fmt.Errorf("git blob %s: %w", name, err) - } - - reader, err := blob.Reader() - if err != nil { - return nil, fmt.Errorf("git blob open: %w", err) - } - defer reader.Close() - - data, err := io.ReadAll(reader) - if err != nil { - return nil, fmt.Errorf("git blob read: %w", err) - } - + manifestEntry := &Entry{} + if existingManifestEntry, found := blobsNeeded[entry.Hash]; found { + // If the same blob is present twice, we only need to fetch it once (and both + // instances will alias the same `Entry` structure in the manifest). + manifestEntry = existingManifestEntry + } else if entry.Mode.IsFile() { + blobsNeeded[entry.Hash] = manifestEntry if entry.Mode == filemode.Symlink { manifestEntry.Type = Type_Symlink.Enum() } else { manifestEntry.Type = Type_InlineFile.Enum() } - manifestEntry.Size = proto.Int64(blob.Size) - manifestEntry.Data = data + manifestEntry.GitHash = proto.String(entry.Hash.String()) } else if entry.Mode == filemode.Dir { manifestEntry.Type = Type_Directory.Enum() } else { - AddProblem(&manifest, name, "unsupported mode %#o", entry.Mode) + AddProblem(manifest, name, "unsupported mode %#o", entry.Mode) continue } - manifest.Contents[name] = &manifestEntry + manifest.Contents[name] = manifestEntry } } - return &manifest, nil + + // Collect checkout statistics. + var dataBytesFromOldManifest int64 + var dataBytesFromGitCheckout int64 + var dataBytesFromGitTransport int64 + + // First, see if we can extract the blobs from the old manifest. This is the preferred option + // because it avoids both network transfers and recompression. Note that we do not request + // blobs from the backend under any circumstances to avoid creating a blob existence oracle. + for _, oldManifestEntry := range oldManifest.GetContents() { + if hash, ok := plumbing.FromHex(oldManifestEntry.GetGitHash()); ok { + if manifestEntry, found := blobsNeeded[hash]; found { + CopyProtoMessage(manifestEntry, oldManifestEntry) + dataBytesFromOldManifest += oldManifestEntry.GetOriginalSize() + delete(blobsNeeded, hash) + } + } + } + + // Second, fill the manifest entries with data from the git checkout we just made. + // This will only succeed if a `blob:none` filter isn't supported and we got a full + // clone despite asking for a partial clone. + for hash, manifestEntry := range blobsNeeded { + if err := readGitBlob(repo, hash, manifestEntry); err == nil { + dataBytesFromGitCheckout += manifestEntry.GetOriginalSize() + delete(blobsNeeded, hash) + } + } + + // Third, if we still don't have data for some manifest entries, re-establish a git transport + // and request the missing blobs (only) from the server. + if len(blobsNeeded) > 0 { + client, err := transport.Get(parsedRepoURL.Scheme) + if err != nil { + return nil, fmt.Errorf("git transport: %w", err) + } + + endpoint, err := transport.NewEndpoint(repoURL) + if err != nil { + return nil, fmt.Errorf("git endpoint: %w", err) + } + + session, err := client.NewSession(storer, endpoint, nil) + if err != nil { + return nil, fmt.Errorf("git session: %w", err) + } + + connection, err := session.Handshake(ctx, transport.UploadPackService) + if err != nil { + return nil, fmt.Errorf("git connection: %w", err) + } + defer connection.Close() + + if err := connection.Fetch(ctx, &transport.FetchRequest{ + Wants: slices.Collect(maps.Keys(blobsNeeded)), + Depth: 1, + // Git CLI behaves like this, even if the wants above are references to blobs. + Filter: "blob:none", + }); err != nil && !errors.Is(err, transport.ErrNoChange) { + return nil, fmt.Errorf("git blob fetch request: %w", err) + } + + // All remaining blobs should now be available. + for hash, manifestEntry := range blobsNeeded { + if err := readGitBlob(repo, hash, manifestEntry); err != nil { + return nil, err + } + dataBytesFromGitTransport += manifestEntry.GetOriginalSize() + delete(blobsNeeded, hash) + } + } + + logc.Printf(ctx, + "fetch: %s from old manifest, %s from git checkout, %s from git transport\n", + datasize.ByteSize(dataBytesFromOldManifest).HR(), + datasize.ByteSize(dataBytesFromGitCheckout).HR(), + datasize.ByteSize(dataBytesFromGitTransport).HR(), + ) + + return manifest, nil +} + +func readGitBlob(repo *git.Repository, hash plumbing.Hash, entry *Entry) error { + blob, err := repo.BlobObject(hash) + if err != nil { + return fmt.Errorf("git blob %s: %w", hash, err) + } + + reader, err := blob.Reader() + if err != nil { + return fmt.Errorf("git blob open: %w", err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + return fmt.Errorf("git blob read: %w", err) + } + + switch entry.GetType() { + case Type_InlineFile, Type_Symlink: + // okay + default: + panic(fmt.Errorf("readGitBlob encountered invalid entry: %v, %v", + entry.GetType(), entry.GetTransform())) + } + + entry.Data = data + entry.Transform = Transform_Identity.Enum() + entry.OriginalSize = proto.Int64(blob.Size) + entry.CompressedSize = proto.Int64(blob.Size) + return nil } diff --git a/src/manifest.go b/src/manifest.go index e3a3087..f603bca 100644 --- a/src/manifest.go +++ b/src/manifest.go @@ -150,14 +150,14 @@ func DetectContentType(manifest *Manifest) { contentType = http.DetectContentType(entry.Data[:min(512, len(entry.Data))]) } entry.ContentType = proto.String(contentType) - } else { + } else if entry.GetContentType() == "" { panic(fmt.Errorf("DetectContentType encountered invalid entry: %v, %v", entry.GetType(), entry.GetTransform())) } } } -// The `clauspost/compress/zstd` package recommends reusing a compressor to avoid repeated +// The `klauspost/compress/zstd` package recommends reusing a compressor to avoid repeated // allocations of internal buffers. var zstdEncoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBetterCompression)) @@ -166,22 +166,24 @@ func CompressFiles(ctx context.Context, manifest *Manifest) { span, _ := ObserveFunction(ctx, "CompressFiles") defer span.Finish() - var originalSize, compressedSize int64 + var originalSize int64 + var compressedSize int64 for _, entry := range manifest.Contents { if entry.GetType() == Type_InlineFile && entry.GetTransform() == Transform_Identity { - mtype := getMediaType(entry.GetContentType()) - if strings.HasPrefix(mtype, "video/") || strings.HasPrefix(mtype, "audio/") { + mediaType := getMediaType(entry.GetContentType()) + if strings.HasPrefix(mediaType, "video/") || strings.HasPrefix(mediaType, "audio/") { continue } - originalSize += entry.GetSize() - compressedData := zstdEncoder.EncodeAll(entry.GetData(), make([]byte, 0, entry.GetSize())) - if len(compressedData) < int(*entry.Size) { + compressedData := zstdEncoder.EncodeAll(entry.GetData(), + make([]byte, 0, entry.GetOriginalSize())) + if int64(len(compressedData)) < entry.GetOriginalSize() { entry.Data = compressedData - entry.Size = proto.Int64(int64(len(entry.Data))) entry.Transform = Transform_Zstd.Enum() + entry.CompressedSize = proto.Int64(int64(len(entry.Data))) } - compressedSize += entry.GetSize() } + originalSize += entry.GetOriginalSize() + compressedSize += entry.GetCompressedSize() } manifest.OriginalSize = proto.Int64(originalSize) manifest.CompressedSize = proto.Int64(compressedSize) @@ -246,27 +248,34 @@ func StoreManifest(ctx context.Context, name string, manifest *Manifest) (*Manif CompressedSize: manifest.CompressedSize, StoredSize: proto.Int64(0), } - extObjectSizes := make(map[string]int64) for name, entry := range manifest.Contents { cannotBeInlined := entry.GetType() == Type_InlineFile && - entry.GetSize() > int64(config.Limits.MaxInlineFileSize.Bytes()) + entry.GetCompressedSize() > int64(config.Limits.MaxInlineFileSize.Bytes()) if cannotBeInlined { dataHash := sha256.Sum256(entry.Data) extManifest.Contents[name] = &Entry{ - Type: Type_ExternalFile.Enum(), - Size: entry.Size, - Data: fmt.Appendf(nil, "sha256-%x", dataHash), - Transform: entry.Transform, - ContentType: entry.ContentType, + Type: Type_ExternalFile.Enum(), + OriginalSize: entry.OriginalSize, + CompressedSize: entry.CompressedSize, + Data: fmt.Appendf(nil, "sha256-%x", dataHash), + Transform: entry.Transform, + ContentType: entry.ContentType, + GitHash: entry.GitHash, } - extObjectSizes[string(dataHash[:])] = entry.GetSize() } else { extManifest.Contents[name] = entry } } - // `extObjectMap` stores size once per object, deduplicating it - for _, storedSize := range extObjectSizes { - *extManifest.StoredSize += storedSize + + // Compute the deduplicated storage size. + var blobSizes = make(map[string]int64) + for _, entry := range manifest.Contents { + if entry.GetType() == Type_ExternalFile { + blobSizes[string(entry.Data)] = entry.GetCompressedSize() + } + } + for _, blobSize := range blobSizes { + *extManifest.StoredSize += blobSize } // Upload the resulting manifest and the blob it references. diff --git a/src/pages.go b/src/pages.go index f1e32b1..b7c067c 100644 --- a/src/pages.go +++ b/src/pages.go @@ -328,7 +328,7 @@ func getPage(w http.ResponseWriter, r *http.Request) error { case "zstd": // Set Content-Length ourselves since `http.ServeContent` only sets // it if Content-Encoding is unset or if it's a range request. - w.Header().Set("Content-Length", strconv.FormatInt(*entry.Size, 10)) + w.Header().Set("Content-Length", strconv.FormatInt(entry.GetCompressedSize(), 10)) w.Header().Set("Content-Encoding", "zstd") serveEncodingCount. With(prometheus.Labels{"transform": "zstd", "negotiated": "zstd"}). diff --git a/src/schema.pb.go b/src/schema.pb.go index e50719d..b9b98d3 100644 --- a/src/schema.pb.go +++ b/src/schema.pb.go @@ -134,8 +134,13 @@ type Entry struct { state protoimpl.MessageState `protogen:"open.v1"` Type *Type `protobuf:"varint,1,opt,name=type,enum=Type" json:"type,omitempty"` // Only present for `type == InlineFile` and `type == ExternalFile`. - // For transformed entries, refers to the post-transformation (compressed) size. - Size *int64 `protobuf:"varint,2,opt,name=size" json:"size,omitempty"` + // For transformed entries, refers to the pre-transformation (decompressed) size; otherwise + // equal to `compressed_size`. + OriginalSize *int64 `protobuf:"varint,7,opt,name=original_size,json=originalSize" json:"original_size,omitempty"` + // Only present for `type == InlineFile` and `type == ExternalFile`. + // For transformed entries, refers to the post-transformation (compressed) size; otherwise + // equal to `original_size`. + CompressedSize *int64 `protobuf:"varint,2,opt,name=compressed_size,json=compressedSize" json:"compressed_size,omitempty"` // Meaning depends on `type`: // - If `type == InlineFile`, contains file data. // - If `type == ExternalFile`, contains blob name (an otherwise unspecified @@ -148,7 +153,13 @@ type Entry struct { Transform *Transform `protobuf:"varint,4,opt,name=transform,enum=Transform" json:"transform,omitempty"` // Only present for `type == InlineFile` and `type == ExternalFile`. // Currently, optional (not present on certain legacy manifests). - ContentType *string `protobuf:"bytes,5,opt,name=content_type,json=contentType" json:"content_type,omitempty"` + ContentType *string `protobuf:"bytes,5,opt,name=content_type,json=contentType" json:"content_type,omitempty"` + // May be present for `type == InlineFile` and `type == ExternalFile`. + // Used to reduce the amount of work being done during git checkouts. + // The type of hash used is determined by the length: + // - 40 bytes: SHA1DC (as hex) + // - 64 bytes: SHA256 (as hex) + GitHash *string `protobuf:"bytes,6,opt,name=git_hash,json=gitHash" json:"git_hash,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -190,9 +201,16 @@ func (x *Entry) GetType() Type { return Type_Invalid } -func (x *Entry) GetSize() int64 { - if x != nil && x.Size != nil { - return *x.Size +func (x *Entry) GetOriginalSize() int64 { + if x != nil && x.OriginalSize != nil { + return *x.OriginalSize + } + return 0 +} + +func (x *Entry) GetCompressedSize() int64 { + if x != nil && x.CompressedSize != nil { + return *x.CompressedSize } return 0 } @@ -218,6 +236,13 @@ func (x *Entry) GetContentType() string { return "" } +func (x *Entry) GetGitHash() string { + if x != nil && x.GitHash != nil { + return *x.GitHash + } + return "" +} + // See https://docs.netlify.com/manage/routing/redirects/overview/ for details. // Only a subset of the Netlify specification is representable here. type RedirectRule struct { @@ -569,14 +594,16 @@ var File_schema_proto protoreflect.FileDescriptor const file_schema_proto_rawDesc = "" + "\n" + - "\fschema.proto\"\x97\x01\n" + + "\fschema.proto\"\xec\x01\n" + "\x05Entry\x12\x19\n" + - "\x04type\x18\x01 \x01(\x0e2\x05.TypeR\x04type\x12\x12\n" + - "\x04size\x18\x02 \x01(\x03R\x04size\x12\x12\n" + + "\x04type\x18\x01 \x01(\x0e2\x05.TypeR\x04type\x12#\n" + + "\roriginal_size\x18\a \x01(\x03R\foriginalSize\x12'\n" + + "\x0fcompressed_size\x18\x02 \x01(\x03R\x0ecompressedSize\x12\x12\n" + "\x04data\x18\x03 \x01(\fR\x04data\x12(\n" + "\ttransform\x18\x04 \x01(\x0e2\n" + ".TransformR\ttransform\x12!\n" + - "\fcontent_type\x18\x05 \x01(\tR\vcontentType\"`\n" + + "\fcontent_type\x18\x05 \x01(\tR\vcontentType\x12\x19\n" + + "\bgit_hash\x18\x06 \x01(\tR\agitHash\"`\n" + "\fRedirectRule\x12\x12\n" + "\x04from\x18\x01 \x01(\tR\x04from\x12\x0e\n" + "\x02to\x18\x02 \x01(\tR\x02to\x12\x16\n" + diff --git a/src/schema.proto b/src/schema.proto index cf191d1..6a8d006 100644 --- a/src/schema.proto +++ b/src/schema.proto @@ -26,8 +26,13 @@ enum Transform { message Entry { Type type = 1; // Only present for `type == InlineFile` and `type == ExternalFile`. - // For transformed entries, refers to the post-transformation (compressed) size. - int64 size = 2; + // For transformed entries, refers to the pre-transformation (decompressed) size; otherwise + // equal to `compressed_size`. + int64 original_size = 7; + // Only present for `type == InlineFile` and `type == ExternalFile`. + // For transformed entries, refers to the post-transformation (compressed) size; otherwise + // equal to `original_size`. + int64 compressed_size = 2; // Meaning depends on `type`: // * If `type == InlineFile`, contains file data. // * If `type == ExternalFile`, contains blob name (an otherwise unspecified @@ -41,6 +46,12 @@ message Entry { // Only present for `type == InlineFile` and `type == ExternalFile`. // Currently, optional (not present on certain legacy manifests). string content_type = 5; + // May be present for `type == InlineFile` and `type == ExternalFile`. + // Used to reduce the amount of work being done during git checkouts. + // The type of hash used is determined by the length: + // * 40 bytes: SHA1DC (as hex) + // * 64 bytes: SHA256 (as hex) + string git_hash = 6; } // See https://docs.netlify.com/manage/routing/redirects/overview/ for details. @@ -76,9 +87,9 @@ message Manifest { // Contents map contents = 4; - int64 original_size = 10; // total size of entries before compression - int64 compressed_size = 5; // simple sum of each `entry.size` - int64 stored_size = 8; // total size of (deduplicated) external objects + int64 original_size = 10; // sum of each `entry.original_size` + int64 compressed_size = 5; // sum of each `entry.compressed_size` + int64 stored_size = 8; // sum of deduplicated `entry.compressed_size` for external files only // Netlify-style `_redirects` and `_headers` repeated RedirectRule redirects = 6; diff --git a/src/update.go b/src/update.go index bb26883..f33d7ac 100644 --- a/src/update.go +++ b/src/update.go @@ -92,7 +92,10 @@ func UpdateFromRepository( logc.Printf(ctx, "update %s: %s %s\n", webRoot, repoURL, branch) - manifest, err := FetchRepository(ctx, repoURL, branch) + oldManifest, _, _ := backend.GetManifest(ctx, webRoot, GetManifestOptions{}) + // Ignore errors; worst case we have to re-fetch all of the blobs. + + manifest, err := FetchRepository(ctx, repoURL, branch, oldManifest) if errors.Is(err, context.DeadlineExceeded) { result = UpdateResult{UpdateTimeout, nil, fmt.Errorf("update timeout")} } else if err != nil { diff --git a/src/util.go b/src/util.go index 9079a25..fea7c59 100644 --- a/src/util.go +++ b/src/util.go @@ -4,6 +4,8 @@ import ( "errors" "io" "strings" + + "google.golang.org/protobuf/proto" ) type BoundedReader struct { @@ -85,3 +87,19 @@ func getMediaType(mimeType string) (mediaType string) { mediaType = strings.TrimSpace(strings.ToLower(mediaType)) return } + +// Copying Protobuf messages like `*dest = *src` causes a lock to be copied, which is unsound. +// Copying Protobuf messages field-wise is fragile: adding a new field to the schema does not +// cause a diagnostic to be emitted pointing to the copy site, making it easy to miss updates. +// Serializing and deserializing is reliable and breaks referential links. +func CopyProtoMessage(dest, src proto.Message) { + data, err := proto.Marshal(src) + if err != nil { + panic(err) + } + + err = proto.Unmarshal(data, dest) + if err != nil { + panic(err) + } +}