Implement -audit-server.

To use this function, configure git-pages with e.g.:

    [audit]
    collect = true
    notify-url = "http://localhost:3004/"

and run an audit server with e.g.:

    git-pages -audit-server tcp/:3004 python $(pwd)/process.py

The provided command line is executed after appending two arguments
(audit record ID and event type), and runs in a temporary directory
with the audit record extracted into it. The following files will
be present in this directory:
  * `$1-event.json` (always)
  * `$1-manifest.json` (if type is `CommitManifest`)
  * `$1-archive.tar` (if type is `CommitManifest`)

The script must complete successfully for the event processing to
finish. The notification will keep being re-sent (by the worker) with
exponential backoff until it does.
This commit is contained in:
Catherine
2025-12-05 03:03:38 +00:00
parent 464c40db9c
commit 8c29ba3fe7
4 changed files with 149 additions and 19 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@
/data
/config*.toml*
/git-pages
/site

View File

@@ -4,7 +4,11 @@ import (
"cmp"
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
@@ -147,6 +151,110 @@ func AuditRecordJSON(record *AuditRecord, scope AuditRecordScope) []byte {
return json
}
// This function receives `id` and `record` separately because the record itself may have its
// ID missing or mismatched. While this is very unlikely, using the actual primary key as
// the filename is more robust.
func ExtractAuditRecord(ctx context.Context, id AuditID, record *AuditRecord, dest string) error {
const mode = 0o400 // readable by current user, not writable
err := os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-event.json", id)),
AuditRecordJSON(record, AuditRecordNoManifest), mode)
if err != nil {
return err
}
if record.Manifest != nil {
err = os.WriteFile(filepath.Join(dest, fmt.Sprintf("%s-manifest.json", id)),
ManifestJSON(record.Manifest), mode)
if err != nil {
return err
}
archive, err := os.OpenFile(filepath.Join(dest, fmt.Sprintf("%s-archive.tar", id)),
os.O_CREATE|os.O_TRUNC|os.O_WRONLY, mode)
if err != nil {
return err
}
defer archive.Close()
err = CollectTar(ctx, archive, record.Manifest, ManifestMetadata{})
if err != nil {
return err
}
}
return nil
}
func AuditEventProcessor(command string, args []string) (http.Handler, error) {
var err error
// Resolve the command to an absolute path, as it will be run from a different current
// directory, which would break e.g. `git-pages -audit-server tcp/:3004 ./handler.sh`.
if command, err = exec.LookPath(command); err != nil {
return nil, err
}
if command, err = filepath.Abs(command); err != nil {
return nil, err
}
router := http.NewServeMux()
router.Handle("GET /", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Go will cancel the request context if the client drops the connection. We don't want
// that to interrupt processing. However, we also want the client (not the server) to
// handle retries, so instead of spawning a goroutine to process the event, we do this
// within the HTTP handler. If an error is returned, the notify goroutine in the worker
// will retry the HTTP request (with backoff) until it succeeds.
//
// This is a somewhat idiosyncratic design and it's not clear that this is the best
// possible approach (e.g. if the worker gets restarted and the event processing fails,
// it will not be retried), but it should do the job for now. It is expected that
// some form of observability is used to highlight event processor errors.
ctx := context.WithoutCancel(r.Context())
id, err := ParseAuditID(r.URL.RawQuery)
if err != nil {
logc.Printf(ctx, "audit process err: malformed query\n")
http.Error(w, "malformed query", http.StatusBadRequest)
return
}
record, err := backend.QueryAuditLog(ctx, id)
if err != nil {
logc.Printf(ctx, "audit process err: missing record\n")
http.Error(w, "missing record", http.StatusNotFound)
return
}
args := append(args, id.String(), record.GetEvent().String())
cmd := exec.CommandContext(ctx, command, args...)
if cmd.Dir, err = os.MkdirTemp("", "auditRecord"); err != nil {
panic(fmt.Errorf("mkdtemp: %w", err))
}
defer os.RemoveAll(cmd.Dir)
if err = ExtractAuditRecord(ctx, id, record, cmd.Dir); err != nil {
logc.Printf(ctx, "audit process %s err: %s\n", id, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
output, err := cmd.CombinedOutput()
if err != nil {
logc.Printf(ctx, "audit process %s err: %s; %s\n", id, err, string(output))
w.WriteHeader(http.StatusServiceUnavailable)
if len(output) == 0 {
fmt.Fprintln(w, err.Error())
}
} else {
logc.Printf(ctx, "audit process %s ok: %s\n", id, string(output))
w.WriteHeader(http.StatusOK)
}
w.Write(output)
}))
return router, nil
}
type auditedBackend struct {
Backend
}
@@ -199,6 +307,7 @@ func notifyAudit(ctx context.Context, id AuditID) {
notifyURL := config.Audit.NotifyURL.URL
notifyURL.RawQuery = id.String()
// See also the explanation in `AuditEventProcessor` above.
go func() {
backoff := exponential.Backoff{
Jitter: true,
@@ -206,16 +315,27 @@ func notifyAudit(ctx context.Context, id AuditID) {
Max: time.Second * 60,
}
for {
_, err := http.Get(notifyURL.String())
if err != nil {
sleepFor := backoff.Duration()
logc.Printf(ctx, "audit notify %s err: %s (retry in %s)", id, err, sleepFor)
auditNotifyErrorCount.Inc()
time.Sleep(sleepFor)
} else {
logc.Printf(ctx, "audit notify %s ok", id)
resp, err := http.Get(notifyURL.String())
var body []byte
if err == nil {
defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body)
}
if err == nil && resp.StatusCode == http.StatusOK {
logc.Printf(ctx, "audit notify %s ok: %s\n", id, string(body))
auditNotifyOkCount.Inc()
break
} else {
sleepFor := backoff.Duration()
if err != nil {
logc.Printf(ctx, "audit notify %s err: %s (retry in %s)",
id, err, sleepFor)
} else {
logc.Printf(ctx, "audit notify %s fail: %s (retry in %s); %s",
id, resp.Status, sleepFor, string(body))
}
auditNotifyErrorCount.Inc()
time.Sleep(sleepFor)
}
}
}()

View File

@@ -41,8 +41,7 @@ func FetchRepository(
var storer *filesystem.Storage
for _, filter := range []packp.Filter{packp.FilterBlobNone(), packp.Filter("")} {
var tempDir string
tempDir, err = os.MkdirTemp("", "fetchRepo")
if err != nil {
if tempDir, err = os.MkdirTemp("", "fetchRepo"); err != nil {
return nil, fmt.Errorf("mkdtemp: %w", err)
}
defer os.RemoveAll(tempDir)

View File

@@ -177,7 +177,7 @@ func usage() {
fmt.Fprintf(os.Stderr, "(admin) "+
"git-pages {-run-migration <name>|-freeze-domain <domain>|-unfreeze-domain <domain>}\n")
fmt.Fprintf(os.Stderr, "(audit) "+
"git-pages {-audit-log|-audit-read <id>}\n")
"git-pages {-audit-log|-audit-read <id>|-audit-server <endpoint> <program> [args...]}\n")
fmt.Fprintf(os.Stderr, "(info) "+
"git-pages {-print-config-env-vars|-print-config}\n")
fmt.Fprintf(os.Stderr, "(cli) "+
@@ -215,6 +215,8 @@ func Main() {
"display audit log")
auditRead := flag.String("audit-read", "",
"extract contents of audit record `id` to files '<id>-*'")
auditServer := flag.String("audit-server", "",
"listen for notifications on `endpoint` and spawn a process for each audit event")
flag.Parse()
var cliOperations int
@@ -228,6 +230,7 @@ func Main() {
*unfreezeDomain != "",
*auditLog,
*auditRead != "",
*auditServer != "",
} {
if selected {
cliOperations++
@@ -469,19 +472,26 @@ func Main() {
logc.Fatalln(ctx, err)
}
errEvent := os.WriteFile(fmt.Sprintf("%s-event.json", id),
AuditRecordJSON(record, AuditRecordNoManifest), 0o400)
errManifest := os.WriteFile(fmt.Sprintf("%s-manifest.json", id),
ManifestJSON(record.Manifest), 0o400)
fileArchive, errArchive := os.OpenFile(fmt.Sprintf("%s-archive.tar", id),
os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o400)
if err = errors.Join(errEvent, errManifest, errArchive); err != nil {
if err = ExtractAuditRecord(ctx, id, record, "."); err != nil {
logc.Fatalln(ctx, err)
}
if err = CollectTar(ctx, fileArchive, record.Manifest, ManifestMetadata{}); err != nil {
case *auditServer != "":
if backend, err = CreateBackend(ctx, &config.Storage); err != nil {
logc.Fatalln(ctx, err)
}
if flag.NArg() < 1 {
logc.Fatalln(ctx, "handler path not provided")
}
processor, err := AuditEventProcessor(flag.Arg(0), flag.Args()[1:])
if err != nil {
logc.Fatalln(ctx, err)
}
serve(ctx, listen(ctx, "audit", *auditServer), ObserveHTTPHandler(processor))
default:
// Hook a signal (SIGHUP on *nix, nothing on Windows) for reloading the configuration
// at runtime. This is useful because it preserves S3 backend cache contents. Failed