Merge pull request #494 from versity/event-notif-filters

Bucket event notifications filters
This commit is contained in:
Ben McClelland
2024-04-05 17:31:52 -07:00
committed by GitHub
9 changed files with 280 additions and 39 deletions

View File

@@ -43,6 +43,7 @@ var (
kafkaURL, kafkaTopic, kafkaKey string
natsURL, natsTopic string
eventWebhookURL string
eventConfigFilePath string
logWebhookURL string
accessLog string
healthPath string
@@ -83,6 +84,7 @@ func main() {
azureCommand(),
adminCommand(),
testCommand(),
utilsCommand(),
}
ctx, cancel := context.WithCancel(context.Background())
@@ -258,6 +260,13 @@ func initFlags() []cli.Flag {
Destination: &eventWebhookURL,
Aliases: []string{"ewu"},
},
&cli.StringFlag{
Name: "event-filter",
Usage: "bucket event notifications filters configuration file path",
EnvVars: []string{"VGW_EVENT_FILTER"},
Destination: &eventConfigFilePath,
Aliases: []string{"ef"},
},
&cli.StringFlag{
Name: "iam-dir",
Usage: "if defined, run internal iam service within this directory",
@@ -490,12 +499,13 @@ func runGateway(ctx context.Context, be backend.Backend) error {
}
evSender, err := s3event.InitEventSender(&s3event.EventConfig{
KafkaURL: kafkaURL,
KafkaTopic: kafkaTopic,
KafkaTopicKey: kafkaKey,
NatsURL: natsURL,
NatsTopic: natsTopic,
WebhookURL: eventWebhookURL,
KafkaURL: kafkaURL,
KafkaTopic: kafkaTopic,
KafkaTopicKey: kafkaKey,
NatsURL: natsURL,
NatsTopic: natsTopic,
WebhookURL: eventWebhookURL,
FilterConfigFilePath: eventConfigFilePath,
})
if err != nil {
return fmt.Errorf("init bucket event notifications: %w", err)

View File

@@ -1,3 +1,17 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (

89
cmd/versitygw/utils.go Normal file
View File

@@ -0,0 +1,89 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"github.com/urfave/cli/v2"
"github.com/versity/versitygw/s3event"
)
func utilsCommand() *cli.Command {
return &cli.Command{
Name: "utils",
Usage: "utility helper CLI tool",
Subcommands: []*cli.Command{
{
Name: "gen-event-filter-config",
Aliases: []string{"gefc"},
Usage: "Create a new configuration file for bucket event notifications filter.",
Action: generateEventFiltersConfig,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "path",
Usage: "the path where the config file has to be created",
Aliases: []string{"p"},
},
},
},
},
}
}
func generateEventFiltersConfig(ctx *cli.Context) error {
pathFlag := ctx.String("path")
path, err := filepath.Abs(filepath.Join(pathFlag, "event_config.json"))
if err != nil {
return err
}
config := s3event.EventFilter{
s3event.EventObjectCreated: true,
s3event.EventObjectCreatedPut: true,
s3event.EventObjectCreatedPost: true,
s3event.EventObjectCreatedCopy: true,
s3event.EventCompleteMultipartUpload: true,
s3event.EventObjectDeleted: true,
s3event.EventObjectTagging: true,
s3event.EventObjectTaggingPut: true,
s3event.EventObjectTaggingDelete: true,
s3event.EventObjectAclPut: true,
s3event.EventObjectRestore: true,
s3event.EventObjectRestorePost: true,
s3event.EventObjectRestoreCompleted: true,
}
configBytes, err := json.Marshal(config)
if err != nil {
return fmt.Errorf("parse event config: %w", err)
}
file, err := os.Create(path)
if err != nil {
return fmt.Errorf("create config file: %w", err)
}
defer file.Close()
_, err = file.Write(configBytes)
if err != nil {
return fmt.Errorf("write config file: %w", err)
}
return nil
}

View File

@@ -1387,7 +1387,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
BucketOwner: parsedAcl.Owner,
ObjectETag: res.CopyObjectResult.ETag,
VersionId: res.VersionId,
EventName: s3event.EventObjectCopy,
EventName: s3event.EventObjectCreatedCopy,
})
} else {
return SendXMLResponse(ctx, res, err, &MetaOpts{
@@ -1453,7 +1453,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
BucketOwner: parsedAcl.Owner,
ObjectETag: &etag,
ObjectSize: contentLength,
EventName: s3event.EventObjectPut,
EventName: s3event.EventObjectCreatedPut,
})
}
@@ -1710,7 +1710,7 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
EvSender: c.evSender,
Action: "DeleteObject",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectDelete,
EventName: s3event.EventObjectDeleted,
Status: http.StatusNoContent,
})
}

View File

