storage_proxy: add mv stats to write handler

Previous patch for issue 3416 did not cover passing write stats
to write response handler, which results in some write stats
being incorrectly counted as user write stats, while they belong
to materialized views.
This one fixes that by passing correct write stats reference
to write response handler constructor.

Also at: https://github.com/psarna/scylla/commits/fix_3416_again

Closes #3416
Message-Id: <53ef3cc96ccadfdad8992d92ed6a41473419eb0a.1527510473.git.sarna@scylladb.com>
This commit is contained in:
Piotr Sarna
2018-05-28 14:28:44 +02:00
committed by Duarte Nunes
parent aefb5e0fbd
commit ffe52681ea
2 changed files with 23 additions and 19 deletions

View File

@@ -207,7 +207,7 @@ protected:
error _error = error::NONE;
size_t _failed = 0;
size_t _total_endpoints = 0;
storage_proxy::stats& _stats;
storage_proxy::write_stats& _stats;
protected:
virtual bool waited_for(gms::inet_address from) = 0;
@@ -220,9 +220,9 @@ protected:
public:
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, tracing::trace_state_ptr trace_state,
size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
storage_proxy::write_stats& stats, size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
: _id(p->_next_response_id++), _proxy(std::move(p)), _trace_state(trace_state), _cl(cl), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
_dead_endpoints(std::move(dead_endpoints)), _stats(_proxy->_stats) {
_dead_endpoints(std::move(dead_endpoints)), _stats(stats) {
// original comment from cassandra:
// during bootstrap, include pending endpoints in the count
// or we may fail the consistency level guarantees (see #833, #8058)
@@ -326,9 +326,10 @@ class datacenter_write_response_handler : public abstract_write_response_handler
public:
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state) :
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
std::move(targets), std::move(tr_state), stats, db::count_local_endpoints(pending_endpoints), std::move(dead_endpoints)) {
_total_endpoints = db::count_local_endpoints(_targets);
}
};
@@ -340,9 +341,10 @@ class write_response_handler : public abstract_write_response_handler {
public:
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state) :
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
std::move(targets), std::move(tr_state), pending_endpoints.size(), std::move(dead_endpoints)) {
std::move(targets), std::move(tr_state), stats, pending_endpoints.size(), std::move(dead_endpoints)) {
_total_endpoints = _targets.size();
}
};
@@ -369,8 +371,8 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
public:
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints,
std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), 0, dead_endpoints) {
std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats) :
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, std::move(tr_state), stats, 0, dead_endpoints) {
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& target : targets) {
@@ -509,17 +511,18 @@ future<> storage_proxy::response_wait(storage_proxy::response_id_type id, clock_
}
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state)
std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints, tracing::trace_state_ptr tr_state,
storage_proxy::write_stats& stats)
{
shared_ptr<abstract_write_response_handler> h;
auto& rs = ks.get_replication_strategy();
if (db::is_datacenter_local(cl)) {
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state));
h = ::make_shared<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats);
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state));
h = ::make_shared<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats);
} else {
h = ::make_shared<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state));
h = ::make_shared<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), stats);
}
return register_response_handler(std::move(h));
}
@@ -1238,7 +1241,7 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_
db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints);
return create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state));
return create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints), std::move(tr_state), _stats);
}
storage_proxy::response_id_type
@@ -1253,7 +1256,7 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
auto keyspace_name = mh->schema()->ks_name();
keyspace& ks = _db.local().find_keyspace(keyspace_name);
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state));
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), _stats);
}
void
@@ -1569,7 +1572,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state);
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {}, _trace_state, _stats);
}).then([this, cl] (std::vector<unique_response_handler> ids) {
return _p.mutate_begin(std::move(ids), cl, _stats);
});
@@ -1638,7 +1641,7 @@ future<> storage_proxy::send_to_endpoint(
std::unordered_set<gms::inet_address> targets(pending_endpoints.begin(), pending_endpoints.end());
targets.insert(std::move(target));
return mutate_prepare(std::array<mutation, 1>{std::move(m)}, cl, type,
[this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints)] (
[this, targets = std::move(targets), pending_endpoints = std::move(pending_endpoints), &stats] (
const mutation& m,
db::consistency_level cl,
db::write_type type) mutable {
@@ -1651,7 +1654,8 @@ future<> storage_proxy::send_to_endpoint(
std::move(targets),
pending_endpoints,
{ },
nullptr);
nullptr,
stats);
}).then([this, &stats, cl] (std::vector<unique_response_handler> ids) {
return mutate_begin(std::move(ids), cl, stats);
}).then_wrapped([p = shared_from_this(), lc, &stats] (future<>&& f) {

View File

@@ -170,7 +170,7 @@ private:
future<> response_wait(response_id_type id, clock_type::time_point timeout);
::shared_ptr<abstract_write_response_handler>& get_write_response_handler(storage_proxy::response_id_type id);
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, std::unordered_set<gms::inet_address> targets,
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>, tracing::trace_state_ptr tr_state);
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>, tracing::trace_state_ptr tr_state, storage_proxy::write_stats& stats);
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state);
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout, write_stats& stats);