diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index ab161fd..16fea8b 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -43,6 +43,7 @@ var ( kafkaURL, kafkaTopic, kafkaKey string natsURL, natsTopic string eventWebhookURL string + eventConfigFilePath string logWebhookURL string accessLog string healthPath string @@ -83,6 +84,7 @@ func main() { azureCommand(), adminCommand(), testCommand(), + utilsCommand(), } ctx, cancel := context.WithCancel(context.Background()) @@ -258,6 +260,13 @@ func initFlags() []cli.Flag { Destination: &eventWebhookURL, Aliases: []string{"ewu"}, }, + &cli.StringFlag{ + Name: "event-filter", + Usage: "bucket event notifications filters configuration file path", + EnvVars: []string{"VGW_EVENT_FILTER"}, + Destination: &eventConfigFilePath, + Aliases: []string{"ef"}, + }, &cli.StringFlag{ Name: "iam-dir", Usage: "if defined, run internal iam service within this directory", @@ -490,12 +499,13 @@ func runGateway(ctx context.Context, be backend.Backend) error { } evSender, err := s3event.InitEventSender(&s3event.EventConfig{ - KafkaURL: kafkaURL, - KafkaTopic: kafkaTopic, - KafkaTopicKey: kafkaKey, - NatsURL: natsURL, - NatsTopic: natsTopic, - WebhookURL: eventWebhookURL, + KafkaURL: kafkaURL, + KafkaTopic: kafkaTopic, + KafkaTopicKey: kafkaKey, + NatsURL: natsURL, + NatsTopic: natsTopic, + WebhookURL: eventWebhookURL, + FilterConfigFilePath: eventConfigFilePath, }) if err != nil { return fmt.Errorf("init bucket event notifications: %w", err) diff --git a/cmd/versitygw/test.go b/cmd/versitygw/test.go index 021ffcd..b6c9ec2 100644 --- a/cmd/versitygw/test.go +++ b/cmd/versitygw/test.go @@ -1,3 +1,17 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package main import ( diff --git a/cmd/versitygw/utils.go b/cmd/versitygw/utils.go new file mode 100644 index 0000000..d334338 --- /dev/null +++ b/cmd/versitygw/utils.go @@ -0,0 +1,89 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/urfave/cli/v2" + "github.com/versity/versitygw/s3event" +) + +func utilsCommand() *cli.Command { + return &cli.Command{ + Name: "utils", + Usage: "utility helper CLI tool", + Subcommands: []*cli.Command{ + { + Name: "gen-event-filter-config", + Aliases: []string{"gefc"}, + Usage: "Create a new configuration file for bucket event notifications filter.", + Action: generateEventFiltersConfig, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "path", + Usage: "the path where the config file has to be created", + Aliases: []string{"p"}, + }, + }, + }, + }, + } +} + +func generateEventFiltersConfig(ctx *cli.Context) error { + pathFlag := ctx.String("path") + path, err := filepath.Abs(filepath.Join(pathFlag, "event_config.json")) + if err != nil { + return err + } + + config := s3event.EventFilter{ + s3event.EventObjectCreated: true, + s3event.EventObjectCreatedPut: true, + s3event.EventObjectCreatedPost: true, + s3event.EventObjectCreatedCopy: true, + s3event.EventCompleteMultipartUpload: true, + s3event.EventObjectDeleted: true, + s3event.EventObjectTagging: true, + s3event.EventObjectTaggingPut: true, + s3event.EventObjectTaggingDelete: true, + s3event.EventObjectAclPut: true, + s3event.EventObjectRestore: true, + s3event.EventObjectRestorePost: true, + s3event.EventObjectRestoreCompleted: true, + } + + configBytes, err := json.Marshal(config) + if err != nil { + return fmt.Errorf("parse event config: %w", err) + } + + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("create config file: %w", err) + } + defer file.Close() + + _, err = file.Write(configBytes) + if err != nil { + return fmt.Errorf("write config file: %w", err) + } + + return nil +} diff --git a/s3api/controllers/base.go b/s3api/controllers/base.go index 6d820ef..c86bec2 100644 --- a/s3api/controllers/base.go +++ b/s3api/controllers/base.go @@ -1387,7 +1387,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { BucketOwner: parsedAcl.Owner, ObjectETag: res.CopyObjectResult.ETag, VersionId: res.VersionId, - EventName: s3event.EventObjectCopy, + EventName: s3event.EventObjectCreatedCopy, }) } else { return SendXMLResponse(ctx, res, err, &MetaOpts{ @@ -1453,7 +1453,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { BucketOwner: parsedAcl.Owner, ObjectETag: &etag, ObjectSize: contentLength, - EventName: s3event.EventObjectPut, + EventName: s3event.EventObjectCreatedPut, }) } @@ -1710,7 +1710,7 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error { EvSender: c.evSender, Action: "DeleteObject", BucketOwner: parsedAcl.Owner, - EventName: s3event.EventObjectDelete, + EventName: s3event.EventObjectDeleted, Status: http.StatusNoContent, }) } diff --git a/s3event/event.go b/s3event/event.go index b46fa95..7253ef7 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -41,22 +41,6 @@ type EventFields struct { Records []EventSchema } -type EventType string - -const ( - EventObjectPut EventType = "s3:ObjectCreated:Put" - EventObjectCopy EventType = "s3:ObjectCreated:Copy" - EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" - EventObjectDelete EventType = "s3:ObjectRemoved:Delete" - EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" - EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" - EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" - EventObjectAclPut EventType = "s3:ObjectAcl:Put" - // Not supported - // EventObjectRestorePost EventType = "s3:ObjectRestore:Post" - // EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" -) - type EventSchema struct { EventVersion string `json:"eventVersion"` EventSource string `json:"eventSource"` @@ -123,26 +107,30 @@ type EventObjectData struct { } type EventConfig struct { - KafkaURL string - KafkaTopic string - KafkaTopicKey string - NatsURL string - NatsTopic string - WebhookURL string + KafkaURL string + KafkaTopic string + KafkaTopicKey string + NatsURL string + NatsTopic string + WebhookURL string + FilterConfigFilePath string } func InitEventSender(cfg *EventConfig) (S3EventSender, error) { + filter, err := parseEventFilters(cfg.FilterConfigFilePath) + if err != nil { + return nil, fmt.Errorf("parse event filter config file %w", err) + } var evSender S3EventSender - var err error switch { case cfg.WebhookURL != "": - evSender, err = InitWebhookEventSender(cfg.WebhookURL) + evSender, err = InitWebhookEventSender(cfg.WebhookURL, filter) fmt.Printf("initializing S3 Event Notifications with webhook URL %v\n", cfg.WebhookURL) case cfg.KafkaURL != "": - evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) + evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey, filter) fmt.Printf("initializing S3 Event Notifications with kafka. URL: %v, topic: %v\n", cfg.WebhookURL, cfg.KafkaTopic) case cfg.NatsURL != "": - evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic) + evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic, filter) fmt.Printf("initializing S3 Event Notifications with Nats. URL: %v, topic: %v\n", cfg.NatsURL, cfg.NatsTopic) default: return nil, nil diff --git a/s3event/filter.go b/s3event/filter.go new file mode 100644 index 0000000..81fb61c --- /dev/null +++ b/s3event/filter.go @@ -0,0 +1,122 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" +) + +type EventType string + +const ( + EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated + EventObjectCreatedPut EventType = "s3:ObjectCreated:Put" + EventObjectCreatedPost EventType = "s3:ObjectCreated:Post" + EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy" + EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" + EventObjectDeleted EventType = "s3:ObjectRemoved:Delete" // ObjectRemoved + EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging + EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" + EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" + EventObjectAclPut EventType = "s3:ObjectAcl:Put" + EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore + EventObjectRestorePost EventType = "s3:ObjectRestore:Post" + EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" + // EventObjectRestorePost EventType = "s3:ObjectRestore:Post" + // EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" +) + +func (event EventType) IsValid() bool { + _, ok := supportedEventFilters[event] + return ok +} + +var supportedEventFilters = map[EventType]struct{}{ + EventObjectCreated: {}, + EventObjectCreatedPut: {}, + EventObjectCreatedPost: {}, + EventObjectCreatedCopy: {}, + EventCompleteMultipartUpload: {}, + EventObjectDeleted: {}, + EventObjectTagging: {}, + EventObjectTaggingPut: {}, + EventObjectTaggingDelete: {}, + EventObjectAclPut: {}, + EventObjectRestore: {}, + EventObjectRestorePost: {}, + EventObjectRestoreCompleted: {}, +} + +type EventFilter map[EventType]bool + +func parseEventFilters(path string) (EventFilter, error) { + // if no filter config file path is specified return nil map + if path == "" { + return nil, nil + } + + configFilePath, err := filepath.Abs(path) + if err != nil { + return nil, err + } + + // Open the JSON file + file, err := os.Open(configFilePath) + if err != nil { + return nil, err + } + defer file.Close() + + var filter EventFilter + if err := json.NewDecoder(file).Decode(&filter); err != nil { + return nil, err + } + + if err := filter.Validate(); err != nil { + return nil, err + } + + return filter, nil +} + +func (ef EventFilter) Validate() error { + for event := range ef { + if isValid := event.IsValid(); !isValid { + return fmt.Errorf("invalid configuration property: %v", event) + } + } + + return nil +} + +func (ef EventFilter) Filter(event EventType) bool { + ev, found := ef[event] + if found { + return ev + } + + // check wildcard match + wildCardEv := EventType(string(event[strings.LastIndex(string(event), ":")+1]) + "*") + wildcard, found := ef[wildCardEv] + if found { + return wildcard + } + + return false +} diff --git a/s3event/kafka.go b/s3event/kafka.go index 4bd615e..5123ec1 100644 --- a/s3event/kafka.go +++ b/s3event/kafka.go @@ -30,10 +30,11 @@ var sequencer = 0 type Kafka struct { key string writer *kafka.Writer + filter EventFilter mu sync.Mutex } -func InitKafkaEventService(url, topic, key string) (S3EventSender, error) { +func InitKafkaEventService(url, topic, key string, filter EventFilter) (S3EventSender, error) { if topic == "" { return nil, fmt.Errorf("kafka message topic should be specified") } @@ -65,6 +66,7 @@ func InitKafkaEventService(url, topic, key string) (S3EventSender, error) { return &Kafka{ key: key, writer: w, + filter: filter, }, nil } @@ -72,6 +74,10 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) { ks.mu.Lock() defer ks.mu.Unlock() + if ks.filter != nil && !ks.filter.Filter(meta.EventName) { + return + } + schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka) if err != nil { fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error()) diff --git a/s3event/nats.go b/s3event/nats.go index 5ad76ab..d18b7da 100644 --- a/s3event/nats.go +++ b/s3event/nats.go @@ -27,9 +27,10 @@ type NatsEventSender struct { topic string client *nats.Conn mu sync.Mutex + filter EventFilter } -func InitNatsEventService(url, topic string) (S3EventSender, error) { +func InitNatsEventService(url, topic string, filter EventFilter) (S3EventSender, error) { if topic == "" { return nil, fmt.Errorf("nats message topic should be specified") } @@ -52,6 +53,7 @@ func InitNatsEventService(url, topic string) (S3EventSender, error) { return &NatsEventSender{ topic: topic, client: client, + filter: filter, }, nil } @@ -59,6 +61,10 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { ns.mu.Lock() defer ns.mu.Unlock() + if ns.filter != nil && !ns.filter.Filter(meta.EventName) { + return + } + schema, err := createEventSchema(ctx, meta, ConfigurationIdNats) if err != nil { fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error()) diff --git a/s3event/webhook.go b/s3event/webhook.go index b17f7c8..f8d9d9e 100644 --- a/s3event/webhook.go +++ b/s3event/webhook.go @@ -29,10 +29,11 @@ import ( type Webhook struct { url string client *http.Client + filter EventFilter mu sync.Mutex } -func InitWebhookEventSender(url string) (S3EventSender, error) { +func InitWebhookEventSender(url string, filter EventFilter) (S3EventSender, error) { if url == "" { return nil, fmt.Errorf("webhook url should be specified") } @@ -63,7 +64,8 @@ func InitWebhookEventSender(url string) (S3EventSender, error) { client: &http.Client{ Timeout: 3 * time.Second, }, - url: url, + url: url, + filter: filter, }, nil } @@ -71,6 +73,10 @@ func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) { w.mu.Lock() defer w.mu.Unlock() + if w.filter != nil && !w.filter.Filter(meta.EventName) { + return + } + schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook) if err != nil { fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error())