feat: bucket event notifications

Set up Bucket event notifications interface to send aws compatible format event messages to a configured event service.
First integrated service is kafka message broker as an option for bucket event notifications.
This commit is contained in:
jonaustin09
2023-07-18 20:38:11 +04:00
committed by Ben McClelland
parent df7f01f7e2
commit fe547a19e9
13 changed files with 595 additions and 161 deletions

View File

@@ -27,18 +27,20 @@ import (
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3api"
"github.com/versity/versitygw/s3api/middlewares"
"github.com/versity/versitygw/s3event"
"github.com/versity/versitygw/s3log"
)
var (
port string
rootUserAccess string
rootUserSecret string
region string
certFile, keyFile string
logWebhookURL string
accessLog bool
debug bool
port string
rootUserAccess string
rootUserSecret string
region string
certFile, keyFile string
kafkaURL, kafkaTopic, kafkaKey string
logWebhookURL string
accessLog bool
debug bool
)
var (
@@ -154,6 +156,24 @@ func initFlags() []cli.Flag {
Usage: "webhook url to send the audit logs",
Destination: &logWebhookURL,
},
&cli.StringFlag{
Name: "event-kafka-url",
Usage: "kafka server url to send the bucket notifications.",
Destination: &kafkaURL,
Aliases: []string{"eku"},
},
&cli.StringFlag{
Name: "event-kafka-topic",
Usage: "kafka server pub-sub topic to send the bucket notifications to",
Destination: &kafkaTopic,
Aliases: []string{"ekt"},
},
&cli.StringFlag{
Name: "event-kafka-key",
Usage: "kafka server put-sub topic key to send the bucket notifications to",
Destination: &kafkaKey,
Aliases: []string{"ekk"},
},
}
}
@@ -203,11 +223,19 @@ func runGateway(ctx *cli.Context, be backend.Backend, s auth.Storer) error {
return fmt.Errorf("setup logger: %w", err)
}
evSender, err := s3event.InitEventSender(&s3event.EventConfig{
KafkaURL: kafkaURL,
KafkaTopic: kafkaTopic,
KafkaTopicKey: kafkaKey,
})
if err != nil {
return fmt.Errorf("unable to connect to the message broker: %w", err)
}
srv, err := s3api.New(app, be, middlewares.RootUserConfig{
Access: rootUserAccess,
Secret: rootUserSecret,
Region: region,
}, port, region, iam, logger, opts...)
}, port, region, iam, logger, evSender, opts...)
if err != nil {
return fmt.Errorf("init gateway: %v", err)
}

3
go.mod
View File

