diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index bc92f804d9..c47d382f65 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -56,10 +56,9 @@ #include "db/batchlog_manager.hh" #include "exceptions/exceptions.hh" #include -#include -#include #include -#include +#include +#include #include #include #include @@ -110,6 +109,66 @@ sstring get_local_dc() { return get_dc(local_addr); } +class mutation_holder { +protected: + size_t _size = 0; + schema_ptr _schema; +public: + virtual ~mutation_holder() {} + virtual lw_shared_ptr get_mutation_for(gms::inet_address ep) = 0; + virtual bool is_shared() = 0; + size_t size() const { + return _size; + } + const schema_ptr& schema() { + return _schema; + } +}; + +// different mutation for each destination (for read repairs) +class per_destination_mutation : public mutation_holder { + std::unordered_map> _mutations; + dht::token _token; +public: + per_destination_mutation(const std::unordered_map>& mutations) { + for (auto&& m : mutations) { + lw_shared_ptr fm; + if (m.second) { + _schema = m.second.value().schema(); + _token = m.second.value().token(); + fm = make_lw_shared(freeze(m.second.value())); + _size += fm->representation().size(); + } + _mutations.emplace(m.first, std::move(fm)); + } + } + lw_shared_ptr get_mutation_for(gms::inet_address ep) override { + return _mutations[ep]; + } + virtual bool is_shared() override { + return false; + } + dht::token& token() { + return _token; + } +}; + +// same mutation for each destination +class shared_mutation : public mutation_holder { + lw_shared_ptr _mutation; +public: + shared_mutation(const mutation& m) : _mutation(make_lw_shared(freeze(m))) { + _size = _mutation->representation().size(); + _schema = m.schema(); + }; + lw_shared_ptr get_mutation_for(gms::inet_address ep) override { + return _mutation; + } + virtual bool is_shared() override { + return true; + } +}; + class abstract_write_response_handler { protected: storage_proxy::response_id_type _id; @@ -118,8 +177,7 @@ protected: db::consistency_level _cl; keyspace& _ks; db::write_type _type; - schema_ptr _schema; - lw_shared_ptr _mutation; + std::unique_ptr _mutation_holder; std::unordered_set _targets; // who we sent this mutation to size_t _pending_endpoints; // how many endpoints in bootstrap state there is // added dead_endpoints as a memeber here as well. This to be able to carry the info across @@ -141,10 +199,9 @@ protected: } public: abstract_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, - lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, size_t pending_endpoints = 0, std::vector dead_endpoints = {}) - : _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _schema(std::move(s)), _mutation(std::move(mutation)), _targets(std::move(targets)), + : _id(p->_next_response_id++), _proxy(std::move(p)), _cl(cl), _ks(ks), _type(type), _mutation_holder(std::move(mh)), _targets(std::move(targets)), _pending_endpoints(pending_endpoints), _dead_endpoints(std::move(dead_endpoints)) { } virtual ~abstract_write_response_handler() { @@ -153,14 +210,14 @@ public: _ready.set_value(); } else { _proxy->_stats.background_writes--; - _proxy->_stats.background_write_bytes -= _mutation->representation().size(); + _proxy->_stats.background_write_bytes -= _mutation_holder->size(); _proxy->unthrottle(); } } }; void unthrottle() { _proxy->_stats.background_writes++; - _proxy->_stats.background_write_bytes += _mutation->representation().size(); + _proxy->_stats.background_write_bytes += _mutation_holder->size(); _throttled = false; _ready.set_value(); } @@ -193,15 +250,18 @@ public: const std::vector& get_dead_endpoints() const { return _dead_endpoints; } - lw_shared_ptr get_mutation() { - return _mutation; + lw_shared_ptr get_mutation_for(gms::inet_address ep) { + return _mutation_holder->get_mutation_for(ep); } const schema_ptr& get_schema() const { - return _schema; + return _mutation_holder->schema(); } storage_proxy::response_id_type id() const { return _id; } + bool read_repair_write() { + return !_mutation_holder->is_shared(); + } friend storage_proxy; }; @@ -213,18 +273,18 @@ class datacenter_write_response_handler : public abstract_write_response_handler } public: datacenter_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), boost::range::count_if(pending_endpoints, db::is_local), std::move(dead_endpoints)) {} }; class write_response_handler : public abstract_write_response_handler { public: write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, lw_shared_ptr mutation, std::unordered_set targets, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), std::move(targets), pending_endpoints.size(), std::move(dead_endpoints)) {} }; @@ -242,10 +302,9 @@ class datacenter_sync_write_response_handler : public abstract_write_response_ha } public: datacenter_sync_write_response_handler(shared_ptr p, keyspace& ks, db::consistency_level cl, db::write_type type, - schema_ptr s, - lw_shared_ptr mutation, std::unordered_set targets, std::vector pending_endpoints, + std::unique_ptr mh, std::unordered_set targets, std::vector pending_endpoints, std::vector dead_endpoints) : - abstract_write_response_handler(std::move(p), ks, cl, type, std::move(s), std::move(mutation), targets, 0, dead_endpoints) { + abstract_write_response_handler(std::move(p), ks, cl, type, std::move(mh), targets, 0, dead_endpoints) { auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); for (auto& target : targets) { @@ -285,7 +344,7 @@ storage_proxy::response_id_type storage_proxy::register_response_handler(std::un // we are here because either cl was achieved, but targets left in the handler are not // responding, so a hint should be written for them, or cl == any in which case // hints are counted towards consistency, so we need to write hints and count how much was written - auto hints = hint_to_dead_endpoints(e.handler->get_mutation(), e.handler->get_targets()); + auto hints = hint_to_dead_endpoints(e.handler->_mutation_holder, e.handler->get_targets()); e.handler->signal(hints); if (e.handler->_cl == db::consistency_level::ANY && hints) { logger.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); @@ -330,20 +389,18 @@ abstract_write_response_handler& storage_proxy::get_write_response_handler(stora return *_response_handlers.find(id)->second.handler; } -storage_proxy::response_id_type storage_proxy::create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, +storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints) { std::unique_ptr h; auto& rs = ks.get_replication_strategy(); - auto m = make_lw_shared(std::move(mutation)); - if (db::is_datacenter_local(cl)) { - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } else if (cl == db::consistency_level::EACH_QUORUM && rs.get_type() == locator::replication_strategy_type::network_topology){ - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } else { - h = std::make_unique(shared_from_this(), ks, cl, type, std::move(s), std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); + h = std::make_unique(shared_from_this(), ks, cl, type, std::move(m), std::move(targets), std::move(pending_endpoints), std::move(dead_endpoints)); } return register_response_handler(std::move(h)); } @@ -961,14 +1018,28 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_ logger.trace("creating write handler with live: {} dead: {}", live_endpoints, dead_endpoints); db::assure_sufficient_live_nodes(cl, ks, live_endpoints, pending_endpoints); - return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints)); + return create_write_response_handler(ks, cl, type, std::make_unique(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints)); +} + +storage_proxy::response_id_type +storage_proxy::create_write_response_handler(const std::unordered_map>& m, db::consistency_level cl, db::write_type type) { + std::unordered_set endpoints(m.size()); + boost::copy(m | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin())); + auto mh = std::make_unique(m); + + logger.trace("creating write handler for read repair token: {} endpoint: {}", mh->token(), endpoints); + + auto keyspace_name = mh->schema()->ks_name(); + keyspace& ks = _db.local().find_keyspace(keyspace_name); + + return create_write_response_handler(ks, cl, type, std::move(mh), std::move(endpoints), std::vector(), std::vector()); } void storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level cl) { auto& h = get_write_response_handler(id); - size_t hints = hint_to_dead_endpoints(h.get_mutation(), h.get_dead_endpoints()); + size_t hints = hint_to_dead_endpoints(h._mutation_holder, h.get_dead_endpoints()); if (cl == db::consistency_level::ANY) { // for cl==ANY hints are counted towards consistency @@ -981,7 +1052,7 @@ future> storage_proxy::mutat // apply is used to convert exceptions to exceptional future return futurize>::apply([this] (const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler create_handler) { std::vector ids; - ids.reserve(mutations.size()); + ids.reserve(boost::size(mutations)); for (auto& m : mutations) { ids.emplace_back(*this, create_handler(m, cl, type)); } @@ -989,8 +1060,9 @@ future> storage_proxy::mutat }, mutations, cl, type, std::move(create_handler)); } -future> storage_proxy::mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type) { - return mutate_prepare<>(mutations, cl, type, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { +template +future> storage_proxy::mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type) { + return mutate_prepare<>(mutations, cl, type, [this] (const typename Range::value_type& m, db::consistency_level cl, db::write_type type) { return create_write_response_handler(m, cl, type); }); } @@ -1051,11 +1123,21 @@ future<> storage_proxy::mutate_end(future<> mutate_result, utils::latency_counte * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ +future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { + return mutate_internal(std::move(mutations), cl); +} + +/* + * Range template parameter can either be range of 'mutation' or a range of 'std::unordered_map'. + * create_write_response_handler() has specialization for both types. The one for the former uses keyspace to figure out + * endpoints to send mutation to, the one for the late uses enpoints that are used as keys for the map. + */ +template future<> -storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { - logger.trace("mutate cl={}", cl, mutations); +storage_proxy::mutate_internal(Range mutations, db::consistency_level cl) { + logger.trace("mutate cl={}", cl); mlogger.trace("mutations={}", mutations); - auto type = mutations.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; + auto type = boost::size(mutations) == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; utils::latency_counter lc; lc.start(); @@ -1132,7 +1214,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc future<> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - return _p.create_write_response_handler(m.schema(), ks, cl, type, freeze(m), _batchlog_endpoints, {}, {}); + return _p.create_write_response_handler(ks, cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}); }).then([this, cl] (std::vector ids) { return _p.mutate_begin(std::move(ids), cl); }); @@ -1205,25 +1287,24 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo std::vector>> local; local.reserve(3); - for(auto dest: get_write_response_handler(response_id).get_targets()) { + auto&& handler = get_write_response_handler(response_id); + + for(auto dest: handler.get_targets()) { sstring dc = get_dc(dest); - if (dc == get_local_dc()) { + // read repair writes do not go through coordinator since mutations are per destination + if (handler.read_repair_write() || dc == get_local_dc()) { local.emplace_back("", std::vector({dest})); } else { dc_groups[dc].push_back(dest); } } - auto&& handler = get_write_response_handler(response_id); - auto mptr = handler.get_mutation(); - auto schema = handler.get_schema(); - auto& m = *mptr; auto all = boost::range::join(local, dc_groups); auto my_address = utils::fb_utilities::get_broadcast_address(); // lambda for applying mutation locally - auto lmutate = [&m, schema = std::move(schema), mptr = std::move(mptr), response_id, this, my_address] { - return mutate_locally(schema, m).then([response_id, this, my_address, mptr = std::move(mptr), p = shared_from_this()] { + auto lmutate = [&handler, response_id, this, my_address] (lw_shared_ptr m) { + return mutate_locally(handler.get_schema(), *m).then([response_id, this, my_address, m, p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address); @@ -1231,11 +1312,12 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo }; // lambda for applying mutation remotely - auto rmutate = [this, &m, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward) { + auto rmutate = [this, &handler, timeout, response_id, my_address] (gms::inet_address coordinator, std::vector&& forward, const frozen_mutation& m) { auto& ms = net::get_local_messaging_service(); - _stats.queued_write_bytes += m.representation().size(); + auto msize = m.representation().size(); + _stats.queued_write_bytes += msize; return ms.send_mutation(net::messaging_service::msg_addr{coordinator, 0}, timeout, m, - std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize = m.representation().size()] { + std::move(forward), my_address, engine().cpu_id(), response_id).finally([this, p = shared_from_this(), msize] { _stats.queued_write_bytes -= msize; unthrottle(); }); @@ -1250,12 +1332,23 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo future<> f = make_ready_future<>(); - ++_stats.writes_attempts.get_ep_stat(coordinator); - if (coordinator == my_address) { - f = futurize::apply(lmutate); + lw_shared_ptr m = handler.get_mutation_for(coordinator); + + if (!m) { + got_response(response_id, coordinator); } else { - f = futurize::apply(rmutate, coordinator, std::move(forward)); + if (!handler.read_repair_write()) { + ++_stats.writes_attempts.get_ep_stat(coordinator); + } else { + ++_stats.read_repair_write_attempts.get_ep_stat(coordinator); + } + + if (coordinator == my_address) { + f = futurize::apply(lmutate, std::move(m)); + } else { + f = futurize::apply(rmutate, coordinator, std::move(forward), *m); + } } f.handle_exception([coordinator, p = shared_from_this()] (std::exception_ptr eptr) { @@ -1275,10 +1368,10 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo // returns number of hints stored template -size_t storage_proxy::hint_to_dead_endpoints(lw_shared_ptr m, const Range& targets) noexcept +size_t storage_proxy::hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets) noexcept { return boost::count_if(targets | boost::adaptors::filtered(std::bind1st(std::mem_fn(&storage_proxy::should_hint), this)), - std::bind(std::mem_fn(&storage_proxy::submit_hint), this, m, std::placeholders::_1)); + std::bind(std::mem_fn(&storage_proxy::submit_hint), this, std::ref(mh), std::placeholders::_1)); } size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) { @@ -1291,7 +1384,7 @@ size_t storage_proxy::get_hints_in_progress_for(gms::inet_address target) { return it->second; } -bool storage_proxy::submit_hint(lw_shared_ptr m, gms::inet_address target) +bool storage_proxy::submit_hint(std::unique_ptr& mh, gms::inet_address target) { warn(unimplemented::cause::HINT); // local write that time out should be handled by LocalMutationRunnable @@ -1473,22 +1566,11 @@ bool storage_proxy::submit_hint(lw_shared_ptr m, gms::ine } #endif -future<> storage_proxy::schedule_repair(std::unordered_map> diffs) { - return parallel_for_each(diffs, [this] (std::pair>& i) { - auto type = i.second.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; - utils::latency_counter lc; - lc.start(); - - return mutate_prepare<>(std::move(i.second), db::consistency_level::ONE, type, [ep = i.first, this] (const mutation& m, db::consistency_level cl, db::write_type type) { - auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - return create_write_response_handler(m.schema(), ks, cl, type, freeze(m), std::unordered_set({ep}, 1), {}, {}); - }).then([this, ep = i.first] (std::vector ids) { - _stats.read_repair_write_attempts.get_ep_stat(ep) += ids.size(); - return mutate_begin(std::move(ids), db::consistency_level::ONE); - }).then_wrapped([this, lc] (future<> f) { - return mutate_end(std::move(f), lc); - }); - }).finally([p = shared_from_this()] {}); +future<> storage_proxy::schedule_repair(std::unordered_map>> diffs, db::consistency_level cl) { + if (diffs.empty()) { + return make_ready_future<>(); + } + return mutate_internal(diffs | boost::adaptors::map_values, cl); } class abstract_read_resolver { @@ -1640,7 +1722,7 @@ class data_read_resolver : public abstract_read_resolver { size_t _total_live_count = 0; uint32_t _max_live_count = 0; std::vector _data_results; - std::unordered_map> _diffs; + std::unordered_map>> _diffs; private: virtual void on_timeout() override { // we will not need them any more @@ -1834,22 +1916,49 @@ public: return mutation_and_live_row_count { std::move(m), live_row_count }; }); + bool has_diff = false; + // calculate differences for (auto z : boost::combine(versions, reconciled_partitions)) { const mutation& m = z.get<1>().mut; for (const version& v : z.get<0>()) { auto diff = m.partition().difference(schema, v.par.mut().unfreeze(schema).partition()); + auto it = _diffs[m.token()].find(v.from); + std::experimental::optional mdiff; if (!diff.empty()) { - _diffs[v.from].emplace_back(mutation(schema, m.decorated_key(), std::move(diff))); + has_diff = true; + mdiff = mutation(schema, m.decorated_key(), std::move(diff)); + } + if (it == _diffs[m.token()].end()) { + _diffs[m.token()].emplace(v.from, std::move(mdiff)); + } else { + // should not really happen, but lets try to deal with it + if (mdiff) { + if (it->second) { + it->second.value().apply(std::move(mdiff.value())); + } else { + it->second = std::move(mdiff); + } + } } } } - if (!_diffs.empty()) { + if (has_diff) { if (_total_live_count >= original_row_limit && got_incomplete_information(schema, cmd, original_row_limit, reconciled_partitions | boost::adaptors::reversed, last_rows)) { return {}; } + // filter out partitions with empty diffs + for (auto it = _diffs.begin(); it != _diffs.end();) { + if (boost::algorithm::none_of(it->second | boost::adaptors::map_values, std::mem_fn(&std::experimental::optional::operator bool))) { + it = _diffs.erase(it); + } else { + ++it; + } + } + } else { + _diffs.clear(); } // build reconcilable_result from reconciled data @@ -1999,7 +2108,7 @@ protected: // wait for write to complete before returning result to prevent multiple concurrent read requests to // trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum // another read had returned a newer value (but the newer value had not yet been sent to the other replicas) - _proxy->schedule_repair(data_resolver->get_diffs_for_repair()).then([this, result = std::move(result)] () mutable { + _proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl).then([this, result = std::move(result)] () mutable { _result_promise.set_value(std::move(result)); }).handle_exception([this, exec] (std::exception_ptr eptr) { try { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 259dd205c1..971c4888db 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -55,6 +55,7 @@ namespace service { class abstract_write_response_handler; class abstract_read_executor; +class mutation_holder; class storage_proxy : public seastar::async_sharded_service /*implements StorageProxyMBean*/ { using clock_type = std::chrono::steady_clock; @@ -197,17 +198,18 @@ private: void got_response(response_id_type id, gms::inet_address from); future<> response_wait(response_id_type id, clock_type::time_point timeout); abstract_write_response_handler& get_write_response_handler(storage_proxy::response_id_type id); - response_id_type create_write_response_handler(schema_ptr s, keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, + response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, std::unique_ptr m, std::unordered_set targets, const std::vector& pending_endpoints, std::vector); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type); + response_id_type create_write_response_handler(const std::unordered_map>&, db::consistency_level cl, db::write_type type); void send_to_live_endpoints(response_id_type response_id, clock_type::time_point timeout); template - size_t hint_to_dead_endpoints(lw_shared_ptr m, const Range& targets) noexcept; + size_t hint_to_dead_endpoints(std::unique_ptr& mh, const Range& targets) noexcept; void hint_to_dead_endpoints(response_id_type, db::consistency_level); bool cannot_hint(gms::inet_address target); size_t get_hints_in_progress_for(gms::inet_address target); bool should_hint(gms::inet_address ep) noexcept; - bool submit_hint(lw_shared_ptr m, gms::inet_address target); + bool submit_hint(std::unique_ptr& mh, gms::inet_address target); std::vector get_live_sorted_endpoints(keyspace& ks, const dht::token& token); db::read_repair_decision new_read_repair_decision(const schema& s); ::shared_ptr get_read_executor(lw_shared_ptr cmd, query::partition_range pr, db::consistency_level cl); @@ -228,13 +230,16 @@ private: db::consistency_level cl); template future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); - future> mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type); + template + future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type); future<> mutate_begin(std::vector ids, db::consistency_level cl); future<> mutate_end(future<> mutate_result, utils::latency_counter); - future<> schedule_repair(std::unordered_map> diffs); + future<> schedule_repair(std::unordered_map>> diffs, db::consistency_level cl); bool need_throttle_writes() const; void unthrottle(); void handle_read_error(std::exception_ptr eptr); + template + future<> mutate_internal(Range mutations, db::consistency_level cl); public: storage_proxy(distributed& db);