Merge pull request #133 from versity/event-notif-nats

Bucket event notifications(nats)
This commit is contained in:
Ben McClelland
2023-07-20 13:50:53 -07:00
committed by GitHub
5 changed files with 172 additions and 3 deletions

View File

@@ -38,6 +38,7 @@ var (
region string
certFile, keyFile string
kafkaURL, kafkaTopic, kafkaKey string
natsURL, natsTopic string
logWebhookURL string
accessLog bool
debug bool
@@ -174,6 +175,18 @@ func initFlags() []cli.Flag {
Destination: &kafkaKey,
Aliases: []string{"ekk"},
},
&cli.StringFlag{
Name: "event-nats-url",
Usage: "nats server url to send the bucket notifications",
Destination: &natsURL,
Aliases: []string{"enu"},
},
&cli.StringFlag{
Name: "event-nats-topic",
Usage: "nats server pub-sub topic to send the bucket notifications to",
Destination: &natsTopic,
Aliases: []string{"ent"},
},
}
}
@@ -227,6 +240,8 @@ func runGateway(ctx *cli.Context, be backend.Backend, s auth.Storer) error {
KafkaURL: kafkaURL,
KafkaTopic: kafkaTopic,
KafkaTopicKey: kafkaKey,
NatsURL: natsURL,
NatsTopic: natsTopic,
})
if err != nil {
return fmt.Errorf("unable to connect to the message broker: %w", err)

7
go.mod
View File

@@ -8,6 +8,7 @@ require (
github.com/aws/smithy-go v1.13.5
github.com/gofiber/fiber/v2 v2.47.0
github.com/google/uuid v1.3.0
github.com/nats-io/nats.go v1.28.0
github.com/pkg/xattr v0.4.9
github.com/segmentio/kafka-go v0.4.42
github.com/urfave/cli/v2 v2.25.7
@@ -22,9 +23,15 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.12.12 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.12 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/nats-io/nats-server/v2 v2.9.20 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/stretchr/testify v1.8.1 // indirect
golang.org/x/crypto v0.11.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
require (

26
go.sum
View File

@@ -45,6 +45,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs=
github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
@@ -63,6 +67,16 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/nats-server/v2 v2.9.20 h1:bt1dW6xsL1hWWwv7Hovm+EJt5L6iplyqlgEFkoEUk0k=
github.com/nats-io/nats-server/v2 v2.9.20/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0=
@@ -119,6 +133,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
@@ -129,7 +145,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -158,7 +174,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -166,7 +183,12 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -14,7 +14,11 @@
package s3event
import "github.com/gofiber/fiber/v2"
import (
"fmt"
"github.com/gofiber/fiber/v2"
)
type S3EventSender interface {
SendEvent(ctx *fiber.Ctx, meta EventMeta)
@@ -108,9 +112,17 @@ type EventConfig struct {
KafkaURL string
KafkaTopic string
KafkaTopicKey string
NatsURL string
NatsTopic string
}
func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
if cfg.KafkaURL != "" && cfg.NatsURL != "" {
return nil, fmt.Errorf("there should be specified one of the following: kafka, nats")
}
if cfg.NatsURL != "" {
return InitNatsNotifSender(cfg.NatsURL, cfg.NatsTopic)
}
if cfg.KafkaURL != "" {
return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey)
}

113
s3event/nats.go Normal file
View File

@@ -0,0 +1,113 @@
// Copyright 2023 Versity Software
// This file is licensed under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package s3event
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/gofiber/fiber/v2"
"github.com/nats-io/nats.go"
)
type NatsEventSender struct {
EventFields
topic string
client *nats.Conn
mu sync.Mutex
}
func InitNatsNotifSender(url, topic string) (S3EventSender, error) {
if topic == "" {
return nil, fmt.Errorf("nats message topic should be specified")
}
client, err := nats.Connect(url)
if err != nil {
return nil, err
}
return &NatsEventSender{
topic: topic,
client: client,
}, nil
}
func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
ns.mu.Lock()
defer ns.mu.Unlock()
path := strings.Split(ctx.Path(), "/")
bucket, object := path[1], strings.Join(path[2:], "/")
schema := EventSchema{
EventVersion: "2.2",
EventSource: "aws:s3",
AwsRegion: ctx.Locals("region").(string),
EventTime: time.Now().Format(time.RFC3339),
EventName: meta.EventName,
UserIdentity: EventUserIdentity{
PrincipalId: ctx.Locals("access").(string),
},
RequestParameters: EventRequestParams{
SourceIPAddress: ctx.IP(),
},
ResponseElements: EventResponseElements{
RequestId: ctx.Get("X-Amz-Request-Id"),
HostId: ctx.Get("X-Amx-Id-2"),
},
S3: EventS3Data{
S3SchemaVersion: "1.0",
// This field will come up after implementing per bucket notifications
ConfigurationId: "nats-global",
Bucket: EventS3BucketData{
Name: bucket,
OwnerIdentity: EventUserIdentity{
PrincipalId: ctx.Locals("access").(string),
},
Arn: fmt.Sprintf("arn:aws:s3:::%v", strings.Join(path, "/")),
},
Object: EventObjectData{
Key: object,
Size: meta.ObjectSize,
ETag: meta.ObjectETag,
VersionId: meta.VersionId,
Sequencer: genSequencer(),
},
},
GlacierEventData: EventGlacierData{
// Not supported
RestoreEventData: EventRestoreData{},
},
}
ns.Records = []EventSchema{schema}
ns.sendEvent()
}
func (ns *NatsEventSender) sendEvent() {
jsonEvent, err := json.Marshal(ns)
if err != nil {
fmt.Printf("\n failed to parse the event data: %v", err.Error())
}
err = ns.client.Publish(ns.topic, jsonEvent)
if err != nil {
fmt.Println("failed to send nats event: ", err.Error())
}
}