mirror of
https://github.com/versity/versitygw.git
synced 2026-01-04 11:03:57 +00:00
feat: cleanup nats for kafka similarity
This commit is contained in:
@@ -121,7 +121,7 @@ func InitEventSender(cfg *EventConfig) (S3EventSender, error) {
|
||||
return nil, fmt.Errorf("there should be specified one of the following: kafka, nats")
|
||||
}
|
||||
if cfg.NatsURL != "" {
|
||||
return InitNatsNotifSender(cfg.NatsURL, cfg.NatsTopic)
|
||||
return InitNatsEventService(cfg.NatsURL, cfg.NatsTopic)
|
||||
}
|
||||
if cfg.KafkaURL != "" {
|
||||
return InitKafkaEventService(cfg.KafkaURL, cfg.KafkaTopic, cfg.KafkaTopicKey)
|
||||
|
||||
@@ -131,7 +131,7 @@ func (ks *Kafka) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
|
||||
func (ks *Kafka) send(evnt []EventSchema) {
|
||||
msg, err := json.Marshal(evnt)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "\nfailed to parse the event data: %v", err.Error())
|
||||
fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -143,7 +143,7 @@ func (ks *Kafka) send(evnt []EventSchema) {
|
||||
ctx := context.Background()
|
||||
err = ks.writer.WriteMessages(ctx, message)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "\nfailed to send kafka event: %v", err.Error())
|
||||
fmt.Fprintf(os.Stderr, "failed to send kafka event: %v\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ package s3event
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -26,13 +27,12 @@ import (
|
||||
)
|
||||
|
||||
type NatsEventSender struct {
|
||||
EventFields
|
||||
topic string
|
||||
client *nats.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func InitNatsNotifSender(url, topic string) (S3EventSender, error) {
|
||||
func InitNatsEventService(url, topic string) (S3EventSender, error) {
|
||||
if topic == "" {
|
||||
return nil, fmt.Errorf("nats message topic should be specified")
|
||||
}
|
||||
@@ -96,18 +96,17 @@ func (ns *NatsEventSender) SendEvent(ctx *fiber.Ctx, meta EventMeta) {
|
||||
},
|
||||
}
|
||||
|
||||
ns.Records = []EventSchema{schema}
|
||||
ns.sendEvent()
|
||||
ns.send([]EventSchema{schema})
|
||||
}
|
||||
|
||||
func (ns *NatsEventSender) sendEvent() {
|
||||
jsonEvent, err := json.Marshal(ns)
|
||||
func (ns *NatsEventSender) send(evnt []EventSchema) {
|
||||
msg, err := json.Marshal(evnt)
|
||||
if err != nil {
|
||||
fmt.Printf("\n failed to parse the event data: %v", err.Error())
|
||||
fmt.Fprintf(os.Stderr, "failed to parse the event data: %v\n", err.Error())
|
||||
}
|
||||
|
||||
err = ns.client.Publish(ns.topic, jsonEvent)
|
||||
err = ns.client.Publish(ns.topic, msg)
|
||||
if err != nil {
|
||||
fmt.Println("failed to send nats event: ", err.Error())
|
||||
fmt.Fprintf(os.Stderr, "failed to send nats event: %v\n", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user