309 lines
7.5 KiB
Go
309 lines
7.5 KiB
Go
// Package logging provides centralized structured logging with optional remote log shipping.
|
|
package logging
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Default configuration values
|
|
const (
|
|
DefaultBatchSize = 100
|
|
DefaultFlushInterval = 5 * time.Second
|
|
)
|
|
|
|
// Shipper defines the interface for log shipping backends.
|
|
// Implementations should be safe for concurrent use.
|
|
type Shipper interface {
|
|
// Ship sends a batch of log entries to the remote service.
|
|
// Returns an error if the batch could not be shipped.
|
|
Ship(ctx context.Context, entries []LogEntry) error
|
|
|
|
// Close cleanly shuts down the shipper, releasing any resources.
|
|
Close() error
|
|
}
|
|
|
|
// LogEntry represents a single log entry to be shipped.
|
|
type LogEntry struct {
|
|
Time time.Time
|
|
Level slog.Level
|
|
Message string
|
|
Source string
|
|
Attrs map[string]any
|
|
}
|
|
|
|
// ShipperConfig configures the log shipper.
|
|
type ShipperConfig struct {
|
|
// Backend selects the shipping backend: "victoria", "opensearch", "loki", etc.
|
|
// Empty string disables remote shipping (stdout only).
|
|
Backend string
|
|
|
|
// URL is the remote service endpoint URL.
|
|
URL string
|
|
|
|
// BatchSize is the number of logs to batch before flushing.
|
|
// Default: 100
|
|
BatchSize int
|
|
|
|
// FlushInterval is the maximum time between flushes.
|
|
// Default: 5s
|
|
FlushInterval time.Duration
|
|
|
|
// Service identifies the source service ("appview" or "hold").
|
|
// Added to all log entries.
|
|
Service string
|
|
|
|
// Username for basic auth (optional).
|
|
Username string
|
|
|
|
// Password for basic auth (optional).
|
|
Password string
|
|
}
|
|
|
|
// NewShipper creates a shipper for the configured backend.
|
|
// Returns nil if no backend is configured (remote shipping disabled).
|
|
func NewShipper(cfg ShipperConfig) (Shipper, error) {
|
|
switch cfg.Backend {
|
|
case "victoria":
|
|
return NewVictoriaShipper(cfg)
|
|
case "opensearch":
|
|
return nil, fmt.Errorf("opensearch backend not yet implemented")
|
|
case "loki":
|
|
return nil, fmt.Errorf("loki backend not yet implemented")
|
|
case "":
|
|
return nil, nil // No remote shipping
|
|
default:
|
|
return nil, fmt.Errorf("unknown log shipper backend: %s", cfg.Backend)
|
|
}
|
|
}
|
|
|
|
// asyncState holds the shared state for async log shipping.
|
|
// This is separate from AsyncHandler to allow WithAttrs/WithGroup
|
|
// to create new handlers that share the same batch and flush state.
|
|
type asyncState struct {
|
|
shipper Shipper
|
|
|
|
// Batching
|
|
batch []LogEntry
|
|
batchMu sync.Mutex
|
|
batchSize int
|
|
|
|
// Async flush
|
|
flushInterval time.Duration
|
|
flushCh chan struct{}
|
|
doneCh chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// AsyncHandler is an slog.Handler that writes to stdout and optionally
|
|
// ships logs to a remote service asynchronously.
|
|
type AsyncHandler struct {
|
|
stdout slog.Handler
|
|
opts *slog.HandlerOptions
|
|
state *asyncState // Shared state for batching and flushing
|
|
}
|
|
|
|
// NewAsyncHandler creates a new AsyncHandler that wraps stdout logging
|
|
// and optionally ships logs to a remote service.
|
|
func NewAsyncHandler(stdout slog.Handler, shipper Shipper, cfg ShipperConfig, opts *slog.HandlerOptions) *AsyncHandler {
|
|
batchSize := cfg.BatchSize
|
|
if batchSize <= 0 {
|
|
batchSize = DefaultBatchSize
|
|
}
|
|
|
|
flushInterval := cfg.FlushInterval
|
|
if flushInterval <= 0 {
|
|
flushInterval = DefaultFlushInterval
|
|
}
|
|
|
|
state := &asyncState{
|
|
shipper: shipper,
|
|
batch: make([]LogEntry, 0, batchSize),
|
|
batchSize: batchSize,
|
|
flushInterval: flushInterval,
|
|
flushCh: make(chan struct{}, 1),
|
|
doneCh: make(chan struct{}),
|
|
}
|
|
|
|
h := &AsyncHandler{
|
|
stdout: stdout,
|
|
opts: opts,
|
|
state: state,
|
|
}
|
|
|
|
// Start background flusher if shipping is enabled
|
|
if shipper != nil {
|
|
state.wg.Add(1)
|
|
go h.runFlusher()
|
|
}
|
|
|
|
return h
|
|
}
|
|
|
|
// Enabled reports whether the handler handles records at the given level.
|
|
func (h *AsyncHandler) Enabled(ctx context.Context, level slog.Level) bool {
|
|
return h.stdout.Enabled(ctx, level)
|
|
}
|
|
|
|
// Handle handles the Record by writing to stdout and queuing for remote shipping.
|
|
func (h *AsyncHandler) Handle(ctx context.Context, r slog.Record) error {
|
|
// Always write to stdout
|
|
if err := h.stdout.Handle(ctx, r); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip remote shipping if no shipper configured
|
|
if h.state.shipper == nil {
|
|
return nil
|
|
}
|
|
|
|
// Build log entry
|
|
entry := LogEntry{
|
|
Time: r.Time,
|
|
Level: r.Level,
|
|
Message: r.Message,
|
|
Attrs: make(map[string]any),
|
|
}
|
|
|
|
r.Attrs(func(a slog.Attr) bool {
|
|
if a.Key == slog.SourceKey {
|
|
if src, ok := a.Value.Any().(*slog.Source); ok {
|
|
entry.Source = shortenSource(src.File, src.Line)
|
|
}
|
|
} else {
|
|
entry.Attrs[a.Key] = resolveAttrValue(a.Value)
|
|
}
|
|
return true
|
|
})
|
|
|
|
// Add to batch
|
|
h.state.batchMu.Lock()
|
|
h.state.batch = append(h.state.batch, entry)
|
|
shouldFlush := len(h.state.batch) >= h.state.batchSize
|
|
h.state.batchMu.Unlock()
|
|
|
|
if shouldFlush {
|
|
h.triggerFlush()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WithAttrs returns a new Handler with the given attributes added.
|
|
func (h *AsyncHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
|
return &AsyncHandler{
|
|
stdout: h.stdout.WithAttrs(attrs),
|
|
opts: h.opts,
|
|
state: h.state, // Share the same state
|
|
}
|
|
}
|
|
|
|
// WithGroup returns a new Handler with the given group name.
|
|
func (h *AsyncHandler) WithGroup(name string) slog.Handler {
|
|
return &AsyncHandler{
|
|
stdout: h.stdout.WithGroup(name),
|
|
opts: h.opts,
|
|
state: h.state, // Share the same state
|
|
}
|
|
}
|
|
|
|
// triggerFlush signals the flusher goroutine to flush immediately.
|
|
func (h *AsyncHandler) triggerFlush() {
|
|
select {
|
|
case h.state.flushCh <- struct{}{}:
|
|
default: // Flush already pending
|
|
}
|
|
}
|
|
|
|
// runFlusher runs in a goroutine and periodically flushes the batch.
|
|
func (h *AsyncHandler) runFlusher() {
|
|
defer h.state.wg.Done()
|
|
|
|
ticker := time.NewTicker(h.state.flushInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
h.flush()
|
|
case <-h.state.flushCh:
|
|
h.flush()
|
|
case <-h.state.doneCh:
|
|
h.flush() // Final flush
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flush sends the current batch to the remote service.
|
|
func (h *AsyncHandler) flush() {
|
|
h.state.batchMu.Lock()
|
|
if len(h.state.batch) == 0 {
|
|
h.state.batchMu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Take ownership of the batch
|
|
batch := h.state.batch
|
|
h.state.batch = make([]LogEntry, 0, h.state.batchSize)
|
|
h.state.batchMu.Unlock()
|
|
|
|
// Ship with a timeout context
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := h.state.shipper.Ship(ctx, batch); err != nil {
|
|
// Log to stderr (not through slog to avoid recursion)
|
|
fmt.Printf("log shipper error: %v (dropped %d entries)\n", err, len(batch))
|
|
}
|
|
}
|
|
|
|
// Shutdown flushes any remaining logs and closes the shipper.
|
|
// Call this during graceful shutdown.
|
|
func (h *AsyncHandler) Shutdown() {
|
|
if h.state.shipper == nil {
|
|
return
|
|
}
|
|
|
|
close(h.state.doneCh)
|
|
h.state.wg.Wait()
|
|
|
|
if err := h.state.shipper.Close(); err != nil {
|
|
fmt.Printf("log shipper close error: %v\n", err)
|
|
}
|
|
}
|
|
|
|
// resolveAttrValue converts slog.Value to a plain Go value for JSON encoding.
|
|
func resolveAttrValue(v slog.Value) any {
|
|
switch v.Kind() {
|
|
case slog.KindString:
|
|
return v.String()
|
|
case slog.KindInt64:
|
|
return v.Int64()
|
|
case slog.KindUint64:
|
|
return v.Uint64()
|
|
case slog.KindFloat64:
|
|
return v.Float64()
|
|
case slog.KindBool:
|
|
return v.Bool()
|
|
case slog.KindDuration:
|
|
return v.Duration().String()
|
|
case slog.KindTime:
|
|
return v.Time().Format(time.RFC3339Nano)
|
|
case slog.KindGroup:
|
|
attrs := v.Group()
|
|
m := make(map[string]any, len(attrs))
|
|
for _, a := range attrs {
|
|
m[a.Key] = resolveAttrValue(a.Value)
|
|
}
|
|
return m
|
|
case slog.KindAny:
|
|
return v.Any()
|
|
default:
|
|
return v.String()
|
|
}
|
|
}
|