@@ -9,6 +9,7 @@ require (
github.com/gofiber/fiber/v2 v2.47.0
github.com/google/uuid v1.3.0
github.com/pkg/xattr v0.4.9
github.com/segmentio/kafka-go v0.4.42
github.com/urfave/cli/v2 v2.25.7
github.com/valyala/fasthttp v1.48.0
github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9
@@ -22,6 +23,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/stretchr/testify v1.8.1 // indirect
)
require (

31
go.sum
View File

@@ -40,8 +40,9 @@ github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs=
github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
@@ -52,6 +53,7 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
@@ -64,6 +66,9 @@ github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -78,7 +83,15 @@ github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94/go.mod h1:90zrgN3
github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
github.com/segmentio/kafka-go v0.4.42 h1:qffhBZCz4WcWyNuHEclHjIMLs2slp6mZO8px+5W5tfU=
github.com/segmentio/kafka-go v0.4.42/go.mod h1:d0g15xPMqoUookug0OU75DhGZxXwCFxSLeJ4uphwJzg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/tinylib/msgp v1.1.8 h1:FCXC1xanKO4I8plpHGH2P7koL/RzZs12l/+r7vakfm0=
github.com/tinylib/msgp v1.1.8/go.mod h1:qkpG+2ldGg4xRFmx+jfTvZPxfGFhi64BcnL9vkCm/Tw=
@@ -92,6 +105,12 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9 h1:ZfmQR01Kk6/kQh6+zlqfBYszVY02fzf9xYrchOY4NFM=
github.com/versity/scoutfs-go v0.0.0-20230606232754-0474b14343b9/go.mod h1:gJsq73k+4685y+rbDIpPY8i/5GbsiwP6JFoFyUDB1fQ=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU=
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -109,6 +128,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -123,16 +144,21 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -144,3 +170,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -32,27 +32,29 @@ import (
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3api/utils"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3event"
"github.com/versity/versitygw/s3log"
"github.com/versity/versitygw/s3response"
)
type S3ApiController struct {
be backend.Backend
iam auth.IAMService
logger s3log.AuditLogger
be backend.Backend
iam auth.IAMService
logger s3log.AuditLogger
evSender s3event.S3EventSender
}
func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger) S3ApiController {
return S3ApiController{be: be, iam: iam, logger: logger}
func New(be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender) S3ApiController {
return S3ApiController{be: be, iam: iam, logger: logger, evSender: evs}
}
func (c S3ApiController) ListBuckets(ctx *fiber.Ctx) error {
access, isRoot := ctx.Locals("access").(string), ctx.Locals("isRoot").(bool)
if err := auth.IsAdmin(access, isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListBucket"}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListBucket"})
}
res, err := c.be.ListBuckets()
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListBucket"}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListBucket"})
}
func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
@@ -71,22 +73,22 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
if ctx.Request().URI().QueryArgs().Has("tagging") {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner})
}
tags, err := c.be.GetTags(bucket, key)
if err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner})
}
resp := s3response.Tagging{TagSet: s3response.TagSet{Tags: []s3response.Tag{}}}
@@ -94,58 +96,52 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
resp.TagSet.Tags = append(resp.TagSet.Tags, s3response.Tag{Key: key, Value: val})
}
return SendXMLResponse(ctx, resp, nil, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, resp, nil, &MetaOpts{Logger: c.logger, Action: "GetObjectTagging", BucketOwner: parsedAcl.Owner})
}
if uploadId != "" {
if maxParts < 0 || (maxParts == 0 && ctx.Query("max-parts") != "") {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidMaxParts), &LogOptions{
Logger: c.logger,
Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner},
})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidMaxParts), &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner})
}
if partNumberMarker < 0 || (partNumberMarker == 0 && ctx.Query("part-number-marker") != "") {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPartNumberMarker), &LogOptions{
Logger: c.logger,
Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner},
})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPartNumberMarker), &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.ListObjectParts(bucket, key, uploadId, partNumberMarker, maxParts)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectParts", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjectParts", BucketOwner: parsedAcl.Owner})
}
if ctx.Request().URI().QueryArgs().Has("acl") {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.GetObjectAcl(bucket, key)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAcl", BucketOwner: parsedAcl.Owner})
}
if attrs := ctx.Get("X-Amz-Object-Attributes"); attrs != "" {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.GetObjectAttributes(bucket, key, strings.Split(attrs, ","))
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetObjectAttributes", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner})
}
ctx.Locals("logResBody", false)
res, err := c.be.GetObject(bucket, key, acceptRange, ctx.Response().BodyWriter())
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner})
}
if res == nil {
return SendResponse(ctx, fmt.Errorf("get object nil response"), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, fmt.Errorf("get object nil response"), &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner})
}
utils.SetMetaHeaders(ctx, res.Metadata)
@@ -179,7 +175,7 @@ func (c S3ApiController) GetActions(ctx *fiber.Ctx) error {
Value: string(res.StorageClass),
},
})
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "GetObject", BucketOwner: parsedAcl.Owner})
}
func getstring(s *string) string {
@@ -200,45 +196,45 @@ func (c S3ApiController) ListActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
if ctx.Request().URI().QueryArgs().Has("acl") {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ_ACP", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner})
}
res, err := auth.ParseACLOutput(data)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "GetBucketAcl", BucketOwner: parsedAcl.Owner})
}
if ctx.Request().URI().QueryArgs().Has("uploads") {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.ListMultipartUploads(&s3.ListMultipartUploadsInput{Bucket: aws.String(ctx.Params("bucket"))})
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListMultipartUploads", BucketOwner: parsedAcl.Owner})
}
if ctx.QueryInt("list-type") == 2 {
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.ListObjectsV2(bucket, prefix, marker, delimiter, maxkeys)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjectsV2", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjects", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "ListObjects", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.ListObjects(bucket, prefix, marker, delimiter, maxkeys)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "ListObjects", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "ListObjects", BucketOwner: parsedAcl.Owner})
}
func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
@@ -261,27 +257,27 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl"})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl"})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE_ACP", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
if len(ctx.Body()) > 0 {
if grants+acl != "" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
var accessControlPolicy auth.AccessControlPolicy
err := xml.Unmarshal(ctx.Body(), &accessControlPolicy)
if err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
input = &s3.PutBucketAclInput{
@@ -292,10 +288,10 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
}
if acl != "" {
if acl != "private" && acl != "public-read" && acl != "public-read-write" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
if len(ctx.Body()) > 0 || grants != "" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
input = &s3.PutBucketAclInput{
@@ -319,15 +315,15 @@ func (c S3ApiController) PutBucketActions(ctx *fiber.Ctx) error {
updAcl, err := auth.UpdateACL(input, parsedAcl, c.iam)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
err = c.be.PutBucketAcl(bucket, updAcl)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucketAcl", BucketOwner: parsedAcl.Owner})
}
err := c.be.PutBucket(bucket, access)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutBucket", BucketOwner: ctx.Locals("access").(string)}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutBucket", BucketOwner: ctx.Locals("access").(string)})
}
func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
@@ -370,19 +366,19 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
if ctx.Request().URI().QueryArgs().Has("tagging") {
var objTagging s3response.Tagging
err := xml.Unmarshal(ctx.Body(), &objTagging)
if err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner})
}
tags := make(map[string]string, len(objTagging.TagSet.Tags))
@@ -392,18 +388,23 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner})
}
err = c.be.SetTags(bucket, keyStart, tags)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "PutObjectTagging",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectTaggingPut,
})
}
if ctx.Request().URI().QueryArgs().Has("uploadId") && ctx.Request().URI().QueryArgs().Has("partNumber") && copySource != "" {
partNumber := ctx.QueryInt("partNumber", -1)
if partNumber < 1 || partNumber > 10000 {
return SendXMLResponse(ctx, nil, s3err.GetAPIError(s3err.ErrInvalidPart), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, s3err.GetAPIError(s3err.ErrInvalidPart), &MetaOpts{Logger: c.logger, Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner})
}
resp, err := c.be.UploadPartCopy(&s3.UploadPartCopyInput{
@@ -415,22 +416,22 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
ExpectedBucketOwner: &bucketOwner,
CopySourceRange: &copySrcRange,
})
return SendXMLResponse(ctx, resp, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, resp, err, &MetaOpts{Logger: c.logger, Action: "UploadPartCopy", BucketOwner: parsedAcl.Owner})
}
if ctx.Request().URI().QueryArgs().Has("uploadId") && ctx.Request().URI().QueryArgs().Has("partNumber") {
partNumber := ctx.QueryInt("partNumber", -1)
if partNumber < 1 || partNumber > 10000 {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPart), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidPart), &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner})
}
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner})
}
body := io.ReadSeeker(bytes.NewReader([]byte(ctx.Body())))
@@ -438,7 +439,7 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
etag, err := c.be.PutObjectPart(bucket, keyStart, uploadId,
partNumber, contentLength, body)
ctx.Response().Header.Set("Etag", etag)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectPart", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObjectPart", BucketOwner: parsedAcl.Owner})
}
if ctx.Request().URI().QueryArgs().Has("acl") {
@@ -446,13 +447,13 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
if len(ctx.Body()) > 0 {
if grants+acl != "" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner})
}
var accessControlPolicy auth.AccessControlPolicy
err := xml.Unmarshal(ctx.Body(), &accessControlPolicy)
if err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner})
}
input = &s3.PutObjectAclInput{
@@ -464,10 +465,10 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
}
if acl != "" {
if acl != "private" && acl != "public-read" && acl != "public-read-write" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner})
}
if len(ctx.Body()) > 0 || grants != "" {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner})
}
input = &s3.PutObjectAclInput{
@@ -492,7 +493,13 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
}
err = c.be.PutObjectAcl(input)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObjectAcl", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "PutObjectAcl",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectAclPut,
})
}
if copySource != "" {
@@ -502,22 +509,38 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
srcBucket, srcObject := copySourceSplit[0], copySourceSplit[1:]
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CopyObject", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CopyObject", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.CopyObject(srcBucket, strings.Join(srcObject, "/"), bucket, keyStart)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CopyObject", BucketOwner: parsedAcl.Owner}})
if err == nil {
return SendXMLResponse(ctx, res, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "CopyObject",
BucketOwner: parsedAcl.Owner,
ObjectETag: res.CopyObjectResult.ETag,
VersionId: res.VersionId,
EventName: s3event.EventObjectCopy,
})
} else {
return SendXMLResponse(ctx, res, err, &MetaOpts{
Logger: c.logger,
Action: "CopyObject",
BucketOwner: parsedAcl.Owner,
})
}
}
metadata := utils.GetUserMetaData(&ctx.Request().Header)
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "PutObject", BucketOwner: parsedAcl.Owner})
}
contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
if err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "PutObject", BucketOwner: parsedAcl.Owner})
}
ctx.Locals("logReqBody", false)
@@ -529,7 +552,15 @@ func (c S3ApiController) PutActions(ctx *fiber.Ctx) error {
Body: bytes.NewReader(ctx.Request().Body()),
})
ctx.Response().Header.Set("ETag", etag)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "PutObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "PutObject",
BucketOwner: parsedAcl.Owner,
ObjectETag: &etag,
ObjectSize: contentLength,
EventName: s3event.EventObjectPut,
})
}
func (c S3ApiController) DeleteBucket(ctx *fiber.Ctx) error {
@@ -537,20 +568,20 @@ func (c S3ApiController) DeleteBucket(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBuckets"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBuckets"})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBuckets"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBuckets"})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBucket", BucketOwner: parsedAcl.Owner})
}
err = c.be.DeleteBucket(bucket)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteBucket", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteBucket", BucketOwner: parsedAcl.Owner})
}
func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error {
@@ -559,24 +590,24 @@ func (c S3ApiController) DeleteObjects(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects"})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects"})
}
if err := xml.Unmarshal(ctx.Body(), &dObj); err != nil {
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidRequest), &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner})
}
err = c.be.DeleteObjects(bucket, &s3.DeleteObjectsInput{Delete: &dObj})
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObjects", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObjects", BucketOwner: parsedAcl.Owner})
}
func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
@@ -593,28 +624,34 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
if ctx.Request().URI().QueryArgs().Has("tagging") {
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner})
}
err = c.be.RemoveTags(bucket, key)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RemoveObjectTagging", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "RemoveObjectTagging",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectTaggingDelete,
})
}
if uploadId != "" {
expectedBucketOwner, requestPayer := ctx.Get("X-Amz-Expected-Bucket-Owner"), ctx.Get("X-Amz-Request-Payer")
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner})
}
err := c.be.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
@@ -624,15 +661,21 @@ func (c S3ApiController) DeleteActions(ctx *fiber.Ctx) error {
ExpectedBucketOwner: &expectedBucketOwner,
RequestPayer: types.RequestPayer(requestPayer),
})
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "AbortMultipartUpload", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "DeleteObject", BucketOwner: parsedAcl.Owner})
}
err = c.be.DeleteObject(bucket, key)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "DeleteObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "DeleteObject",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectDelete,
})
}
func (c S3ApiController) HeadBucket(ctx *fiber.Ctx) error {
@@ -640,21 +683,21 @@ func (c S3ApiController) HeadBucket(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket"})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket"})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket", BucketOwner: parsedAcl.Owner})
}
_, err = c.be.HeadBucket(bucket)
// TODO: set bucket response headers
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadBucket", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadBucket", BucketOwner: parsedAcl.Owner})
}
const (
@@ -671,24 +714,24 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject"})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject"}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject"})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "READ", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.HeadObject(bucket, key)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner})
}
if res == nil {
return SendResponse(ctx, fmt.Errorf("head object nil response"), &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, fmt.Errorf("head object nil response"), &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner})
}
utils.SetMetaHeaders(ctx, res.Metadata)
@@ -727,7 +770,7 @@ func (c S3ApiController) HeadObject(ctx *fiber.Ctx) error {
},
})
return SendResponse(ctx, nil, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "HeadObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, nil, &MetaOpts{Logger: c.logger, Action: "HeadObject", BucketOwner: parsedAcl.Owner})
}
func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
@@ -744,27 +787,33 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
data, err := c.be.GetBucketAcl(bucket)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
parsedAcl, err := auth.ParseACL(data)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger})
}
var restoreRequest s3.RestoreObjectInput
if ctx.Request().URI().QueryArgs().Has("restore") {
err := xml.Unmarshal(ctx.Body(), &restoreRequest)
if err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RestoreObject", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{Logger: c.logger, Action: "RestoreObject", BucketOwner: parsedAcl.Owner})
}
err = c.be.RestoreObject(bucket, key, &restoreRequest)
return SendResponse(ctx, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "RestoreObject", BucketOwner: parsedAcl.Owner}})
return SendResponse(ctx, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "RestoreObject",
BucketOwner: parsedAcl.Owner,
EventName: s3event.EventObjectRestoreCompleted,
})
}
if uploadId != "" {
@@ -773,33 +822,59 @@ func (c S3ApiController) CreateActions(ctx *fiber.Ctx) error {
}{}
if err := xml.Unmarshal(ctx.Body(), &data); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner})
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.CompleteMultipartUpload(bucket, key, uploadId, data.Parts)
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CompleteMultipartUpload", BucketOwner: parsedAcl.Owner}})
if err == nil {
return SendXMLResponse(ctx, res, err, &MetaOpts{
Logger: c.logger,
EvSender: c.evSender,
Action: "CompleteMultipartUpload",
BucketOwner: parsedAcl.Owner,
ObjectETag: res.ETag,
EventName: s3event.EventCompleteMultipartUpload,
VersionId: res.VersionId,
})
} else {
return SendXMLResponse(ctx, res, err, &MetaOpts{
Logger: c.logger,
Action: "CompleteMultipartUpload",
BucketOwner: parsedAcl.Owner,
})
}
}
if err := auth.VerifyACL(parsedAcl, bucket, access, "WRITE", isRoot); err != nil {
return SendXMLResponse(ctx, nil, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, nil, err, &MetaOpts{Logger: c.logger, Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner})
}
res, err := c.be.CreateMultipartUpload(&s3.CreateMultipartUploadInput{Bucket: &bucket, Key: &key})
return SendXMLResponse(ctx, res, err, &LogOptions{Logger: c.logger, Meta: s3log.LogMeta{Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner}})
return SendXMLResponse(ctx, res, err, &MetaOpts{Logger: c.logger, Action: "CreateMultipartUpload", BucketOwner: parsedAcl.Owner})
}
type LogOptions struct {
Logger s3log.AuditLogger
Meta s3log.LogMeta
type MetaOpts struct {
Logger s3log.AuditLogger
EvSender s3event.S3EventSender
Action string
BucketOwner string
ObjectSize int64
EventName s3event.EventType
ObjectETag *string
VersionId *string
}
func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error {
func SendResponse(ctx *fiber.Ctx, err error, l *MetaOpts) error {
if l.Logger != nil {
l.Logger.Log(ctx, err, nil, l.Meta)
l.Logger.Log(ctx, err, nil, s3log.LogMeta{
Action: l.Action,
BucketOwner: l.BucketOwner,
ObjectSize: l.ObjectSize,
})
}
if err != nil {
serr, ok := err.(s3err.APIError)
@@ -813,6 +888,15 @@ func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error {
return ctx.Send(s3err.GetAPIErrorResponse(
s3err.GetAPIError(s3err.ErrInternalError), "", "", ""))
}
if l.EvSender != nil {
l.EvSender.SendEvent(ctx, s3event.EventMeta{
ObjectSize: l.ObjectSize,
ObjectETag: l.ObjectETag,
EventName: l.EventName,
BucketOwner: l.BucketOwner,
VersionId: l.VersionId,
})
}
utils.LogCtxDetails(ctx, []byte{})
@@ -822,10 +906,14 @@ func SendResponse(ctx *fiber.Ctx, err error, l *LogOptions) error {
return nil
}
func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *LogOptions) error {
func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *MetaOpts) error {
if err != nil {
if l.Logger != nil {
l.Logger.Log(ctx, err, nil, l.Meta)
l.Logger.Log(ctx, err, nil, s3log.LogMeta{
Action: l.Action,
BucketOwner: l.BucketOwner,
ObjectSize: l.ObjectSize,
})
}
serr, ok := err.(s3err.APIError)
if ok {
@@ -854,7 +942,21 @@ func SendXMLResponse(ctx *fiber.Ctx, resp any, err error, l *LogOptions) error {
utils.LogCtxDetails(ctx, b)
if l.Logger != nil {
l.Logger.Log(ctx, nil, b, l.Meta)
l.Logger.Log(ctx, nil, b, s3log.LogMeta{
Action: l.Action,
BucketOwner: l.BucketOwner,
ObjectSize: l.ObjectSize,
})
}
if l.EvSender != nil {
l.EvSender.SendEvent(ctx, s3event.EventMeta{
BucketOwner: l.BucketOwner,
ObjectSize: l.ObjectSize,
ObjectETag: l.ObjectETag,
VersionId: l.VersionId,
EventName: l.EventName,
})
}
return ctx.Send(b)

View File

@@ -31,7 +31,6 @@ import (
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3err"
"github.com/versity/versitygw/s3log"
"github.com/versity/versitygw/s3response"
)
@@ -50,9 +49,8 @@ func init() {
func TestNew(t *testing.T) {
type args struct {
be backend.Backend
iam auth.IAMService
logger s3log.AuditLogger
be backend.Backend
iam auth.IAMService
}
be := backend.BackendUnsupported{}
@@ -76,7 +74,7 @@ func TestNew(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := New(tt.args.be, tt.args.iam, tt.args.logger); !reflect.DeepEqual(got, tt.want) {
if got := New(tt.args.be, tt.args.iam, nil, nil); !reflect.DeepEqual(got, tt.want) {
t.Errorf("New() = %v, want %v", got, tt.want)
}
})
@@ -659,10 +657,12 @@ func TestS3ApiController_PutActions(t *testing.T) {
return nil
},
CopyObjectFunc: func(srcBucket, srcObject, DstBucket, dstObject string) (*s3.CopyObjectOutput, error) {
return &s3.CopyObjectOutput{}, nil
return &s3.CopyObjectOutput{
CopyObjectResult: &types.CopyObjectResult{},
}, nil
},
PutObjectFunc: func(*s3.PutObjectInput) (string, error) {
return "Hey", nil
return "ETag", nil
},
PutObjectPartFunc: func(bucket, object, uploadID string, part int, length int64, r io.Reader) (string, error) {
return "hello", nil
@@ -1438,7 +1438,7 @@ func Test_XMLresponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := SendXMLResponse(tt.args.ctx, tt.args.resp, tt.args.err, &LogOptions{}); (err != nil) != tt.wantErr {
if err := SendXMLResponse(tt.args.ctx, tt.args.resp, tt.args.err, &MetaOpts{}); (err != nil) != tt.wantErr {
t.Errorf("response() %v error = %v, wantErr %v", tt.name, err, tt.wantErr)
}
@@ -1518,7 +1518,7 @@ func Test_response(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := SendResponse(tt.args.ctx, tt.args.err, &LogOptions{}); (err != nil) != tt.wantErr {
if err := SendResponse(tt.args.ctx, tt.args.err, &MetaOpts{}); (err != nil) != tt.wantErr {
t.Errorf("response() %v error = %v, wantErr %v", tt.name, err, tt.wantErr)
}

View File

@@ -39,18 +39,17 @@ const (
type RootUserConfig struct {
Access string
Secret string
Region string
}
func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.AuditLogger, region string, debug bool) fiber.Handler {
acct := accounts{root: root, iam: iam}
return func(ctx *fiber.Ctx) error {
ctx.Locals("region", root.Region)
ctx.Locals("region", region)
ctx.Locals("startTime", time.Now())
authorization := ctx.Get("Authorization")
if authorization == "" {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrAuthHeaderEmpty), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrAuthHeaderEmpty), &controllers.MetaOpts{Logger: logger})
}
// Check the signature version
@@ -60,22 +59,22 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au
}
if len(authParts) != 3 {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.MetaOpts{Logger: logger})
}
startParts := strings.Split(authParts[0], " ")
if startParts[0] != "AWS4-HMAC-SHA256" {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureVersionNotSupported), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureVersionNotSupported), &controllers.MetaOpts{Logger: logger})
}
credKv := strings.Split(startParts[1], "=")
if len(credKv) != 2 {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger})
}
creds := strings.Split(credKv[1], "/")
if len(creds) < 4 {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger})
}
ctx.Locals("access", creds[0])
@@ -83,29 +82,29 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au
signHdrKv := strings.Split(authParts[1], "=")
if len(signHdrKv) != 2 {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrCredMalformed), &controllers.MetaOpts{Logger: logger})
}
signedHdrs := strings.Split(signHdrKv[1], ";")
account, err := acct.getAccount(creds[0])
if err == auth.ErrNoSuchUser {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidAccessKeyID), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidAccessKeyID), &controllers.MetaOpts{Logger: logger})
}
if err != nil {
return controllers.SendResponse(ctx, err, &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, err, &controllers.MetaOpts{Logger: logger})
}
ctx.Locals("role", account.Role)
// Check X-Amz-Date header
date := ctx.Get("X-Amz-Date")
if date == "" {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingDateHeader), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingDateHeader), &controllers.MetaOpts{Logger: logger})
}
// Parse the date and check the date validity
tdate, err := time.Parse(iso8601Format, date)
if err != nil {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMalformedDate), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMalformedDate), &controllers.MetaOpts{Logger: logger})
}
hashPayloadHeader := ctx.Get("X-Amz-Content-Sha256")
@@ -118,14 +117,14 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au
// Compare the calculated hash with the hash provided
if hashPayloadHeader != hexPayload {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrContentSHA256Mismatch), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrContentSHA256Mismatch), &controllers.MetaOpts{Logger: logger})
}
}
// Create a new http request instance from fasthttp request
req, err := utils.CreateHttpRequestFromCtx(ctx, signedHdrs)
if err != nil {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.MetaOpts{Logger: logger})
}
signer := v4.NewSigner()
@@ -140,18 +139,18 @@ func VerifyV4Signature(root RootUserConfig, iam auth.IAMService, logger s3log.Au
}
})
if signErr != nil {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInternalError), &controllers.MetaOpts{Logger: logger})
}
parts := strings.Split(req.Header.Get("Authorization"), " ")
if len(parts) < 4 {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrMissingFields), &controllers.MetaOpts{Logger: logger})
}
calculatedSign := strings.Split(parts[3], "=")[1]
expectedSign := strings.Split(authParts[2], "=")[1]
if expectedSign != calculatedSign {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureDoesNotMatch), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrSignatureDoesNotMatch), &controllers.MetaOpts{Logger: logger})
}
return ctx.Next()

