feat: remove unused gauge metrics and add service name option

This commit is contained in:
Ben McClelland
2024-05-28 15:06:13 -07:00
parent 983da28a7e
commit 50541e0921
3 changed files with 44 additions and 76 deletions

View File

@@ -63,6 +63,7 @@ var (
iamCacheDisable bool iamCacheDisable bool
iamCacheTTL int iamCacheTTL int
iamCachePrune int iamCachePrune int
metricsService string
statsdServers string statsdServers string
) )
@@ -400,9 +401,16 @@ func initFlags() []cli.Flag {
EnvVars: []string{"VGW_READ_ONLY"}, EnvVars: []string{"VGW_READ_ONLY"},
Destination: &readonly, Destination: &readonly,
}, },
&cli.StringFlag{
Name: "metrics-service-name",
Usage: "service name tag for metrics, hostname if blank",
EnvVars: []string{"VGW_METRICS_SERVICE_NAME"},
Aliases: []string{"msn"},
Destination: &metricsService,
},
&cli.StringFlag{ &cli.StringFlag{
Name: "metrics-statsd-servers", Name: "metrics-statsd-servers",
Usage: "StatsD server urls comma separated. e.g. 'statsd.example1.com:8125, statsd.example2.com:8125'", Usage: "StatsD server urls comma separated. e.g. 'statsd1.example.com:8125,statsd2.example.com:8125'",
EnvVars: []string{"VGW_METRICS_STATSD_SERVERS"}, EnvVars: []string{"VGW_METRICS_STATSD_SERVERS"},
Aliases: []string{"mss"}, Aliases: []string{"mss"},
Destination: &statsdServers, Destination: &statsdServers,
@@ -518,6 +526,7 @@ func runGateway(ctx context.Context, be backend.Backend) error {
} }
metricsManager, err := metrics.NewManager(ctx, metrics.Config{ metricsManager, err := metrics.NewManager(ctx, metrics.Config{
ServiceName: metricsService,
StatsdServers: statsdServers, StatsdServers: statsdServers,
}) })
if err != nil { if err != nil {

View File

@@ -41,12 +41,12 @@ type Manager struct {
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
publishers []publisher publishers []publisher
addDataChan chan datapoint addDataChan chan datapoint
gaugeDataChan chan datapoint
} }
type Config struct { type Config struct {
ServiceName string
StatsdServers string StatsdServers string
} }
@@ -55,24 +55,26 @@ func NewManager(ctx context.Context, conf Config) (*Manager, error) {
if len(conf.StatsdServers) == 0 { if len(conf.StatsdServers) == 0 {
return nil, nil return nil, nil
} }
hostname, err := os.Hostname()
if err != nil { if conf.ServiceName == "" {
return nil, fmt.Errorf("failed to get hostname: %w", err) hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("failed to get hostname: %w", err)
}
conf.ServiceName = hostname
} }
addDataChan := make(chan datapoint, dataItemCount) addDataChan := make(chan datapoint, dataItemCount)
gaugeDataChan := make(chan datapoint, dataItemCount)
mgr := &Manager{ mgr := &Manager{
addDataChan: addDataChan, addDataChan: addDataChan,
gaugeDataChan: gaugeDataChan, ctx: ctx,
ctx: ctx,
} }
statsdServers := strings.Split(conf.StatsdServers, ",") statsdServers := strings.Split(conf.StatsdServers, ",")
for _, server := range statsdServers { for _, server := range statsdServers {
statsd, err := NewStatsd(server, hostname) statsd, err := newStatsd(server, conf.ServiceName)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -81,8 +83,6 @@ func NewManager(ctx context.Context, conf Config) (*Manager, error) {
mgr.wg.Add(1) mgr.wg.Add(1)
go mgr.addForwarder(addDataChan) go mgr.addForwarder(addDataChan)
mgr.wg.Add(1)
go mgr.gaugeForwarder(gaugeDataChan)
return mgr, nil return mgr, nil
} }
@@ -93,35 +93,35 @@ func (m *Manager) Send(err error, action string, count int64) {
action = ActionUndetected action = ActionUndetected
} }
if err != nil { if err != nil {
m.Increment(action, "failed_count") m.increment(action, "failed_count")
} else { } else {
m.Increment(action, "success_count") m.increment(action, "success_count")
} }
switch action { switch action {
case ActionPutObject: case ActionPutObject:
m.Add(action, "bytes_written", count) m.add(action, "bytes_written", count)
m.Increment(action, "object_created_count") m.increment(action, "object_created_count")
case ActionCompleteMultipartUpload: case ActionCompleteMultipartUpload:
m.Increment(action, "object_created_count") m.increment(action, "object_created_count")
case ActionUploadPart: case ActionUploadPart:
m.Add(action, "bytes_written", count) m.add(action, "bytes_written", count)
case ActionGetObject: case ActionGetObject:
m.Add(action, "bytes_read", count) m.add(action, "bytes_read", count)
case ActionDeleteObject: case ActionDeleteObject:
m.Increment(action, "object_removed_count") m.increment(action, "object_removed_count")
case ActionDeleteObjects: case ActionDeleteObjects:
m.Add(action, "object_removed_count", count) m.add(action, "object_removed_count", count)
} }
} }
// Increment increments the key by one // increment increments the key by one
func (m *Manager) Increment(module, key string, tags ...Tag) { func (m *Manager) increment(module, key string, tags ...Tag) {
m.Add(module, key, 1, tags...) m.add(module, key, 1, tags...)
} }
// Add adds value to key // add adds value to key
func (m *Manager) Add(module, key string, value int64, tags ...Tag) { func (m *Manager) add(module, key string, value int64, tags ...Tag) {
if m.ctx.Err() != nil { if m.ctx.Err() != nil {
return return
} }
@@ -140,31 +140,10 @@ func (m *Manager) Add(module, key string, value int64, tags ...Tag) {
} }
} }
// Gauge sets key to value
func (m *Manager) Gauge(module, key string, value int64, tags ...Tag) {
if m.ctx.Err() != nil {
return
}
d := datapoint{
module: module,
key: key,
value: value,
tags: tags,
}
select {
case m.gaugeDataChan <- d:
default:
// channel full, drop the updates
}
}
// Close closes metrics channels, waits for data to complete, closes all plugins // Close closes metrics channels, waits for data to complete, closes all plugins
func (m *Manager) Close() { func (m *Manager) Close() {
// drain the datapoint channels // drain the datapoint channels
close(m.addDataChan) close(m.addDataChan)
close(m.gaugeDataChan)
m.wg.Wait() m.wg.Wait()
// close all publishers // close all publishers
@@ -176,7 +155,6 @@ func (m *Manager) Close() {
// publisher is the interface for interacting with the metrics plugins // publisher is the interface for interacting with the metrics plugins
type publisher interface { type publisher interface {
Add(module, key string, value int64, tags ...Tag) Add(module, key string, value int64, tags ...Tag)
Gauge(module, key string, value int64, tags ...Tag)
Close() Close()
} }
@@ -189,15 +167,6 @@ func (m *Manager) addForwarder(addChan <-chan datapoint) {
m.wg.Done() m.wg.Done()
} }
func (m *Manager) gaugeForwarder(gaugeChan <-chan datapoint) {
for data := range gaugeChan {
for _, s := range m.publishers {
s.Gauge(data.module, data.key, data.value, data.tags...)
}
}
m.wg.Done()
}
type datapoint struct { type datapoint struct {
module string module string
key string key string

View File

@@ -20,44 +20,34 @@ import (
"github.com/smira/go-statsd" "github.com/smira/go-statsd"
) )
// Statsd metrics type // vgwStatsd metrics type
type Statsd struct { type vgwStatsd struct {
c *statsd.Client c *statsd.Client
} }
// NewStatsd takes a server address and returns a statsd merics // newStatsd takes a server address and returns a statsd merics
// Supply service name to be used as a tag to identify the spcific // Supply service name to be used as a tag to identify the spcific
// gateway instance, this may typically be the gateway hostname // gateway instance, this may typically be the gateway hostname
func NewStatsd(server string, service string) (*Statsd, error) { func newStatsd(server string, service string) (*vgwStatsd, error) {
c := statsd.NewClient( c := statsd.NewClient(
server, server,
statsd.MaxPacketSize(1400),
statsd.MetricPrefix("versitygw."), statsd.MetricPrefix("versitygw."),
statsd.TagStyle(statsd.TagFormatInfluxDB), statsd.TagStyle(statsd.TagFormatInfluxDB),
statsd.DefaultTags(statsd.StringTag("service", service)), statsd.DefaultTags(statsd.StringTag("service", service)),
) )
return &Statsd{c: c}, nil return &vgwStatsd{c: c}, nil
} }
// Close closes statsd connections // Close closes statsd connections
func (s *Statsd) Close() { func (s *vgwStatsd) Close() {
s.c.Close() s.c.Close()
} }
// Add adds value to key // Add adds value to key
func (s *Statsd) Add(module, key string, value int64, tags ...Tag) { func (s *vgwStatsd) Add(module, key string, value int64, tags ...Tag) {
stags := make([]statsd.Tag, len(tags)) stags := make([]statsd.Tag, len(tags))
for i, t := range tags { for i, t := range tags {
stags[i] = statsd.StringTag(t.Key, t.Value) stags[i] = statsd.StringTag(t.Key, t.Value)
} }
s.c.Incr(fmt.Sprintf("%v.%v", module, key), value, stags...) s.c.Incr(fmt.Sprintf("%v.%v", module, key), value, stags...)
} }
// Gauge sets key to value
func (s *Statsd) Gauge(module, key string, value int64, tags ...Tag) {
stags := make([]statsd.Tag, len(tags))
for i, t := range tags {
stags[i] = statsd.StringTag(t.Key, t.Value)
}
s.c.Gauge(fmt.Sprintf("%v.%v", module, key), int64(value), stags...)
}