feat: Added DeleteObjects event support in bucket event notifications

This commit is contained in:
jonaustin09
2024-04-25 16:18:02 -04:00
parent 47ed2d65c1
commit 6f9c6fde37
7 changed files with 182 additions and 97 deletions

View File

@@ -54,19 +54,21 @@ func generateEventFiltersConfig(ctx *cli.Context) error {
} }
config := s3event.EventFilter{ config := s3event.EventFilter{
s3event.EventObjectCreated: true, s3event.EventObjectCreated: true,
s3event.EventObjectCreatedPut: true, s3event.EventObjectCreatedPut: true,
s3event.EventObjectCreatedPost: true, s3event.EventObjectCreatedPost: true,
s3event.EventObjectCreatedCopy: true, s3event.EventObjectCreatedCopy: true,
s3event.EventCompleteMultipartUpload: true, s3event.EventCompleteMultipartUpload: true,
s3event.EventObjectDeleted: true, s3event.EventObjectRemoved: true,
s3event.EventObjectTagging: true, s3event.EventObjectRemovedDelete: true,
s3event.EventObjectTaggingPut: true, s3event.EventObjectRemovedDeleteObjects: true,
s3event.EventObjectTaggingDelete: true, s3event.EventObjectTagging: true,
s3event.EventObjectAclPut: true, s3event.EventObjectTaggingPut: true,
s3event.EventObjectRestore: true, s3event.EventObjectTaggingDelete: true,
s3event.EventObjectRestorePost: true, s3event.EventObjectAclPut: true,
s3event.EventObjectRestoreCompleted: true, s3event.EventObjectRestore: true,
s3event.EventObjectRestorePost: true,
s3event.EventObjectRestoreCompleted: true,
} }
configBytes, err := json.Marshal(config) configBytes, err := json.Marshal(config)

View File

@@ -1947,6 +1947,8 @@ func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error {
Logger: c.logger, Logger: c.logger,
Action: "DeleteObjects", Action: "DeleteObjects",
BucketOwner: parsedAcl.Owner, BucketOwner: parsedAcl.Owner,
EvSender: c.evSender,
EventName: s3event.EventObjectRemovedDeleteObjects,
}) })
} }
@@ -2077,7 +2079,7 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
EvSender: c.evSender, EvSender: c.evSender,
Action: "DeleteObject", Action: "DeleteObject",
BucketOwner: parsedAcl.Owner, BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectDeleted, EventName: s3event.EventObjectRemovedDelete,
Status: http.StatusNoContent, Status: http.StatusNoContent,
}) })
} }

View File