View File

@@ -35,7 +35,7 @@ func VerifyMD5Body(logger s3log.AuditLogger) fiber.Handler {
calculatedSum := base64.StdEncoding.EncodeToString(sum[:])
if incomingSum != calculatedSum {
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidDigest), &controllers.LogOptions{Logger: logger})
return controllers.SendResponse(ctx, s3err.GetAPIError(s3err.ErrInvalidDigest), &controllers.MetaOpts{Logger: logger})
}
return ctx.Next()

View File

@@ -19,13 +19,14 @@ import (
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3api/controllers"
"github.com/versity/versitygw/s3event"
"github.com/versity/versitygw/s3log"
)
type S3ApiRouter struct{}
func (sa *S3ApiRouter) Init(app *fiber.App, be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger) {
s3ApiController := controllers.New(be, iam, logger)
func (sa *S3ApiRouter) Init(app *fiber.App, be backend.Backend, iam auth.IAMService, logger s3log.AuditLogger, evs s3event.S3EventSender) {
s3ApiController := controllers.New(be, iam, logger, evs)
adminController := controllers.AdminController{IAMService: iam}
app.Patch("/create-user", adminController.CreateUser)

View File

@@ -45,7 +45,7 @@ func TestS3ApiRouter_Init(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.sa.Init(tt.args.app, tt.args.be, tt.args.iam, nil)
tt.sa.Init(tt.args.app, tt.args.be, tt.args.iam, nil, nil)
})
}
}

