From e089166cfa8086ffaa794a2fb433f75397c78b92 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 13 Jun 2016 15:47:27 +0300 Subject: [PATCH] storage_proxy: wait only for expected CL when writing back data during read repair When read repair writes diffs back to replicas it is enough to wait for requested CL to guaranty read monotonicity. This patch makes read repair write reuse regular mutate functionality which already tracks CL status. This is done by changing write response handler to not hold mutation directly, but instead hold a container that, depending on whether this is read repair write or regular one, can provide different mutation per destination. Message-Id: <20160613124727.GL1096@scylladb.com> --- service/storage_proxy.cc | 257 ++++++++++++++++++++++++++++----------- service/storage_proxy.hh | 15 ++- 2 files changed, 193 insertions(+), 79 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index bc92f804d9..c47d382f65 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -56,10 +56,9 @@ #include "db/batchlog_manager.hh" #include "exceptions/exceptions.hh" #include -#include -#include #include -#include +#include +#include #include #include #include @@ -110,6 +109,66 @@ sstring get_local_dc() { return get_dc(local_addr); } +class mutation_holder { +protected: + size_t _size = 0; + schema_ptr _schema; +public: + virtual ~mutation_holder() {} + virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) = 0; + virtual bool is_shared() = 0; + size_t size() const { + return _size; + } + const schema_ptr& schema() { + return _schema; + } +}; + +// different mutation for each destination (for read repairs) +class per_destination_mutation : public mutation_holder { + std::unordered_map> _mutations; + dht::token _token; +public: + per_destination_mutation(const std::unordered_map>& mutations) { + for (auto&& m : mutations) { + lw_shared_ptr fm; + if (m.second) { + _schema = m.second.value().schema(); + _token = m.second.value().token(); + fm = make_lw_shared(freeze(m.second.value())); + _size += fm->representation().size(); + } + _mutations.emplace(m.first, std::move(fm)); + } + } + lw_shared_ptr get_mutation_for(gms::inet_address ep) override { + return _mutations[ep]; + } + virtual bool is_shared() override { + return false; + } + dht::token& token() { + return _token; + } +}; + +// same mutation for each destination +class shared_mutation : public mutation_holder { + lw_shared_ptr _mutation; +public: + shared_mutation(const mutation& m) : _mutation(make_lw_shared(freeze(m))) { + _size = _mutation->representation().size(); + _schema = m.schema(); + }; + lw_shared_ptr get_mutation_for(gms::inet_address ep) override { + return _mutation; + } + virtual bool is_shared() override { + return true; + } +}; + class abstract_write_response_handler { protected: storage_proxy::response_id_type _id; @@ -118,8 +177,7 @@ protected: db::consistency_level _cl; keyspace& _ks; db::write_type _type; - schema_ptr _schema; - lw_shared_ptr _mutation; + std::unique_ptr _mutation_holder; std::unordered_set _targets; // who we sent this mutation to size_t _pending_endpoints; // how many endpoints in bootstrap state there is // added dead_endpoints as a memeber here as well. This to be able to carry the info across @@ -141,10 +199,9 @@ protected: } public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, - lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) - : _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _schema(std::move(s)), _mutation(std::move(mutation)), _targets(std::move(targets)), + : _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) { } virtual ~abstract_write_response_handler() { @@ -153,14 +210,14 @@ public: _ready.set_value(); } else { _proxy->_stats.background_writes--; - _proxy->_stats.background_write_bytes -= _mutation->representation().size(); + _proxy->_stats.background_write_bytes -= _mutation_holder->size(); _proxy->unthrottle(); } } }; void unthrottle() { _proxy->_stats.background_writes++; - _proxy->_stats.background_write_bytes += _mutation->representation().size(); + _proxy->_stats.background_write_bytes += _mutation_holder->size(); _throttled = false; _ready.set_value(); } @@ -193,15 +250,18 @@ public: const std::vector& get_dead_endpoints() const { return _dead_endpoints; } - lw_shared_ptr get_mutation() { - return _mutation; + lw_shared_ptr get_mutation_for(gms::inet_address ep) { + return _mutation_holder->get_mutation_for(ep); } const schema_ptr& get_schema() const { - return _schema; + return _mutation_holder->schema(); } storage_proxy::response_id_type id() const { return _id; } + bool read_repair_write() { + return !_mutation_holder->is_shared(); + } friend storage_proxy; }; @@ -213,18 +273,18 @@ class datacenter_write_response_handler : public abstract_write_response_handler } public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {} }; class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {} }; @@ -242,10 +302,9 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, - lw_shared_ptr mutation, std::unordered_set targets, std::vector pending_endpoints, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) { + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& target : targets) { @@ -285,7 +344,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(std::un // we are here because either cl was achieved, but targets left in the handler are not // responding, so a hint should be written for them, or cl == any in which case // hints are counted towards consistency, so we need to write hints and count how much was written - auto hints = hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets()); + auto hints = hint_to_dead_endpoints(e.handler->_mutation_holder, e.handler->get_targets()); e.handler->signal(hints); if (e.handler->_cl == db::consistency_level::ANY && hints) { logger.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); @@ -330,20 +389,18 @@ abstract_write_response_handler& storage_proxy::get_write_response_handler(stora return *_response_handlers.find(id)->second.handler; } -storage_proxy::response_id_type storage_proxy::create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, +storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints) { std::unique_ptr h; auto& rs = ks.get_replication_strategy(); - auto m = make_lw_shared(std::move(mutation)); - if (db::is_datacenter_local(cl)) { - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } else { - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } return register_response_handler(std::move(h)); } @@ -961,14 +1018,28 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_ logger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints); - return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints)); + return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints)); +} + +storage_proxy::response_id_type +storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type) { + std::unordered_set endpoints(m.size()); + boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); + auto mh = std::make_unique(m); + + logger.trace("creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints); + + auto keyspace_name = mh->schema()->ks_name(); + keyspace& ks = _db.local().find_keyspace(keyspace_name); + + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector()); } void storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level cl) { auto& h = get_write_response_handler(id); - size_t hints = hint_to_dead_endpoints(h.get_mutation(), h.get_dead_endpoints()); + size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints()); if (cl == db::consistency_level::ANY) { // for cl==ANY hints are counted towards consistency @@ -981,7 +1052,7 @@ future> storage_proxy::mutat // apply is used to convert exceptions to exceptional future return futurize>::apply([this] (const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler create_handler) { std::vector ids; - ids.reserve(mutations.size()); + ids.reserve(boost::size(mutations)); for (auto& m : mutations) { ids.emplace_back(*this, create_handler(m, cl, type)); } @@ -989,8 +1060,9 @@ future> storage_proxy::mutat }, mutations, cl, type, std::move(create_handler)); } -future> storage_proxy::mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type) { - return mutate_prepare<>(mutations, cl, type, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { +template +future> storage_proxy::mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type) { + return mutate_prepare<>(mutations, cl, type, [this] (const typename Range::value_type& m, db::consistency_level cl, db::write_type type) { return create_write_response_handler(m, cl, type); }); } @@ -1051,11 +1123,21 @@ future<> storage_proxy::mutate_end(future<> mutate_result, utils::latency_counte * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ +future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { + return mutate_internal(std::move(mutations), cl); +} + +/* + * Range template parameter can either be range of 'mutation' or a range of 'std::unordered_map'. + * create_write_response_handler() has specialization for both types. The one for the former uses keyspace to figure out + * endpoints to send mutation to, the one for the late uses enpoints that are used as keys for the map. + */ +template future<> -storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { - logger.trace("mutate cl={}", cl, mutations); +storage_proxy::mutate_internal(Range mutations, db::consistency_level cl) { + logger.trace("mutate cl={}", cl); mlogger.trace("mutations={}", mutations); - auto type = mutations.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; + auto type = boost::size(mutations) == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; utils::latency_counter lc; lc.start(); @@ -1132,7 +1214,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - return _p.create_write_response_handler(m.schema(), ks, cl, type, freeze(m), _batchlog_endpoints, {}, {}); + return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}); }).then([this, cl] (std::vector ids) { return _p.mutate_begin(std::move(ids), cl); }); @@ -1205,25 +1287,24 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo std::vector>> local; local.reserve(3); - for(auto dest: get_write_response_handler(response_id).get_targets()) { + auto&& handler = get_write_response_handler(response_id); + + for(auto dest: handler.get_targets()) { sstring dc = get_dc(dest); - if (dc == get_local_dc()) { + // read repair writes do not go through coordinator since mutations are per destination + if (handler.read_repair_write() || dc == get_local_dc()) { local.emplace_back("", std::vector({dest})); } else { dc_groups[dc].push_back(dest); } } - auto&& handler = get_write_response_handler(response_id); - auto mptr = handler.get_mutation(); - auto schema = handler.get_schema(); - auto& m = *mptr; auto all = boost::range::join(local, dc_groups); auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [&m, schema = std::move(schema), mptr = std::move(mptr), response_id, this, my_address] { - return mutate_locally(schema, m).then([response_id, this, my_address, mptr = std::move(mptr), p = shared_from_this()] { + auto lmutate = [&handler, response_id, this, my_address] (lw_shared_ptr m) { + return mutate_locally(handler.get_schema(), *m).then([response_id, this, my_address, m, p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address); @@ -1231,11 +1312,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, &m, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward) { + auto rmutate = [this, &handler, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { auto& ms = net::get_local_messaging_service(); - _stats.queued_write_bytes += m.representation().size(); + auto msize = m.representation().size(); + _stats.queued_write_bytes += msize; return ms.send_mutation(net::messaging_service::msg_addr{coordinator, 0}, timeout, m, - std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize = m.representation().size()] { + std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize] { _stats.queued_write_bytes -= msize; unthrottle(); }); @@ -1250,12 +1332,23 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo future<> f = make_ready_future<>(); - ++_stats.writes_attempts.get_ep_stat(coordinator); - if (coordinator == my_address) { - f = futurize::apply(lmutate); + lw_shared_ptr m = handler.get_mutation_for(coordinator); + + if (!m) { + got_response(response_id, coordinator); } else { - f = futurize::apply(rmutate, coordinator, std::move(forward)); + if (!handler.read_repair_write()) { + ++_stats.writes_attempts.get_ep_stat(coordinator); + } else { + ++_stats.read_repair_write_attempts.get_ep_stat(coordinator); + } + + if (coordinator == my_address) { + f = futurize::apply(lmutate, std::move(m)); + } else { + f = futurize::apply(rmutate, coordinator, std::move(forward), *m); + } } f.handle_exception([coordinator, p = shared_from_this()] (std::exception_ptr eptr) { @@ -1275,10 +1368,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // returns number of hints stored template -size_t storage_proxy::hint_to_dead_endpoints(lw_shared_ptr m, const Range& targets) noexcept +size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets) noexcept { return boost::count_if(targets | boost::adaptors::filtered(std::bind1st(std::mem_fn(&storage_proxy::should_hint), this)), - std::bind(std::mem_fn(&storage_proxy::submit_hint), this, m, std::placeholders::_1)); + std::bind(std::mem_fn(&storage_proxy::submit_hint), this, std::ref(mh), std::placeholders::_1)); } size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) { @@ -1291,7 +1384,7 @@ size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) { return it->second; } -bool storage_proxy::submit_hint(lw_shared_ptr m, gms::inet_address target) +bool storage_proxy::submit_hint(std::unique_ptr& mh, gms::inet_address target) { warn(unimplemented::cause::HINT); // local write that time out should be handled by LocalMutationRunnable @@ -1473,22 +1566,11 @@ bool storage_proxy::submit_hint(lw_shared_ptr m, gms::ine } #endif -future<> storage_proxy::schedule_repair(std::unordered_map> diffs) { - return parallel_for_each(diffs, [this] (std::pair>& i) { - auto type = i.second.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; - utils::latency_counter lc; - lc.start(); - - return mutate_prepare<>(std::move(i.second), db::consistency_level::ONE, type, [ep = i.first, this] (const mutation& m, db::consistency_level cl, db::write_type type) { - auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::unordered_set({ep}, 1), {}, {}); - }).then([this, ep = i.first] (std::vector ids) { - _stats.read_repair_write_attempts.get_ep_stat(ep) += ids.size(); - return mutate_begin(std::move(ids), db::consistency_level::ONE); - }).then_wrapped([this, lc] (future<> f) { - return mutate_end(std::move(f), lc); - }); - }).finally([p = shared_from_this()] {}); +future<> storage_proxy::schedule_repair(std::unordered_map>> diffs, db::consistency_level cl) { + if (diffs.empty()) { + return make_ready_future<>(); + } + return mutate_internal(diffs | boost::adaptors::map_values, cl); } class abstract_read_resolver { @@ -1640,7 +1722,7 @@ class data_read_resolver : public abstract_read_resolver { size_t _total_live_count = 0; uint32_t _max_live_count = 0; std::vector _data_results; - std::unordered_map> _diffs; + std::unordered_map>> _diffs; private: virtual void on_timeout() override { // we will not need them any more @@ -1834,22 +1916,49 @@ public: return mutation_and_live_row_count { std::move(m), live_row_count }; }); + bool has_diff = false; + // calculate differences for (auto z : boost::combine(versions, reconciled_partitions)) { const mutation& m = z.get<1>().mut; for (const version& v : z.get<0>()) { auto diff = m.partition().difference(schema, v.par.mut().unfreeze(schema).partition()); + auto it = _diffs[m.token()].find(v.from); + std::experimental::optional mdiff; if (!diff.empty()) { - _diffs[v.from].emplace_back(mutation(schema, m.decorated_key(), std::move(diff))); + has_diff = true; + mdiff = mutation(schema, m.decorated_key(), std::move(diff)); + } + if (it == _diffs[m.token()].end()) { + _diffs[m.token()].emplace(v.from, std::move(mdiff)); + } else { + // should not really happen, but lets try to deal with it + if (mdiff) { + if (it->second) { + it->second.value().apply(std::move(mdiff.value())); + } else { + it->second = std::move(mdiff); + } + } } } } - if (!_diffs.empty()) { + if (has_diff) { if (_total_live_count >= original_row_limit && got_incomplete_information(schema, cmd, original_row_limit, reconciled_partitions | boost::adaptors::reversed, last_rows)) { return {}; } + // filter out partitions with empty diffs + for (auto it = _diffs.begin(); it != _diffs.end();) { + if (boost::algorithm::none_of(it->second | boost::adaptors::map_values, std::mem_fn(&std::experimental::optional::operator bool))) { + it = _diffs.erase(it); + } else { + ++it; + } + } + } else { + _diffs.clear(); } // build reconcilable_result from reconciled data @@ -1999,7 +2108,7 @@ protected: // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) - _proxy->schedule_repair(data_resolver->get_diffs_for_repair()).then([this, result = std::move(result)] () mutable { + _proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl).then([this, result = std::move(result)] () mutable { _result_promise.set_value(std::move(result)); }).handle_exception([this, exec] (std::exception_ptr eptr) { try { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 259dd205c1..971c4888db 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -55,6 +55,7 @@ namespace service { class abstract_write_response_handler; class abstract_read_executor; +class mutation_holder; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { using clock_type = std::chrono::steady_clock; @@ -197,17 +198,18 @@ private: void got_response(response_id_type id, gms::inet_address from); future<> response_wait(response_id_type id, clock_type::time_point timeout); abstract_write_response_handler& get_write_response_handler(storage_proxy::response_id_type id); - response_id_type create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, + response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, const std::vector& pending_endpoints, std::vector); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type); + response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout); template - size_t hint_to_dead_endpoints(lw_shared_ptr m, const Range& targets) noexcept; + size_t hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets) noexcept; void hint_to_dead_endpoints(response_id_type, db::consistency_level); bool cannot_hint(gms::inet_address target); size_t get_hints_in_progress_for(gms::inet_address target); bool should_hint(gms::inet_address ep) noexcept; - bool submit_hint(lw_shared_ptr m, gms::inet_address target); + bool submit_hint(std::unique_ptr& mh, gms::inet_address target); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl); @@ -228,13 +230,16 @@ private: db::consistency_level cl); template future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); - future> mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type); + template + future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type); future<> mutate_begin(std::vector ids, db::consistency_level cl); future<> mutate_end(future<> mutate_result, utils::latency_counter); - future<> schedule_repair(std::unordered_map> diffs); + future<> schedule_repair(std::unordered_map>> diffs, db::consistency_level cl); bool need_throttle_writes() const; void unthrottle(); void handle_read_error(std::exception_ptr eptr); + template + future<> mutate_internal(Range mutations, db::consistency_level cl); public: storage_proxy(distributed& db);