diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index f5b84ef..8741391 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -45,6 +45,8 @@ var ( certFile, keyFile string kafkaURL, kafkaTopic, kafkaKey string natsURL, natsTopic string + rabbitmqURL, rabbitmqExchange string + rabbitmqRoutingKey string eventWebhookURL string eventConfigFilePath string logWebhookURL, accessLog string @@ -288,6 +290,27 @@ func initFlags() []cli.Flag { Destination: &natsTopic, Aliases: []string{"ent"}, }, + &cli.StringFlag{ + Name: "event-rabbitmq-url", + Usage: "rabbitmq server url to send the bucket notifications (amqp or amqps scheme)", + EnvVars: []string{"VGW_EVENT_RABBITMQ_URL"}, + Destination: &rabbitmqURL, + Aliases: []string{"eru"}, + }, + &cli.StringFlag{ + Name: "event-rabbitmq-exchange", + Usage: "rabbitmq exchange to publish bucket notifications to (blank for default)", + EnvVars: []string{"VGW_EVENT_RABBITMQ_EXCHANGE"}, + Destination: &rabbitmqExchange, + Aliases: []string{"ere"}, + }, + &cli.StringFlag{ + Name: "event-rabbitmq-routing-key", + Usage: "rabbitmq routing key when publishing bucket notifications (defaults to bucket name when blank)", + EnvVars: []string{"VGW_EVENT_RABBITMQ_ROUTING_KEY"}, + Destination: &rabbitmqRoutingKey, + Aliases: []string{"errk"}, + }, &cli.StringFlag{ Name: "event-webhook-url", Usage: "webhook url to send bucket notifications", @@ -693,6 +716,9 @@ func runGateway(ctx context.Context, be backend.Backend) error { KafkaTopicKey: kafkaKey, NatsURL: natsURL, NatsTopic: natsTopic, + RabbitmqURL: rabbitmqURL, + RabbitmqExchange: rabbitmqExchange, + RabbitmqRoutingKey: rabbitmqRoutingKey, WebhookURL: eventWebhookURL, FilterConfigFilePath: eventConfigFilePath, }) diff --git a/extra/example.conf b/extra/example.conf index 2344bad..74489f5 100644 --- a/extra/example.conf +++ b/extra/example.conf @@ -169,6 +169,19 @@ ROOT_SECRET_ACCESS_KEY= #VGW_EVENT_NATS_URL= #VGW_EVENT_NATS_TOPIC= +# Bucket events can be sent to a RabbitMQ messaging service. When +# VGW_EVENT_RABBITMQ_URL is specified, events will be published to the specified +# exchange (VGW_EVENT_RABBITMQ_EXCHANGE) using the routing key +# (VGW_EVENT_RABBITMQ_ROUTING_KEY). If exchange is blank the default exchange is +# used. If routing key is blank, it will be left empty (the server can bind a +# queue with an empty binding key or you can set an explicit key). +# Example URL formats: +# amqp://user:pass@rabbitmq:5672/ +# amqps://user:pass@rabbitmq:5671/vhost +#VGW_EVENT_RABBITMQ_URL= +#VGW_EVENT_RABBITMQ_EXCHANGE= +#VGW_EVENT_RABBITMQ_ROUTING_KEY= + # Bucket events can be sent to a webhook. When VGW_EVENT_WEBHOOK_URL is # specified, all configured bucket events will be sent to the webhook. #VGW_EVENT_WEBHOOK_URL= diff --git a/go.mod b/go.mod index 230eac5..cab7be5 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/nats-io/nats.go v1.44.0 github.com/oklog/ulid/v2 v2.1.1 github.com/pkg/xattr v0.4.12 + github.com/rabbitmq/amqp091-go v1.10.0 github.com/segmentio/kafka-go v0.4.48 github.com/smira/go-statsd v1.3.4 github.com/stretchr/testify v1.10.0 diff --git a/go.sum b/go.sum index 8e2cd3a..22da766 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,8 @@ github.com/pkg/xattr v0.4.12 h1:rRTkSyFNTRElv6pkA3zpjHpQ90p/OdHQC1GmGh1aTjM= github.com/pkg/xattr v0.4.12/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= @@ -192,6 +194,8 @@ github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/s3event/event.go b/s3event/event.go index 37bac42..2e6f53a 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -72,9 +72,10 @@ type ConfigurationId string // This field will be changed after implementing per bucket notifications const ( - ConfigurationIdKafka ConfigurationId = "kafka-global" - ConfigurationIdNats ConfigurationId = "nats-global" - ConfigurationIdWebhook ConfigurationId = "webhook-global" + ConfigurationIdKafka ConfigurationId = "kafka-global" + ConfigurationIdNats ConfigurationId = "nats-global" + ConfigurationIdWebhook ConfigurationId = "webhook-global" + ConfigurationIdRabbitMQ ConfigurationId = "rabbitmq-global" ) type EventS3Data struct { @@ -113,6 +114,9 @@ type EventConfig struct { KafkaTopicKey string NatsURL string NatsTopic string + RabbitmqURL string + RabbitmqExchange string + RabbitmqRoutingKey string WebhookURL string FilterConfigFilePath string } @@ -133,6 +137,9 @@ func InitEventSender(cfg *EventConfig) (S3EventSender, error) { case cfg.NatsURL != "": evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic, filter) fmt.Printf("initializing S3 Event Notifications with Nats. URL: %v, topic: %v\n", cfg.NatsURL, cfg.NatsTopic) + case cfg.RabbitmqURL != "": + evSender, err = InitRabbitmqEventService(cfg.RabbitmqURL, cfg.RabbitmqExchange, cfg.RabbitmqRoutingKey, filter) + fmt.Printf("initializing S3 Event Notifications with RabbitMQ. URL: %v, exchange: %v, routing key: %v\n", cfg.RabbitmqURL, cfg.RabbitmqExchange, cfg.RabbitmqRoutingKey) default: return nil, nil } diff --git a/s3event/rabbitmq.go b/s3event/rabbitmq.go new file mode 100644 index 0000000..fb35f11 --- /dev/null +++ b/s3event/rabbitmq.go @@ -0,0 +1,150 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import ( + "encoding/json" + "encoding/xml" + "fmt" + "os" + "sync" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/versity/versitygw/s3response" +) + +// RabbitmqEventSender sends S3 events to a RabbitMQ exchange/queue. +// It mirrors the behavior of the Kafka and NATS implementations: send a +// test event on initialization to validate configuration, filter events, +// and handle multi-delete payloads. +type RabbitmqEventSender struct { + url string + exchange string + routingKey string + conn *amqp.Connection + channel *amqp.Channel + mu sync.Mutex + filter EventFilter +} + +// InitRabbitmqEventService creates a RabbitMQ sender. If exchange is blank the +// default (empty string) exchange is used. If routingKey is blank we publish +// with an empty routing key; for delete object multi-events we override the +// routing key with the bucket name (object key not suitable as key routinely has '/') +func InitRabbitmqEventService(url, exchange, routingKey string, filter EventFilter) (S3EventSender, error) { + if url == "" { + return nil, fmt.Errorf("rabbitmq url should be specified") + } + + conn, err := amqp.Dial(url) + if err != nil { + return nil, fmt.Errorf("rabbitmq connect: %w", err) + } + + ch, err := conn.Channel() + if err != nil { + conn.Close() + return nil, fmt.Errorf("rabbitmq channel: %w", err) + } + + // Send a test event to validate publishing works. Use a transient message. + testMsg, err := generateTestEvent() + if err != nil { + ch.Close() + conn.Close() + return nil, fmt.Errorf("rabbitmq generate test event: %w", err) + } + + pub := amqp.Publishing{Timestamp: time.Now(), ContentType: fiber.MIMEApplicationJSON, Body: testMsg, MessageId: uuid.NewString()} + if err := ch.Publish(exchange, routingKey, false, false, pub); err != nil { + ch.Close() + conn.Close() + return nil, fmt.Errorf("rabbitmq publish test event: %w", err) + } + + return &RabbitmqEventSender{ + url: url, + exchange: exchange, + routingKey: routingKey, + conn: conn, + channel: ch, + filter: filter, + }, nil +} + +func (rs *RabbitmqEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { + rs.mu.Lock() + defer rs.mu.Unlock() + + if rs.filter != nil && !rs.filter.Filter(meta.EventName) { + return + } + + if meta.EventName == EventObjectRemovedDeleteObjects { + var dObj s3response.DeleteObjects + if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil { + fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error()) + return + } + for _, obj := range dObj.Objects { + key := *obj.Key + schema := createEventSchema(ctx, meta, ConfigurationIdRabbitMQ) + schema.Records[0].S3.Object.Key = key + schema.Records[0].S3.Object.VersionId = obj.VersionId + go rs.send(schema) + } + return + } + + schema := createEventSchema(ctx, meta, ConfigurationIdRabbitMQ) + go rs.send(schema) +} + +func (rs *RabbitmqEventSender) Close() error { + var firstErr error + if rs.channel != nil { + if err := rs.channel.Close(); err != nil { + firstErr = err + } + } + if rs.conn != nil { + if err := rs.conn.Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + +func (rs *RabbitmqEventSender) send(event EventSchema) { + body, err := json.Marshal(event) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to marshal event data: %v\n", err.Error()) + return + } + + msg := amqp.Publishing{ + Timestamp: time.Now(), + ContentType: fiber.MIMEApplicationJSON, + Body: body, + MessageId: uuid.NewString(), + } + + if err := rs.channel.Publish(rs.exchange, rs.routingKey, false, false, msg); err != nil { + fmt.Fprintf(os.Stderr, "failed to send rabbitmq event: %v\n", err.Error()) + } +}