@@ -37,11 +37,11 @@ type EventMeta struct {
VersionId *string VersionId *string
} }
type EventFields struct { type EventSchema struct {
Records []EventSchema Records []EventRecord
} }
type EventSchema struct { type EventRecord struct {
EventVersion string `json:"eventVersion"` EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"` EventSource string `json:"eventSource"`
AwsRegion string `json:"awsRegion"` AwsRegion string `json:"awsRegion"`
@@ -139,54 +139,54 @@ func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
return evSender, err return evSender, err
} }
func createEventSchema(ctx *fiber.Ctx, meta EventMeta, configId ConfigurationId) ([]byte, error) { func createEventSchema(ctx *fiber.Ctx, meta EventMeta, configId ConfigurationId) EventSchema {
path := strings.Split(ctx.Path(), "/") path := strings.Split(ctx.Path(), "/")
bucket, object := path[1], strings.Join(path[2:], "/") bucket, object := path[1], strings.Join(path[2:], "/")
acc := ctx.Locals("account").(auth.Account) acc := ctx.Locals("account").(auth.Account)
event := []EventSchema{ return EventSchema{
{ Records: []EventRecord{
EventVersion: "2.2", {
EventSource: "aws:s3", EventVersion: "2.2",
AwsRegion: ctx.Locals("region").(string), EventSource: "aws:s3",
EventTime: time.Now().Format(time.RFC3339), AwsRegion: ctx.Locals("region").(string),
EventName: meta.EventName, EventTime: time.Now().Format(time.RFC3339),
UserIdentity: EventUserIdentity{ EventName: meta.EventName,
PrincipalId: acc.Access, UserIdentity: EventUserIdentity{
}, PrincipalId: acc.Access,
RequestParameters: EventRequestParams{ },
SourceIPAddress: ctx.IP(), RequestParameters: EventRequestParams{
}, SourceIPAddress: ctx.IP(),
ResponseElements: EventResponseElements{ },
RequestId: ctx.Get("X-Amz-Request-Id"), ResponseElements: EventResponseElements{
HostId: ctx.Get("X-Amz-Id-2"), RequestId: ctx.Get("X-Amz-Request-Id"),
}, HostId: ctx.Get("X-Amz-Id-2"),
S3: EventS3Data{ },
S3SchemaVersion: "1.0", S3: EventS3Data{
ConfigurationId: configId, S3SchemaVersion: "1.0",
Bucket: EventS3BucketData{ ConfigurationId: configId,
Name: bucket, Bucket: EventS3BucketData{
OwnerIdentity: EventUserIdentity{ Name: bucket,
PrincipalId: meta.BucketOwner, OwnerIdentity: EventUserIdentity{
PrincipalId: meta.BucketOwner,
},
Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")),
},
Object: EventObjectData{
Key: object,
Size: meta.ObjectSize,
ETag: meta.ObjectETag,
VersionId: meta.VersionId,
Sequencer: genSequencer(),
}, },
Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")),
}, },
Object: EventObjectData{ GlacierEventData: EventGlacierData{
Key: object, // Not supported
Size: meta.ObjectSize, RestoreEventData: EventRestoreData{},
ETag: meta.ObjectETag,
VersionId: meta.VersionId,
Sequencer: genSequencer(),
}, },
}, },
GlacierEventData: EventGlacierData{
// Not supported
RestoreEventData: EventRestoreData{},
},
}, },
} }
return json.Marshal(event)
} }
func generateTestEvent() ([]byte, error) { func generateTestEvent() ([]byte, error) {

View File

@@ -25,19 +25,21 @@ import (
type EventType string type EventType string
const ( const (
EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated
EventObjectCreatedPut EventType = "s3:ObjectCreated:Put" EventObjectCreatedPut EventType = "s3:ObjectCreated:Put"
EventObjectCreatedPost EventType = "s3:ObjectCreated:Post" EventObjectCreatedPost EventType = "s3:ObjectCreated:Post"
EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy" EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDeleted EventType = "s3:ObjectRemoved:Delete" // ObjectRemoved EventObjectRemoved EventType = "s3:ObjectRemoved:*"
EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging EventObjectRemovedDelete EventType = "s3:ObjectRemoved:Delete"
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" EventObjectRemovedDeleteObjects EventType = "s3:ObjectRemoved:DeleteObjects" // non AWS custom type for DeleteObjects
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging
EventObjectAclPut EventType = "s3:ObjectAcl:Put" EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectRestorePost EventType = "s3:ObjectRestore:Post" EventObjectAclPut EventType = "s3:ObjectAcl:Put"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore
EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post" // EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" // EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
) )
@@ -48,19 +50,21 @@ func (event EventType) IsValid() bool {
} }
var supportedEventFilters = map[EventType]struct{}{ var supportedEventFilters = map[EventType]struct{}{
EventObjectCreated: {}, EventObjectCreated: {},
EventObjectCreatedPut: {}, EventObjectCreatedPut: {},
EventObjectCreatedPost: {}, EventObjectCreatedPost: {},
EventObjectCreatedCopy: {}, EventObjectCreatedCopy: {},
EventCompleteMultipartUpload: {}, EventCompleteMultipartUpload: {},
EventObjectDeleted: {}, EventObjectRemoved: {},
EventObjectTagging: {}, EventObjectRemovedDelete: {},
EventObjectTaggingPut: {}, EventObjectRemovedDeleteObjects: {},
EventObjectTaggingDelete: {}, EventObjectTagging: {},
EventObjectAclPut: {}, EventObjectTaggingPut: {},
EventObjectRestore: {}, EventObjectTaggingDelete: {},
EventObjectRestorePost: {}, EventObjectAclPut: {},
EventObjectRestoreCompleted: {}, EventObjectRestore: {},
EventObjectRestorePost: {},
EventObjectRestoreCompleted: {},
} }
type EventFilter map[EventType]bool type EventFilter map[EventType]bool

View File

@@ -16,6 +16,8 @@ package s3event
import ( import (
"context" "context"
"encoding/json"
"encoding/xml"
"fmt" "fmt"
"os" "os"
"sync" "sync"
@@ -23,6 +25,7 @@ import (
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"github.com/versity/versitygw/s3response"
) )
var sequencer = 0 var sequencer = 0
@@ -78,12 +81,29 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
return return
} }
schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka) if meta.EventName == EventObjectRemovedDeleteObjects {
if err != nil { var dObj s3response.DeleteObjects
fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error())
if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil {
fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error())
return
}
// Events aren't send in correct order
for _, obj := range dObj.Objects {
key := *obj.Key
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
schema.Records[0].S3.Object.Key = key
schema.Records[0].S3.Object.VersionId = obj.VersionId
go ks.send(schema)
}
return return
} }
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
go ks.send(schema) go ks.send(schema)
} }
@@ -91,14 +111,20 @@ func (ks *Kafka) Close() error {
return ks.writer.Close() return ks.writer.Close()
} }
func (ks *Kafka) send(event []byte) { func (ks *Kafka) send(event EventSchema) {
eventBytes, err := json.Marshal(event)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error())
return
}
message := kafka.Message{ message := kafka.Message{
Key: []byte(ks.key), Key: []byte(ks.key),
Value: event, Value: eventBytes,
} }
ctx := context.Background() ctx := context.Background()
err := ks.writer.WriteMessages(ctx, message) err = ks.writer.WriteMessages(ctx, message)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error()) fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error())
} }

