From 50541e092148ed6934ffec838dcbfd34e440fd0a Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Tue, 28 May 2024 15:06:13 -0700 Subject: [PATCH] feat: remove unused gauge metrics and add service name option --- cmd/versitygw/main.go | 11 +++++- metrics/metrics.go | 85 ++++++++++++++----------------------------- metrics/statsd.go | 24 ++++-------- 3 files changed, 44 insertions(+), 76 deletions(-) diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index 2d9adeb..43ba24b 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -63,6 +63,7 @@ var ( iamCacheDisable bool iamCacheTTL int iamCachePrune int + metricsService string statsdServers string ) @@ -400,9 +401,16 @@ func initFlags() []cli.Flag { EnvVars: []string{"VGW_READ_ONLY"}, Destination: &readonly, }, + &cli.StringFlag{ + Name: "metrics-service-name", + Usage: "service name tag for metrics, hostname if blank", + EnvVars: []string{"VGW_METRICS_SERVICE_NAME"}, + Aliases: []string{"msn"}, + Destination: &metricsService, + }, &cli.StringFlag{ Name: "metrics-statsd-servers", - Usage: "StatsD server urls comma separated. e.g. 'statsd.example1.com:8125, statsd.example2.com:8125'", + Usage: "StatsD server urls comma separated. e.g. 'statsd1.example.com:8125,statsd2.example.com:8125'", EnvVars: []string{"VGW_METRICS_STATSD_SERVERS"}, Aliases: []string{"mss"}, Destination: &statsdServers, @@ -518,6 +526,7 @@ func runGateway(ctx context.Context, be backend.Backend) error { } metricsManager, err := metrics.NewManager(ctx, metrics.Config{ + ServiceName: metricsService, StatsdServers: statsdServers, }) if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index a0aad32..7ae66ea 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -41,12 +41,12 @@ type Manager struct { wg sync.WaitGroup ctx context.Context - publishers []publisher - addDataChan chan datapoint - gaugeDataChan chan datapoint + publishers []publisher + addDataChan chan datapoint } type Config struct { + ServiceName string StatsdServers string } @@ -55,24 +55,26 @@ func NewManager(ctx context.Context, conf Config) (*Manager, error) { if len(conf.StatsdServers) == 0 { return nil, nil } - hostname, err := os.Hostname() - if err != nil { - return nil, fmt.Errorf("failed to get hostname: %w", err) + + if conf.ServiceName == "" { + hostname, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("failed to get hostname: %w", err) + } + conf.ServiceName = hostname } addDataChan := make(chan datapoint, dataItemCount) - gaugeDataChan := make(chan datapoint, dataItemCount) mgr := &Manager{ - addDataChan: addDataChan, - gaugeDataChan: gaugeDataChan, - ctx: ctx, + addDataChan: addDataChan, + ctx: ctx, } statsdServers := strings.Split(conf.StatsdServers, ",") for _, server := range statsdServers { - statsd, err := NewStatsd(server, hostname) + statsd, err := newStatsd(server, conf.ServiceName) if err != nil { return nil, err } @@ -81,8 +83,6 @@ func NewManager(ctx context.Context, conf Config) (*Manager, error) { mgr.wg.Add(1) go mgr.addForwarder(addDataChan) - mgr.wg.Add(1) - go mgr.gaugeForwarder(gaugeDataChan) return mgr, nil } @@ -93,35 +93,35 @@ func (m *Manager) Send(err error, action string, count int64) { action = ActionUndetected } if err != nil { - m.Increment(action, "failed_count") + m.increment(action, "failed_count") } else { - m.Increment(action, "success_count") + m.increment(action, "success_count") } switch action { case ActionPutObject: - m.Add(action, "bytes_written", count) - m.Increment(action, "object_created_count") + m.add(action, "bytes_written", count) + m.increment(action, "object_created_count") case ActionCompleteMultipartUpload: - m.Increment(action, "object_created_count") + m.increment(action, "object_created_count") case ActionUploadPart: - m.Add(action, "bytes_written", count) + m.add(action, "bytes_written", count) case ActionGetObject: - m.Add(action, "bytes_read", count) + m.add(action, "bytes_read", count) case ActionDeleteObject: - m.Increment(action, "object_removed_count") + m.increment(action, "object_removed_count") case ActionDeleteObjects: - m.Add(action, "object_removed_count", count) + m.add(action, "object_removed_count", count) } } -// Increment increments the key by one -func (m *Manager) Increment(module, key string, tags ...Tag) { - m.Add(module, key, 1, tags...) +// increment increments the key by one +func (m *Manager) increment(module, key string, tags ...Tag) { + m.add(module, key, 1, tags...) } -// Add adds value to key -func (m *Manager) Add(module, key string, value int64, tags ...Tag) { +// add adds value to key +func (m *Manager) add(module, key string, value int64, tags ...Tag) { if m.ctx.Err() != nil { return } @@ -140,31 +140,10 @@ func (m *Manager) Add(module, key string, value int64, tags ...Tag) { } } -// Gauge sets key to value -func (m *Manager) Gauge(module, key string, value int64, tags ...Tag) { - if m.ctx.Err() != nil { - return - } - - d := datapoint{ - module: module, - key: key, - value: value, - tags: tags, - } - - select { - case m.gaugeDataChan <- d: - default: - // channel full, drop the updates - } -} - // Close closes metrics channels, waits for data to complete, closes all plugins func (m *Manager) Close() { // drain the datapoint channels close(m.addDataChan) - close(m.gaugeDataChan) m.wg.Wait() // close all publishers @@ -176,7 +155,6 @@ func (m *Manager) Close() { // publisher is the interface for interacting with the metrics plugins type publisher interface { Add(module, key string, value int64, tags ...Tag) - Gauge(module, key string, value int64, tags ...Tag) Close() } @@ -189,15 +167,6 @@ func (m *Manager) addForwarder(addChan <-chan datapoint) { m.wg.Done() } -func (m *Manager) gaugeForwarder(gaugeChan <-chan datapoint) { - for data := range gaugeChan { - for _, s := range m.publishers { - s.Gauge(data.module, data.key, data.value, data.tags...) - } - } - m.wg.Done() -} - type datapoint struct { module string key string diff --git a/metrics/statsd.go b/metrics/statsd.go index ee701c2..adcc333 100644 --- a/metrics/statsd.go +++ b/metrics/statsd.go @@ -20,44 +20,34 @@ import ( "github.com/smira/go-statsd" ) -// Statsd metrics type -type Statsd struct { +// vgwStatsd metrics type +type vgwStatsd struct { c *statsd.Client } -// NewStatsd takes a server address and returns a statsd merics +// newStatsd takes a server address and returns a statsd merics // Supply service name to be used as a tag to identify the spcific // gateway instance, this may typically be the gateway hostname -func NewStatsd(server string, service string) (*Statsd, error) { +func newStatsd(server string, service string) (*vgwStatsd, error) { c := statsd.NewClient( server, - statsd.MaxPacketSize(1400), statsd.MetricPrefix("versitygw."), statsd.TagStyle(statsd.TagFormatInfluxDB), statsd.DefaultTags(statsd.StringTag("service", service)), ) - return &Statsd{c: c}, nil + return &vgwStatsd{c: c}, nil } // Close closes statsd connections -func (s *Statsd) Close() { +func (s *vgwStatsd) Close() { s.c.Close() } // Add adds value to key -func (s *Statsd) Add(module, key string, value int64, tags ...Tag) { +func (s *vgwStatsd) Add(module, key string, value int64, tags ...Tag) { stags := make([]statsd.Tag, len(tags)) for i, t := range tags { stags[i] = statsd.StringTag(t.Key, t.Value) } s.c.Incr(fmt.Sprintf("%v.%v", module, key), value, stags...) } - -// Gauge sets key to value -func (s *Statsd) Gauge(module, key string, value int64, tags ...Tag) { - stags := make([]statsd.Tag, len(tags)) - for i, t := range tags { - stags[i] = statsd.StringTag(t.Key, t.Value) - } - s.c.Gauge(fmt.Sprintf("%v.%v", module, key), int64(value), stags...) -}