feat: Closes #475, Implemented filters for s3 bucket event notifications, created a utility CLI command to create config file

This commit is contained in:
jonaustin09
2024-04-04 15:20:17 -04:00
committed by Ben McClelland
parent 6ddd3c340f
commit dbc0ad4325
9 changed files with 280 additions and 39 deletions

View File

@@ -41,22 +41,6 @@ type EventFields struct {
Records []EventSchema
}
type EventType string
const (
EventObjectPut EventType = "s3:ObjectCreated:Put"
EventObjectCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDelete EventType = "s3:ObjectRemoved:Delete"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectAclPut EventType = "s3:ObjectAcl:Put"
// Not supported
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
)
type EventSchema struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
@@ -123,26 +107,30 @@ type EventObjectData struct {
}
type EventConfig struct {
KafkaURL string
KafkaTopic string
KafkaTopicKey string
NatsURL string
NatsTopic string
WebhookURL string
KafkaURL string
KafkaTopic string
KafkaTopicKey string
NatsURL string
NatsTopic string
WebhookURL string
FilterConfigFilePath string
}
func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
filter, err := parseEventFilters(cfg.FilterConfigFilePath)
if err != nil {
return nil, fmt.Errorf("parse event filter config file %w", err)
}
var evSender S3EventSender
var err error
switch {
case cfg.WebhookURL != "":
evSender, err = InitWebhookEventSender(cfg.WebhookURL)
evSender, err = InitWebhookEventSender(cfg.WebhookURL, filter)
fmt.Printf("initializing S3 Event Notifications with webhook URL %v\n", cfg.WebhookURL)
case cfg.KafkaURL != "":
evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey)
evSender, err = InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey, filter)
fmt.Printf("initializing S3 Event Notifications with kafka. URL: %v, topic: %v\n", cfg.WebhookURL, cfg.KafkaTopic)
case cfg.NatsURL != "":
evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic)
evSender, err = InitNatsEventService(cfg.NatsURL, cfg.NatsTopic, filter)
fmt.Printf("initializing S3 Event Notifications with Nats. URL: %v, topic: %v\n", cfg.NatsURL, cfg.NatsTopic)
default:
return nil, nil

122
s3event/filter.go Normal file
View File

@@ -0,0 +1,122 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3event
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)
type EventType string
const (
EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated
EventObjectCreatedPut EventType = "s3:ObjectCreated:Put"
EventObjectCreatedPost EventType = "s3:ObjectCreated:Post"
EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDeleted EventType = "s3:ObjectRemoved:Delete" // ObjectRemoved
EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectAclPut EventType = "s3:ObjectAcl:Put"
EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore
EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
)
func (event EventType) IsValid() bool {
_, ok := supportedEventFilters[event]
return ok
}
var supportedEventFilters = map[EventType]struct{}{
EventObjectCreated: {},
EventObjectCreatedPut: {},
EventObjectCreatedPost: {},
EventObjectCreatedCopy: {},
EventCompleteMultipartUpload: {},
EventObjectDeleted: {},
EventObjectTagging: {},
EventObjectTaggingPut: {},
EventObjectTaggingDelete: {},
EventObjectAclPut: {},
EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
}
type EventFilter map[EventType]bool
func parseEventFilters(path string) (EventFilter, error) {
// if no filter config file path is specified return nil map
if path == "" {
return nil, nil
}
configFilePath, err := filepath.Abs(path)
if err != nil {
return nil, err
}
// Open the JSON file
file, err := os.Open(configFilePath)
if err != nil {
return nil, err
}
defer file.Close()
var filter EventFilter
if err := json.NewDecoder(file).Decode(&filter); err != nil {
return nil, err
}
if err := filter.Validate(); err != nil {
return nil, err
}
return filter, nil
}
func (ef EventFilter) Validate() error {
for event := range ef {
if isValid := event.IsValid(); !isValid {
return fmt.Errorf("invalid configuration property: %v", event)
}
}
return nil
}
func (ef EventFilter) Filter(event EventType) bool {
ev, found := ef[event]
if found {
return ev
}
// check wildcard match
wildCardEv := EventType(string(event[strings.LastIndex(string(event), ":")+1]) + "*")
wildcard, found := ef[wildCardEv]
if found {
return wildcard
}
return false
}

View File

@@ -30,10 +30,11 @@ var sequencer = 0
type Kafka struct {
key string
writer *kafka.Writer
filter EventFilter
mu sync.Mutex
}
func InitKafkaEventService(url, topic, key string) (S3EventSender, error) {
func InitKafkaEventService(url, topic, key string, filter EventFilter) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("kafka message topic should be specified")
}
@@ -65,6 +66,7 @@ func InitKafkaEventService(url, topic, key string) (S3EventSender, error) {
return &Kafka{
key: key,
writer: w,
filter: filter,
}, nil
}
@@ -72,6 +74,10 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ks.mu.Lock()
defer ks.mu.Unlock()
if ks.filter != nil && !ks.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error())

View File

@@ -27,9 +27,10 @@ type NatsEventSender struct {
topic string
client *nats.Conn
mu sync.Mutex
filter EventFilter
}
func InitNatsEventService(url, topic string) (S3EventSender, error) {
func InitNatsEventService(url, topic string, filter EventFilter) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("nats message topic should be specified")
}
@@ -52,6 +53,7 @@ func InitNatsEventService(url, topic string) (S3EventSender, error) {
return &NatsEventSender{
topic: topic,
client: client,
filter: filter,
}, nil
}
@@ -59,6 +61,10 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ns.mu.Lock()
defer ns.mu.Unlock()
if ns.filter != nil && !ns.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdNats)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error())

View File

@@ -29,10 +29,11 @@ import (
type Webhook struct {
url string
client *http.Client
filter EventFilter
mu sync.Mutex
}
func InitWebhookEventSender(url string) (S3EventSender, error) {
func InitWebhookEventSender(url string, filter EventFilter) (S3EventSender, error) {
if url == "" {
return nil, fmt.Errorf("webhook url should be specified")
}
@@ -63,7 +64,8 @@ func InitWebhookEventSender(url string) (S3EventSender, error) {
client: &http.Client{
Timeout: 3 * time.Second,
},
url: url,
url: url,
filter: filter,
}, nil
}
@@ -71,6 +73,10 @@ func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
w.mu.Lock()
defer w.mu.Unlock()
if w.filter != nil && !w.filter.Filter(meta.EventName) {
return
}
schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error())