mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 03:56:42 +00:00
storage_proxy: wait only for expected CL when writing back data during read repair
When read repair writes diffs back to replicas it is enough to wait for requested CL to guaranty read monotonicity. This patch makes read repair write reuse regular mutate functionality which already tracks CL status. This is done by changing write response handler to not hold mutation directly, but instead hold a container that, depending on whether this is read repair write or regular one, can provide different mutation per destination. Message-Id: <20160613124727.GL1096@scylladb.com>
This commit is contained in:
@@ -56,10 +56,9 @@
|
||||
#include "db/batchlog_manager.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include <boost/range/algorithm_ext/push_back.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/adaptor/reversed.hpp>
|
||||
#include <boost/iterator/counting_iterator.hpp>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <boost/algorithm/cxx11/none_of.hpp>
|
||||
#include <boost/range/algorithm/count_if.hpp>
|
||||
#include <boost/range/algorithm/find.hpp>
|
||||
#include <boost/range/algorithm/find_if.hpp>
|
||||
@@ -110,6 +109,66 @@ sstring get_local_dc() {
|
||||
return get_dc(local_addr);
|
||||
}
|
||||
|
||||
class mutation_holder {
|
||||
protected:
|
||||
size_t _size = 0;
|
||||
schema_ptr _schema;
|
||||
public:
|
||||
virtual ~mutation_holder() {}
|
||||
virtual lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) = 0;
|
||||
virtual bool is_shared() = 0;
|
||||
size_t size() const {
|
||||
return _size;
|
||||
}
|
||||
const schema_ptr& schema() {
|
||||
return _schema;
|
||||
}
|
||||
};
|
||||
|
||||
// different mutation for each destination (for read repairs)
|
||||
class per_destination_mutation : public mutation_holder {
|
||||
std::unordered_map<gms::inet_address, lw_shared_ptr<const frozen_mutation>> _mutations;
|
||||
dht::token _token;
|
||||
public:
|
||||
per_destination_mutation(const std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>& mutations) {
|
||||
for (auto&& m : mutations) {
|
||||
lw_shared_ptr<const frozen_mutation> fm;
|
||||
if (m.second) {
|
||||
_schema = m.second.value().schema();
|
||||
_token = m.second.value().token();
|
||||
fm = make_lw_shared<const frozen_mutation>(freeze(m.second.value()));
|
||||
_size += fm->representation().size();
|
||||
}
|
||||
_mutations.emplace(m.first, std::move(fm));
|
||||
}
|
||||
}
|
||||
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) override {
|
||||
return _mutations[ep];
|
||||
}
|
||||
virtual bool is_shared() override {
|
||||
return false;
|
||||
}
|
||||
dht::token& token() {
|
||||
return _token;
|
||||
}
|
||||
};
|
||||
|
||||
// same mutation for each destination
|
||||
class shared_mutation : public mutation_holder {
|
||||
lw_shared_ptr<const frozen_mutation> _mutation;
|
||||
public:
|
||||
shared_mutation(const mutation& m) : _mutation(make_lw_shared<const frozen_mutation>(freeze(m))) {
|
||||
_size = _mutation->representation().size();
|
||||
_schema = m.schema();
|
||||
};
|
||||
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) override {
|
||||
return _mutation;
|
||||
}
|
||||
virtual bool is_shared() override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
class abstract_write_response_handler {
|
||||
protected:
|
||||
storage_proxy::response_id_type _id;
|
||||
@@ -118,8 +177,7 @@ protected:
|
||||
db::consistency_level _cl;
|
||||
keyspace& _ks;
|
||||
db::write_type _type;
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<const frozen_mutation> _mutation;
|
||||
std::unique_ptr<mutation_holder> _mutation_holder;
|
||||
std::unordered_set<gms::inet_address> _targets; // who we sent this mutation to
|
||||
size_t _pending_endpoints; // how many endpoints in bootstrap state there is
|
||||
// added dead_endpoints as a memeber here as well. This to be able to carry the info across
|
||||
@@ -141,10 +199,9 @@ protected:
|
||||
}
|
||||
public:
|
||||
abstract_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
|
||||
size_t pending_endpoints = 0, std::vector<gms::inet_address> dead_endpoints = {})
|
||||
: _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _schema(std::move(s)), _mutation(std::move(mutation)), _targets(std::move(targets)),
|
||||
: _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)),
|
||||
_pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) {
|
||||
}
|
||||
virtual ~abstract_write_response_handler() {
|
||||
@@ -153,14 +210,14 @@ public:
|
||||
_ready.set_value();
|
||||
} else {
|
||||
_proxy->_stats.background_writes--;
|
||||
_proxy->_stats.background_write_bytes -= _mutation->representation().size();
|
||||
_proxy->_stats.background_write_bytes -= _mutation_holder->size();
|
||||
_proxy->unthrottle();
|
||||
}
|
||||
}
|
||||
};
|
||||
void unthrottle() {
|
||||
_proxy->_stats.background_writes++;
|
||||
_proxy->_stats.background_write_bytes += _mutation->representation().size();
|
||||
_proxy->_stats.background_write_bytes += _mutation_holder->size();
|
||||
_throttled = false;
|
||||
_ready.set_value();
|
||||
}
|
||||
@@ -193,15 +250,18 @@ public:
|
||||
const std::vector<gms::inet_address>& get_dead_endpoints() const {
|
||||
return _dead_endpoints;
|
||||
}
|
||||
lw_shared_ptr<const frozen_mutation> get_mutation() {
|
||||
return _mutation;
|
||||
lw_shared_ptr<const frozen_mutation> get_mutation_for(gms::inet_address ep) {
|
||||
return _mutation_holder->get_mutation_for(ep);
|
||||
}
|
||||
const schema_ptr& get_schema() const {
|
||||
return _schema;
|
||||
return _mutation_holder->schema();
|
||||
}
|
||||
storage_proxy::response_id_type id() const {
|
||||
return _id;
|
||||
}
|
||||
bool read_repair_write() {
|
||||
return !_mutation_holder->is_shared();
|
||||
}
|
||||
friend storage_proxy;
|
||||
};
|
||||
|
||||
@@ -213,18 +273,18 @@ class datacenter_write_response_handler : public abstract_write_response_handler
|
||||
}
|
||||
public:
|
||||
datacenter_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
|
||||
std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
class write_response_handler : public abstract_write_response_handler {
|
||||
public:
|
||||
write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s, lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets,
|
||||
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets,
|
||||
std::vector<gms::inet_address> pending_endpoints, std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation),
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh),
|
||||
std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {}
|
||||
};
|
||||
|
||||
@@ -242,10 +302,9 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha
|
||||
}
|
||||
public:
|
||||
datacenter_sync_write_response_handler(shared_ptr<storage_proxy> p, keyspace& ks, db::consistency_level cl, db::write_type type,
|
||||
schema_ptr s,
|
||||
lw_shared_ptr<const frozen_mutation> mutation, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
|
||||
std::unique_ptr<mutation_holder> mh, std::unordered_set<gms::inet_address> targets, std::vector<gms::inet_address> pending_endpoints,
|
||||
std::vector<gms::inet_address> dead_endpoints) :
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) {
|
||||
abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, 0, dead_endpoints) {
|
||||
auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
|
||||
|
||||
for (auto& target : targets) {
|
||||
@@ -285,7 +344,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(std::un
|
||||
// we are here because either cl was achieved, but targets left in the handler are not
|
||||
// responding, so a hint should be written for them, or cl == any in which case
|
||||
// hints are counted towards consistency, so we need to write hints and count how much was written
|
||||
auto hints = hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets());
|
||||
auto hints = hint_to_dead_endpoints(e.handler->_mutation_holder, e.handler->get_targets());
|
||||
e.handler->signal(hints);
|
||||
if (e.handler->_cl == db::consistency_level::ANY && hints) {
|
||||
logger.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
|
||||
@@ -330,20 +389,18 @@ abstract_write_response_handler& storage_proxy::get_write_response_handler(stora
|
||||
return *_response_handlers.find(id)->second.handler;
|
||||
}
|
||||
|
||||
storage_proxy::response_id_type storage_proxy::create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation,
|
||||
storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m,
|
||||
std::unordered_set<gms::inet_address> targets, const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address> dead_endpoints)
|
||||
{
|
||||
std::unique_ptr<abstract_write_response_handler> h;
|
||||
auto& rs = ks.get_replication_strategy();
|
||||
|
||||
auto m = make_lw_shared<const frozen_mutation>(std::move(mutation));
|
||||
|
||||
if (db::is_datacenter_local(cl)) {
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
h = std::make_unique<datacenter_sync_write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
} else {
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
h = std::make_unique<write_response_handler>(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints));
|
||||
}
|
||||
return register_response_handler(std::move(h));
|
||||
}
|
||||
@@ -961,14 +1018,28 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_
|
||||
logger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints);
|
||||
db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints);
|
||||
|
||||
return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints));
|
||||
return create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints));
|
||||
}
|
||||
|
||||
storage_proxy::response_id_type
|
||||
storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>& m, db::consistency_level cl, db::write_type type) {
|
||||
std::unordered_set<gms::inet_address> endpoints(m.size());
|
||||
boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin()));
|
||||
auto mh = std::make_unique<per_destination_mutation>(m);
|
||||
|
||||
logger.trace("creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints);
|
||||
|
||||
auto keyspace_name = mh->schema()->ks_name();
|
||||
keyspace& ks = _db.local().find_keyspace(keyspace_name);
|
||||
|
||||
return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector<gms::inet_address>(), std::vector<gms::inet_address>());
|
||||
}
|
||||
|
||||
void
|
||||
storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level cl) {
|
||||
auto& h = get_write_response_handler(id);
|
||||
|
||||
size_t hints = hint_to_dead_endpoints(h.get_mutation(), h.get_dead_endpoints());
|
||||
size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints());
|
||||
|
||||
if (cl == db::consistency_level::ANY) {
|
||||
// for cl==ANY hints are counted towards consistency
|
||||
@@ -981,7 +1052,7 @@ future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutat
|
||||
// apply is used to convert exceptions to exceptional future
|
||||
return futurize<std::vector<storage_proxy::unique_response_handler>>::apply([this] (const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler create_handler) {
|
||||
std::vector<unique_response_handler> ids;
|
||||
ids.reserve(mutations.size());
|
||||
ids.reserve(boost::size(mutations));
|
||||
for (auto& m : mutations) {
|
||||
ids.emplace_back(*this, create_handler(m, cl, type));
|
||||
}
|
||||
@@ -989,8 +1060,9 @@ future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutat
|
||||
}, mutations, cl, type, std::move(create_handler));
|
||||
}
|
||||
|
||||
future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutate_prepare(std::vector<mutation>& mutations, db::consistency_level cl, db::write_type type) {
|
||||
return mutate_prepare<>(mutations, cl, type, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
|
||||
template<typename Range>
|
||||
future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type) {
|
||||
return mutate_prepare<>(mutations, cl, type, [this] (const typename Range::value_type& m, db::consistency_level cl, db::write_type type) {
|
||||
return create_write_response_handler(m, cl, type);
|
||||
});
|
||||
}
|
||||
@@ -1051,11 +1123,21 @@ future<> storage_proxy::mutate_end(future<> mutate_result, utils::latency_counte
|
||||
* @param mutations the mutations to be applied across the replicas
|
||||
* @param consistency_level the consistency level for the operation
|
||||
*/
|
||||
future<> storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
|
||||
return mutate_internal(std::move(mutations), cl);
|
||||
}
|
||||
|
||||
/*
|
||||
* Range template parameter can either be range of 'mutation' or a range of 'std::unordered_map<gms::inet_address, mutation>'.
|
||||
* create_write_response_handler() has specialization for both types. The one for the former uses keyspace to figure out
|
||||
* endpoints to send mutation to, the one for the late uses enpoints that are used as keys for the map.
|
||||
*/
|
||||
template<typename Range>
|
||||
future<>
|
||||
storage_proxy::mutate(std::vector<mutation> mutations, db::consistency_level cl) {
|
||||
logger.trace("mutate cl={}", cl, mutations);
|
||||
storage_proxy::mutate_internal(Range mutations, db::consistency_level cl) {
|
||||
logger.trace("mutate cl={}", cl);
|
||||
mlogger.trace("mutations={}", mutations);
|
||||
auto type = mutations.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH;
|
||||
auto type = boost::size(mutations) == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH;
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
@@ -1132,7 +1214,7 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
|
||||
future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) {
|
||||
return _p.mutate_prepare<>(std::array<mutation, 1>{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) {
|
||||
auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name());
|
||||
return _p.create_write_response_handler(m.schema(), ks, cl, type, freeze(m), _batchlog_endpoints, {}, {});
|
||||
return _p.create_write_response_handler(ks, cl, type, std::make_unique<shared_mutation>(m), _batchlog_endpoints, {}, {});
|
||||
}).then([this, cl] (std::vector<unique_response_handler> ids) {
|
||||
return _p.mutate_begin(std::move(ids), cl);
|
||||
});
|
||||
@@ -1205,25 +1287,24 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
std::vector<std::pair<const sstring, std::vector<gms::inet_address>>> local;
|
||||
local.reserve(3);
|
||||
|
||||
for(auto dest: get_write_response_handler(response_id).get_targets()) {
|
||||
auto&& handler = get_write_response_handler(response_id);
|
||||
|
||||
for(auto dest: handler.get_targets()) {
|
||||
sstring dc = get_dc(dest);
|
||||
if (dc == get_local_dc()) {
|
||||
// read repair writes do not go through coordinator since mutations are per destination
|
||||
if (handler.read_repair_write() || dc == get_local_dc()) {
|
||||
local.emplace_back("", std::vector<gms::inet_address>({dest}));
|
||||
} else {
|
||||
dc_groups[dc].push_back(dest);
|
||||
}
|
||||
}
|
||||
|
||||
auto&& handler = get_write_response_handler(response_id);
|
||||
auto mptr = handler.get_mutation();
|
||||
auto schema = handler.get_schema();
|
||||
auto& m = *mptr;
|
||||
auto all = boost::range::join(local, dc_groups);
|
||||
auto my_address = utils::fb_utilities::get_broadcast_address();
|
||||
|
||||
// lambda for applying mutation locally
|
||||
auto lmutate = [&m, schema = std::move(schema), mptr = std::move(mptr), response_id, this, my_address] {
|
||||
return mutate_locally(schema, m).then([response_id, this, my_address, mptr = std::move(mptr), p = shared_from_this()] {
|
||||
auto lmutate = [&handler, response_id, this, my_address] (lw_shared_ptr<const frozen_mutation> m) {
|
||||
return mutate_locally(handler.get_schema(), *m).then([response_id, this, my_address, m, p = shared_from_this()] {
|
||||
// make mutation alive until it is processed locally, otherwise it
|
||||
// may disappear if write timeouts before this future is ready
|
||||
got_response(response_id, my_address);
|
||||
@@ -1231,11 +1312,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
};
|
||||
|
||||
// lambda for applying mutation remotely
|
||||
auto rmutate = [this, &m, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward) {
|
||||
auto rmutate = [this, &handler, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector<gms::inet_address>&& forward, const frozen_mutation& m) {
|
||||
auto& ms = net::get_local_messaging_service();
|
||||
_stats.queued_write_bytes += m.representation().size();
|
||||
auto msize = m.representation().size();
|
||||
_stats.queued_write_bytes += msize;
|
||||
return ms.send_mutation(net::messaging_service::msg_addr{coordinator, 0}, timeout, m,
|
||||
std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize = m.representation().size()] {
|
||||
std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize] {
|
||||
_stats.queued_write_bytes -= msize;
|
||||
unthrottle();
|
||||
});
|
||||
@@ -1250,12 +1332,23 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
|
||||
future<> f = make_ready_future<>();
|
||||
|
||||
++_stats.writes_attempts.get_ep_stat(coordinator);
|
||||
|
||||
if (coordinator == my_address) {
|
||||
f = futurize<void>::apply(lmutate);
|
||||
lw_shared_ptr<const frozen_mutation> m = handler.get_mutation_for(coordinator);
|
||||
|
||||
if (!m) {
|
||||
got_response(response_id, coordinator);
|
||||
} else {
|
||||
f = futurize<void>::apply(rmutate, coordinator, std::move(forward));
|
||||
if (!handler.read_repair_write()) {
|
||||
++_stats.writes_attempts.get_ep_stat(coordinator);
|
||||
} else {
|
||||
++_stats.read_repair_write_attempts.get_ep_stat(coordinator);
|
||||
}
|
||||
|
||||
if (coordinator == my_address) {
|
||||
f = futurize<void>::apply(lmutate, std::move(m));
|
||||
} else {
|
||||
f = futurize<void>::apply(rmutate, coordinator, std::move(forward), *m);
|
||||
}
|
||||
}
|
||||
|
||||
f.handle_exception([coordinator, p = shared_from_this()] (std::exception_ptr eptr) {
|
||||
@@ -1275,10 +1368,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
|
||||
// returns number of hints stored
|
||||
template<typename Range>
|
||||
size_t storage_proxy::hint_to_dead_endpoints(lw_shared_ptr<const frozen_mutation> m, const Range& targets) noexcept
|
||||
size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets) noexcept
|
||||
{
|
||||
return boost::count_if(targets | boost::adaptors::filtered(std::bind1st(std::mem_fn(&storage_proxy::should_hint), this)),
|
||||
std::bind(std::mem_fn(&storage_proxy::submit_hint), this, m, std::placeholders::_1));
|
||||
std::bind(std::mem_fn(&storage_proxy::submit_hint), this, std::ref(mh), std::placeholders::_1));
|
||||
}
|
||||
|
||||
size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) {
|
||||
@@ -1291,7 +1384,7 @@ size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) {
|
||||
return it->second;
|
||||
}
|
||||
|
||||
bool storage_proxy::submit_hint(lw_shared_ptr<const frozen_mutation> m, gms::inet_address target)
|
||||
bool storage_proxy::submit_hint(std::unique_ptr<mutation_holder>& mh, gms::inet_address target)
|
||||
{
|
||||
warn(unimplemented::cause::HINT);
|
||||
// local write that time out should be handled by LocalMutationRunnable
|
||||
@@ -1473,22 +1566,11 @@ bool storage_proxy::submit_hint(lw_shared_ptr<const frozen_mutation> m, gms::ine
|
||||
}
|
||||
#endif
|
||||
|
||||
future<> storage_proxy::schedule_repair(std::unordered_map<gms::inet_address, std::vector<mutation>> diffs) {
|
||||
return parallel_for_each(diffs, [this] (std::pair<const gms::inet_address, std::vector<mutation>>& i) {
|
||||
auto type = i.second.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH;
|
||||
utils::latency_counter lc;
|
||||
lc.start();
|
||||
|
||||
return mutate_prepare<>(std::move(i.second), db::consistency_level::ONE, type, [ep = i.first, this] (const mutation& m, db::consistency_level cl, db::write_type type) {
|
||||
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
|
||||
return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::unordered_set<gms::inet_address>({ep}, 1), {}, {});
|
||||
}).then([this, ep = i.first] (std::vector<unique_response_handler> ids) {
|
||||
_stats.read_repair_write_attempts.get_ep_stat(ep) += ids.size();
|
||||
return mutate_begin(std::move(ids), db::consistency_level::ONE);
|
||||
}).then_wrapped([this, lc] (future<> f) {
|
||||
return mutate_end(std::move(f), lc);
|
||||
});
|
||||
}).finally([p = shared_from_this()] {});
|
||||
future<> storage_proxy::schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>> diffs, db::consistency_level cl) {
|
||||
if (diffs.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return mutate_internal(diffs | boost::adaptors::map_values, cl);
|
||||
}
|
||||
|
||||
class abstract_read_resolver {
|
||||
@@ -1640,7 +1722,7 @@ class data_read_resolver : public abstract_read_resolver {
|
||||
size_t _total_live_count = 0;
|
||||
uint32_t _max_live_count = 0;
|
||||
std::vector<reply> _data_results;
|
||||
std::unordered_map<gms::inet_address, std::vector<mutation>> _diffs;
|
||||
std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>> _diffs;
|
||||
private:
|
||||
virtual void on_timeout() override {
|
||||
// we will not need them any more
|
||||
@@ -1834,22 +1916,49 @@ public:
|
||||
return mutation_and_live_row_count { std::move(m), live_row_count };
|
||||
});
|
||||
|
||||
bool has_diff = false;
|
||||
|
||||
// calculate differences
|
||||
for (auto z : boost::combine(versions, reconciled_partitions)) {
|
||||
const mutation& m = z.get<1>().mut;
|
||||
for (const version& v : z.get<0>()) {
|
||||
auto diff = m.partition().difference(schema, v.par.mut().unfreeze(schema).partition());
|
||||
auto it = _diffs[m.token()].find(v.from);
|
||||
std::experimental::optional<mutation> mdiff;
|
||||
if (!diff.empty()) {
|
||||
_diffs[v.from].emplace_back(mutation(schema, m.decorated_key(), std::move(diff)));
|
||||
has_diff = true;
|
||||
mdiff = mutation(schema, m.decorated_key(), std::move(diff));
|
||||
}
|
||||
if (it == _diffs[m.token()].end()) {
|
||||
_diffs[m.token()].emplace(v.from, std::move(mdiff));
|
||||
} else {
|
||||
// should not really happen, but lets try to deal with it
|
||||
if (mdiff) {
|
||||
if (it->second) {
|
||||
it->second.value().apply(std::move(mdiff.value()));
|
||||
} else {
|
||||
it->second = std::move(mdiff);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!_diffs.empty()) {
|
||||
if (has_diff) {
|
||||
if (_total_live_count >= original_row_limit && got_incomplete_information(schema, cmd, original_row_limit,
|
||||
reconciled_partitions | boost::adaptors::reversed, last_rows)) {
|
||||
return {};
|
||||
}
|
||||
// filter out partitions with empty diffs
|
||||
for (auto it = _diffs.begin(); it != _diffs.end();) {
|
||||
if (boost::algorithm::none_of(it->second | boost::adaptors::map_values, std::mem_fn(&std::experimental::optional<mutation>::operator bool))) {
|
||||
it = _diffs.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_diffs.clear();
|
||||
}
|
||||
|
||||
// build reconcilable_result from reconciled data
|
||||
@@ -1999,7 +2108,7 @@ protected:
|
||||
// wait for write to complete before returning result to prevent multiple concurrent read requests to
|
||||
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
|
||||
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
|
||||
_proxy->schedule_repair(data_resolver->get_diffs_for_repair()).then([this, result = std::move(result)] () mutable {
|
||||
_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl).then([this, result = std::move(result)] () mutable {
|
||||
_result_promise.set_value(std::move(result));
|
||||
}).handle_exception([this, exec] (std::exception_ptr eptr) {
|
||||
try {
|
||||
|
||||
@@ -55,6 +55,7 @@ namespace service {
|
||||
|
||||
class abstract_write_response_handler;
|
||||
class abstract_read_executor;
|
||||
class mutation_holder;
|
||||
|
||||
class storage_proxy : public seastar::async_sharded_service<storage_proxy> /*implements StorageProxyMBean*/ {
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
@@ -197,17 +198,18 @@ private:
|
||||
void got_response(response_id_type id, gms::inet_address from);
|
||||
future<> response_wait(response_id_type id, clock_type::time_point timeout);
|
||||
abstract_write_response_handler& get_write_response_handler(storage_proxy::response_id_type id);
|
||||
response_id_type create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set<gms::inet_address> targets,
|
||||
response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr<mutation_holder> m, std::unordered_set<gms::inet_address> targets,
|
||||
const std::vector<gms::inet_address>& pending_endpoints, std::vector<gms::inet_address>);
|
||||
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type);
|
||||
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>&, db::consistency_level cl, db::write_type type);
|
||||
void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout);
|
||||
template<typename Range>
|
||||
size_t hint_to_dead_endpoints(lw_shared_ptr<const frozen_mutation> m, const Range& targets) noexcept;
|
||||
size_t hint_to_dead_endpoints(std::unique_ptr<mutation_holder>& mh, const Range& targets) noexcept;
|
||||
void hint_to_dead_endpoints(response_id_type, db::consistency_level);
|
||||
bool cannot_hint(gms::inet_address target);
|
||||
size_t get_hints_in_progress_for(gms::inet_address target);
|
||||
bool should_hint(gms::inet_address ep) noexcept;
|
||||
bool submit_hint(lw_shared_ptr<const frozen_mutation> m, gms::inet_address target);
|
||||
bool submit_hint(std::unique_ptr<mutation_holder>& mh, gms::inet_address target);
|
||||
std::vector<gms::inet_address> get_live_sorted_endpoints(keyspace& ks, const dht::token& token);
|
||||
db::read_repair_decision new_read_repair_decision(const schema& s);
|
||||
::shared_ptr<abstract_read_executor> get_read_executor(lw_shared_ptr<query::read_command> cmd, query::partition_range pr, db::consistency_level cl);
|
||||
@@ -228,13 +230,16 @@ private:
|
||||
db::consistency_level cl);
|
||||
template<typename Range, typename CreateWriteHandler>
|
||||
future<std::vector<unique_response_handler>> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler);
|
||||
future<std::vector<unique_response_handler>> mutate_prepare(std::vector<mutation>& mutations, db::consistency_level cl, db::write_type type);
|
||||
template<typename Range>
|
||||
future<std::vector<unique_response_handler>> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type);
|
||||
future<> mutate_begin(std::vector<unique_response_handler> ids, db::consistency_level cl);
|
||||
future<> mutate_end(future<> mutate_result, utils::latency_counter);
|
||||
future<> schedule_repair(std::unordered_map<gms::inet_address, std::vector<mutation>> diffs);
|
||||
future<> schedule_repair(std::unordered_map<dht::token, std::unordered_map<gms::inet_address, std::experimental::optional<mutation>>> diffs, db::consistency_level cl);
|
||||
bool need_throttle_writes() const;
|
||||
void unthrottle();
|
||||
void handle_read_error(std::exception_ptr eptr);
|
||||
template<typename Range>
|
||||
future<> mutate_internal(Range mutations, db::consistency_level cl);
|
||||
|
||||
public:
|
||||
storage_proxy(distributed<database>& db);
|
||||
|
||||
Reference in New Issue
Block a user