diff --git a/cmd/versitygw/utils.go b/cmd/versitygw/utils.go index d334338..25e0e3d 100644 --- a/cmd/versitygw/utils.go +++ b/cmd/versitygw/utils.go @@ -54,19 +54,21 @@ func generateEventFiltersConfig(ctx *cli.Context) error { } config := s3event.EventFilter{ - s3event.EventObjectCreated: true, - s3event.EventObjectCreatedPut: true, - s3event.EventObjectCreatedPost: true, - s3event.EventObjectCreatedCopy: true, - s3event.EventCompleteMultipartUpload: true, - s3event.EventObjectDeleted: true, - s3event.EventObjectTagging: true, - s3event.EventObjectTaggingPut: true, - s3event.EventObjectTaggingDelete: true, - s3event.EventObjectAclPut: true, - s3event.EventObjectRestore: true, - s3event.EventObjectRestorePost: true, - s3event.EventObjectRestoreCompleted: true, + s3event.EventObjectCreated: true, + s3event.EventObjectCreatedPut: true, + s3event.EventObjectCreatedPost: true, + s3event.EventObjectCreatedCopy: true, + s3event.EventCompleteMultipartUpload: true, + s3event.EventObjectRemoved: true, + s3event.EventObjectRemovedDelete: true, + s3event.EventObjectRemovedDeleteObjects: true, + s3event.EventObjectTagging: true, + s3event.EventObjectTaggingPut: true, + s3event.EventObjectTaggingDelete: true, + s3event.EventObjectAclPut: true, + s3event.EventObjectRestore: true, + s3event.EventObjectRestorePost: true, + s3event.EventObjectRestoreCompleted: true, } configBytes, err := json.Marshal(config) diff --git a/s3api/controllers/base.go b/s3api/controllers/base.go index 97509d5..96f7194 100644 --- a/s3api/controllers/base.go +++ b/s3api/controllers/base.go @@ -1947,6 +1947,8 @@ func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error { Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner, + EvSender: c.evSender, + EventName: s3event.EventObjectRemovedDeleteObjects, }) } @@ -2077,7 +2079,7 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error { EvSender: c.evSender, Action: "DeleteObject", BucketOwner: parsedAcl.Owner, - EventName: s3event.EventObjectDeleted, + EventName: s3event.EventObjectRemovedDelete, Status: http.StatusNoContent, }) } diff --git a/s3event/event.go b/s3event/event.go index 7253ef7..475f447 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -37,11 +37,11 @@ type EventMeta struct { VersionId *string } -type EventFields struct { - Records []EventSchema +type EventSchema struct { + Records []EventRecord } -type EventSchema struct { +type EventRecord struct { EventVersion string `json:"eventVersion"` EventSource string `json:"eventSource"` AwsRegion string `json:"awsRegion"` @@ -139,54 +139,54 @@ func InitEventSender(cfg *EventConfig) (S3EventSender, error) { return evSender, err } -func createEventSchema(ctx *fiber.Ctx, meta EventMeta, configId ConfigurationId) ([]byte, error) { +func createEventSchema(ctx *fiber.Ctx, meta EventMeta, configId ConfigurationId) EventSchema { path := strings.Split(ctx.Path(), "/") bucket, object := path[1], strings.Join(path[2:], "/") acc := ctx.Locals("account").(auth.Account) - event := []EventSchema{ - { - EventVersion: "2.2", - EventSource: "aws:s3", - AwsRegion: ctx.Locals("region").(string), - EventTime: time.Now().Format(time.RFC3339), - EventName: meta.EventName, - UserIdentity: EventUserIdentity{ - PrincipalId: acc.Access, - }, - RequestParameters: EventRequestParams{ - SourceIPAddress: ctx.IP(), - }, - ResponseElements: EventResponseElements{ - RequestId: ctx.Get("X-Amz-Request-Id"), - HostId: ctx.Get("X-Amz-Id-2"), - }, - S3: EventS3Data{ - S3SchemaVersion: "1.0", - ConfigurationId: configId, - Bucket: EventS3BucketData{ - Name: bucket, - OwnerIdentity: EventUserIdentity{ - PrincipalId: meta.BucketOwner, + return EventSchema{ + Records: []EventRecord{ + { + EventVersion: "2.2", + EventSource: "aws:s3", + AwsRegion: ctx.Locals("region").(string), + EventTime: time.Now().Format(time.RFC3339), + EventName: meta.EventName, + UserIdentity: EventUserIdentity{ + PrincipalId: acc.Access, + }, + RequestParameters: EventRequestParams{ + SourceIPAddress: ctx.IP(), + }, + ResponseElements: EventResponseElements{ + RequestId: ctx.Get("X-Amz-Request-Id"), + HostId: ctx.Get("X-Amz-Id-2"), + }, + S3: EventS3Data{ + S3SchemaVersion: "1.0", + ConfigurationId: configId, + Bucket: EventS3BucketData{ + Name: bucket, + OwnerIdentity: EventUserIdentity{ + PrincipalId: meta.BucketOwner, + }, + Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), + }, + Object: EventObjectData{ + Key: object, + Size: meta.ObjectSize, + ETag: meta.ObjectETag, + VersionId: meta.VersionId, + Sequencer: genSequencer(), }, - Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), }, - Object: EventObjectData{ - Key: object, - Size: meta.ObjectSize, - ETag: meta.ObjectETag, - VersionId: meta.VersionId, - Sequencer: genSequencer(), + GlacierEventData: EventGlacierData{ + // Not supported + RestoreEventData: EventRestoreData{}, }, }, - GlacierEventData: EventGlacierData{ - // Not supported - RestoreEventData: EventRestoreData{}, - }, }, } - - return json.Marshal(event) } func generateTestEvent() ([]byte, error) { diff --git a/s3event/filter.go b/s3event/filter.go index 81fb61c..356ccd1 100644 --- a/s3event/filter.go +++ b/s3event/filter.go @@ -25,19 +25,21 @@ import ( type EventType string const ( - EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated - EventObjectCreatedPut EventType = "s3:ObjectCreated:Put" - EventObjectCreatedPost EventType = "s3:ObjectCreated:Post" - EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy" - EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" - EventObjectDeleted EventType = "s3:ObjectRemoved:Delete" // ObjectRemoved - EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging - EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" - EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" - EventObjectAclPut EventType = "s3:ObjectAcl:Put" - EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore - EventObjectRestorePost EventType = "s3:ObjectRestore:Post" - EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" + EventObjectCreated EventType = "s3:ObjectCreated:*" // ObjectCreated + EventObjectCreatedPut EventType = "s3:ObjectCreated:Put" + EventObjectCreatedPost EventType = "s3:ObjectCreated:Post" + EventObjectCreatedCopy EventType = "s3:ObjectCreated:Copy" + EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" + EventObjectRemoved EventType = "s3:ObjectRemoved:*" + EventObjectRemovedDelete EventType = "s3:ObjectRemoved:Delete" + EventObjectRemovedDeleteObjects EventType = "s3:ObjectRemoved:DeleteObjects" // non AWS custom type for DeleteObjects + EventObjectTagging EventType = "s3:ObjectTagging:*" // ObjectTagging + EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" + EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" + EventObjectAclPut EventType = "s3:ObjectAcl:Put" + EventObjectRestore EventType = "s3:ObjectRestore:*" // ObjectRestore + EventObjectRestorePost EventType = "s3:ObjectRestore:Post" + EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" // EventObjectRestorePost EventType = "s3:ObjectRestore:Post" // EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" ) @@ -48,19 +50,21 @@ func (event EventType) IsValid() bool { } var supportedEventFilters = map[EventType]struct{}{ - EventObjectCreated: {}, - EventObjectCreatedPut: {}, - EventObjectCreatedPost: {}, - EventObjectCreatedCopy: {}, - EventCompleteMultipartUpload: {}, - EventObjectDeleted: {}, - EventObjectTagging: {}, - EventObjectTaggingPut: {}, - EventObjectTaggingDelete: {}, - EventObjectAclPut: {}, - EventObjectRestore: {}, - EventObjectRestorePost: {}, - EventObjectRestoreCompleted: {}, + EventObjectCreated: {}, + EventObjectCreatedPut: {}, + EventObjectCreatedPost: {}, + EventObjectCreatedCopy: {}, + EventCompleteMultipartUpload: {}, + EventObjectRemoved: {}, + EventObjectRemovedDelete: {}, + EventObjectRemovedDeleteObjects: {}, + EventObjectTagging: {}, + EventObjectTaggingPut: {}, + EventObjectTaggingDelete: {}, + EventObjectAclPut: {}, + EventObjectRestore: {}, + EventObjectRestorePost: {}, + EventObjectRestoreCompleted: {}, } type EventFilter map[EventType]bool diff --git a/s3event/kafka.go b/s3event/kafka.go index 5123ec1..257c1c4 100644 --- a/s3event/kafka.go +++ b/s3event/kafka.go @@ -16,6 +16,8 @@ package s3event import ( "context" + "encoding/json" + "encoding/xml" "fmt" "os" "sync" @@ -23,6 +25,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/segmentio/kafka-go" + "github.com/versity/versitygw/s3response" ) var sequencer = 0 @@ -78,12 +81,29 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) { return } - schema, err := createEventSchema(ctx, meta, ConfigurationIdKafka) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to create kafka event: %v\n", err.Error()) + if meta.EventName == EventObjectRemovedDeleteObjects { + var dObj s3response.DeleteObjects + + if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil { + fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error()) + return + } + + // Events aren't send in correct order + for _, obj := range dObj.Objects { + key := *obj.Key + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + schema.Records[0].S3.Object.Key = key + schema.Records[0].S3.Object.VersionId = obj.VersionId + + go ks.send(schema) + } + return } + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + go ks.send(schema) } @@ -91,14 +111,20 @@ func (ks *Kafka) Close() error { return ks.writer.Close() } -func (ks *Kafka) send(event []byte) { +func (ks *Kafka) send(event EventSchema) { + eventBytes, err := json.Marshal(event) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error()) + return + } + message := kafka.Message{ Key: []byte(ks.key), - Value: event, + Value: eventBytes, } ctx := context.Background() - err := ks.writer.WriteMessages(ctx, message) + err = ks.writer.WriteMessages(ctx, message) if err != nil { fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error()) } diff --git a/s3event/nats.go b/s3event/nats.go index d18b7da..189105a 100644 --- a/s3event/nats.go +++ b/s3event/nats.go @@ -15,12 +15,15 @@ package s3event import ( + "encoding/json" + "encoding/xml" "fmt" "os" "sync" "github.com/gofiber/fiber/v2" "github.com/nats-io/nats.go" + "github.com/versity/versitygw/s3response" ) type NatsEventSender struct { @@ -65,12 +68,29 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { return } - schema, err := createEventSchema(ctx, meta, ConfigurationIdNats) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to create nats event: %v\n", err.Error()) + if meta.EventName == EventObjectRemovedDeleteObjects { + var dObj s3response.DeleteObjects + + if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil { + fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error()) + return + } + + // Events aren't send in correct order + for _, obj := range dObj.Objects { + key := *obj.Key + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + schema.Records[0].S3.Object.Key = key + schema.Records[0].S3.Object.VersionId = obj.VersionId + + go ns.send(schema) + } + return } + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + go ns.send(schema) } @@ -79,8 +99,13 @@ func (ns *NatsEventSender) Close() error { return nil } -func (ns *NatsEventSender) send(event []byte) { - err := ns.client.Publish(ns.topic, event) +func (ns *NatsEventSender) send(event EventSchema) { + eventBytes, err := json.Marshal(event) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error()) + return + } + err = ns.client.Publish(ns.topic, eventBytes) if err != nil { fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error()) } diff --git a/s3event/webhook.go b/s3event/webhook.go index f8d9d9e..bce571c 100644 --- a/s3event/webhook.go +++ b/s3event/webhook.go @@ -16,6 +16,8 @@ package s3event import ( "bytes" + "encoding/json" + "encoding/xml" "fmt" "net" "net/http" @@ -24,6 +26,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/versity/versitygw/s3response" ) type Webhook struct { @@ -77,12 +80,29 @@ func (w *Webhook) SendEvent(ctx *fiber.Ctx, meta EventMeta) { return } - schema, err := createEventSchema(ctx, meta, ConfigurationIdWebhook) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to create webhook event: %v\n", err.Error()) + if meta.EventName == EventObjectRemovedDeleteObjects { + var dObj s3response.DeleteObjects + + if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil { + fmt.Fprintf(os.Stderr, "failed to parse delete objects input payload: %v\n", err.Error()) + return + } + + // Events aren't send in correct order + for _, obj := range dObj.Objects { + key := *obj.Key + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + schema.Records[0].S3.Object.Key = key + schema.Records[0].S3.Object.VersionId = obj.VersionId + + go w.send(schema) + } + return } + schema := createEventSchema(ctx, meta, ConfigurationIdWebhook) + go w.send(schema) } @@ -90,8 +110,14 @@ func (w *Webhook) Close() error { return nil } -func (w *Webhook) send(event []byte) { - req, err := http.NewRequest(http.MethodPost, w.url, bytes.NewReader(event)) +func (w *Webhook) send(event EventSchema) { + eventBytes, err := json.Marshal(event) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to parse event data: %v\n", err.Error()) + return + } + + req, err := http.NewRequest(http.MethodPost, w.url, bytes.NewReader(eventBytes)) if err != nil { fmt.Fprintf(os.Stderr, "failed to create webhook event request: %v\n", err.Error()) return