@@ -41,22 +41,6 @@ type EventFields struct {
Records []EventSchema
}
type EventType string
const (
EventObjectPut EventType = "s3:ObjectCreated:Put"
EventObjectCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDelete EventType = "s3:ObjectRemoved:Delete"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectAclPut EventType = "s3:ObjectAcl:Put"
// Not supported
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
)
type EventSchema struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
@@ -123,26 +107,30 @@ type EventObjectData struct {
}
type EventConfig struct {
KafkaURL string
KafkaTopic string
KafkaTopicKey string
NatsURL string
NatsTopic string
WebhookURL string
KafkaURL string
KafkaTopic string
KafkaTopicKey string
NatsURL string
NatsTopic string
WebhookURL string
FilterConfigFilePath string
}
func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
filter, err := parseEventFilters(cfg.FilterConfigFilePath)
if err != nil {
return nil, fmt.Errorf("parse event filter config file %w", err)
}
var evSender S3EventSender
var err error
switch {
case cfg.WebhookURL != "":
evSender, err = InitWebhookEventSender(cfg.WebhookURL)
evSender, err = InitWebhookEventSender(cfg.WebhookURL, filter)
fmt.Printf("initializing S3 Event Notifications with webhook URL %v\n", cfg.WebhookURL)
case cfg.KafkaURL != "":
evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey)
evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey, filter)
fmt.Printf("initializing S3 Event Notifications with kafka. URL: %v, topic: %v\n", cfg.WebhookURL, cfg.KafkaTopic)
case cfg.NatsURL != "":
evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic)
evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic, filter)
fmt.Printf("initializing S3 Event Notifications with Nats. URL: %v, topic: %v\n", cfg.NatsURL, cfg.NatsTopic)
default:
return nil, nil

122
s3event/filter.go Normal file
View File

@@ -0,0 +1,122 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3event
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)
type EventType string
const (
EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated
EventObjectCreatedPut EventType = "s3:ObjectCreated:Put"
EventObjectCreatedPost EventType = "s3:ObjectCreated:Post"
EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDeleted EventType = "s3:ObjectRemoved:Delete" // ObjectRemoved
EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectAclPut EventType = "s3:ObjectAcl:Put"
EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore
EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
)
func (event EventType) IsValid() bool {
_, ok := supportedEventFilters[event]
return ok
}
var supportedEventFilters = map[EventType]struct{}{
EventObjectCreated: {},
EventObjectCreatedPut: {},
EventObjectCreatedPost: {},
EventObjectCreatedCopy: {},
EventCompleteMultipartUpload: {},
EventObjectDeleted: {},
EventObjectTagging: {},
EventObjectTaggingPut: {},
EventObjectTaggingDelete: {},
EventObjectAclPut: {},
EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
}
type EventFilter map[EventType]bool
func parseEventFilters(path string) (EventFilter, error) {
// if no filter config file path is specified return nil map
if path == "" {
return nil, nil
}
configFilePath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
// Open the JSON file
file, err := os.Open(configFilePath)
if err != nil {
return nil, err
}
defer file.Close()
var filter EventFilter
if err := json.NewDecoder(file).Decode(&filter); err != nil {
return nil, err
}
if err := filter.Validate(); err != nil {
return nil, err
}
return filter, nil
}
func (ef EventFilter) Validate() error {
for event := range ef {
if isValid := event.IsValid(); !isValid {
return fmt.Errorf("invalid configuration property: %v", event)
}
}
return nil
}
func (ef EventFilter) Filter(event EventType) bool {
ev, found := ef[event]
if found {
return ev
}
// check wildcard match
wildCardEv := EventType(string(event[strings.LastIndex(string(event), ":")+1]) + "*")
wildcard, found := ef[wildCardEv]
if found {
return wildcard
}
return false
}

View File

@@ -30,10 +30,11 @@ var sequencer = 0
type Kafka struct {
key string
writer *kafka.Writer
filter EventFilter
mu sync.Mutex
}
func InitKafkaEventService(url, topic, key string) (S3EventSender, error) {
func InitKafkaEventService(url, topic, key string, filter EventFilter) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("kafka message topic should be specified")
}
@@ -65,6 +66,7 @@ func InitKafkaEventService(url, topic, key string) (S3EventSender, error) {
return &Kafka{
key: key,
writer: w,
filter: filter,
}, nil
}
@@ -72,6 +74,10 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ks.mu.Lock()
defer ks.mu.Unlock()
if ks.filter != nil && !ks.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error())

View File

@@ -27,9 +27,10 @@ type NatsEventSender struct {
topic string
client *nats.Conn
mu sync.Mutex
filter EventFilter
}
func InitNatsEventService(url, topic string) (S3EventSender, error) {
func InitNatsEventService(url, topic string, filter EventFilter) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("nats message topic should be specified")
}
@@ -52,6 +53,7 @@ func InitNatsEventService(url, topic string) (S3EventSender, error) {
return &NatsEventSender{
topic: topic,
client: client,
filter: filter,
}, nil
}
@@ -59,6 +61,10 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ns.mu.Lock()
defer ns.mu.Unlock()
if ns.filter != nil && !ns.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdNats)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error())

View File

@@ -29,10 +29,11 @@ import (
type Webhook struct {
url string
client *http.Client
filter EventFilter
mu sync.Mutex
}
func InitWebhookEventSender(url string) (S3EventSender, error) {
func InitWebhookEventSender(url string, filter EventFilter) (S3EventSender, error) {
if url == "" {
return nil, fmt.Errorf("webhook url should be specified")
}
@@ -63,7 +64,8 @@ func InitWebhookEventSender(url string) (S3EventSender, error) {
client: &http.Client{
Timeout: 3 * time.Second,
},
url: url,
url: url,
filter: filter,
}, nil
}
@@ -71,6 +73,10 @@ func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
w.mu.Lock()
defer w.mu.Unlock()
if w.filter != nil && !w.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error())