From a526ad2e8010005c1bbb4f334065c42c0445fdef Mon Sep 17 00:00:00 2001 From: Ricardo Katz Date: Thu, 12 Aug 2021 02:24:19 -0300 Subject: [PATCH] Add headers into AMQP notifications (#12911) Signed-off-by: Ricardo Katz --- docs/bucket/notifications/README.md | 2 ++ internal/event/target/amqp.go | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index 7a16aa9ab..e3a7d9e88 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -140,6 +140,8 @@ $ mc admin config set myminio/ notify_amqp:1 exchange="bucketevents" exchange_ty MinIO supports all the exchanges available in [RabbitMQ](https://www.rabbitmq.com/). For this setup, we are using `fanout` exchange. +MinIO also sends with the notifications two headers: `minio-bucket` and `minio-event`. An exchange using the type "headers" can use this information to route the notifications to proper queues. + Note that, you can add as many AMQP server endpoint configurations as needed by providing an identifier (like "1" in the example above) for the AMQP instance and an object of per-server configuration parameters. ### Step 2: Enable bucket notification using MinIO client diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index 77c9300c7..ffcfc2ce2 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -222,6 +222,11 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms return err } + headers := make(amqp.Table) + // Add more information here as required, but be aware to not overload headers + headers["minio-bucket"] = eventData.S3.Bucket.Name + headers["minio-event"] = eventData.EventName.String() + if err = ch.ExchangeDeclare(target.args.Exchange, target.args.ExchangeType, target.args.Durable, target.args.AutoDeleted, target.args.Internal, target.args.NoWait, nil); err != nil { return err @@ -229,6 +234,7 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, target.args.Immediate, amqp.Publishing{ + Headers: headers, ContentType: "application/json", DeliveryMode: target.args.DeliveryMode, Body: data,