Files
2026-01-08 22:52:32 -06:00

309 lines
7.5 KiB
Go

// Package logging provides centralized structured logging with optional remote log shipping.
package logging
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
)
// Default configuration values
const (
DefaultBatchSize = 100
DefaultFlushInterval = 5 * time.Second
)
// Shipper defines the interface for log shipping backends.
// Implementations should be safe for concurrent use.
type Shipper interface {
// Ship sends a batch of log entries to the remote service.
// Returns an error if the batch could not be shipped.
Ship(ctx context.Context, entries []LogEntry) error
// Close cleanly shuts down the shipper, releasing any resources.
Close() error
}
// LogEntry represents a single log entry to be shipped.
type LogEntry struct {
Time time.Time
Level slog.Level
Message string
Source string
Attrs map[string]any
}
// ShipperConfig configures the log shipper.
type ShipperConfig struct {
// Backend selects the shipping backend: "victoria", "opensearch", "loki", etc.
// Empty string disables remote shipping (stdout only).
Backend string
// URL is the remote service endpoint URL.
URL string
// BatchSize is the number of logs to batch before flushing.
// Default: 100
BatchSize int
// FlushInterval is the maximum time between flushes.
// Default: 5s
FlushInterval time.Duration
// Service identifies the source service ("appview" or "hold").
// Added to all log entries.
Service string
// Username for basic auth (optional).
Username string
// Password for basic auth (optional).
Password string
}
// NewShipper creates a shipper for the configured backend.
// Returns nil if no backend is configured (remote shipping disabled).
func NewShipper(cfg ShipperConfig) (Shipper, error) {
switch cfg.Backend {
case "victoria":
return NewVictoriaShipper(cfg)
case "opensearch":
return nil, fmt.Errorf("opensearch backend not yet implemented")
case "loki":
return nil, fmt.Errorf("loki backend not yet implemented")
case "":
return nil, nil // No remote shipping
default:
return nil, fmt.Errorf("unknown log shipper backend: %s", cfg.Backend)
}
}
// asyncState holds the shared state for async log shipping.
// This is separate from AsyncHandler to allow WithAttrs/WithGroup
// to create new handlers that share the same batch and flush state.
type asyncState struct {
shipper Shipper
// Batching
batch []LogEntry
batchMu sync.Mutex
batchSize int
// Async flush
flushInterval time.Duration
flushCh chan struct{}
doneCh chan struct{}
wg sync.WaitGroup
}
// AsyncHandler is an slog.Handler that writes to stdout and optionally
// ships logs to a remote service asynchronously.
type AsyncHandler struct {
stdout slog.Handler
opts *slog.HandlerOptions
state *asyncState // Shared state for batching and flushing
}
// NewAsyncHandler creates a new AsyncHandler that wraps stdout logging
// and optionally ships logs to a remote service.
func NewAsyncHandler(stdout slog.Handler, shipper Shipper, cfg ShipperConfig, opts *slog.HandlerOptions) *AsyncHandler {
batchSize := cfg.BatchSize
if batchSize <= 0 {
batchSize = DefaultBatchSize
}
flushInterval := cfg.FlushInterval
if flushInterval <= 0 {
flushInterval = DefaultFlushInterval
}
state := &asyncState{
shipper: shipper,
batch: make([]LogEntry, 0, batchSize),
batchSize: batchSize,
flushInterval: flushInterval,
flushCh: make(chan struct{}, 1),
doneCh: make(chan struct{}),
}
h := &AsyncHandler{
stdout: stdout,
opts: opts,
state: state,
}
// Start background flusher if shipping is enabled
if shipper != nil {
state.wg.Add(1)
go h.runFlusher()
}
return h
}
// Enabled reports whether the handler handles records at the given level.
func (h *AsyncHandler) Enabled(ctx context.Context, level slog.Level) bool {
return h.stdout.Enabled(ctx, level)
}
// Handle handles the Record by writing to stdout and queuing for remote shipping.
func (h *AsyncHandler) Handle(ctx context.Context, r slog.Record) error {
// Always write to stdout
if err := h.stdout.Handle(ctx, r); err != nil {
return err
}
// Skip remote shipping if no shipper configured
if h.state.shipper == nil {
return nil
}
// Build log entry
entry := LogEntry{
Time: r.Time,
Level: r.Level,
Message: r.Message,
Attrs: make(map[string]any),
}
r.Attrs(func(a slog.Attr) bool {
if a.Key == slog.SourceKey {
if src, ok := a.Value.Any().(*slog.Source); ok {
entry.Source = shortenSource(src.File, src.Line)
}
} else {
entry.Attrs[a.Key] = resolveAttrValue(a.Value)
}
return true
})
// Add to batch
h.state.batchMu.Lock()
h.state.batch = append(h.state.batch, entry)
shouldFlush := len(h.state.batch) >= h.state.batchSize
h.state.batchMu.Unlock()
if shouldFlush {
h.triggerFlush()
}
return nil
}
// WithAttrs returns a new Handler with the given attributes added.
func (h *AsyncHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &AsyncHandler{
stdout: h.stdout.WithAttrs(attrs),
opts: h.opts,
state: h.state, // Share the same state
}
}
// WithGroup returns a new Handler with the given group name.
func (h *AsyncHandler) WithGroup(name string) slog.Handler {
return &AsyncHandler{
stdout: h.stdout.WithGroup(name),
opts: h.opts,
state: h.state, // Share the same state
}
}
// triggerFlush signals the flusher goroutine to flush immediately.
func (h *AsyncHandler) triggerFlush() {
select {
case h.state.flushCh <- struct{}{}:
default: // Flush already pending
}
}
// runFlusher runs in a goroutine and periodically flushes the batch.
func (h *AsyncHandler) runFlusher() {
defer h.state.wg.Done()
ticker := time.NewTicker(h.state.flushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.flush()
case <-h.state.flushCh:
h.flush()
case <-h.state.doneCh:
h.flush() // Final flush
return
}
}
}
// flush sends the current batch to the remote service.
func (h *AsyncHandler) flush() {
h.state.batchMu.Lock()
if len(h.state.batch) == 0 {
h.state.batchMu.Unlock()
return
}
// Take ownership of the batch
batch := h.state.batch
h.state.batch = make([]LogEntry, 0, h.state.batchSize)
h.state.batchMu.Unlock()
// Ship with a timeout context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.state.shipper.Ship(ctx, batch); err != nil {
// Log to stderr (not through slog to avoid recursion)
fmt.Printf("log shipper error: %v (dropped %d entries)\n", err, len(batch))
}
}
// Shutdown flushes any remaining logs and closes the shipper.
// Call this during graceful shutdown.
func (h *AsyncHandler) Shutdown() {
if h.state.shipper == nil {
return
}
close(h.state.doneCh)
h.state.wg.Wait()
if err := h.state.shipper.Close(); err != nil {
fmt.Printf("log shipper close error: %v\n", err)
}
}
// resolveAttrValue converts slog.Value to a plain Go value for JSON encoding.
func resolveAttrValue(v slog.Value) any {
switch v.Kind() {
case slog.KindString:
return v.String()
case slog.KindInt64:
return v.Int64()
case slog.KindUint64:
return v.Uint64()
case slog.KindFloat64:
return v.Float64()
case slog.KindBool:
return v.Bool()
case slog.KindDuration:
return v.Duration().String()
case slog.KindTime:
return v.Time().Format(time.RFC3339Nano)
case slog.KindGroup:
attrs := v.Group()
m := make(map[string]any, len(attrs))
for _, a := range attrs {
m[a.Key] = resolveAttrValue(a.Value)
}
return m
case slog.KindAny:
return v.Any()
default:
return v.String()
}
}