diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index 4db32c6..03b353b 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -27,18 +27,20 @@ import ( "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3api" "github.com/versity/versitygw/s3api/middlewares" + "github.com/versity/versitygw/s3event" "github.com/versity/versitygw/s3log" ) var ( - port string - rootUserAccess string - rootUserSecret string - region string - certFile, keyFile string - logWebhookURL string - accessLog bool - debug bool + port string + rootUserAccess string + rootUserSecret string + region string + certFile, keyFile string + kafkaURL, kafkaTopic, kafkaKey string + logWebhookURL string + accessLog bool + debug bool ) var ( @@ -154,6 +156,24 @@ func initFlags() []cli.Flag { Usage: "webhook url to send the audit logs", Destination: &logWebhookURL, }, + &cli.StringFlag{ + Name: "event-kafka-url", + Usage: "kafka server url to send the bucket notifications.", + Destination: &kafkaURL, + Aliases: []string{"eku"}, + }, + &cli.StringFlag{ + Name: "event-kafka-topic", + Usage: "kafka server pub-sub topic to send the bucket notifications to", + Destination: &kafkaTopic, + Aliases: []string{"ekt"}, + }, + &cli.StringFlag{ + Name: "event-kafka-key", + Usage: "kafka server put-sub topic key to send the bucket notifications to", + Destination: &kafkaKey, + Aliases: []string{"ekk"}, + }, } } @@ -203,11 +223,19 @@ func runGateway(ctx *cli.Context, be backend.Backend, s auth.Storer) error { return fmt.Errorf("setup logger: %w", err) } + evSender, err := s3event.InitEventSender(&s3event.EventConfig{ + KafkaURL: kafkaURL, + KafkaTopic: kafkaTopic, + KafkaTopicKey: kafkaKey, + }) + if err != nil { + return fmt.Errorf("unable to connect to the message broker: %w", err) + } + srv, err := s3api.New(app, be, middlewares.RootUserConfig{ Access: rootUserAccess, Secret: rootUserSecret, - Region: region, - }, port, region, iam, logger, opts...) + }, port, region, iam, logger, evSender, opts...) if err != nil { return fmt.Errorf("init gateway: %v", err) } diff --git a/go.mod b/go.mod index acb3c55..492b7ef 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/gofiber/fiber/v2 v2.47.0 github.com/google/uuid v1.3.0 github.com/pkg/xattr v0.4.9 + github.com/segmentio/kafka-go v0.4.42 github.com/urfave/cli/v2 v2.25.7 github.com/valyala/fasthttp v1.48.0 github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9 @@ -22,6 +23,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/stretchr/testify v1.8.1 // indirect ) require ( diff --git a/go.sum b/go.sum index de20398..dad6f44 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,9 @@ github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs= github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= @@ -52,6 +53,7 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -64,6 +66,9 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= +github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -78,7 +83,15 @@ github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94/go.mod h1:90zrgN3 github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= +github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU= +github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw= github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0= github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw= @@ -92,6 +105,12 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9 h1:ZfmQR01Kk6/kQh6+zlqfBYszVY02fzf9xYrchOY4NFM= github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9/go.mod h1:gJsq73k+4685y+rbDIpPY8i/5GbsiwP6JFoFyUDB1fQ= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -109,6 +128,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -123,16 +144,21 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -144,3 +170,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/s3api/controllers/base.go b/s3api/controllers/base.go index c84deac..6975127 100644 --- a/s3api/controllers/base.go +++ b/s3api/controllers/base.go @@ -32,27 +32,29 @@ import ( "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3api/utils" "github.com/versity/versitygw/s3err" + "github.com/versity/versitygw/s3event" "github.com/versity/versitygw/s3log" "github.com/versity/versitygw/s3response" ) type S3ApiController struct { - be backend.Backend - iam auth.IAMService - logger s3log.AuditLogger + be backend.Backend + iam auth.IAMService + logger s3log.AuditLogger + evSender s3event.S3EventSender } -func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger) S3ApiController { - return S3ApiController{be: be, iam: iam, logger: logger} +func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender) S3ApiController { + return S3ApiController{be: be, iam: iam, logger: logger, evSender: evs} } func (c S3ApiController) ListBuckets(ctx *fiber.Ctx) error { access, isRoot := ctx.Locals("access").(string), ctx.Locals("isRoot").(bool) if err := auth.IsAdmin(access, isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListBucket"}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListBucket"}) } res, err := c.be.ListBuckets() - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListBucket"}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListBucket"}) } func (c S3ApiController) GetActions(ctx *fiber.Ctx) error { @@ -71,22 +73,22 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } if ctx.Request().URI().QueryArgs().Has("tagging") { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}) } tags, err := c.be.GetTags(bucket, key) if err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}) } resp := s3response.Tagging{TagSet: s3response.TagSet{Tags: []s3response.Tag{}}} @@ -94,58 +96,52 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error { resp.TagSet.Tags = append(resp.TagSet.Tags, s3response.Tag{Key: key, Value: val}) } - return SendXMLResponse(ctx, resp, nil, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, resp, nil, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}) } if uploadId != "" { if maxParts < 0 || (maxParts == 0 && ctx.Query("max-parts") != "") { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidMaxParts), &LogOptions{ - Logger: c.logger, - Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}, - }) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidMaxParts), &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}) } if partNumberMarker < 0 || (partNumberMarker == 0 && ctx.Query("part-number-marker") != "") { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPartNumberMarker), &LogOptions{ - Logger: c.logger, - Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}, - }) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPartNumberMarker), &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}) } res, err := c.be.ListObjectParts(bucket, key, uploadId, partNumberMarker, maxParts) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}) } if ctx.Request().URI().QueryArgs().Has("acl") { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}) } res, err := c.be.GetObjectAcl(bucket, key) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}) } if attrs := ctx.Get("X-Amz-Object-Attributes"); attrs != "" { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}) } res, err := c.be.GetObjectAttributes(bucket, key, strings.Split(attrs, ",")) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner}) } ctx.Locals("logResBody", false) res, err := c.be.GetObject(bucket, key, acceptRange, ctx.Response().BodyWriter()) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner}) } if res == nil { - return SendResponse(ctx, fmt.Errorf("get object nil response"), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, fmt.Errorf("get object nil response"), &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner}) } utils.SetMetaHeaders(ctx, res.Metadata) @@ -179,7 +175,7 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error { Value: string(res.StorageClass), }, }) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner}) } func getstring(s *string) string { @@ -200,45 +196,45 @@ func (c S3ApiController) ListActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } if ctx.Request().URI().QueryArgs().Has("acl") { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}) } res, err := auth.ParseACLOutput(data) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}) } if ctx.Request().URI().QueryArgs().Has("uploads") { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}) } res, err := c.be.ListMultipartUploads(&s3.ListMultipartUploadsInput{Bucket: aws.String(ctx.Params("bucket"))}) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}) } if ctx.QueryInt("list-type") == 2 { if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}) } res, err := c.be.ListObjectsV2(bucket, prefix, marker, delimiter, maxkeys) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjects", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjects", BucketOwner: parsedAcl.Owner}) } res, err := c.be.ListObjects(bucket, prefix, marker, delimiter, maxkeys) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjects", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjects", BucketOwner: parsedAcl.Owner}) } func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error { @@ -261,27 +257,27 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl"}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl"}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE_ACP", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } if len(ctx.Body()) > 0 { if grants+acl != "" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } var accessControlPolicy auth.AccessControlPolicy err := xml.Unmarshal(ctx.Body(), &accessControlPolicy) if err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } input = &s3.PutBucketAclInput{ @@ -292,10 +288,10 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error { } if acl != "" { if acl != "private" && acl != "public-read" && acl != "public-read-write" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } if len(ctx.Body()) > 0 || grants != "" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } input = &s3.PutBucketAclInput{ @@ -319,15 +315,15 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error { updAcl, err := auth.UpdateACL(input, parsedAcl, c.iam) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } err = c.be.PutBucketAcl(bucket, updAcl) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}) } err := c.be.PutBucket(bucket, access) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucket", BucketOwner: ctx.Locals("access").(string)}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucket", BucketOwner: ctx.Locals("access").(string)}) } func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { @@ -370,19 +366,19 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } if ctx.Request().URI().QueryArgs().Has("tagging") { var objTagging s3response.Tagging err := xml.Unmarshal(ctx.Body(), &objTagging) if err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}) } tags := make(map[string]string, len(objTagging.TagSet.Tags)) @@ -392,18 +388,23 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}) } err = c.be.SetTags(bucket, keyStart, tags) - - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "PutObjectTagging", + BucketOwner: parsedAcl.Owner, + EventName: s3event.EventObjectTaggingPut, + }) } if ctx.Request().URI().QueryArgs().Has("uploadId") && ctx.Request().URI().QueryArgs().Has("partNumber") && copySource != "" { partNumber := ctx.QueryInt("partNumber", -1) if partNumber < 1 || partNumber > 10000 { - return SendXMLResponse(ctx, nil, s3err.GetAPIError(s3err.ErrInvalidPart), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, s3err.GetAPIError(s3err.ErrInvalidPart), &MetaOpts{Logger: c.logger, Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}) } resp, err := c.be.UploadPartCopy(&s3.UploadPartCopyInput{ @@ -415,22 +416,22 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { ExpectedBucketOwner: &bucketOwner, CopySourceRange: ©SrcRange, }) - return SendXMLResponse(ctx, resp, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, resp, err, &MetaOpts{Logger: c.logger, Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}) } if ctx.Request().URI().QueryArgs().Has("uploadId") && ctx.Request().URI().QueryArgs().Has("partNumber") { partNumber := ctx.QueryInt("partNumber", -1) if partNumber < 1 || partNumber > 10000 { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPart), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPart), &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}) } contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64) if err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}) } body := io.ReadSeeker(bytes.NewReader([]byte(ctx.Body()))) @@ -438,7 +439,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { etag, err := c.be.PutObjectPart(bucket, keyStart, uploadId, partNumber, contentLength, body) ctx.Response().Header.Set("Etag", etag) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}) } if ctx.Request().URI().QueryArgs().Has("acl") { @@ -446,13 +447,13 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { if len(ctx.Body()) > 0 { if grants+acl != "" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}) } var accessControlPolicy auth.AccessControlPolicy err := xml.Unmarshal(ctx.Body(), &accessControlPolicy) if err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}) } input = &s3.PutObjectAclInput{ @@ -464,10 +465,10 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { } if acl != "" { if acl != "private" && acl != "public-read" && acl != "public-read-write" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}) } if len(ctx.Body()) > 0 || grants != "" { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}) } input = &s3.PutObjectAclInput{ @@ -492,7 +493,13 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { } err = c.be.PutObjectAcl(input) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "PutObjectAcl", + BucketOwner: parsedAcl.Owner, + EventName: s3event.EventObjectAclPut, + }) } if copySource != "" { @@ -502,22 +509,38 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { srcBucket, srcObject := copySourceSplit[0], copySourceSplit[1:] if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CopyObject", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CopyObject", BucketOwner: parsedAcl.Owner}) } res, err := c.be.CopyObject(srcBucket, strings.Join(srcObject, "/"), bucket, keyStart) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CopyObject", BucketOwner: parsedAcl.Owner}}) + if err == nil { + return SendXMLResponse(ctx, res, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "CopyObject", + BucketOwner: parsedAcl.Owner, + ObjectETag: res.CopyObjectResult.ETag, + VersionId: res.VersionId, + EventName: s3event.EventObjectCopy, + }) + } else { + return SendXMLResponse(ctx, res, err, &MetaOpts{ + Logger: c.logger, + Action: "CopyObject", + BucketOwner: parsedAcl.Owner, + }) + } } metadata := utils.GetUserMetaData(&ctx.Request().Header) if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObject", BucketOwner: parsedAcl.Owner}) } contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64) if err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObject", BucketOwner: parsedAcl.Owner}) } ctx.Locals("logReqBody", false) @@ -529,7 +552,15 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error { Body: bytes.NewReader(ctx.Request().Body()), }) ctx.Response().Header.Set("ETag", etag) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "PutObject", + BucketOwner: parsedAcl.Owner, + ObjectETag: &etag, + ObjectSize: contentLength, + EventName: s3event.EventObjectPut, + }) } func (c S3ApiController) DeleteBucket(ctx *fiber.Ctx) error { @@ -537,20 +568,20 @@ func (c S3ApiController) DeleteBucket(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBuckets"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBuckets"}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBuckets"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBuckets"}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}) } err = c.be.DeleteBucket(bucket) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}) } func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error { @@ -559,24 +590,24 @@ func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects"}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects"}) } if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil { - return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}) } err = c.be.DeleteObjects(bucket, &s3.DeleteObjectsInput{Delete: &dObj}) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}) } func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error { @@ -593,28 +624,34 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } if ctx.Request().URI().QueryArgs().Has("tagging") { if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner}) } err = c.be.RemoveTags(bucket, key) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "RemoveObjectTagging", + BucketOwner: parsedAcl.Owner, + EventName: s3event.EventObjectTaggingDelete, + }) } if uploadId != "" { expectedBucketOwner, requestPayer := ctx.Get("X-Amz-Expected-Bucket-Owner"), ctx.Get("X-Amz-Request-Payer") if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}) } err := c.be.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ @@ -624,15 +661,21 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error { ExpectedBucketOwner: &expectedBucketOwner, RequestPayer: types.RequestPayer(requestPayer), }) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObject", BucketOwner: parsedAcl.Owner}) } err = c.be.DeleteObject(bucket, key) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "DeleteObject", + BucketOwner: parsedAcl.Owner, + EventName: s3event.EventObjectDelete, + }) } func (c S3ApiController) HeadBucket(ctx *fiber.Ctx) error { @@ -640,21 +683,21 @@ func (c S3ApiController) HeadBucket(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket"}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket"}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket", BucketOwner: parsedAcl.Owner}) } _, err = c.be.HeadBucket(bucket) // TODO: set bucket response headers - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket", BucketOwner: parsedAcl.Owner}) } const ( @@ -671,24 +714,24 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject"}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject"}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject"}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner}) } res, err := c.be.HeadObject(bucket, key) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner}) } if res == nil { - return SendResponse(ctx, fmt.Errorf("head object nil response"), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, fmt.Errorf("head object nil response"), &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner}) } utils.SetMetaHeaders(ctx, res.Metadata) @@ -727,7 +770,7 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error { }, }) - return SendResponse(ctx, nil, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, nil, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner}) } func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error { @@ -744,27 +787,33 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error { data, err := c.be.GetBucketAcl(bucket) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } parsedAcl, err := auth.ParseACL(data) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger}) } var restoreRequest s3.RestoreObjectInput if ctx.Request().URI().QueryArgs().Has("restore") { err := xml.Unmarshal(ctx.Body(), &restoreRequest) if err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RestoreObject", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RestoreObject", BucketOwner: parsedAcl.Owner}) } err = c.be.RestoreObject(bucket, key, &restoreRequest) - return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}}) + return SendResponse(ctx, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "RestoreObject", + BucketOwner: parsedAcl.Owner, + EventName: s3event.EventObjectRestoreCompleted, + }) } if uploadId != "" { @@ -773,33 +822,59 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error { }{} if err := xml.Unmarshal(ctx.Body(), &data); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}) } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}) } res, err := c.be.CompleteMultipartUpload(bucket, key, uploadId, data.Parts) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}}) + if err == nil { + return SendXMLResponse(ctx, res, err, &MetaOpts{ + Logger: c.logger, + EvSender: c.evSender, + Action: "CompleteMultipartUpload", + BucketOwner: parsedAcl.Owner, + ObjectETag: res.ETag, + EventName: s3event.EventCompleteMultipartUpload, + VersionId: res.VersionId, + }) + } else { + return SendXMLResponse(ctx, res, err, &MetaOpts{ + Logger: c.logger, + Action: "CompleteMultipartUpload", + BucketOwner: parsedAcl.Owner, + }) + } } if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil { - return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}) } res, err := c.be.CreateMultipartUpload(&s3.CreateMultipartUploadInput{Bucket: &bucket, Key: &key}) - return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}}) + return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}) } -type LogOptions struct { - Logger s3log.AuditLogger - Meta s3log.LogMeta +type MetaOpts struct { + Logger s3log.AuditLogger + EvSender s3event.S3EventSender + Action string + BucketOwner string + ObjectSize int64 + EventName s3event.EventType + ObjectETag *string + VersionId *string } -func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error { +func SendResponse(ctx *fiber.Ctx, err error, l *MetaOpts) error { if l.Logger != nil { - l.Logger.Log(ctx, err, nil, l.Meta) + l.Logger.Log(ctx, err, nil, s3log.LogMeta{ + Action: l.Action, + BucketOwner: l.BucketOwner, + ObjectSize: l.ObjectSize, + }) } if err != nil { serr, ok := err.(s3err.APIError) @@ -813,6 +888,15 @@ func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error { return ctx.Send(s3err.GetAPIErrorResponse( s3err.GetAPIError(s3err.ErrInternalError), "", "", "")) } + if l.EvSender != nil { + l.EvSender.SendEvent(ctx, s3event.EventMeta{ + ObjectSize: l.ObjectSize, + ObjectETag: l.ObjectETag, + EventName: l.EventName, + BucketOwner: l.BucketOwner, + VersionId: l.VersionId, + }) + } utils.LogCtxDetails(ctx, []byte{}) @@ -822,10 +906,14 @@ func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error { return nil } -func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *LogOptions) error { +func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *MetaOpts) error { if err != nil { if l.Logger != nil { - l.Logger.Log(ctx, err, nil, l.Meta) + l.Logger.Log(ctx, err, nil, s3log.LogMeta{ + Action: l.Action, + BucketOwner: l.BucketOwner, + ObjectSize: l.ObjectSize, + }) } serr, ok := err.(s3err.APIError) if ok { @@ -854,7 +942,21 @@ func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *LogOptions) error { utils.LogCtxDetails(ctx, b) if l.Logger != nil { - l.Logger.Log(ctx, nil, b, l.Meta) + l.Logger.Log(ctx, nil, b, s3log.LogMeta{ + Action: l.Action, + BucketOwner: l.BucketOwner, + ObjectSize: l.ObjectSize, + }) + } + + if l.EvSender != nil { + l.EvSender.SendEvent(ctx, s3event.EventMeta{ + BucketOwner: l.BucketOwner, + ObjectSize: l.ObjectSize, + ObjectETag: l.ObjectETag, + VersionId: l.VersionId, + EventName: l.EventName, + }) } return ctx.Send(b) diff --git a/s3api/controllers/base_test.go b/s3api/controllers/base_test.go index cd080f5..d1db2da 100644 --- a/s3api/controllers/base_test.go +++ b/s3api/controllers/base_test.go @@ -31,7 +31,6 @@ import ( "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3err" - "github.com/versity/versitygw/s3log" "github.com/versity/versitygw/s3response" ) @@ -50,9 +49,8 @@ func init() { func TestNew(t *testing.T) { type args struct { - be backend.Backend - iam auth.IAMService - logger s3log.AuditLogger + be backend.Backend + iam auth.IAMService } be := backend.BackendUnsupported{} @@ -76,7 +74,7 @@ func TestNew(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := New(tt.args.be, tt.args.iam, tt.args.logger); !reflect.DeepEqual(got, tt.want) { + if got := New(tt.args.be, tt.args.iam, nil, nil); !reflect.DeepEqual(got, tt.want) { t.Errorf("New() = %v, want %v", got, tt.want) } }) @@ -659,10 +657,12 @@ func TestS3ApiController_PutActions(t *testing.T) { return nil }, CopyObjectFunc: func(srcBucket, srcObject, DstBucket, dstObject string) (*s3.CopyObjectOutput, error) { - return &s3.CopyObjectOutput{}, nil + return &s3.CopyObjectOutput{ + CopyObjectResult: &types.CopyObjectResult{}, + }, nil }, PutObjectFunc: func(*s3.PutObjectInput) (string, error) { - return "Hey", nil + return "ETag", nil }, PutObjectPartFunc: func(bucket, object, uploadID string, part int, length int64, r io.Reader) (string, error) { return "hello", nil @@ -1438,7 +1438,7 @@ func Test_XMLresponse(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := SendXMLResponse(tt.args.ctx, tt.args.resp, tt.args.err, &LogOptions{}); (err != nil) != tt.wantErr { + if err := SendXMLResponse(tt.args.ctx, tt.args.resp, tt.args.err, &MetaOpts{}); (err != nil) != tt.wantErr { t.Errorf("response() %v error = %v, wantErr %v", tt.name, err, tt.wantErr) } @@ -1518,7 +1518,7 @@ func Test_response(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := SendResponse(tt.args.ctx, tt.args.err, &LogOptions{}); (err != nil) != tt.wantErr { + if err := SendResponse(tt.args.ctx, tt.args.err, &MetaOpts{}); (err != nil) != tt.wantErr { t.Errorf("response() %v error = %v, wantErr %v", tt.name, err, tt.wantErr) } diff --git a/s3api/middlewares/authentication.go b/s3api/middlewares/authentication.go index 25cdfdd..8fed3c4 100644 --- a/s3api/middlewares/authentication.go +++ b/s3api/middlewares/authentication.go @@ -39,18 +39,17 @@ const ( type RootUserConfig struct { Access string Secret string - Region string } func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.AuditLogger, region string, debug bool) fiber.Handler { acct := accounts{root: root, iam: iam} return func(ctx *fiber.Ctx) error { - ctx.Locals("region", root.Region) + ctx.Locals("region", region) ctx.Locals("startTime", time.Now()) authorization := ctx.Get("Authorization") if authorization == "" { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrAuthHeaderEmpty), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrAuthHeaderEmpty), &controllers.MetaOpts{Logger: logger}) } // Check the signature version @@ -60,22 +59,22 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au } if len(authParts) != 3 { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.MetaOpts{Logger: logger}) } startParts := strings.Split(authParts[0], " ") if startParts[0] != "AWS4-HMAC-SHA256" { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureVersionNotSupported), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureVersionNotSupported), &controllers.MetaOpts{Logger: logger}) } credKv := strings.Split(startParts[1], "=") if len(credKv) != 2 { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger}) } creds := strings.Split(credKv[1], "/") if len(creds) < 4 { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger}) } ctx.Locals("access", creds[0]) @@ -83,29 +82,29 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au signHdrKv := strings.Split(authParts[1], "=") if len(signHdrKv) != 2 { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger}) } signedHdrs := strings.Split(signHdrKv[1], ";") account, err := acct.getAccount(creds[0]) if err == auth.ErrNoSuchUser { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidAccessKeyID), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidAccessKeyID), &controllers.MetaOpts{Logger: logger}) } if err != nil { - return controllers.SendResponse(ctx, err, &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, err, &controllers.MetaOpts{Logger: logger}) } ctx.Locals("role", account.Role) // Check X-Amz-Date header date := ctx.Get("X-Amz-Date") if date == "" { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingDateHeader), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingDateHeader), &controllers.MetaOpts{Logger: logger}) } // Parse the date and check the date validity tdate, err := time.Parse(iso8601Format, date) if err != nil { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMalformedDate), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMalformedDate), &controllers.MetaOpts{Logger: logger}) } hashPayloadHeader := ctx.Get("X-Amz-Content-Sha256") @@ -118,14 +117,14 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au // Compare the calculated hash with the hash provided if hashPayloadHeader != hexPayload { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrContentSHA256Mismatch), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrContentSHA256Mismatch), &controllers.MetaOpts{Logger: logger}) } } // Create a new http request instance from fasthttp request req, err := utils.CreateHttpRequestFromCtx(ctx, signedHdrs) if err != nil { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.MetaOpts{Logger: logger}) } signer := v4.NewSigner() @@ -140,18 +139,18 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au } }) if signErr != nil { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.MetaOpts{Logger: logger}) } parts := strings.Split(req.Header.Get("Authorization"), " ") if len(parts) < 4 { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.MetaOpts{Logger: logger}) } calculatedSign := strings.Split(parts[3], "=")[1] expectedSign := strings.Split(authParts[2], "=")[1] if expectedSign != calculatedSign { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureDoesNotMatch), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureDoesNotMatch), &controllers.MetaOpts{Logger: logger}) } return ctx.Next() diff --git a/s3api/middlewares/md5.go b/s3api/middlewares/md5.go index 7fcc1e9..2da5f1a 100644 --- a/s3api/middlewares/md5.go +++ b/s3api/middlewares/md5.go @@ -35,7 +35,7 @@ func VerifyMD5Body(logger s3log.AuditLogger) fiber.Handler { calculatedSum := base64.StdEncoding.EncodeToString(sum[:]) if incomingSum != calculatedSum { - return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidDigest), &controllers.LogOptions{Logger: logger}) + return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidDigest), &controllers.MetaOpts{Logger: logger}) } return ctx.Next() diff --git a/s3api/router.go b/s3api/router.go index bc50666..69740ce 100644 --- a/s3api/router.go +++ b/s3api/router.go @@ -19,13 +19,14 @@ import ( "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3api/controllers" + "github.com/versity/versitygw/s3event" "github.com/versity/versitygw/s3log" ) type S3ApiRouter struct{} -func (sa *S3ApiRouter) Init(app *fiber.App, be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger) { - s3ApiController := controllers.New(be, iam, logger) +func (sa *S3ApiRouter) Init(app *fiber.App, be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender) { + s3ApiController := controllers.New(be, iam, logger, evs) adminController := controllers.AdminController{IAMService: iam} app.Patch("/create-user", adminController.CreateUser) diff --git a/s3api/router_test.go b/s3api/router_test.go index b2c9e0e..a72d983 100644 --- a/s3api/router_test.go +++ b/s3api/router_test.go @@ -45,7 +45,7 @@ func TestS3ApiRouter_Init(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tt.sa.Init(tt.args.app, tt.args.be, tt.args.iam, nil) + tt.sa.Init(tt.args.app, tt.args.be, tt.args.iam, nil, nil) }) } } diff --git a/s3api/server.go b/s3api/server.go index 0b59fc5..e92e7cf 100644 --- a/s3api/server.go +++ b/s3api/server.go @@ -22,6 +22,7 @@ import ( "github.com/versity/versitygw/auth" "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3api/middlewares" + "github.com/versity/versitygw/s3event" "github.com/versity/versitygw/s3log" ) @@ -34,7 +35,7 @@ type S3ApiServer struct { debug bool } -func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, port, region string, iam auth.IAMService, l s3log.AuditLogger, opts ...Option) (*S3ApiServer, error) { +func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, port, region string, iam auth.IAMService, l s3log.AuditLogger, evs s3event.S3EventSender, opts ...Option) (*S3ApiServer, error) { server := &S3ApiServer{ app: app, backend: be, @@ -54,7 +55,7 @@ func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, po app.Use(middlewares.VerifyV4Signature(root, iam, l, region, server.debug)) app.Use(middlewares.VerifyMD5Body(l)) - server.router.Init(app, be, iam, l) + server.router.Init(app, be, iam, l, evs) return server, nil } diff --git a/s3api/server_test.go b/s3api/server_test.go index 9917cb7..9d4890b 100644 --- a/s3api/server_test.go +++ b/s3api/server_test.go @@ -64,7 +64,7 @@ func TestNew(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { gotS3ApiServer, err := New(tt.args.app, tt.args.be, tt.args.root, - tt.args.port, "us-east-1", &auth.IAMServiceInternal{}, nil) + tt.args.port, "us-east-1", &auth.IAMServiceInternal{}, nil, nil) if (err != nil) != tt.wantErr { t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/s3event/event.go b/s3event/event.go new file mode 100644 index 0000000..a8d6fa8 --- /dev/null +++ b/s3event/event.go @@ -0,0 +1,118 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import "github.com/gofiber/fiber/v2" + +type S3EventSender interface { + SendEvent(ctx *fiber.Ctx, meta EventMeta) +} + +type EventMeta struct { + BucketOwner string + EventName EventType + ObjectSize int64 + ObjectETag *string + VersionId *string +} + +type EventFields struct { + Records []EventSchema +} + +type EventType string + +const ( + EventObjectPut EventType = "s3:ObjectCreated:Put" + EventObjectCopy EventType = "s3:ObjectCreated:Copy" + EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload" + EventObjectDelete EventType = "s3:ObjectRemoved:Delete" + EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed" + EventObjectTaggingPut EventType = "s3:ObjectTagging:Put" + EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete" + EventObjectAclPut EventType = "s3:ObjectAcl:Put" + // Not supported + // EventObjectRestorePost EventType = "s3:ObjectRestore:Post" + // EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete" +) + +type EventSchema struct { + EventVersion string `json:"eventVersion"` + EventSource string `json:"eventSource"` + AwsRegion string `json:"awsRegion"` + EventTime string `json:"eventTime"` + EventName EventType `json:"eventName"` + UserIdentity EventUserIdentity `json:"userIdentity"` + RequestParameters EventRequestParams `json:"requestParameters"` + ResponseElements EventResponseElements `json:"responseElements"` + S3 EventS3Data `json:"s3"` + GlacierEventData EventGlacierData `json:"glacierEventData"` +} + +type EventUserIdentity struct { + PrincipalId string `json:"PrincipalId"` +} + +type EventRequestParams struct { + SourceIPAddress string `json:"sourceIPAddress"` +} + +type EventResponseElements struct { + RequestId string `json:"x-amz-request-id"` + HostId string `json:"x-amz-id-2"` +} + +type EventS3Data struct { + S3SchemaVersion string `json:"s3SchemaVersion"` + ConfigurationId string `json:"configurationId"` + Bucket EventS3BucketData `json:"bucket"` + Object EventObjectData `json:"object"` +} + +type EventGlacierData struct { + RestoreEventData EventRestoreData `json:"restoreEventData"` +} + +type EventRestoreData struct { + LifecycleRestorationExpiryTime string `json:"lifecycleRestorationExpiryTime"` + LifecycleRestoreStorageClass string `json:"lifecycleRestoreStorageClass"` +} + +type EventS3BucketData struct { + Name string `json:"name"` + OwnerIdentity EventUserIdentity `json:"ownerIdentity"` + Arn string `json:"arn"` +} + +type EventObjectData struct { + Key string `json:"key"` + Size int64 `json:"size"` + ETag *string `json:"eTag"` + VersionId *string `json:"versionId"` + Sequencer string `json:"sequencer"` +} + +type EventConfig struct { + KafkaURL string + KafkaTopic string + KafkaTopicKey string +} + +func InitEventSender(cfg *EventConfig) (S3EventSender, error) { + if cfg.KafkaURL != "" { + return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) + } + return nil, nil +} diff --git a/s3event/kafka.go b/s3event/kafka.go new file mode 100644 index 0000000..ddad1bd --- /dev/null +++ b/s3event/kafka.go @@ -0,0 +1,153 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/segmentio/kafka-go" +) + +var sequencer = 0 + +type Kafka struct { + key string + writer *kafka.Writer + mu sync.Mutex +} + +func InitKafkaEventService(url, topic, key string) (S3EventSender, error) { + if topic == "" { + return nil, fmt.Errorf("kafka message topic should be specified") + } + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{url}, + Topic: topic, + Balancer: &kafka.LeastBytes{}, + BatchTimeout: 5 * time.Millisecond, + }) + + msg := map[string]string{ + "Service": "S3", + "Event": "s3:TestEvent", + "Time": time.Now().Format(time.RFC3339), + "Bucket": "Test-Bucket", + } + + msgJSON, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + message := kafka.Message{ + Key: []byte(key), + Value: msgJSON, + } + + ctx := context.Background() + + err = w.WriteMessages(ctx, message) + if err != nil { + return nil, err + } + + return &Kafka{ + key: key, + writer: w, + }, nil +} + +func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) { + ks.mu.Lock() + defer ks.mu.Unlock() + + path := strings.Split(ctx.Path(), "/") + bucket, object := path[1], strings.Join(path[2:], "/") + + schema := EventSchema{ + EventVersion: "2.2", + EventSource: "aws:s3", + AwsRegion: ctx.Locals("region").(string), + EventTime: time.Now().Format(time.RFC3339), + EventName: meta.EventName, + UserIdentity: EventUserIdentity{ + PrincipalId: ctx.Locals("access").(string), + }, + RequestParameters: EventRequestParams{ + SourceIPAddress: ctx.IP(), + }, + ResponseElements: EventResponseElements{ + RequestId: ctx.Get("X-Amz-Request-Id"), + HostId: ctx.Get("X-Amx-Id-2"), + }, + S3: EventS3Data{ + S3SchemaVersion: "1.0", + // This field will come up after implementing per bucket notifications + ConfigurationId: "kafka-global", + Bucket: EventS3BucketData{ + Name: bucket, + OwnerIdentity: EventUserIdentity{ + PrincipalId: ctx.Locals("access").(string), + }, + Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), + }, + Object: EventObjectData{ + Key: object, + Size: meta.ObjectSize, + ETag: meta.ObjectETag, + VersionId: meta.VersionId, + Sequencer: genSequencer(), + }, + }, + GlacierEventData: EventGlacierData{ + // Not supported + RestoreEventData: EventRestoreData{}, + }, + } + + ks.send([]EventSchema{schema}) +} + +func (ks *Kafka) send(evnt []EventSchema) { + msg, err := json.Marshal(evnt) + if err != nil { + fmt.Fprintf(os.Stderr, "\nfailed to parse the event data: %v", err.Error()) + return + } + + message := kafka.Message{ + Key: []byte(ks.key), + Value: msg, + } + + ctx := context.Background() + err = ks.writer.WriteMessages(ctx, message) + if err != nil { + fmt.Fprintf(os.Stderr, "\nfailed to send kafka event: %v", err.Error()) + } +} + +func genSequencer() string { + sequencer = sequencer + 1 + return fmt.Sprintf("%X", sequencer) +}