mirror of
https://github.com/versity/versitygw.git
synced 2025-12-23 05:05:16 +00:00
feat: add metrics module for forwarding gateway metrics
This creates a metrics service for the rest of the gateway that can be used to send metrics stats to any number of metrics plugins. To start, a statsd plugin is implemented for generic statsd capability.
This commit is contained in:
1
go.mod
1
go.mod
@@ -16,6 +16,7 @@ require (
|
||||
github.com/nats-io/nats.go v1.35.0
|
||||
github.com/pkg/xattr v0.4.9
|
||||
github.com/segmentio/kafka-go v0.4.47
|
||||
github.com/smira/go-statsd v1.3.3
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/valyala/fasthttp v1.54.0
|
||||
github.com/versity/scoutfs-go v0.0.0-20240325223134-38eb2f5f7d44
|
||||
|
||||
2
go.sum
2
go.sum
@@ -127,6 +127,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
|
||||
github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
|
||||
github.com/smira/go-statsd v1.3.3 h1:WnMlmGTyMpzto+HvOJWRPoLaLlk5EGfzsnlQBcvj4yI=
|
||||
github.com/smira/go-statsd v1.3.3/go.mod h1:RjdsESPgDODtg1VpVVf9MJrEW2Hw0wtRNbmB1CAhu6A=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
|
||||
172
metrics/metrics.go
Normal file
172
metrics/metrics.go
Normal file
@@ -0,0 +1,172 @@
|
||||
// Copyright 2024 Versity Software
|
||||
// This file is licensed under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
// max size of data items to buffer before dropping
|
||||
// new incoming data items
|
||||
dataItemCount = 100000
|
||||
)
|
||||
|
||||
// Tag is added metadata for metrics
|
||||
type Tag struct {
|
||||
// Key is tag name
|
||||
Key string
|
||||
// Value is tag data
|
||||
Value string
|
||||
}
|
||||
|
||||
// Manager is a manager of metrics plugins
|
||||
type Manager struct {
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
|
||||
publishers []publisher
|
||||
addDataChan chan datapoint
|
||||
gaugeDataChan chan datapoint
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
StatsdServers []string
|
||||
}
|
||||
|
||||
// NewManager initializes metrics plugins and returns a new metrics manager
|
||||
func NewManager(ctx context.Context, conf Config) (*Manager, error) {
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get hostname: %w", err)
|
||||
}
|
||||
|
||||
addDataChan := make(chan datapoint, dataItemCount)
|
||||
gaugeDataChan := make(chan datapoint, dataItemCount)
|
||||
|
||||
mgr := &Manager{
|
||||
addDataChan: addDataChan,
|
||||
gaugeDataChan: gaugeDataChan,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
for _, server := range conf.StatsdServers {
|
||||
statsd, err := NewStatsd(server, hostname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mgr.publishers = append(mgr.publishers, statsd)
|
||||
}
|
||||
|
||||
mgr.wg.Add(1)
|
||||
go mgr.addForwarder(addDataChan)
|
||||
mgr.wg.Add(1)
|
||||
go mgr.gaugeForwarder(gaugeDataChan)
|
||||
|
||||
return mgr, nil
|
||||
}
|
||||
|
||||
// Increment increments the key by one
|
||||
func (m *Manager) Increment(module, key string, tags ...Tag) {
|
||||
m.Add(module, key, 1, tags...)
|
||||
}
|
||||
|
||||
// Add adds value to key
|
||||
func (m *Manager) Add(module, key string, value int, tags ...Tag) {
|
||||
if m.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
d := datapoint{
|
||||
module: module,
|
||||
key: key,
|
||||
value: value,
|
||||
tags: tags,
|
||||
}
|
||||
|
||||
select {
|
||||
case m.addDataChan <- d:
|
||||
default:
|
||||
// channel full, drop the updates
|
||||
}
|
||||
}
|
||||
|
||||
// Gauge sets key to value
|
||||
func (m *Manager) Gauge(module, key string, value int, tags ...Tag) {
|
||||
if m.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
d := datapoint{
|
||||
module: module,
|
||||
key: key,
|
||||
value: value,
|
||||
tags: tags,
|
||||
}
|
||||
|
||||
select {
|
||||
case m.gaugeDataChan <- d:
|
||||
default:
|
||||
// channel full, drop the updates
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes metrics channels, waits for data to cmplete, closes all plugins
|
||||
func (m *Manager) Close() {
|
||||
// drain the datapoint channels
|
||||
close(m.addDataChan)
|
||||
close(m.gaugeDataChan)
|
||||
m.wg.Wait()
|
||||
|
||||
// close all publishers
|
||||
for _, p := range m.publishers {
|
||||
p.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// publisher is the interface for interacting with the metrics plugins
|
||||
type publisher interface {
|
||||
Add(module, key string, value int, tags ...Tag)
|
||||
Gauge(module, key string, value int, tags ...Tag)
|
||||
Close()
|
||||
}
|
||||
|
||||
func (m *Manager) addForwarder(addChan <-chan datapoint) {
|
||||
for data := range addChan {
|
||||
for _, s := range m.publishers {
|
||||
s.Add(data.module, data.key, data.value, data.tags...)
|
||||
}
|
||||
}
|
||||
m.wg.Done()
|
||||
}
|
||||
|
||||
func (m *Manager) gaugeForwarder(gaugeChan <-chan datapoint) {
|
||||
for data := range gaugeChan {
|
||||
for _, s := range m.publishers {
|
||||
s.Gauge(data.module, data.key, data.value, data.tags...)
|
||||
}
|
||||
}
|
||||
m.wg.Done()
|
||||
}
|
||||
|
||||
type datapoint struct {
|
||||
module string
|
||||
key string
|
||||
value int
|
||||
tags []Tag
|
||||
}
|
||||
63
metrics/statsd.go
Normal file
63
metrics/statsd.go
Normal file
@@ -0,0 +1,63 @@
|
||||
// Copyright 2024 Versity Software
|
||||
// This file is licensed under the Apache License, Version 2.0
|
||||
// (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/smira/go-statsd"
|
||||
)
|
||||
|
||||
// Statsd metrics type
|
||||
type Statsd struct {
|
||||
c *statsd.Client
|
||||
}
|
||||
|
||||
// NewStatsd takes a server address and returns a statsd merics
|
||||
// Supply service name to be used as a tag to identify the spcific
|
||||
// gateway instance, this may typically be the gateway hostname
|
||||
func NewStatsd(server string, service string) (*Statsd, error) {
|
||||
c := statsd.NewClient(
|
||||
server,
|
||||
statsd.MaxPacketSize(1400),
|
||||
statsd.MetricPrefix("versitygw."),
|
||||
statsd.TagStyle(statsd.TagFormatInfluxDB),
|
||||
statsd.DefaultTags(statsd.StringTag("service", service)),
|
||||
)
|
||||
return &Statsd{c: c}, nil
|
||||
}
|
||||
|
||||
// Close closes statsd connections
|
||||
func (s *Statsd) Close() {
|
||||
s.c.Close()
|
||||
}
|
||||
|
||||
// Add adds value to key
|
||||
func (s *Statsd) Add(module, key string, value int, tags ...Tag) {
|
||||
stags := make([]statsd.Tag, len(tags))
|
||||
for i, t := range tags {
|
||||
stags[i] = statsd.StringTag(t.Key, t.Value)
|
||||
}
|
||||
s.c.Incr(fmt.Sprintf("%v.%v", module, key), int64(value), stags...)
|
||||
}
|
||||
|
||||
// Gauge sets key to value
|
||||
func (s *Statsd) Gauge(module, key string, value int, tags ...Tag) {
|
||||
stags := make([]statsd.Tag, len(tags))
|
||||
for i, t := range tags {
|
||||
stags[i] = statsd.StringTag(t.Key, t.Value)
|
||||
}
|
||||
s.c.Gauge(fmt.Sprintf("%v.%v", module, key), int64(value), stags...)
|
||||
}
|
||||
Reference in New Issue
Block a user