diff --git a/cmd/versitygw/main.go b/cmd/versitygw/main.go index 03b353b..907a531 100644 --- a/cmd/versitygw/main.go +++ b/cmd/versitygw/main.go @@ -38,6 +38,7 @@ var ( region string certFile, keyFile string kafkaURL, kafkaTopic, kafkaKey string + natsURL, natsTopic string logWebhookURL string accessLog bool debug bool @@ -174,6 +175,18 @@ func initFlags() []cli.Flag { Destination: &kafkaKey, Aliases: []string{"ekk"}, }, + &cli.StringFlag{ + Name: "event-nats-url", + Usage: "nats server url to send the bucket notifications", + Destination: &natsURL, + Aliases: []string{"enu"}, + }, + &cli.StringFlag{ + Name: "event-nats-topic", + Usage: "nats server pub-sub topic to send the bucket notifications to", + Destination: &natsTopic, + Aliases: []string{"ent"}, + }, } } @@ -227,6 +240,8 @@ func runGateway(ctx *cli.Context, be backend.Backend, s auth.Storer) error { KafkaURL: kafkaURL, KafkaTopic: kafkaTopic, KafkaTopicKey: kafkaKey, + NatsURL: natsURL, + NatsTopic: natsTopic, }) if err != nil { return fmt.Errorf("unable to connect to the message broker: %w", err) diff --git a/go.mod b/go.mod index 492b7ef..5e110ee 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/aws/smithy-go v1.13.5 github.com/gofiber/fiber/v2 v2.47.0 github.com/google/uuid v1.3.0 + github.com/nats-io/nats.go v1.28.0 github.com/pkg/xattr v0.4.9 github.com/segmentio/kafka-go v0.4.42 github.com/urfave/cli/v2 v2.25.7 @@ -22,9 +23,15 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/nats-io/nats-server/v2 v2.9.20 // indirect + github.com/nats-io/nkeys v0.4.4 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/stretchr/testify v1.8.1 // indirect + golang.org/x/crypto v0.11.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect ) require ( diff --git a/go.sum b/go.sum index dad6f44..406add0 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs= github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -63,6 +67,16 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= +github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k= +github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= +github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= +github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= +github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= +github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw= github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0= @@ -119,6 +133,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= @@ -129,7 +145,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -158,7 +174,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -166,7 +183,12 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/s3event/event.go b/s3event/event.go index a8d6fa8..cf53247 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -14,7 +14,11 @@ package s3event -import "github.com/gofiber/fiber/v2" +import ( + "fmt" + + "github.com/gofiber/fiber/v2" +) type S3EventSender interface { SendEvent(ctx *fiber.Ctx, meta EventMeta) @@ -108,9 +112,17 @@ type EventConfig struct { KafkaURL string KafkaTopic string KafkaTopicKey string + NatsURL string + NatsTopic string } func InitEventSender(cfg *EventConfig) (S3EventSender, error) { + if cfg.KafkaURL != "" && cfg.NatsURL != "" { + return nil, fmt.Errorf("there should be specified one of the following: kafka, nats") + } + if cfg.NatsURL != "" { + return InitNatsNotifSender(cfg.NatsURL, cfg.NatsTopic) + } if cfg.KafkaURL != "" { return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) } diff --git a/s3event/nats.go b/s3event/nats.go new file mode 100644 index 0000000..1f36e9d --- /dev/null +++ b/s3event/nats.go @@ -0,0 +1,113 @@ +// Copyright 2023 Versity Software +// This file is licensed under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package s3event + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/gofiber/fiber/v2" + "github.com/nats-io/nats.go" +) + +type NatsEventSender struct { + EventFields + topic string + client *nats.Conn + mu sync.Mutex +} + +func InitNatsNotifSender(url, topic string) (S3EventSender, error) { + if topic == "" { + return nil, fmt.Errorf("nats message topic should be specified") + } + + client, err := nats.Connect(url) + if err != nil { + return nil, err + } + + return &NatsEventSender{ + topic: topic, + client: client, + }, nil +} + +func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { + ns.mu.Lock() + defer ns.mu.Unlock() + + path := strings.Split(ctx.Path(), "/") + bucket, object := path[1], strings.Join(path[2:], "/") + + schema := EventSchema{ + EventVersion: "2.2", + EventSource: "aws:s3", + AwsRegion: ctx.Locals("region").(string), + EventTime: time.Now().Format(time.RFC3339), + EventName: meta.EventName, + UserIdentity: EventUserIdentity{ + PrincipalId: ctx.Locals("access").(string), + }, + RequestParameters: EventRequestParams{ + SourceIPAddress: ctx.IP(), + }, + ResponseElements: EventResponseElements{ + RequestId: ctx.Get("X-Amz-Request-Id"), + HostId: ctx.Get("X-Amx-Id-2"), + }, + S3: EventS3Data{ + S3SchemaVersion: "1.0", + // This field will come up after implementing per bucket notifications + ConfigurationId: "nats-global", + Bucket: EventS3BucketData{ + Name: bucket, + OwnerIdentity: EventUserIdentity{ + PrincipalId: ctx.Locals("access").(string), + }, + Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")), + }, + Object: EventObjectData{ + Key: object, + Size: meta.ObjectSize, + ETag: meta.ObjectETag, + VersionId: meta.VersionId, + Sequencer: genSequencer(), + }, + }, + GlacierEventData: EventGlacierData{ + // Not supported + RestoreEventData: EventRestoreData{}, + }, + } + + ns.Records = []EventSchema{schema} + ns.sendEvent() +} + +func (ns *NatsEventSender) sendEvent() { + jsonEvent, err := json.Marshal(ns) + if err != nil { + fmt.Printf("\n failed to parse the event data: %v", err.Error()) + } + + err = ns.client.Publish(ns.topic, jsonEvent) + if err != nil { + fmt.Println("failed to send nats event: ", err.Error()) + } +}