View File

@@ -22,6 +22,7 @@ import (
"github.com/versity/versitygw/auth"
"github.com/versity/versitygw/backend"
"github.com/versity/versitygw/s3api/middlewares"
"github.com/versity/versitygw/s3event"
"github.com/versity/versitygw/s3log"
)
@@ -34,7 +35,7 @@ type S3ApiServer struct {
debug bool
}
func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, port, region string, iam auth.IAMService, l s3log.AuditLogger, opts ...Option) (*S3ApiServer, error) {
func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, port, region string, iam auth.IAMService, l s3log.AuditLogger, evs s3event.S3EventSender, opts ...Option) (*S3ApiServer, error) {
server := &S3ApiServer{
app: app,
backend: be,
@@ -54,7 +55,7 @@ func New(app *fiber.App, be backend.Backend, root middlewares.RootUserConfig, po
app.Use(middlewares.VerifyV4Signature(root, iam, l, region, server.debug))
app.Use(middlewares.VerifyMD5Body(l))
server.router.Init(app, be, iam, l)
server.router.Init(app, be, iam, l, evs)
return server, nil
}

View File

@@ -64,7 +64,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotS3ApiServer, err := New(tt.args.app, tt.args.be, tt.args.root,
tt.args.port, "us-east-1", &auth.IAMServiceInternal{}, nil)
tt.args.port, "us-east-1", &auth.IAMServiceInternal{}, nil, nil)
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return

118
s3event/event.go Normal file
View File

@@ -0,0 +1,118 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3event
import "github.com/gofiber/fiber/v2"
type S3EventSender interface {
SendEvent(ctx *fiber.Ctx, meta EventMeta)
}
type EventMeta struct {
BucketOwner string
EventName EventType
ObjectSize int64
ObjectETag *string
VersionId *string
}
type EventFields struct {
Records []EventSchema
}
type EventType string
const (
EventObjectPut EventType = "s3:ObjectCreated:Put"
EventObjectCopy EventType = "s3:ObjectCreated:Copy"
EventCompleteMultipartUpload EventType = "s3:ObjectCreated:CompleteMultipartUpload"
EventObjectDelete EventType = "s3:ObjectRemoved:Delete"
EventObjectRestoreCompleted EventType = "s3:ObjectRestore:Completed"
EventObjectTaggingPut EventType = "s3:ObjectTagging:Put"
EventObjectTaggingDelete EventType = "s3:ObjectTagging:Delete"
EventObjectAclPut EventType = "s3:ObjectAcl:Put"
// Not supported
// EventObjectRestorePost EventType = "s3:ObjectRestore:Post"
// EventObjectRestoreDelete EventType = "s3:ObjectRestore:Delete"
)
type EventSchema struct {
EventVersion string `json:"eventVersion"`
EventSource string `json:"eventSource"`
AwsRegion string `json:"awsRegion"`
EventTime string `json:"eventTime"`
EventName EventType `json:"eventName"`
UserIdentity EventUserIdentity `json:"userIdentity"`
RequestParameters EventRequestParams `json:"requestParameters"`
ResponseElements EventResponseElements `json:"responseElements"`
S3 EventS3Data `json:"s3"`
GlacierEventData EventGlacierData `json:"glacierEventData"`
}
type EventUserIdentity struct {
PrincipalId string `json:"PrincipalId"`
}
type EventRequestParams struct {
SourceIPAddress string `json:"sourceIPAddress"`
}
type EventResponseElements struct {
RequestId string `json:"x-amz-request-id"`
HostId string `json:"x-amz-id-2"`
}
type EventS3Data struct {
S3SchemaVersion string `json:"s3SchemaVersion"`
ConfigurationId string `json:"configurationId"`
Bucket EventS3BucketData `json:"bucket"`
Object EventObjectData `json:"object"`
}
type EventGlacierData struct {
RestoreEventData EventRestoreData `json:"restoreEventData"`
}
type EventRestoreData struct {
LifecycleRestorationExpiryTime string `json:"lifecycleRestorationExpiryTime"`
LifecycleRestoreStorageClass string `json:"lifecycleRestoreStorageClass"`
}
type EventS3BucketData struct {
Name string `json:"name"`
OwnerIdentity EventUserIdentity `json:"ownerIdentity"`
Arn string `json:"arn"`
}
type EventObjectData struct {
Key string `json:"key"`
Size int64 `json:"size"`
ETag *string `json:"eTag"`
VersionId *string `json:"versionId"`
Sequencer string `json:"sequencer"`
}
type EventConfig struct {
KafkaURL string
KafkaTopic string
KafkaTopicKey string
}
func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
if cfg.KafkaURL != "" {
return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey)
}
return nil, nil
}

