Allow PATCH method to apply partial updates.

Gated behind the `patch` feature.
This commit is contained in:
Catherine
2025-12-03 18:10:54 +00:00
parent be75cc82a4
commit 460ff41cc9
14 changed files with 669 additions and 55 deletions

View File

@@ -144,7 +144,9 @@ func notifyAudit(ctx context.Context, id AuditID) {
}
}
func (audited *auditedBackend) CommitManifest(ctx context.Context, name string, manifest *Manifest) (err error) {
func (audited *auditedBackend) CommitManifest(
ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
) (err error) {
domain, project, ok := strings.Cut(name, "/")
if !ok {
panic("malformed manifest name")
@@ -156,10 +158,12 @@ func (audited *auditedBackend) CommitManifest(ctx context.Context, name string,
Manifest: manifest,
})
return audited.Backend.CommitManifest(ctx, name, manifest)
return audited.Backend.CommitManifest(ctx, name, manifest, opts)
}
func (audited *auditedBackend) DeleteManifest(ctx context.Context, name string) (err error) {
func (audited *auditedBackend) DeleteManifest(
ctx context.Context, name string, opts ModifyManifestOptions,
) (err error) {
domain, project, ok := strings.Cut(name, "/")
if !ok {
panic("malformed manifest name")
@@ -170,7 +174,7 @@ func (audited *auditedBackend) DeleteManifest(ctx context.Context, name string)
Project: proto.String(project),
})
return audited.Backend.DeleteManifest(ctx, name)
return audited.Backend.DeleteManifest(ctx, name, opts)
}
func (audited *auditedBackend) FreezeDomain(ctx context.Context, domain string, freeze bool) (err error) {

View File

@@ -12,6 +12,8 @@ import (
)
var ErrObjectNotFound = errors.New("not found")
var ErrPreconditionFailed = errors.New("precondition failed")
var ErrWriteConflict = errors.New("write conflict")
var ErrDomainFrozen = errors.New("domain administratively frozen")
func splitBlobName(name string) []string {
@@ -35,6 +37,12 @@ type GetManifestOptions struct {
BypassCache bool
}
type ModifyManifestOptions struct {
// If non-zero, the request will only succeed if the manifest hasn't been changed since
// the given time. Whether this is racy or not is can be determined via `HasAtomicCAS()`.
IfUnmodifiedSince time.Time
}
type QueryAuditLogOptions struct {
// Inclusive lower bound on returned audit records, per their Snowflake ID (which may differ
// slightly from the embedded timestamp). If zero, audit records are returned since beginning
@@ -81,12 +89,17 @@ type Backend interface {
// effects.
StageManifest(ctx context.Context, manifest *Manifest) error
// Whether a compare-and-swap operation on a manifest is truly race-free, or only best-effort
// atomic with a small but non-zero window where two requests may race where the one committing
// first will have its update lost. (Plain swap operations are always guaranteed to be atomic.)
HasAtomicCAS(ctx context.Context) bool
// Commit a manifest. This is an atomic operation; `GetManifest` calls will return either
// the old version or the new version of the manifest, never anything else.
CommitManifest(ctx context.Context, name string, manifest *Manifest) error
CommitManifest(ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions) error
// Delete a manifest.
DeleteManifest(ctx context.Context, name string) error
DeleteManifest(ctx context.Context, name string, opts ModifyManifestOptions) error
// List all manifests.
ListManifests(ctx context.Context) (manifests []string, err error)
@@ -114,7 +127,7 @@ type Backend interface {
func CreateBackend(config *StorageConfig) (backend Backend, err error) {
switch config.Type {
case "fs":
if backend, err = NewFSBackend(&config.FS); err != nil {
if backend, err = NewFSBackend(context.Background(), &config.FS); err != nil {
err = fmt.Errorf("fs backend: %w", err)
}
case "s3":

View File

@@ -11,13 +11,15 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
)
type FSBackend struct {
blobRoot *os.Root
siteRoot *os.Root
auditRoot *os.Root
blobRoot *os.Root
siteRoot *os.Root
auditRoot *os.Root
hasAtomicCAS bool
}
var _ Backend = (*FSBackend)(nil)
@@ -56,7 +58,21 @@ func createTempInRoot(root *os.Root, name string, data []byte) (string, error) {
return tempPath, nil
}
func NewFSBackend(config *FSConfig) (*FSBackend, error) {
func checkAtomicCAS(root *os.Root) bool {
fileName := ".hasAtomicCAS"
file, err := root.Create(fileName)
if err != nil {
panic(err)
}
root.Remove(fileName)
defer file.Close()
flockErr := FileLock(file)
funlockErr := FileUnlock(file)
return (flockErr == nil && funlockErr == nil)
}
func NewFSBackend(ctx context.Context, config *FSConfig) (*FSBackend, error) {
blobRoot, err := maybeCreateOpenRoot(config.Root, "blob")
if err != nil {
return nil, fmt.Errorf("blob: %w", err)
@@ -69,7 +85,13 @@ func NewFSBackend(config *FSConfig) (*FSBackend, error) {
if err != nil {
return nil, fmt.Errorf("audit: %w", err)
}
return &FSBackend{blobRoot, siteRoot, auditRoot}, nil
hasAtomicCAS := checkAtomicCAS(siteRoot)
if hasAtomicCAS {
logc.Println(ctx, "fs: has atomic CAS")
} else {
logc.Println(ctx, "fs: has best-effort CAS")
}
return &FSBackend{blobRoot, siteRoot, auditRoot, hasAtomicCAS}, nil
}
func (fs *FSBackend) Backend() Backend {
@@ -229,12 +251,81 @@ func (fs *FSBackend) checkDomainFrozen(ctx context.Context, domain string) error
}
}
func (fs *FSBackend) CommitManifest(ctx context.Context, name string, manifest *Manifest) error {
func (fs *FSBackend) HasAtomicCAS(ctx context.Context) bool {
// On a suitable filesystem, POSIX advisory locks can be used to implement atomic CAS.
// An implementation consists of two parts:
// - Intra-process mutex set (one per manifest), to prevent races between goroutines;
// - Inter-process POSIX advisory locks (one per manifest), to prevent races between
// different git-pages instances.
return fs.hasAtomicCAS
}
// Right now updates aren't very common, so this lock is essentially entirely uncontended.
// If it ever becomes a bottleneck it should be replaced with a per-manifest lock.
var sharedManifestLock = sync.Mutex{}
type manifestLockGuard struct {
file *os.File
}
func lockManifest(fs *os.Root, name string) (*manifestLockGuard, error) {
file, err := fs.Open(name)
if errors.Is(err, os.ErrNotExist) {
return &manifestLockGuard{nil}, nil
} else if err != nil {
return nil, fmt.Errorf("open: %w", err)
}
if err := FileLock(file); err != nil {
file.Close()
return nil, fmt.Errorf("flock(LOCK_EX): %w", err)
}
sharedManifestLock.Lock()
return &manifestLockGuard{file}, nil
}
func (guard *manifestLockGuard) Unlock() {
if guard.file != nil {
FileUnlock(guard.file)
guard.file.Close()
sharedManifestLock.Unlock()
}
}
func (fs *FSBackend) checkManifestPrecondition(
ctx context.Context, name string, opts ModifyManifestOptions,
) error {
if !opts.IfUnmodifiedSince.IsZero() {
stat, err := fs.siteRoot.Stat(name)
if err != nil {
return fmt.Errorf("stat: %w", err)
}
if stat.ModTime().Compare(opts.IfUnmodifiedSince) > 0 {
return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed)
}
}
return nil
}
func (fs *FSBackend) CommitManifest(
ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
) error {
if guard, err := lockManifest(fs.siteRoot, name); err != nil {
return err
} else {
defer guard.Unlock()
}
domain := filepath.Dir(name)
if err := fs.checkDomainFrozen(ctx, domain); err != nil {
return err
}
if err := fs.checkManifestPrecondition(ctx, name, opts); err != nil {
return err
}
manifestData := EncodeManifest(manifest)
manifestHashName := stagedManifestName(manifestData)
@@ -253,12 +344,24 @@ func (fs *FSBackend) CommitManifest(ctx context.Context, name string, manifest *
return nil
}
func (fs *FSBackend) DeleteManifest(ctx context.Context, name string) error {
func (fs *FSBackend) DeleteManifest(
ctx context.Context, name string, opts ModifyManifestOptions,
) error {
if guard, err := lockManifest(fs.siteRoot, name); err != nil {
return err
} else {
defer guard.Unlock()
}
domain := filepath.Dir(name)
if err := fs.checkDomainFrozen(ctx, domain); err != nil {
return err
}
if err := fs.checkManifestPrecondition(ctx, name, opts); err != nil {
return err
}
err := fs.siteRoot.Remove(name)
if errors.Is(err, os.ErrNotExist) {
return nil

View File

@@ -530,7 +530,44 @@ func (s3 *S3Backend) checkDomainFrozen(ctx context.Context, domain string) error
}
}
func (s3 *S3Backend) CommitManifest(ctx context.Context, name string, manifest *Manifest) error {
func (s3 *S3Backend) HasAtomicCAS(ctx context.Context) bool {
// Support for `If-Unmodified-Since:` or `If-Match:` for PutObject requests is very spotty:
// - AWS supports only `If-Match:`:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
// - Minio supports `If-Match:`:
// https://blog.min.io/leading-the-way-minios-conditional-write-feature-for-modern-data-workloads/
// - Tigris supports `If-Unmodified-Since:` and `If-Match:`, but only with `X-Tigris-Consistent: true`;
// https://www.tigrisdata.com/docs/objects/conditionals/
// Note that the `X-Tigris-Consistent: true` header must be present on *every* transaction
// touching the object, not just on the CAS transactions.
// - Wasabi does not support either one and docs seem to suggest that the headers are ignored;
// - Garage does not support either one and source code suggests the headers are ignored.
// It seems that the only safe option is to not claim support for atomic CAS, and only do
// best-effort CAS implementation using HeadObject and PutObject/DeleteObject.
return false
}
func (s3 *S3Backend) checkManifestPrecondition(
ctx context.Context, name string, opts ModifyManifestOptions,
) error {
if !opts.IfUnmodifiedSince.IsZero() {
stat, err := s3.client.StatObject(ctx, s3.bucket, manifestObjectName(name),
minio.GetObjectOptions{})
if err != nil {
return fmt.Errorf("stat: %w", err)
}
if stat.LastModified.Compare(opts.IfUnmodifiedSince) > 0 {
return fmt.Errorf("%w: If-Unmodified-Since", ErrPreconditionFailed)
}
}
return nil
}
func (s3 *S3Backend) CommitManifest(
ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
) error {
data := EncodeManifest(manifest)
logc.Printf(ctx, "s3: commit manifest %x -> %s", sha256.Sum256(data), name)
@@ -539,6 +576,10 @@ func (s3 *S3Backend) CommitManifest(ctx context.Context, name string, manifest *
return err
}
if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
return err
}
// Remove staged object unconditionally (whether commit succeeded or failed), since
// the upper layer has to retry the complete operation anyway.
_, putErr := s3.client.PutObject(ctx, s3.bucket, manifestObjectName(name),
@@ -547,7 +588,11 @@ func (s3 *S3Backend) CommitManifest(ctx context.Context, name string, manifest *
minio.RemoveObjectOptions{})
s3.siteCache.Cache.Invalidate(name)
if putErr != nil {
return putErr
if errResp := minio.ToErrorResponse(putErr); errResp.Code == "PreconditionFailed" {
return ErrPreconditionFailed
} else {
return putErr
}
} else if removeErr != nil {
return removeErr
} else {
@@ -555,7 +600,9 @@ func (s3 *S3Backend) CommitManifest(ctx context.Context, name string, manifest *
}
}
func (s3 *S3Backend) DeleteManifest(ctx context.Context, name string) error {
func (s3 *S3Backend) DeleteManifest(
ctx context.Context, name string, opts ModifyManifestOptions,
) error {
logc.Printf(ctx, "s3: delete manifest %s\n", name)
_, domain, _ := strings.Cut(name, "/")
@@ -563,6 +610,10 @@ func (s3 *S3Backend) DeleteManifest(ctx context.Context, name string) error {
return err
}
if err := s3.checkManifestPrecondition(ctx, name, opts); err != nil {
return err
}
err := s3.client.RemoveObject(ctx, s3.bucket, manifestObjectName(name),
minio.RemoveObjectOptions{})
s3.siteCache.Cache.Invalidate(name)

View File

@@ -77,7 +77,7 @@ func ExtractTar(reader io.Reader) (*Manifest, error) {
case tar.TypeDir:
AddDirectory(manifest, fileName)
default:
AddProblem(manifest, fileName, "unsupported type '%c'", header.Typeflag)
AddProblem(manifest, fileName, "tar: unsupported type '%c'", header.Typeflag)
continue
}
}

16
src/flock_other.go Normal file
View File

@@ -0,0 +1,16 @@
//go:build !unix
package git_pages
import (
"fmt"
"os"
)
func FileLock(file *os.File) error {
return fmt.Errorf("unimplemented")
}
func FileUnlock(file *os.File) error {
return fmt.Errorf("unimplemented")
}

16
src/flock_posix.go Normal file
View File

@@ -0,0 +1,16 @@
//go:build unix
package git_pages
import (
"os"
"syscall"
)
func FileLock(file *os.File) error {
return syscall.Flock(int(file.Fd()), syscall.LOCK_EX)
}
func FileUnlock(file *os.File) error {
return syscall.Flock(int(file.Fd()), syscall.LOCK_UN)
}

View File

@@ -270,7 +270,9 @@ var ErrManifestTooLarge = errors.New("manifest too large")
// Uploads inline file data over certain size to the storage backend. Returns a copy of
// the manifest updated to refer to an external content-addressable store.
func StoreManifest(ctx context.Context, name string, manifest *Manifest) (*Manifest, error) {
func StoreManifest(
ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions,
) (*Manifest, error) {
span, ctx := ObserveFunction(ctx, "StoreManifest", "manifest.name", name)
defer span.Finish()
@@ -349,7 +351,7 @@ func StoreManifest(ctx context.Context, name string, manifest *Manifest) (*Manif
return nil, err // currently ignores all but 1st error
}
if err := backend.CommitManifest(ctx, name, &extManifest); err != nil {
if err := backend.CommitManifest(ctx, name, &extManifest, opts); err != nil {
if errors.Is(err, ErrDomainFrozen) {
return nil, err
} else {

View File

@@ -403,16 +403,20 @@ func (backend *observedBackend) StageManifest(ctx context.Context, manifest *Man
return
}
func (backend *observedBackend) CommitManifest(ctx context.Context, name string, manifest *Manifest) (err error) {
func (backend *observedBackend) HasAtomicCAS(ctx context.Context) bool {
return backend.inner.HasAtomicCAS(ctx)
}
func (backend *observedBackend) CommitManifest(ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions) (err error) {
span, ctx := ObserveFunction(ctx, "CommitManifest", "manifest.name", name)
err = backend.inner.CommitManifest(ctx, name, manifest)
err = backend.inner.CommitManifest(ctx, name, manifest, opts)
span.Finish()
return
}
func (backend *observedBackend) DeleteManifest(ctx context.Context, name string) (err error) {
func (backend *observedBackend) DeleteManifest(ctx context.Context, name string, opts ModifyManifestOptions) (err error) {
span, ctx := ObserveFunction(ctx, "DeleteManifest", "manifest.name", name)
err = backend.inner.DeleteManifest(ctx, name)
err = backend.inner.DeleteManifest(ctx, name, opts)
span.Finish()
return
}

View File

@@ -46,9 +46,8 @@ var (
}, []string{"cause"})
)
func reportSiteUpdate(via string, result *UpdateResult) {
func observeSiteUpdate(via string, result *UpdateResult) {
siteUpdatesCount.With(prometheus.Labels{"via": via}).Inc()
switch result.outcome {
case UpdateError:
siteUpdateErrorCount.With(prometheus.Labels{"cause": "other"}).Inc()
@@ -358,7 +357,7 @@ func getPage(w http.ResponseWriter, r *http.Request) error {
}
if !negotiatedEncoding {
w.WriteHeader(http.StatusNotAcceptable)
return fmt.Errorf("no supported content encodings (accept-encoding: %q)",
return fmt.Errorf("no supported content encodings (Accept-Encoding: %q)",
r.Header.Get("Accept-Encoding"))
}
@@ -420,6 +419,15 @@ func checkDryRun(w http.ResponseWriter, r *http.Request) bool {
func putPage(w http.ResponseWriter, r *http.Request) error {
var result UpdateResult
for _, header := range []string{
"If-Modified-Since", "If-Unmodified-Since", "If-Match", "If-None-Match",
} {
if r.Header.Get(header) != "" {
http.Error(w, fmt.Sprintf("unsupported precondition %s", header), http.StatusBadRequest)
return nil
}
}
host, err := GetHost(r)
if err != nil {
return err
@@ -483,6 +491,74 @@ func putPage(w http.ResponseWriter, r *http.Request) error {
result = UpdateFromArchive(updateCtx, webRoot, contentType, reader)
}
return reportUpdateResult(w, result)
}
func patchPage(w http.ResponseWriter, r *http.Request) error {
if !config.Feature("patch") {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return nil
}
for _, header := range []string{
"If-Modified-Since", "If-Unmodified-Since", "If-Match", "If-None-Match",
} {
if r.Header.Get(header) != "" {
http.Error(w, fmt.Sprintf("unsupported precondition %s", header), http.StatusBadRequest)
return nil
}
}
host, err := GetHost(r)
if err != nil {
return err
}
projectName, err := GetProjectName(r)
if err != nil {
return err
}
webRoot := makeWebRoot(host, projectName)
updateCtx, cancel := context.WithTimeout(r.Context(), time.Duration(config.Limits.UpdateTimeout))
defer cancel()
if _, err = AuthorizeUpdateFromArchive(r); err != nil {
return err
}
// Providing atomic compare-and-swap operations might be difficult or impossible depending
// on the backend in use and its configuration, but for applications where a mostly-atomic
// compare-and-swap operation is good enough (e.g. generating page previews) we don't want
// to prevent the use of partial updates.
wantRaceFree := r.Header.Get("Race-Free")
hasAtomicCAS := backend.HasAtomicCAS(r.Context())
switch {
case wantRaceFree == "yes" && hasAtomicCAS || wantRaceFree == "no":
// all good
case wantRaceFree == "yes":
http.Error(w, "race free partial updates unsupported", http.StatusPreconditionFailed)
return nil
case wantRaceFree == "":
http.Error(w, "must provide \"Race-Free: yes|no\" header", http.StatusPreconditionRequired)
return nil
default:
http.Error(w, "malformed Race-Free: header", http.StatusBadRequest)
return nil
}
if checkDryRun(w, r) {
return nil
}
contentType := getMediaType(r.Header.Get("Content-Type"))
reader := http.MaxBytesReader(w, r.Body, int64(config.Limits.MaxSiteSize.Bytes()))
result := PartialUpdateFromArchive(updateCtx, webRoot, contentType, reader)
return reportUpdateResult(w, result)
}
func reportUpdateResult(w http.ResponseWriter, result UpdateResult) error {
switch result.outcome {
case UpdateError:
if errors.Is(result.err, ErrManifestTooLarge) {
@@ -491,6 +567,12 @@ func putPage(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusUnsupportedMediaType)
} else if errors.Is(result.err, ErrArchiveTooLarge) {
w.WriteHeader(http.StatusRequestEntityTooLarge)
} else if errors.Is(result.err, ErrMalformedPatch) {
w.WriteHeader(http.StatusUnprocessableEntity)
} else if errors.Is(result.err, ErrPreconditionFailed) {
w.WriteHeader(http.StatusPreconditionFailed)
} else if errors.Is(result.err, ErrWriteConflict) {
w.WriteHeader(http.StatusConflict)
} else if errors.Is(result.err, ErrDomainFrozen) {
w.WriteHeader(http.StatusForbidden)
} else {
@@ -521,7 +603,7 @@ func putPage(w http.ResponseWriter, r *http.Request) error {
} else {
fmt.Fprintln(w, "internal error")
}
reportSiteUpdate("rest", &result)
observeSiteUpdate("rest", &result)
return nil
}
@@ -545,7 +627,8 @@ func deletePage(w http.ResponseWriter, r *http.Request) error {
return nil
}
err = backend.DeleteManifest(r.Context(), makeWebRoot(host, projectName))
err = backend.DeleteManifest(r.Context(), makeWebRoot(host, projectName),
ModifyManifestOptions{})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
} else {
@@ -656,7 +739,7 @@ func postPage(w http.ResponseWriter, r *http.Request) error {
result := UpdateFromRepository(ctx, webRoot, repoURL, auth.branch)
resultChan <- result
reportSiteUpdate("webhook", &result)
observeSiteUpdate("webhook", &result)
}(context.Background())
var result UpdateResult
@@ -716,7 +799,7 @@ func ServePages(w http.ResponseWriter, r *http.Request) {
}
}
}
allowedMethods := []string{"OPTIONS", "HEAD", "GET", "PUT", "DELETE", "POST"}
allowedMethods := []string{"OPTIONS", "HEAD", "GET", "PUT", "PATCH", "DELETE", "POST"}
if r.Method == "OPTIONS" || !slices.Contains(allowedMethods, r.Method) {
w.Header().Add("Allow", strings.Join(allowedMethods, ", "))
}
@@ -729,6 +812,8 @@ func ServePages(w http.ResponseWriter, r *http.Request) {
err = getPage(w, r)
case http.MethodPut:
err = putPage(w, r)
case http.MethodPatch:
err = patchPage(w, r)
case http.MethodDelete:
err = deletePage(w, r)
// webhook API

128
src/patch.go Normal file
View File

@@ -0,0 +1,128 @@
package git_pages
import (
"archive/tar"
"errors"
"fmt"
"io"
"maps"
"slices"
"strings"
)
var ErrMalformedPatch = errors.New("malformed patch")
// Mutates `manifest` according to a tar stream and the following rules:
// - A character device with major 0 and minor 0 is a "whiteout marker". When placed
// at a given path, this path and its entire subtree (if any) are removed from the manifest.
// - When a directory is placed at a given path, this path and its entire subtree (if any) are
// removed from the manifest and replaced with the contents of the directory.
func ApplyTarPatch(manifest *Manifest, reader io.Reader) error {
type Node struct {
entry *Entry
children map[string]*Node
}
// Extract the manifest contents (which is using a flat hash map) into a directory tree
// so that recursive delete operations have O(1) complexity. s
var root *Node
sortedNames := slices.Sorted(maps.Keys(manifest.GetContents()))
for _, name := range sortedNames {
entry := manifest.Contents[name]
node := &Node{entry: entry}
if entry.GetType() == Type_Directory {
node.children = map[string]*Node{}
}
if name == "" {
root = node
} else {
segments := strings.Split(name, "/")
fileName := segments[len(segments)-1]
iter := root
for _, segment := range segments[:len(segments)-1] {
if iter.children == nil {
panic("malformed manifest")
} else if _, exists := iter.children[segment]; !exists {
panic("malformed manifest")
} else {
iter = iter.children[segment]
}
}
iter.children[fileName] = node
}
}
manifest.Contents = map[string]*Entry{}
// Process the archive as a patch operation.
archive := tar.NewReader(reader)
for {
header, err := archive.Next()
if err == io.EOF {
break
} else if err != nil {
return err
}
segments := strings.Split(strings.TrimRight(header.Name, "/"), "/")
fileName := segments[len(segments)-1]
node := root
for index, segment := range segments[:len(segments)-1] {
if node.children == nil {
dirName := strings.Join(segments[:index], "/")
return fmt.Errorf("%w: %s: not a directory", ErrMalformedPatch, dirName)
}
if _, exists := node.children[segment]; !exists {
nodeName := strings.Join(segments[:index+1], "/")
return fmt.Errorf("%w: %s: path not found", ErrMalformedPatch, nodeName)
} else {
node = node.children[segment]
}
}
if node.children == nil {
dirName := strings.Join(segments[:len(segments)-1], "/")
return fmt.Errorf("%w: %s: not a directory", ErrMalformedPatch, dirName)
}
switch header.Typeflag {
case tar.TypeReg:
fileData, err := io.ReadAll(archive)
if err != nil {
return fmt.Errorf("tar: %s: %w", header.Name, err)
}
node.children[fileName] = &Node{
entry: NewManifestEntry(Type_InlineFile, fileData),
}
case tar.TypeSymlink:
node.children[fileName] = &Node{
entry: NewManifestEntry(Type_Symlink, []byte(header.Linkname)),
}
case tar.TypeDir:
node.children[fileName] = &Node{
entry: NewManifestEntry(Type_Directory, nil),
children: map[string]*Node{},
}
case tar.TypeChar:
if header.Devmajor == 0 && header.Devminor == 0 {
delete(node.children, fileName)
} else {
AddProblem(manifest, header.Name,
"tar: unsupported chardev %d,%d", header.Devmajor, header.Devminor)
}
default:
AddProblem(manifest, header.Name,
"tar: unsupported type '%c'", header.Typeflag)
continue
}
}
// Repopulate manifest contents with the updated directory tree.
var traverse func([]string, *Node)
traverse = func(segments []string, node *Node) {
manifest.Contents[strings.Join(segments, "/")] = node.entry
for fileName, childNode := range node.children {
traverse(append(segments, fileName), childNode)
}
}
traverse([]string{}, root)
return nil
}

View File

@@ -6,6 +6,8 @@ import (
"fmt"
"io"
"strings"
"google.golang.org/protobuf/proto"
)
type UpdateOutcome int
@@ -25,14 +27,16 @@ type UpdateResult struct {
err error
}
func Update(ctx context.Context, webRoot string, manifest *Manifest) UpdateResult {
var oldManifest, newManifest *Manifest
func Update(
ctx context.Context, webRoot string, oldManifest, newManifest *Manifest,
opts ModifyManifestOptions,
) UpdateResult {
var err error
var storedManifest *Manifest
outcome := UpdateError
oldManifest, _, _ = backend.GetManifest(ctx, webRoot, GetManifestOptions{})
if IsManifestEmpty(manifest) {
newManifest, err = manifest, backend.DeleteManifest(ctx, webRoot)
if IsManifestEmpty(newManifest) {
storedManifest, err = newManifest, backend.DeleteManifest(ctx, webRoot, opts)
if err == nil {
if oldManifest == nil {
outcome = UpdateNoChange
@@ -40,8 +44,8 @@ func Update(ctx context.Context, webRoot string, manifest *Manifest) UpdateResul
outcome = UpdateDeleted
}
}
} else if err = PrepareManifest(ctx, manifest); err == nil {
newManifest, err = StoreManifest(ctx, webRoot, manifest)
} else if err = PrepareManifest(ctx, newManifest); err == nil {
storedManifest, err = StoreManifest(ctx, webRoot, newManifest, opts)
if err == nil {
domain, _, _ := strings.Cut(webRoot, "/")
err = backend.CreateDomain(ctx, domain)
@@ -49,7 +53,7 @@ func Update(ctx context.Context, webRoot string, manifest *Manifest) UpdateResul
if err == nil {
if oldManifest == nil {
outcome = UpdateCreated
} else if CompareManifest(oldManifest, newManifest) {
} else if CompareManifest(oldManifest, storedManifest) {
outcome = UpdateNoChange
} else {
outcome = UpdateReplaced
@@ -69,8 +73,8 @@ func Update(ctx context.Context, webRoot string, manifest *Manifest) UpdateResul
case UpdateNoChange:
status = "unchanged"
}
if newManifest.Commit != nil {
logc.Printf(ctx, "update %s ok: %s %s", webRoot, status, *newManifest.Commit)
if storedManifest.Commit != nil {
logc.Printf(ctx, "update %s ok: %s %s", webRoot, status, *storedManifest.Commit)
} else {
logc.Printf(ctx, "update %s ok: %s", webRoot, status)
}
@@ -78,7 +82,7 @@ func Update(ctx context.Context, webRoot string, manifest *Manifest) UpdateResul
logc.Printf(ctx, "update %s err: %s", webRoot, err)
}
return UpdateResult{outcome, newManifest, err}
return UpdateResult{outcome, storedManifest, err}
}
func UpdateFromRepository(
@@ -92,16 +96,16 @@ func UpdateFromRepository(
logc.Printf(ctx, "update %s: %s %s\n", webRoot, repoURL, branch)
oldManifest, _, _ := backend.GetManifest(ctx, webRoot, GetManifestOptions{})
// Ignore errors; worst case we have to re-fetch all of the blobs.
oldManifest, _, _ := backend.GetManifest(ctx, webRoot, GetManifestOptions{})
manifest, err := FetchRepository(ctx, repoURL, branch, oldManifest)
newManifest, err := FetchRepository(ctx, repoURL, branch, oldManifest)
if errors.Is(err, context.DeadlineExceeded) {
result = UpdateResult{UpdateTimeout, nil, fmt.Errorf("update timeout")}
} else if err != nil {
result = UpdateResult{UpdateError, nil, err}
} else {
result = Update(ctx, webRoot, manifest)
result = Update(ctx, webRoot, oldManifest, newManifest, ModifyManifestOptions{})
}
observeUpdateResult(result)
@@ -116,22 +120,25 @@ func UpdateFromArchive(
contentType string,
reader io.Reader,
) (result UpdateResult) {
var manifest *Manifest
var err error
// Ignore errors; here the old manifest is used only to determine the update outcome.
oldManifest, _, _ := backend.GetManifest(ctx, webRoot, GetManifestOptions{})
var newManifest *Manifest
switch contentType {
case "application/x-tar":
logc.Printf(ctx, "update %s: (tar)", webRoot)
manifest, err = ExtractTar(reader) // yellow?
newManifest, err = ExtractTar(reader) // yellow?
case "application/x-tar+gzip":
logc.Printf(ctx, "update %s: (tar.gz)", webRoot)
manifest, err = ExtractGzip(reader, ExtractTar) // definitely yellow.
newManifest, err = ExtractGzip(reader, ExtractTar) // definitely yellow.
case "application/x-tar+zstd":
logc.Printf(ctx, "update %s: (tar.zst)", webRoot)
manifest, err = ExtractZstd(reader, ExtractTar)
newManifest, err = ExtractZstd(reader, ExtractTar)
case "application/zip":
logc.Printf(ctx, "update %s: (zip)", webRoot)
manifest, err = ExtractZip(reader)
newManifest, err = ExtractZip(reader)
default:
err = errArchiveFormat
}
@@ -140,7 +147,70 @@ func UpdateFromArchive(
logc.Printf(ctx, "update %s err: %s", webRoot, err)
result = UpdateResult{UpdateError, nil, err}
} else {
result = Update(ctx, webRoot, manifest)
result = Update(ctx, webRoot, oldManifest, newManifest, ModifyManifestOptions{})
}
observeUpdateResult(result)
return
}
func PartialUpdateFromArchive(
ctx context.Context,
webRoot string,
contentType string,
reader io.Reader,
) (result UpdateResult) {
var err error
// Here the old manifest is used both as a substrate to which a patch is applied, as well
// as a "load linked" operation for a future "store conditional" update which, taken together,
// create an atomic compare-and-swap operation.
oldManifest, oldManifestMtime, err := backend.GetManifest(ctx, webRoot,
GetManifestOptions{BypassCache: true})
if err != nil {
logc.Printf(ctx, "patch %s err: %s", webRoot, err)
return UpdateResult{UpdateError, nil, err}
}
applyTarPatch := func(reader io.Reader) (*Manifest, error) {
// Clone the manifest before starting to mutate it. `GetManifest` may return cached
// `*Manifest` objects, which should never be mutated.
newManifest := &Manifest{}
proto.Merge(newManifest, oldManifest)
if err := ApplyTarPatch(newManifest, reader); err != nil {
return nil, err
} else {
return newManifest, nil
}
}
var newManifest *Manifest
switch contentType {
case "application/x-tar":
logc.Printf(ctx, "patch %s: (tar)", webRoot)
newManifest, err = applyTarPatch(reader)
case "application/x-tar+gzip":
logc.Printf(ctx, "patch %s: (tar.gz)", webRoot)
newManifest, err = ExtractGzip(reader, applyTarPatch)
case "application/x-tar+zstd":
logc.Printf(ctx, "patch %s: (tar.zst)", webRoot)
newManifest, err = ExtractZstd(reader, applyTarPatch)
default:
err = errArchiveFormat
}
if err != nil {
logc.Printf(ctx, "patch %s err: %s", webRoot, err)
result = UpdateResult{UpdateError, nil, err}
} else {
result = Update(ctx, webRoot, oldManifest, newManifest,
ModifyManifestOptions{IfUnmodifiedSince: oldManifestMtime})
// The `If-Unmodified-Since` precondition is internally generated here, which means its
// failure shouldn't be surfaced as-is in the HTTP response. If we also accepted options
// from the client, then that precondition failure should surface in the response.
if errors.Is(result.err, ErrPreconditionFailed) {
result.err = ErrWriteConflict
}
}
observeUpdateResult(result)