Files
git-pages/src/observe.go

509 lines
14 KiB
Go

package git_pages
import (
"context"
"errors"
"fmt"
"io"
"iter"
"log"
"log/slog"
"math/rand/v2"
"net/http"
"os"
"runtime/debug"
"strconv"
"sync"
"time"
slogmulti "github.com/samber/slog-multi"
syslog "codeberg.org/git-pages/go-slog-syslog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
sentryslog "github.com/getsentry/sentry-go/slog"
)
var (
httpRequestCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "git_pages_http_request_count",
Help: "Count of HTTP requests by method and response status code",
}, []string{"method", "code"})
httpRequestDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "git_pages_http_request_duration_seconds",
Help: "Time to respond to incoming HTTP requests",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 10 * time.Minute,
}, []string{"method"})
)
var syslogHandler syslog.Handler
func hasSentry() bool {
return os.Getenv("SENTRY_DSN") != ""
}
func chainSentryMiddleware(
middleware ...func(event *sentry.Event, hint *sentry.EventHint) *sentry.Event,
) func(event *sentry.Event, hint *sentry.EventHint) *sentry.Event {
return func(event *sentry.Event, hint *sentry.EventHint) *sentry.Event {
for idx := 0; idx < len(middleware) && event != nil; idx++ {
event = middleware[idx](event, hint)
}
return event
}
}
// sensitiveHTTPHeaders extends the list of sensitive headers defined in the Sentry Go SDK with our
// own application-specific header field names.
var sensitiveHTTPHeaders = map[string]struct{}{
"Forge-Authorization": {},
}
// scrubSentryEvent removes sensitive HTTP header fields from the Sentry event.
func scrubSentryEvent(event *sentry.Event, hint *sentry.EventHint) *sentry.Event {
if event.Request != nil && event.Request.Headers != nil {
for key := range event.Request.Headers {
if _, ok := sensitiveHTTPHeaders[key]; ok {
delete(event.Request.Headers, key)
}
}
}
return event
}
// sampleSentryEvent returns a function that discards a Sentry event according to the sample rate,
// unless the associated HTTP request triggers a mutation or it took too long to produce a response,
// in which case the event is never discarded.
func sampleSentryEvent(sampleRate float64) func(*sentry.Event, *sentry.EventHint) *sentry.Event {
return func(event *sentry.Event, hint *sentry.EventHint) *sentry.Event {
newSampleRate := sampleRate
if event.Request != nil {
switch event.Request.Method {
case "PUT", "POST", "DELETE":
newSampleRate = 1
}
}
duration := event.Timestamp.Sub(event.StartTime)
threshold := time.Duration(config.Observability.SlowResponseThreshold)
if duration >= threshold {
newSampleRate = 1
}
if rand.Float64() < newSampleRate {
return event
}
return nil
}
}
func InitObservability() {
debug.SetPanicOnFault(true)
environment := "development"
if value, ok := os.LookupEnv("ENVIRONMENT"); ok {
environment = value
}
logHandlers := []slog.Handler{}
switch config.LogFormat {
case "none":
// nothing to do
case "text":
logHandlers = append(logHandlers,
slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{}))
case "json":
logHandlers = append(logHandlers,
slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{}))
default:
log.Println("unknown log format", config.LogFormat)
}
if syslogAddr := os.Getenv("SYSLOG_ADDR"); syslogAddr != "" {
var err error
syslogHandler, err = syslog.NewHandler(&syslog.HandlerOptions{
Address: syslogAddr,
AppName: "git-pages",
StructuredDataID: "git-pages",
})
if err != nil {
log.Fatalf("syslog: %v", err)
}
logHandlers = append(logHandlers, syslogHandler)
}
if hasSentry() {
enableLogs := false
if value, err := strconv.ParseBool(os.Getenv("SENTRY_LOGS")); err == nil {
enableLogs = value
}
enableTracing := false
if value, err := strconv.ParseBool(os.Getenv("SENTRY_TRACING")); err == nil {
enableTracing = value
}
tracesSampleRate := 1.00
switch environment {
case "development", "staging":
default:
tracesSampleRate = 0.05
}
options := sentry.ClientOptions{}
options.DisableTelemetryBuffer = !config.Feature("sentry-telemetry-buffer")
options.Environment = environment
options.EnableLogs = enableLogs
options.EnableTracing = enableTracing
options.TracesSampleRate = 1 // use our own custom sampling logic
options.BeforeSend = scrubSentryEvent
options.BeforeSendTransaction = chainSentryMiddleware(
sampleSentryEvent(tracesSampleRate),
scrubSentryEvent,
)
if err := sentry.Init(options); err != nil {
log.Fatalf("sentry: %s\n", err)
}
if enableLogs {
logHandlers = append(logHandlers, sentryslog.Option{
AddSource: true,
}.NewSentryHandler(context.Background()))
}
}
slog.SetDefault(slog.New(slogmulti.Fanout(logHandlers...)))
}
func FiniObservability() {
var wg sync.WaitGroup
timeout := 2 * time.Second
if syslogHandler != nil {
wg.Go(func() { syslogHandler.Flush(timeout) })
}
if hasSentry() {
wg.Go(func() { sentry.Flush(timeout) })
}
wg.Wait()
}
func ObserveError(err error) {
if errors.Is(err, context.Canceled) {
// Something has explicitly requested cancellation.
// Timeout results in a different error.
return
}
if hasSentry() {
sentry.CaptureException(err)
}
}
type observedResponseWriter struct {
inner http.ResponseWriter
status int
}
func newObservedResponseWriter(w http.ResponseWriter) observedResponseWriter {
return observedResponseWriter{
inner: w,
status: 0,
}
}
func (w *observedResponseWriter) Unwrap() http.ResponseWriter {
return w.inner
}
func (w *observedResponseWriter) Header() http.Header {
return w.inner.Header()
}
func (w *observedResponseWriter) Write(data []byte) (int, error) {
return w.inner.Write(data)
}
func (w *observedResponseWriter) WriteHeader(statusCode int) {
w.status = statusCode
w.inner.WriteHeader(statusCode)
}
func ObserveHTTPHandler(handler http.Handler) http.Handler {
if hasSentry() {
handler = func(next http.Handler) http.Handler {
next = sentryhttp.New(sentryhttp.Options{
Repanic: true,
}).Handle(handler)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Prevent the Sentry SDK from continuing traces as we don't use this feature.
r.Header.Del(sentry.SentryTraceHeader)
r.Header.Del(sentry.SentryBaggageHeader)
next.ServeHTTP(w, r)
})
}(handler)
}
handler = func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ow := newObservedResponseWriter(w)
start := time.Now()
next.ServeHTTP(&ow, r)
duration := time.Since(start)
httpRequestCount.
With(prometheus.Labels{"method": r.Method, "code": fmt.Sprintf("%d", ow.status)}).
Inc()
httpRequestDurationSeconds.
With(prometheus.Labels{"method": r.Method}).
Observe(duration.Seconds())
})
}(handler)
return handler
}
type noopSpan struct{}
func (span noopSpan) Finish() {}
func ObserveFunction(
ctx context.Context, funcName string, data ...any,
) (
interface{ Finish() }, context.Context,
) {
switch {
case hasSentry():
span := sentry.StartSpan(ctx, "function")
span.Description = funcName
ObserveData(span.Context(), data...)
return span, span.Context()
default:
return noopSpan{}, ctx
}
}
func ObserveData(ctx context.Context, data ...any) {
if span := sentry.SpanFromContext(ctx); span != nil {
for i := 0; i < len(data); i += 2 {
name, value := data[i], data[i+1]
span.SetData(name.(string), value)
}
}
}
var (
blobsRetrievedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_retrieved",
Help: "Count of blobs retrieved",
})
blobsRetrievedBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_retrieved_bytes",
Help: "Total size in bytes of blobs retrieved",
})
blobsStoredCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_stored",
Help: "Count of blobs stored",
})
blobsStoredBytes = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_blobs_stored_bytes",
Help: "Total size in bytes of blobs stored",
})
manifestsRetrievedCount = promauto.NewCounter(prometheus.CounterOpts{
Name: "git_pages_manifests_retrieved",
Help: "Count of manifests retrieved",
})
)
type observedBackend struct {
inner Backend
}
var _ Backend = (*observedBackend)(nil)
func NewObservedBackend(backend Backend) Backend {
return &observedBackend{inner: backend}
}
func (backend *observedBackend) HasFeature(ctx context.Context, feature BackendFeature) (isOn bool) {
span, ctx := ObserveFunction(ctx, "HasFeature")
isOn = backend.inner.HasFeature(ctx, feature)
span.Finish()
return
}
func (backend *observedBackend) EnableFeature(ctx context.Context, feature BackendFeature) (err error) {
span, ctx := ObserveFunction(ctx, "EnableFeature")
err = backend.inner.EnableFeature(ctx, feature)
span.Finish()
return
}
func (backend *observedBackend) GetBlob(
ctx context.Context, name string,
) (
reader io.ReadSeeker, metadata BlobMetadata, err error,
) {
span, ctx := ObserveFunction(ctx, "GetBlob", "blob.name", name)
if reader, metadata, err = backend.inner.GetBlob(ctx, name); err == nil {
ObserveData(ctx, "blob.size", metadata.Size)
blobsRetrievedCount.Inc()
blobsRetrievedBytes.Add(float64(metadata.Size))
}
span.Finish()
return
}
func (backend *observedBackend) PutBlob(ctx context.Context, name string, data []byte) (err error) {
span, ctx := ObserveFunction(ctx, "PutBlob", "blob.name", name, "blob.size", len(data))
if err = backend.inner.PutBlob(ctx, name, data); err == nil {
blobsStoredCount.Inc()
blobsStoredBytes.Add(float64(len(data)))
}
span.Finish()
return
}
func (backend *observedBackend) DeleteBlob(ctx context.Context, name string) (err error) {
span, ctx := ObserveFunction(ctx, "DeleteBlob", "blob.name", name)
err = backend.inner.DeleteBlob(ctx, name)
span.Finish()
return
}
func (backend *observedBackend) EnumerateBlobs(ctx context.Context) iter.Seq2[BlobMetadata, error] {
return func(yield func(BlobMetadata, error) bool) {
span, ctx := ObserveFunction(ctx, "EnumerateBlobs")
for metadata, err := range backend.inner.EnumerateBlobs(ctx) {
if !yield(metadata, err) {
break
}
}
span.Finish()
}
}
func (backend *observedBackend) GetManifest(
ctx context.Context, name string, opts GetManifestOptions,
) (
manifest *Manifest, metadata ManifestMetadata, err error,
) {
span, ctx := ObserveFunction(ctx, "GetManifest",
"manifest.name", name,
"manifest.bypass_cache", opts.BypassCache,
)
if manifest, metadata, err = backend.inner.GetManifest(ctx, name, opts); err == nil {
manifestsRetrievedCount.Inc()
}
span.Finish()
return
}
func (backend *observedBackend) StageManifest(ctx context.Context, manifest *Manifest) (err error) {
span, ctx := ObserveFunction(ctx, "StageManifest")
err = backend.inner.StageManifest(ctx, manifest)
span.Finish()
return
}
func (backend *observedBackend) HasAtomicCAS(ctx context.Context) bool {
return backend.inner.HasAtomicCAS(ctx)
}
func (backend *observedBackend) CommitManifest(ctx context.Context, name string, manifest *Manifest, opts ModifyManifestOptions) (err error) {
span, ctx := ObserveFunction(ctx, "CommitManifest", "manifest.name", name)
err = backend.inner.CommitManifest(ctx, name, manifest, opts)
span.Finish()
return
}
func (backend *observedBackend) DeleteManifest(ctx context.Context, name string, opts ModifyManifestOptions) (err error) {
span, ctx := ObserveFunction(ctx, "DeleteManifest", "manifest.name", name)
err = backend.inner.DeleteManifest(ctx, name, opts)
span.Finish()
return
}
func (backend *observedBackend) EnumerateManifests(ctx context.Context) iter.Seq2[ManifestMetadata, error] {
return func(yield func(ManifestMetadata, error) bool) {
span, ctx := ObserveFunction(ctx, "EnumerateManifests")
for metadata, err := range backend.inner.EnumerateManifests(ctx) {
if !yield(metadata, err) {
break
}
}
span.Finish()
}
}
func (backend *observedBackend) CheckDomain(ctx context.Context, domain string) (found bool, err error) {
span, ctx := ObserveFunction(ctx, "CheckDomain", "domain.name", domain)
found, err = backend.inner.CheckDomain(ctx, domain)
span.Finish()
return
}
func (backend *observedBackend) CreateDomain(ctx context.Context, domain string) (err error) {
span, ctx := ObserveFunction(ctx, "CreateDomain", "domain.name", domain)
err = backend.inner.CreateDomain(ctx, domain)
span.Finish()
return
}
func (backend *observedBackend) FreezeDomain(ctx context.Context, domain string) (err error) {
span, ctx := ObserveFunction(ctx, "FreezeDomain", "domain.name", domain)
err = backend.inner.FreezeDomain(ctx, domain)
span.Finish()
return
}
func (backend *observedBackend) UnfreezeDomain(ctx context.Context, domain string) (err error) {
span, ctx := ObserveFunction(ctx, "UnfreezeDomain", "domain.name", domain)
err = backend.inner.UnfreezeDomain(ctx, domain)
span.Finish()
return
}
func (backend *observedBackend) AppendAuditLog(ctx context.Context, id AuditID, record *AuditRecord) (err error) {
span, ctx := ObserveFunction(ctx, "AppendAuditLog", "audit.id", id)
err = backend.inner.AppendAuditLog(ctx, id, record)
span.Finish()
return
}
func (backend *observedBackend) QueryAuditLog(ctx context.Context, id AuditID) (record *AuditRecord, err error) {
span, ctx := ObserveFunction(ctx, "QueryAuditLog", "audit.id", id)
record, err = backend.inner.QueryAuditLog(ctx, id)
span.Finish()
return
}
func (backend *observedBackend) SearchAuditLog(
ctx context.Context, opts SearchAuditLogOptions,
) iter.Seq2[AuditID, error] {
return func(yield func(AuditID, error) bool) {
span, ctx := ObserveFunction(ctx, "SearchAuditLog",
"audit.search.since", opts.Since,
"audit.search.until", opts.Until,
)
for id, err := range backend.inner.SearchAuditLog(ctx, opts) {
if !yield(id, err) {
break
}
}
span.Finish()
}
}