153
s3event/kafka.go Normal file
View File

@@ -0,0 +1,153 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3event
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/segmentio/kafka-go"
)
var sequencer = 0
type Kafka struct {
key string
writer *kafka.Writer
mu sync.Mutex
}
func InitKafkaEventService(url, topic, key string) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("kafka message topic should be specified")
}
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{url},
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchTimeout: 5 * time.Millisecond,
})
msg := map[string]string{
"Service": "S3",
"Event": "s3:TestEvent",
"Time": time.Now().Format(time.RFC3339),
"Bucket": "Test-Bucket",
}
msgJSON, err := json.Marshal(msg)
if err != nil {
return nil, err
}
message := kafka.Message{
Key: []byte(key),
Value: msgJSON,
}
ctx := context.Background()
err = w.WriteMessages(ctx, message)
if err != nil {
return nil, err
}
return &Kafka{
key: key,
writer: w,
}, nil
}
func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ks.mu.Lock()
defer ks.mu.Unlock()
path := strings.Split(ctx.Path(), "/")
bucket, object := path[1], strings.Join(path[2:], "/")
schema := EventSchema{
EventVersion: "2.2",
EventSource: "aws:s3",
AwsRegion: ctx.Locals("region").(string),
EventTime: time.Now().Format(time.RFC3339),
EventName: meta.EventName,
UserIdentity: EventUserIdentity{
PrincipalId: ctx.Locals("access").(string),
},
RequestParameters: EventRequestParams{
SourceIPAddress: ctx.IP(),
},
ResponseElements: EventResponseElements{
RequestId: ctx.Get("X-Amz-Request-Id"),
HostId: ctx.Get("X-Amx-Id-2"),
},
S3: EventS3Data{
S3SchemaVersion: "1.0",
// This field will come up after implementing per bucket notifications
ConfigurationId: "kafka-global",
Bucket: EventS3BucketData{
Name: bucket,
OwnerIdentity: EventUserIdentity{
PrincipalId: ctx.Locals("access").(string),
},
Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")),
},
Object: EventObjectData{
Key: object,
Size: meta.ObjectSize,
ETag: meta.ObjectETag,
VersionId: meta.VersionId,
Sequencer: genSequencer(),
},
},
GlacierEventData: EventGlacierData{
// Not supported
RestoreEventData: EventRestoreData{},
},
}
ks.send([]EventSchema{schema})
}
func (ks *Kafka) send(evnt []EventSchema) {
msg, err := json.Marshal(evnt)
if err != nil {
fmt.Fprintf(os.Stderr, "\nfailed to parse the event data: %v", err.Error())
return
}
message := kafka.Message{
Key: []byte(ks.key),
Value: msg,
}
ctx := context.Background()
err = ks.writer.WriteMessages(ctx, message)
if err != nil {
fmt.Fprintf(os.Stderr, "\nfailed to send kafka event: %v", err.Error())
}
}
func genSequencer() string {
sequencer = sequencer + 1
return fmt.Sprintf("%X", sequencer)
}