From 2ae39c3ee8c2d01f69f519e4331980074548842a Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Thu, 20 Jul 2023 13:54:55 -0700 Subject: [PATCH] feat: cleanup nats for kafka similarity --- s3event/event.go | 2 +- s3event/kafka.go | 4 ++-- s3event/nats.go | 17 ++++++++--------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/s3event/event.go b/s3event/event.go index cf53247..74e97f4 100644 --- a/s3event/event.go +++ b/s3event/event.go @@ -121,7 +121,7 @@ func InitEventSender(cfg *EventConfig) (S3EventSender, error) { return nil, fmt.Errorf("there should be specified one of the following: kafka, nats") } if cfg.NatsURL != "" { - return InitNatsNotifSender(cfg.NatsURL, cfg.NatsTopic) + return InitNatsEventService(cfg.NatsURL, cfg.NatsTopic) } if cfg.KafkaURL != "" { return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey) diff --git a/s3event/kafka.go b/s3event/kafka.go index ddad1bd..94c444e 100644 --- a/s3event/kafka.go +++ b/s3event/kafka.go @@ -131,7 +131,7 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) { func (ks *Kafka) send(evnt []EventSchema) { msg, err := json.Marshal(evnt) if err != nil { - fmt.Fprintf(os.Stderr, "\nfailed to parse the event data: %v", err.Error()) + fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error()) return } @@ -143,7 +143,7 @@ func (ks *Kafka) send(evnt []EventSchema) { ctx := context.Background() err = ks.writer.WriteMessages(ctx, message) if err != nil { - fmt.Fprintf(os.Stderr, "\nfailed to send kafka event: %v", err.Error()) + fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error()) } } diff --git a/s3event/nats.go b/s3event/nats.go index 1f36e9d..822c383 100644 --- a/s3event/nats.go +++ b/s3event/nats.go @@ -17,6 +17,7 @@ package s3event import ( "encoding/json" "fmt" + "os" "strings" "sync" "time" @@ -26,13 +27,12 @@ import ( ) type NatsEventSender struct { - EventFields topic string client *nats.Conn mu sync.Mutex } -func InitNatsNotifSender(url, topic string) (S3EventSender, error) { +func InitNatsEventService(url, topic string) (S3EventSender, error) { if topic == "" { return nil, fmt.Errorf("nats message topic should be specified") } @@ -96,18 +96,17 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) { }, } - ns.Records = []EventSchema{schema} - ns.sendEvent() + ns.send([]EventSchema{schema}) } -func (ns *NatsEventSender) sendEvent() { - jsonEvent, err := json.Marshal(ns) +func (ns *NatsEventSender) send(evnt []EventSchema) { + msg, err := json.Marshal(evnt) if err != nil { - fmt.Printf("\n failed to parse the event data: %v", err.Error()) + fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error()) } - err = ns.client.Publish(ns.topic, jsonEvent) + err = ns.client.Publish(ns.topic, msg) if err != nil { - fmt.Println("failed to send nats event: ", err.Error()) + fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error()) } }