From ffe52681ea1cdaa329f4ba378ab34ed7b1f59e6b Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 28 May 2018 14:28:44 +0200 Subject: [PATCH] storage_proxy: add mv stats to write handler Previous patch for issue 3416 did not cover passing write stats to write response handler, which results in some write stats being incorrectly counted as user write stats, while they belong to materialized views. This one fixes that by passing correct write stats reference to write response handler constructor. Also at: https://github.com/psarna/scylla/commits/fix_3416_again Closes #3416 Message-Id: <53ef3cc96ccadfdad8992d92ed6a41473419eb0a.1527510473.git.sarna@scylladb.com> --- service/storage_proxy.cc | 40 ++++++++++++++++++++++------------------ service/storage_proxy.hh | 2 +- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 1ab63052bf..9ef73668d9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -207,7 +207,7 @@ protected: error _error = error::NONE; size_t _failed = 0; size_t _total_endpoints = 0; - storage_proxy::stats& _stats; + storage_proxy::write_stats& _stats; protected: virtual bool waited_for(gms::inet_address from) = 0; @@ -220,9 +220,9 @@ protected: public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, tracing::trace_state_ptr trace_state, - size_t pending_endpoints = 0, std::vector dead_endpoints = {}) + storage_proxy::write_stats& stats, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) : _id(p->_next_response_id++), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), - _dead_endpoints(std::move(dead_endpoints)), _stats(_proxy->_stats) { + _dead_endpoints(std::move(dead_endpoints)), _stats(stats) { // original comment from cassandra: // during bootstrap, include pending endpoints in the count // or we may fail the consistency level guarantees (see #833, #8058) @@ -326,9 +326,10 @@ class datacenter_write_response_handler : public abstract_write_response_handler public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : + const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) { _total_endpoints = db::count_local_endpoints(_targets); } }; @@ -340,9 +341,10 @@ class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : + const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) : abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), - std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) { + std::move(targets), std::move(tr_state), stats, pending_endpoints.size(), std::move(dead_endpoints)) { _total_endpoints = _targets.size(); } }; @@ -369,8 +371,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr mh, std::unordered_set targets, const std::vector& pending_endpoints, - std::vector dead_endpoints, tracing::trace_state_ptr tr_state) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), 0, dead_endpoints) { + std::vector dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats) : + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& target : targets) { @@ -509,17 +511,18 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_ } storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, - std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state) + std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints, tracing::trace_state_ptr tr_state, + storage_proxy::write_stats& stats) { shared_ptr h; auto& rs = ks.get_replication_strategy(); if (db::is_datacenter_local(cl)) { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } else { - h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + h = ::make_shared(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats); } return register_response_handler(std::move(h)); } @@ -1238,7 +1241,7 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_ db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints); - return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state)); + return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), _stats); } storage_proxy::response_id_type @@ -1253,7 +1256,7 @@ storage_proxy::create_write_response_handler(const std::unordered_mapschema()->ks_name(); keyspace& ks = _db.local().find_keyspace(keyspace_name); - return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state)); + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector(), std::move(tr_state), _stats); } void @@ -1569,7 +1572,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state); + return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats); }).then([this, cl] (std::vector ids) { return _p.mutate_begin(std::move(ids), cl, _stats); }); @@ -1638,7 +1641,7 @@ future<> storage_proxy::send_to_endpoint( std::unordered_set targets(pending_endpoints.begin(), pending_endpoints.end()); targets.insert(std::move(target)); return mutate_prepare(std::array{std::move(m)}, cl, type, - [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints)] ( + [this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints), &stats] ( const mutation& m, db::consistency_level cl, db::write_type type) mutable { @@ -1651,7 +1654,8 @@ future<> storage_proxy::send_to_endpoint( std::move(targets), pending_endpoints, { }, - nullptr); + nullptr, + stats); }).then([this, &stats, cl] (std::vector ids) { return mutate_begin(std::move(ids), cl, stats); }).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 28b596ba49..a6011123d6 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -170,7 +170,7 @@ private: future<> response_wait(response_id_type id, clock_type::time_point timeout); ::shared_ptr& get_write_response_handler(storage_proxy::response_id_type id); response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, - const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state); + const std::vector& pending_endpoints, std::vector, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state); response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout, write_stats& stats);