diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1ab63052bf..9ef73668d9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -207,7 +207,7 @@ protected: error _error = error::NONE; size_t _failed = 0; size_t _total_endpoints = 0; - storage_proxy::stats& _stats; + storage_proxy::write_stats& _stats; protected: virtual bool waited_for(gms::inet_address from) = 0; @@ -220,9 +220,9 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, - size_t pending_endpoints = 0, std::vector dead_endpoints = {}) + storage_proxy::write_stats& stats, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) : _id(p->_next_response_id++), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), - _dead_endpoints(std::move(dead_endpoints)), _stats(_proxy->_stats) { + _dead_endpoints(std::move(dead_endpoints)), _stats(stats) { // original comment from cassandra: // during bootstrap, include pending endpoints in the count // or we may fail the consistency level guarantees (see #833, #8058) @@ -326,9 +326,10 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : + const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { _total_endpoints = db::count_local_endpoints(_targets); } }; @@ -340,9 +341,10 @@ class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : + const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, pending_endpoints.size(), std::move(dead_endpoints)) { _total_endpoints = _targets.size(); } }; @@ -369,8 +371,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, - std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), 0, dead_endpoints) { + std::vector dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats) : + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& target : targets) { @@ -509,17 +511,18 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) + std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) { shared_ptr h; auto& rs = ks.get_replication_strategy(); if (db::is_datacenter_local(cl)) { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } else { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } return register_response_handler(std::move(h)); } @@ -1238,7 +1241,7 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_ db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints); - return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), _stats); } storage_proxy::response_id_type @@ -1253,7 +1256,7 @@ storage_proxy::create_write_response_handler(const std::unordered_mapschema()->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state)); + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state), _stats); } void @@ -1569,7 +1572,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state); + return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats); }).then([this, cl] (std::vector ids) { return _p.mutate_begin(std::move(ids), cl, _stats); }); @@ -1638,7 +1641,7 @@ future<> storage_proxy::send_to_endpoint( std::unordered_set targets(pending_endpoints.begin(), pending_endpoints.end()); targets.insert(std::move(target)); return mutate_prepare(std::array{std::move(m)}, cl, type, - [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints)] ( + [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints), &stats] ( const mutation& m, db::consistency_level cl, db::write_type type) mutable { @@ -1651,7 +1654,8 @@ future<> storage_proxy::send_to_endpoint( std::move(targets), pending_endpoints, { }, - nullptr); + nullptr, + stats); }).then([this, &stats, cl] (std::vector ids) { return mutate_begin(std::move(ids), cl, stats); }).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 28b596ba49..a6011123d6 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -170,7 +170,7 @@ private: future<> response_wait(response_id_type id, clock_type::time_point timeout); ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state); + const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout, write_stats& stats);