View File

@@ -15,12 +15,15 @@
package s3event package s3event
import ( import (
"encoding/json"
"encoding/xml"
"fmt" "fmt"
"os" "os"
"sync" "sync"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/versity/versitygw/s3response"
) )
type NatsEventSender struct { type NatsEventSender struct {
@@ -65,12 +68,29 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
return return
} }
schema, err := createEventSchema(ctx, meta, ConfigurationIdNats) if meta.EventName == EventObjectRemovedDeleteObjects {
if err != nil { var dObj s3response.DeleteObjects
fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error())
if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil {
fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error())
return
}
// Events aren't send in correct order
for _, obj := range dObj.Objects {
key := *obj.Key
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
schema.Records[0].S3.Object.Key = key
schema.Records[0].S3.Object.VersionId = obj.VersionId
go ns.send(schema)
}
return return
} }
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
go ns.send(schema) go ns.send(schema)
} }
@@ -79,8 +99,13 @@ func (ns *NatsEventSender) Close() error {
return nil return nil
} }
func (ns *NatsEventSender) send(event []byte) { func (ns *NatsEventSender) send(event EventSchema) {
err := ns.client.Publish(ns.topic, event) eventBytes, err := json.Marshal(event)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error())
return
}
err = ns.client.Publish(ns.topic, eventBytes)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error()) fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error())
} }

View File

@@ -16,6 +16,8 @@ package s3event
import ( import (
"bytes" "bytes"
"encoding/json"
"encoding/xml"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@@ -24,6 +26,7 @@ import (
"time" "time"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/versity/versitygw/s3response"
) )
type Webhook struct { type Webhook struct {
@@ -77,12 +80,29 @@ func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
return return
} }
schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook) if meta.EventName == EventObjectRemovedDeleteObjects {
if err != nil { var dObj s3response.DeleteObjects
fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error())
if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil {
fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error())
return
}
// Events aren't send in correct order
for _, obj := range dObj.Objects {
key := *obj.Key
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
schema.Records[0].S3.Object.Key = key
schema.Records[0].S3.Object.VersionId = obj.VersionId
go w.send(schema)
}
return return
} }
schema := createEventSchema(ctx, meta, ConfigurationIdWebhook)
go w.send(schema) go w.send(schema)
} }
@@ -90,8 +110,14 @@ func (w *Webhook) Close() error {
return nil return nil
} }
func (w *Webhook) send(event []byte) { func (w *Webhook) send(event EventSchema) {
req, err := http.NewRequest(http.MethodPost, w.url, bytes.NewReader(event)) eventBytes, err := json.Marshal(event)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error())
return
}
req, err := http.NewRequest(http.MethodPost, w.url, bytes.NewReader(eventBytes))
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "failed to create webhook event request: %v\n", err.Error()) fmt.Fprintf(os.Stderr, "failed to create webhook event request: %v\n", err.Error())
return return