diff --git a/internal/config/config.go b/internal/config/config.go index 760b2069a..6618d5081 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -506,7 +506,6 @@ func (c Config) Merge() Config { rnSubSys, ok := renamedSubsys[subSys] if !ok { // A config subsystem was removed or server was downgraded. - Logger.Info("config: ignoring unknown subsystem config %q\n", subSys) continue } // Copy over settings from previous sub-system diff --git a/internal/config/notify/help.go b/internal/config/notify/help.go index f92495934..3f2dd2c31 100644 --- a/internal/config/notify/help.go +++ b/internal/config/notify/help.go @@ -140,6 +140,12 @@ var ( Optional: true, Type: "number", }, + config.HelpKV{ + Key: target.AmqpPublisherConfirms, + Description: "enable consumer acknowlegement and publisher confirms, use this along with queue_dir for guaranteed delivery of all events", + Optional: true, + Type: "on|off", + }, config.HelpKV{ Key: target.AmqpQueueDir, Description: queueDirComment, diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 1d5cb5d3d..e2c5e8c27 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -1701,6 +1701,10 @@ var ( Key: target.AmqpDeliveryMode, Value: "0", }, + config.KV{ + Key: target.AmqpPublisherConfirms, + Value: config.EnableOff, + }, config.KV{ Key: target.AmqpQueueLimit, Value: "0", @@ -1779,6 +1783,10 @@ func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, e if k != config.Default { autoDeletedEnv = autoDeletedEnv + config.Default + k } + publisherConfirmsEnv := target.EnvAMQPPublisherConfirms + if k != config.Default { + publisherConfirmsEnv = publisherConfirmsEnv + config.Default + k + } queueDirEnv := target.EnvAMQPQueueDir if k != config.Default { queueDirEnv = queueDirEnv + config.Default + k @@ -1792,20 +1800,21 @@ func GetNotifyAMQP(amqpKVS map[string]config.KVS) (map[string]target.AMQPArgs, e return nil, err } amqpArgs := target.AMQPArgs{ - Enable: enabled, - URL: *url, - Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)), - RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)), - ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)), - DeliveryMode: uint8(deliveryMode), - Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn, - Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn, - Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn, - Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn, - NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn, - AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn, - QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)), - QueueLimit: queueLimit, + Enable: enabled, + URL: *url, + Exchange: env.Get(exchangeEnv, kv.Get(target.AmqpExchange)), + RoutingKey: env.Get(routingKeyEnv, kv.Get(target.AmqpRoutingKey)), + ExchangeType: env.Get(exchangeTypeEnv, kv.Get(target.AmqpExchangeType)), + DeliveryMode: uint8(deliveryMode), + Mandatory: env.Get(mandatoryEnv, kv.Get(target.AmqpMandatory)) == config.EnableOn, + Immediate: env.Get(immediateEnv, kv.Get(target.AmqpImmediate)) == config.EnableOn, + Durable: env.Get(durableEnv, kv.Get(target.AmqpDurable)) == config.EnableOn, + Internal: env.Get(internalEnv, kv.Get(target.AmqpInternal)) == config.EnableOn, + NoWait: env.Get(noWaitEnv, kv.Get(target.AmqpNoWait)) == config.EnableOn, + AutoDeleted: env.Get(autoDeletedEnv, kv.Get(target.AmqpAutoDeleted)) == config.EnableOn, + PublisherConfirms: env.Get(publisherConfirmsEnv, kv.Get(target.AmqpPublisherConfirms)) == config.EnableOn, + QueueDir: env.Get(queueDirEnv, kv.Get(target.AmqpQueueDir)), + QueueLimit: queueLimit, } if err = amqpArgs.Validate(); err != nil { return nil, err diff --git a/internal/event/target/amqp.go b/internal/event/target/amqp.go index f12d350e5..e0051c6c1 100644 --- a/internal/event/target/amqp.go +++ b/internal/event/target/amqp.go @@ -21,6 +21,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net" "net/url" "os" @@ -34,20 +35,21 @@ import ( // AMQPArgs - AMQP target arguments. type AMQPArgs struct { - Enable bool `json:"enable"` - URL xnet.URL `json:"url"` - Exchange string `json:"exchange"` - RoutingKey string `json:"routingKey"` - ExchangeType string `json:"exchangeType"` - DeliveryMode uint8 `json:"deliveryMode"` - Mandatory bool `json:"mandatory"` - Immediate bool `json:"immediate"` - Durable bool `json:"durable"` - Internal bool `json:"internal"` - NoWait bool `json:"noWait"` - AutoDeleted bool `json:"autoDeleted"` - QueueDir string `json:"queueDir"` - QueueLimit uint64 `json:"queueLimit"` + Enable bool `json:"enable"` + URL xnet.URL `json:"url"` + Exchange string `json:"exchange"` + RoutingKey string `json:"routingKey"` + ExchangeType string `json:"exchangeType"` + DeliveryMode uint8 `json:"deliveryMode"` + Mandatory bool `json:"mandatory"` + Immediate bool `json:"immediate"` + Durable bool `json:"durable"` + Internal bool `json:"internal"` + NoWait bool `json:"noWait"` + AutoDeleted bool `json:"autoDeleted"` + PublisherConfirms bool `json:"publisherConfirms"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` } //lint:file-ignore ST1003 We cannot change these exported names. @@ -69,7 +71,7 @@ const ( AmqpNoWait = "no_wait" AmqpAutoDeleted = "auto_deleted" AmqpArguments = "arguments" - AmqpPublishingHeaders = "publishing_headers" + AmqpPublisherConfirms = "publisher_confirms" EnvAMQPEnable = "MINIO_NOTIFY_AMQP_ENABLE" EnvAMQPURL = "MINIO_NOTIFY_AMQP_URL" @@ -84,7 +86,7 @@ const ( EnvAMQPNoWait = "MINIO_NOTIFY_AMQP_NO_WAIT" EnvAMQPAutoDeleted = "MINIO_NOTIFY_AMQP_AUTO_DELETED" EnvAMQPArguments = "MINIO_NOTIFY_AMQP_ARGUMENTS" - EnvAMQPPublishingHeaders = "MINIO_NOTIFY_AMQP_PUBLISHING_HEADERS" + EnvAMQPPublisherConfirms = "MINIO_NOTIFY_AMQP_PUBLISHING_CONFIRMS" EnvAMQPQueueDir = "MINIO_NOTIFY_AMQP_QUEUE_DIR" EnvAMQPQueueLimit = "MINIO_NOTIFY_AMQP_QUEUE_LIMIT" ) @@ -123,7 +125,7 @@ func (target *AMQPTarget) ID() event.TargetID { // IsActive - Return true if target is up and active func (target *AMQPTarget) IsActive() (bool, error) { - ch, err := target.channel() + ch, _, err := target.channel() if err != nil { return false, err } @@ -138,7 +140,7 @@ func (target *AMQPTarget) HasQueueStore() bool { return target.store != nil } -func (target *AMQPTarget) channel() (*amqp.Channel, error) { +func (target *AMQPTarget) channel() (*amqp.Channel, chan amqp.Confirmation, error) { var err error var conn *amqp.Connection var ch *amqp.Channel @@ -161,34 +163,54 @@ func (target *AMQPTarget) channel() (*amqp.Channel, error) { if target.conn != nil { ch, err = target.conn.Channel() if err == nil { - return ch, nil + if target.args.PublisherConfirms { + confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) + if err := ch.Confirm(false); err != nil { + ch.Close() + return nil, nil, err + } + return ch, confirms, nil + } + return ch, nil, nil } if !isAMQPClosedErr(err) { - return nil, err + return nil, nil, err } + + // close when we know this is a network error. + target.conn.Close() } conn, err = amqp.Dial(target.args.URL.String()) if err != nil { if IsConnRefusedErr(err) { - return nil, errNotConnected + return nil, nil, errNotConnected } - return nil, err + return nil, nil, err } ch, err = conn.Channel() if err != nil { - return nil, err + return nil, nil, err } target.conn = conn - return ch, nil + if target.args.PublisherConfirms { + confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1)) + if err := ch.Confirm(false); err != nil { + ch.Close() + return nil, nil, err + } + return ch, confirms, nil + } + + return ch, nil, nil } // send - sends an event to the AMQP. -func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error { +func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel, confirms chan amqp.Confirmation) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { return err @@ -205,12 +227,24 @@ func (target *AMQPTarget) send(eventData event.Event, ch *amqp.Channel) error { return err } - return ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, + if err = ch.Publish(target.args.Exchange, target.args.RoutingKey, target.args.Mandatory, target.args.Immediate, amqp.Publishing{ ContentType: "application/json", DeliveryMode: target.args.DeliveryMode, Body: data, - }) + }); err != nil { + return err + } + + // check for publisher confirms only if its enabled + if target.args.PublisherConfirms { + confirmed := <-confirms + if !confirmed.Ack { + return fmt.Errorf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) + } + } + + return nil } // Save - saves the events to the store which will be replayed when the amqp connection is active. @@ -218,7 +252,7 @@ func (target *AMQPTarget) Save(eventData event.Event) error { if target.store != nil { return target.store.Put(eventData) } - ch, err := target.channel() + ch, confirms, err := target.channel() if err != nil { return err } @@ -227,12 +261,12 @@ func (target *AMQPTarget) Save(eventData event.Event) error { target.loggerOnce(context.Background(), cErr, target.ID()) }() - return target.send(eventData, ch) + return target.send(eventData, ch, confirms) } // Send - sends event to AMQP. func (target *AMQPTarget) Send(eventKey string) error { - ch, err := target.channel() + ch, confirms, err := target.channel() if err != nil { return err } @@ -251,7 +285,7 @@ func (target *AMQPTarget) Send(eventKey string) error { return eErr } - if err := target.send(eventData, ch); err != nil { + if err := target.send(eventData, ch, confirms); err != nil { return err } diff --git a/internal/event/target/queuestore.go b/internal/event/target/queuestore.go index 127b9286a..0a0622e04 100644 --- a/internal/event/target/queuestore.go +++ b/internal/event/target/queuestore.go @@ -27,7 +27,6 @@ import ( "sync" "github.com/minio/minio/internal/event" - "github.com/minio/pkg/sys" ) const ( @@ -47,14 +46,6 @@ type QueueStore struct { func NewQueueStore(directory string, limit uint64) Store { if limit == 0 { limit = defaultLimit - _, maxRLimit, err := sys.GetMaxOpenFileLimit() - if err == nil { - // Limit the maximum number of entries - // to maximum open file limit - if maxRLimit < limit { - limit = maxRLimit - } - } } return &QueueStore{