From 81d6635fe967cd605f559a42e03969803199f31a Mon Sep 17 00:00:00 2001 From: jonaustin09 Date: Tue, 2 Apr 2024 15:17:36 -0400 Subject: [PATCH] feat: Adeed webhook URL support for bucket event notifications. Made some bug fixing and refactoring in event sender and audit logger interfaces --- cmd/versitygw/main.go | 21 ++++++++- s3event/event.go | 103 ++++++++++++++++++++++++++++++++++++++---- s3event/kafka.go | 83 +++++++--------------------------- s3event/nats.go | 73 +++++++++--------------------- s3event/webhook.go | 102 +++++++++++++++++++++++++++++++++++++++++ s3log/file.go | 7 +-- s3log/webhook.go | 7 +-- 7 files changed, 261 insertions(+), 135 deletions(-) create mode 100644 s3event/webhook.go diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index b5bc888..ab161fd 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -42,6 +42,7 @@ var ( certFile, keyFile string kafkaURL, kafkaTopic, kafkaKey string natsURL, natsTopic string + eventWebhookURL string logWebhookURL string accessLog string healthPath string @@ -250,6 +251,13 @@ func initFlags() []cli.Flag { Destination: &natsTopic, Aliases: []string{"ent"}, }, + &cli.StringFlag{ + Name: "event-webhook-url", + Usage: "webhook url to send bucket notifications", + EnvVars: []string{"VGW_EVENT_WEBHOOK_URL"}, + Destination: &eventWebhookURL, + Aliases: []string{"ewu"}, + }, &cli.StringFlag{ Name: "iam-dir", Usage: "if defined, run internal iam service within this directory", @@ -487,9 +495,10 @@ func runGateway(ctx context.Context, be backend.Backend) error { KafkaTopicKey: kafkaKey, NatsURL: natsURL, NatsTopic: natsTopic, + WebhookURL: eventWebhookURL, }) if err != nil { - return fmt.Errorf("unable to connect to the message broker: %w", err) + return fmt.Errorf("init bucket event notifications: %w", err) } srv, err := s3api.New(app, be, middlewares.RootUserConfig{ @@ -548,5 +557,15 @@ Loop: } } + if evSender != nil { + err := evSender.Close() + if err != nil { + if saveErr == nil { + saveErr = err + } + fmt.Fprintf(os.Stderr, "close event sender: %v\n", err) + } + } + return saveErr } diff --git a/s3event/event.go b/s3event/event.go index 74e97f4..b46fa95 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -15,13 +15,18 @@ package s3event import ( + "encoding/json" "fmt" + "strings" + "time" "github.com/gofiber/fiber/v2" + "github.com/versity/versitygw/auth" ) type S3EventSender interface { SendEvent(ctx *fiber.Ctx, meta EventMeta) + Close() error } type EventMeta struct { @@ -78,9 +83,18 @@ type EventResponseElements struct { HostId string `json:"x-amz-id-2"` } +type ConfigurationId string + +// This field will be changed after implementing per bucket notifications +const ( + ConfigurationIdKafka ConfigurationId = "kafka-global" + ConfigurationIdNats ConfigurationId = "nats-global" + ConfigurationIdWebhook ConfigurationId = "webhook-global" +) + type EventS3Data struct { S3SchemaVersion string `json:"s3SchemaVersion"` - ConfigurationId string `json:"configurationId"` + ConfigurationId ConfigurationId `json:"configurationId"` Bucket EventS3BucketData `json:"bucket"` Object EventObjectData `json:"object"` } @@ -114,17 +128,86 @@ type EventConfig struct { KafkaTopicKey string NatsURL string NatsTopic string + WebhookURL string } func InitEventSender(cfg *EventConfig) (S3EventSender, error) { - if cfg.KafkaURL != "" && cfg.NatsURL != "" { - return nil, fmt.Errorf("there should be specified one of the following: kafka, nats") + var evSender S3EventSender + var err error + switch { + case cfg.WebhookURL != "": + evSender, err = InitWebhookEventSender(cfg.WebhookURL) + fmt.Printf("initializing S3 Event Notifications with webhook URL %v\n", cfg.WebhookURL) + case cfg.KafkaURL != "": + evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) + fmt.Printf("initializing S3 Event Notifications with kafka. URL: %v, topic: %v\n", cfg.WebhookURL, cfg.KafkaTopic) + case cfg.NatsURL != "": + evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic) + fmt.Printf("initializing S3 Event Notifications with Nats. URL: %v, topic: %v\n", cfg.NatsURL, cfg.NatsTopic) + default: + return nil, nil } - if cfg.NatsURL != "" { - return InitNatsEventService(cfg.NatsURL, cfg.NatsTopic) - } - if cfg.KafkaURL != "" { - return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) - } - return nil, nil + + return evSender, err +} + +func createEventSchema(ctx *fiber.Ctx, meta EventMeta, configId ConfigurationId) ([]byte, error) { + path := strings.Split(ctx.Path(), "/") + bucket, object := path[1], strings.Join(path[2:], "/") + acc := ctx.Locals("account").(auth.Account) + + event := []EventSchema{ + { + EventVersion: "2.2", + EventSource: "aws:s3", + AwsRegion: ctx.Locals("region").(string), + EventTime: time.Now().Format(time.RFC3339), + EventName: meta.EventName, + UserIdentity: EventUserIdentity{ + PrincipalId: acc.Access, + }, + RequestParameters: EventRequestParams{ + SourceIPAddress: ctx.IP(), + }, + ResponseElements: EventResponseElements{ + RequestId: ctx.Get("X-Amz-Request-Id"), + HostId: ctx.Get("X-Amz-Id-2"), + }, + S3: EventS3Data{ + S3SchemaVersion: "1.0", + ConfigurationId: configId, + Bucket: EventS3BucketData{ + Name: bucket, + OwnerIdentity: EventUserIdentity{ + PrincipalId: meta.BucketOwner, + }, + Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), + }, + Object: EventObjectData{ + Key: object, + Size: meta.ObjectSize, + ETag: meta.ObjectETag, + VersionId: meta.VersionId, + Sequencer: genSequencer(), + }, + }, + GlacierEventData: EventGlacierData{ + // Not supported + RestoreEventData: EventRestoreData{}, + }, + }, + } + + return json.Marshal(event) +} + +func generateTestEvent() ([]byte, error) { + msg := map[string]string{ + "Service": "S3", + "Event": "s3:TestEvent", + "Time": time.Now().Format(time.RFC3339), + "Bucket": "Test-Bucket", + } + + return json.Marshal(msg) } diff --git a/s3event/kafka.go b/s3event/kafka.go index 94c444e..4bd615e 100644 --- a/s3event/kafka.go +++ b/s3event/kafka.go @@ -16,10 +16,8 @@ package s3event import ( "context" - "encoding/json" "fmt" "os" - "strings" "sync" "time" @@ -47,26 +45,19 @@ func InitKafkaEventService(url, topic, key string) (S3EventSender, error) { BatchTimeout: 5 * time.Millisecond, }) - msg := map[string]string{ - "Service": "S3", - "Event": "s3:TestEvent", - "Time": time.Now().Format(time.RFC3339), - "Bucket": "Test-Bucket", - } - - msgJSON, err := json.Marshal(msg) + msg, err := generateTestEvent() if err != nil { - return nil, err + return nil, fmt.Errorf("kafka generate test event: %w", err) } message := kafka.Message{ Key: []byte(key), - Value: msgJSON, + Value: msg, } - ctx := context.Background() - + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) err = w.WriteMessages(ctx, message) + cancel() if err != nil { return nil, err } @@ -81,67 +72,27 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) { ks.mu.Lock() defer ks.mu.Unlock() - path := strings.Split(ctx.Path(), "/") - bucket, object := path[1], strings.Join(path[2:], "/") - - schema := EventSchema{ - EventVersion: "2.2", - EventSource: "aws:s3", - AwsRegion: ctx.Locals("region").(string), - EventTime: time.Now().Format(time.RFC3339), - EventName: meta.EventName, - UserIdentity: EventUserIdentity{ - PrincipalId: ctx.Locals("access").(string), - }, - RequestParameters: EventRequestParams{ - SourceIPAddress: ctx.IP(), - }, - ResponseElements: EventResponseElements{ - RequestId: ctx.Get("X-Amz-Request-Id"), - HostId: ctx.Get("X-Amx-Id-2"), - }, - S3: EventS3Data{ - S3SchemaVersion: "1.0", - // This field will come up after implementing per bucket notifications - ConfigurationId: "kafka-global", - Bucket: EventS3BucketData{ - Name: bucket, - OwnerIdentity: EventUserIdentity{ - PrincipalId: ctx.Locals("access").(string), - }, - Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), - }, - Object: EventObjectData{ - Key: object, - Size: meta.ObjectSize, - ETag: meta.ObjectETag, - VersionId: meta.VersionId, - Sequencer: genSequencer(), - }, - }, - GlacierEventData: EventGlacierData{ - // Not supported - RestoreEventData: EventRestoreData{}, - }, - } - - ks.send([]EventSchema{schema}) -} - -func (ks *Kafka) send(evnt []EventSchema) { - msg, err := json.Marshal(evnt) + schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka) if err != nil { - fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error()) + fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error()) return } + go ks.send(schema) +} + +func (ks *Kafka) Close() error { + return ks.writer.Close() +} + +func (ks *Kafka) send(event []byte) { message := kafka.Message{ Key: []byte(ks.key), - Value: msg, + Value: event, } ctx := context.Background() - err = ks.writer.WriteMessages(ctx, message) + err := ks.writer.WriteMessages(ctx, message) if err != nil { fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error()) } diff --git a/s3event/nats.go b/s3event/nats.go index 822c383..5ad76ab 100644 --- a/s3event/nats.go +++ b/s3event/nats.go @@ -15,12 +15,9 @@ package s3event import ( - "encoding/json" "fmt" "os" - "strings" "sync" - "time" "github.com/gofiber/fiber/v2" "github.com/nats-io/nats.go" @@ -42,6 +39,16 @@ func InitNatsEventService(url, topic string) (S3EventSender, error) { return nil, err } + msg, err := generateTestEvent() + if err != nil { + return nil, fmt.Errorf("nats generate test event: %w", err) + } + + err = client.Publish(topic, msg) + if err != nil { + return nil, fmt.Errorf("nats publish test event: %v", err) + } + return &NatsEventSender{ topic: topic, client: client, @@ -52,60 +59,22 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { ns.mu.Lock() defer ns.mu.Unlock() - path := strings.Split(ctx.Path(), "/") - bucket, object := path[1], strings.Join(path[2:], "/") - - schema := EventSchema{ - EventVersion: "2.2", - EventSource: "aws:s3", - AwsRegion: ctx.Locals("region").(string), - EventTime: time.Now().Format(time.RFC3339), - EventName: meta.EventName, - UserIdentity: EventUserIdentity{ - PrincipalId: ctx.Locals("access").(string), - }, - RequestParameters: EventRequestParams{ - SourceIPAddress: ctx.IP(), - }, - ResponseElements: EventResponseElements{ - RequestId: ctx.Get("X-Amz-Request-Id"), - HostId: ctx.Get("X-Amx-Id-2"), - }, - S3: EventS3Data{ - S3SchemaVersion: "1.0", - // This field will come up after implementing per bucket notifications - ConfigurationId: "nats-global", - Bucket: EventS3BucketData{ - Name: bucket, - OwnerIdentity: EventUserIdentity{ - PrincipalId: ctx.Locals("access").(string), - }, - Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), - }, - Object: EventObjectData{ - Key: object, - Size: meta.ObjectSize, - ETag: meta.ObjectETag, - VersionId: meta.VersionId, - Sequencer: genSequencer(), - }, - }, - GlacierEventData: EventGlacierData{ - // Not supported - RestoreEventData: EventRestoreData{}, - }, + schema, err := createEventSchema(ctx, meta, ConfigurationIdNats) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error()) + return } - ns.send([]EventSchema{schema}) + go ns.send(schema) } -func (ns *NatsEventSender) send(evnt []EventSchema) { - msg, err := json.Marshal(evnt) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error()) - } +func (ns *NatsEventSender) Close() error { + ns.client.Close() + return nil +} - err = ns.client.Publish(ns.topic, msg) +func (ns *NatsEventSender) send(event []byte) { + err := ns.client.Publish(ns.topic, event) if err != nil { fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error()) } diff --git a/s3event/webhook.go b/s3event/webhook.go new file mode 100644 index 0000000..b17f7c8 --- /dev/null +++ b/s3event/webhook.go @@ -0,0 +1,102 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import ( + "bytes" + "fmt" + "net" + "net/http" + "os" + "sync" + "time" + + "github.com/gofiber/fiber/v2" +) + +type Webhook struct { + url string + client *http.Client + mu sync.Mutex +} + +func InitWebhookEventSender(url string) (S3EventSender, error) { + if url == "" { + return nil, fmt.Errorf("webhook url should be specified") + } + + client := &http.Client{ + Timeout: time.Second * 1, + } + + testEv, err := generateTestEvent() + if err != nil { + return nil, fmt.Errorf("webhook generate test event: %w", err) + } + + req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(testEv)) + if err != nil { + return nil, fmt.Errorf("create webhook http request: %w", err) + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + + _, err = client.Do(req) + if err != nil { + if err, ok := err.(net.Error); ok && !err.Timeout() { + return nil, fmt.Errorf("send webhook test event: %w", err) + } + } + + return &Webhook{ + client: &http.Client{ + Timeout: 3 * time.Second, + }, + url: url, + }, nil +} + +func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) { + w.mu.Lock() + defer w.mu.Unlock() + + schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error()) + return + } + + go w.send(schema) +} + +func (w *Webhook) Close() error { + return nil +} + +func (w *Webhook) send(event []byte) { + req, err := http.NewRequest(http.MethodPost, w.url, bytes.NewReader(event)) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to create webhook event request: %v\n", err.Error()) + return + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + + _, err = w.client.Do(req) + if err != nil { + if err, ok := err.(net.Error); ok && !err.Timeout() { + fmt.Fprintf(os.Stderr, "failed to send webhook event: %v\n", err.Error()) + } + } +} diff --git a/s3log/file.go b/s3log/file.go index 33e70b4..9d36fff 100644 --- a/s3log/file.go +++ b/s3log/file.go @@ -23,6 +23,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/versity/versitygw/auth" "github.com/versity/versitygw/s3err" ) @@ -88,9 +89,9 @@ func (f *FileLogger) Log(ctx *fiber.Ctx, err error, body []byte, meta LogMeta) { } } - switch ctx.Locals("access").(type) { - case string: - access = ctx.Locals("access").(string) + switch ctx.Locals("account").(type) { + case auth.Account: + access = ctx.Locals("account").(auth.Account).Access } lf.BucketOwner = meta.BucketOwner diff --git a/s3log/webhook.go b/s3log/webhook.go index bc82726..1fe7cb6 100644 --- a/s3log/webhook.go +++ b/s3log/webhook.go @@ -27,6 +27,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/versity/versitygw/auth" "github.com/versity/versitygw/s3err" ) @@ -85,9 +86,9 @@ func (wl *WebhookLogger) Log(ctx *fiber.Ctx, err error, body []byte, meta LogMet } } - switch ctx.Locals("access").(type) { - case string: - access = ctx.Locals("access").(string) + switch ctx.Locals("account").(type) { + case auth.Account: + access = ctx.Locals("account").(auth.Account).Access } lf.BucketOwner = meta.BucketOwner