diff --git a/.gitignore b/.gitignore index 9658acf..67013fb 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ /data /config*.toml* /git-pages +/site diff --git a/src/audit.go b/src/audit.go index 6dc77ac..f1e8cde 100644 --- a/src/audit.go +++ b/src/audit.go @@ -4,7 +4,11 @@ import ( "cmp" "context" "fmt" + "io" "net/http" + "os" + "os/exec" + "path/filepath" "strconv" "strings" "time" @@ -147,6 +151,110 @@ func AuditRecordJSON(record *AuditRecord, scope AuditRecordScope) []byte { return json } +// This function receives `id` and `record` separately because the record itself may have its +// ID missing or mismatched. While this is very unlikely, using the actual primary key as +// the filename is more robust. +func ExtractAuditRecord(ctx context.Context, id AuditID, record *AuditRecord, dest string) error { + const mode = 0o400 // readable by current user, not writable + + err := os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-event.json", id)), + AuditRecordJSON(record, AuditRecordNoManifest), mode) + if err != nil { + return err + } + + if record.Manifest != nil { + err = os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-manifest.json", id)), + ManifestJSON(record.Manifest), mode) + if err != nil { + return err + } + + archive, err := os.OpenFile(filepath.Join(dest, fmt.Sprintf("%s-archive.tar", id)), + os.O_CREATE|os.O_TRUNC|os.O_WRONLY, mode) + if err != nil { + return err + } + defer archive.Close() + + err = CollectTar(ctx, archive, record.Manifest, ManifestMetadata{}) + if err != nil { + return err + } + } + + return nil +} + +func AuditEventProcessor(command string, args []string) (http.Handler, error) { + var err error + + // Resolve the command to an absolute path, as it will be run from a different current + // directory, which would break e.g. `git-pages -audit-server tcp/:3004 ./handler.sh`. + if command, err = exec.LookPath(command); err != nil { + return nil, err + } + if command, err = filepath.Abs(command); err != nil { + return nil, err + } + + router := http.NewServeMux() + router.Handle("GET /", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Go will cancel the request context if the client drops the connection. We don't want + // that to interrupt processing. However, we also want the client (not the server) to + // handle retries, so instead of spawning a goroutine to process the event, we do this + // within the HTTP handler. If an error is returned, the notify goroutine in the worker + // will retry the HTTP request (with backoff) until it succeeds. + // + // This is a somewhat idiosyncratic design and it's not clear that this is the best + // possible approach (e.g. if the worker gets restarted and the event processing fails, + // it will not be retried), but it should do the job for now. It is expected that + // some form of observability is used to highlight event processor errors. + ctx := context.WithoutCancel(r.Context()) + + id, err := ParseAuditID(r.URL.RawQuery) + if err != nil { + logc.Printf(ctx, "audit process err: malformed query\n") + http.Error(w, "malformed query", http.StatusBadRequest) + return + } + + record, err := backend.QueryAuditLog(ctx, id) + if err != nil { + logc.Printf(ctx, "audit process err: missing record\n") + http.Error(w, "missing record", http.StatusNotFound) + return + } + + args := append(args, id.String(), record.GetEvent().String()) + cmd := exec.CommandContext(ctx, command, args...) + if cmd.Dir, err = os.MkdirTemp("", "auditRecord"); err != nil { + panic(fmt.Errorf("mkdtemp: %w", err)) + } + defer os.RemoveAll(cmd.Dir) + + if err = ExtractAuditRecord(ctx, id, record, cmd.Dir); err != nil { + logc.Printf(ctx, "audit process %s err: %s\n", id, err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + output, err := cmd.CombinedOutput() + if err != nil { + logc.Printf(ctx, "audit process %s err: %s; %s\n", id, err, string(output)) + w.WriteHeader(http.StatusServiceUnavailable) + if len(output) == 0 { + fmt.Fprintln(w, err.Error()) + } + } else { + logc.Printf(ctx, "audit process %s ok: %s\n", id, string(output)) + w.WriteHeader(http.StatusOK) + } + w.Write(output) + })) + return router, nil +} + type auditedBackend struct { Backend } @@ -199,6 +307,7 @@ func notifyAudit(ctx context.Context, id AuditID) { notifyURL := config.Audit.NotifyURL.URL notifyURL.RawQuery = id.String() + // See also the explanation in `AuditEventProcessor` above. go func() { backoff := exponential.Backoff{ Jitter: true, @@ -206,16 +315,27 @@ func notifyAudit(ctx context.Context, id AuditID) { Max: time.Second * 60, } for { - _, err := http.Get(notifyURL.String()) - if err != nil { - sleepFor := backoff.Duration() - logc.Printf(ctx, "audit notify %s err: %s (retry in %s)", id, err, sleepFor) - auditNotifyErrorCount.Inc() - time.Sleep(sleepFor) - } else { - logc.Printf(ctx, "audit notify %s ok", id) + resp, err := http.Get(notifyURL.String()) + var body []byte + if err == nil { + defer resp.Body.Close() + body, _ = io.ReadAll(resp.Body) + } + if err == nil && resp.StatusCode == http.StatusOK { + logc.Printf(ctx, "audit notify %s ok: %s\n", id, string(body)) auditNotifyOkCount.Inc() break + } else { + sleepFor := backoff.Duration() + if err != nil { + logc.Printf(ctx, "audit notify %s err: %s (retry in %s)", + id, err, sleepFor) + } else { + logc.Printf(ctx, "audit notify %s fail: %s (retry in %s); %s", + id, resp.Status, sleepFor, string(body)) + } + auditNotifyErrorCount.Inc() + time.Sleep(sleepFor) } } }() diff --git a/src/fetch.go b/src/fetch.go index e38faca..30a7af8 100644 --- a/src/fetch.go +++ b/src/fetch.go @@ -41,8 +41,7 @@ func FetchRepository( var storer *filesystem.Storage for _, filter := range []packp.Filter{packp.FilterBlobNone(), packp.Filter("")} { var tempDir string - tempDir, err = os.MkdirTemp("", "fetchRepo") - if err != nil { + if tempDir, err = os.MkdirTemp("", "fetchRepo"); err != nil { return nil, fmt.Errorf("mkdtemp: %w", err) } defer os.RemoveAll(tempDir) diff --git a/src/main.go b/src/main.go index e91a93e..3b82966 100644 --- a/src/main.go +++ b/src/main.go @@ -177,7 +177,7 @@ func usage() { fmt.Fprintf(os.Stderr, "(admin) "+ "git-pages {-run-migration |-freeze-domain |-unfreeze-domain }\n") fmt.Fprintf(os.Stderr, "(audit) "+ - "git-pages {-audit-log|-audit-read }\n") + "git-pages {-audit-log|-audit-read |-audit-server [args...]}\n") fmt.Fprintf(os.Stderr, "(info) "+ "git-pages {-print-config-env-vars|-print-config}\n") fmt.Fprintf(os.Stderr, "(cli) "+ @@ -215,6 +215,8 @@ func Main() { "display audit log") auditRead := flag.String("audit-read", "", "extract contents of audit record `id` to files '-*'") + auditServer := flag.String("audit-server", "", + "listen for notifications on `endpoint` and spawn a process for each audit event") flag.Parse() var cliOperations int @@ -228,6 +230,7 @@ func Main() { *unfreezeDomain != "", *auditLog, *auditRead != "", + *auditServer != "", } { if selected { cliOperations++ @@ -469,19 +472,26 @@ func Main() { logc.Fatalln(ctx, err) } - errEvent := os.WriteFile(fmt.Sprintf("%s-event.json", id), - AuditRecordJSON(record, AuditRecordNoManifest), 0o400) - errManifest := os.WriteFile(fmt.Sprintf("%s-manifest.json", id), - ManifestJSON(record.Manifest), 0o400) - fileArchive, errArchive := os.OpenFile(fmt.Sprintf("%s-archive.tar", id), - os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o400) - if err = errors.Join(errEvent, errManifest, errArchive); err != nil { + if err = ExtractAuditRecord(ctx, id, record, "."); err != nil { logc.Fatalln(ctx, err) } - if err = CollectTar(ctx, fileArchive, record.Manifest, ManifestMetadata{}); err != nil { + + case *auditServer != "": + if backend, err = CreateBackend(ctx, &config.Storage); err != nil { logc.Fatalln(ctx, err) } + if flag.NArg() < 1 { + logc.Fatalln(ctx, "handler path not provided") + } + + processor, err := AuditEventProcessor(flag.Arg(0), flag.Args()[1:]) + if err != nil { + logc.Fatalln(ctx, err) + } + + serve(ctx, listen(ctx, "audit", *auditServer), ObserveHTTPHandler(processor)) + default: // Hook a signal (SIGHUP on *nix, nothing on Windows) for reloading the configuration // at runtime. This is useful because it preserves S3 backend cache contents. Failed