From 77ceeee2a8546913c8a5b0846469988fc5f3fb1d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 13 Oct 2015 12:11:27 +0300 Subject: [PATCH 1/4] storage_proxy: move array instead of copy it. --- service/storage_proxy.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index f3bcd64e2e..daf181ceb3 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -720,7 +720,7 @@ storage_proxy::create_write_response_handler(const mutation& m, db::consistency_ db::assure_sufficient_live_nodes(cl, ks, live_endpoints); - return create_write_response_handler(ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, dead_endpoints); + return create_write_response_handler(ks, cl, type, freeze(m), std::move(live_endpoints), pending_endpoints, std::move(dead_endpoints)); } void From c6644d57210adb7e0f5158c43ff99bf4021ab0a3 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 14 Oct 2015 10:44:34 +0300 Subject: [PATCH 2/4] storage_proxy: remove outdated comment --- service/storage_proxy.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index daf181ceb3..6ab691ae0a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -258,7 +258,6 @@ storage_proxy::response_id_type storage_proxy::create_write_response_handler(key auto m = make_lw_shared(std::move(mutation)); - // for now make is simple if (db::is_datacenter_local(cl)) { pending_count = std::count_if(pending_endpoints.begin(), pending_endpoints.end(), db::is_local); h = std::make_unique(ks, cl, type, std::move(m), std::move(targets), pending_count, std::move(dead_endpoints)); From db49a196daf37d8b252883ae5adaf46accfd8633 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 9 Sep 2015 11:06:28 +0300 Subject: [PATCH 3/4] storage_proxy: remove code duplication between logged and unlogged batches Currently logged batch has most of the logic on unlogged batch duplicated. This patch rework unlogged batch code in such a way that it can be reused. --- service/storage_proxy.cc | 180 ++++++++++++++++++++------------------- service/storage_proxy.hh | 4 + 2 files changed, 95 insertions(+), 89 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 6ab691ae0a..69e305df92 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -734,6 +734,66 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level } } +future> storage_proxy::mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type) { + std::vector ids; + + try { + ids.reserve(mutations.size()); + for (auto& m : mutations) { + ids.emplace_back(create_write_response_handler(m, cl, type)); + } + return make_ready_future>(std::move(ids)); + } catch(...) { + boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, this, std::placeholders::_1)); + return make_exception_future>(std::current_exception()); + } +} + +future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl, const sstring& local_dc) { + return parallel_for_each(ids, [this, cl, &local_dc] (storage_proxy::response_id_type response_id) { + // it is better to send first and hint afterwards to reduce latency + // but request may complete before hint_to_dead_endpoints() is called and + // response_id handler will be removed, so we will have to do hint with separate + // frozen_mutation copy, or manage handler live time differently. + hint_to_dead_endpoints(response_id, cl); + + // call before send_to_live_endpoints() for the same reason as above + auto f = response_wait(response_id); + send_to_live_endpoints(response_id, local_dc); + return f.handle_exception([this, response_id] (std::exception_ptr exp) { + remove_response_handler(response_id); // cancel expire_timer, so no hint will happen + return make_exception_future<>(exp); + }); + }); +} + +// this function should be called with a future that holds result of mutation attempt (usually +// future returned by mutate_begin()). The future should be ready when function is called. +future<> storage_proxy::mutate_end(future<> mutate_result, utils::latency_counter lc) { + assert(mutate_result.available()); + _stats.write.mark(lc.stop().latency_in_nano()); + try { + mutate_result.get(); + return make_ready_future<>(); + } catch (no_such_keyspace& ex) { + logger.trace("Write to non existing keyspace: {}", ex.what()); + return make_exception_future<>(std::current_exception()); + } catch(mutation_write_timeout_exception& ex) { + // timeout + logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for); + _stats.write_timeouts++; + return make_exception_future<>(std::current_exception()); + } catch (exceptions::unavailable_exception& ex) { + _stats.write_unavailables++; + logger.trace("Unavailable"); + return make_exception_future<>(std::current_exception()); + } catch(overloaded_exception& ex) { + _stats.write_unavailables++; + logger.trace("Overloaded"); + return make_exception_future<>(std::current_exception()); + } +} + /** * Use this method to have these Mutations applied * across all replicas. This method will take care @@ -745,51 +805,17 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level */ future<> storage_proxy::mutate(std::vector mutations, db::consistency_level cl) { - auto local_addr = utils::fb_utilities::get_broadcast_address(); - auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - sstring local_dc = snitch_ptr->get_datacenter(local_addr); auto type = mutations.size() == 1 ? db::write_type::SIMPLE : db::write_type::UNLOGGED_BATCH; utils::latency_counter lc; lc.start(); - return parallel_for_each(mutations, [this, cl, type, &local_dc] (mutation& m) { - storage_proxy::response_id_type response_id = create_write_response_handler(m, cl, type); - // it is better to send first and hint afterwards to reduce latency - // but request may complete before hint_to_dead_endpoints() is called and - // response_id handler will be removed, so we will have to do hint with separate - // frozen_mutation copy, or manage handler live time differently. - hint_to_dead_endpoints(response_id, cl); - - // call before send_to_live_endpoints() for the same reason as above - auto f = response_wait(response_id); - send_to_live_endpoints(response_id, local_dc); - return f.handle_exception([this, response_id, cl] (std::exception_ptr exp) { - remove_response_handler(response_id); // cancel expire_timer, so no hint will happen - //std::rethrow_exception(ex); - return make_exception_future<>(exp); - }); - }).then_wrapped([this, p = shared_from_this(), lc] (future<>&& f) mutable { - _stats.write.mark(lc.stop().latency_in_nano()); - try { - f.get(); - return make_ready_future<>(); - } catch (no_such_keyspace& ex) { - logger.trace("Write to non existing keyspace: {}", ex.what()); - return make_exception_future<>(std::current_exception()); - } catch(mutation_write_timeout_exception& ex) { - // timeout - logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for); - _stats.write_timeouts++; - return make_exception_future<>(std::current_exception()); - } catch (exceptions::unavailable_exception& ex) { - _stats.write_unavailables++; - logger.trace("Unavailable"); - return make_exception_future<>(std::current_exception()); - } catch(overloaded_exception& ex) { - _stats.write_unavailables++; - logger.trace("Overloaded"); - return make_exception_future<>(std::current_exception()); - } + return mutate_prepare(mutations, cl, type).then([this, cl] (std::vector ids) { + auto local_addr = utils::fb_utilities::get_broadcast_address(); + auto& snitch_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); + sstring local_dc = snitch_ptr->get_datacenter(local_addr); + return mutate_begin(std::move(ids), cl, local_dc); + }).then_wrapped([p = shared_from_this(), lc] (future<> f) { + return p->mutate_end(std::move(f), lc); }); } @@ -828,7 +854,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc lc.start(); class context { - storage_proxy & _p; + storage_proxy& _p; std::vector _mutations; db::consistency_level _cl; @@ -894,58 +920,34 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc }; future<> run() { - std::vector ids; - - ids.reserve(_mutations.size()); - try { - for (auto& m : _mutations) { - ids.emplace_back(_p.create_write_response_handler(m, _cl, db::write_type::BATCH)); - } - } catch(...) { - boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1)); - throw; - } - - return sync_write_to_batchlog().then([this, ids = std::move(ids)] { - return parallel_for_each(ids.begin(), ids.end(), [this](auto response_id) { - // it is better to send first and hint afterwards to reduce latency - // but request may complete before hint_to_dead_endpoints() is called and - // response_id handler will be removed, so we will have to do hint with separate - // frozen_mutation copy, or manage handler live time differently. - _p.hint_to_dead_endpoints(response_id, _cl); - // call before send_to_live_endpoints() for the same reason as above - auto f = _p.response_wait(response_id); - _p.send_to_live_endpoints(response_id, _local_dc); - return f.handle_exception([this, response_id](std::exception_ptr p) { - _p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen - try { - std::rethrow_exception(p); - } catch (mutation_write_timeout_exception& ex) { - logger.trace("Write timeout; received {} of {} required replies", ex.received, ex.block_for); - throw; - } - }); + return _p.mutate_prepare(_mutations, _cl, db::write_type::BATCH).then([this] (std::vector ids) { + return sync_write_to_batchlog().then_wrapped([this, ids = std::move(ids)] (future<> f) { + try { + f.get(); + return _p.mutate_begin(std::move(ids), _cl, _local_dc); + } catch(...) { + // writing batchlog failed, remove responce handlers that will not be used now + boost::for_each(ids, std::bind(&storage_proxy::remove_response_handler, &_p, std::placeholders::_1)); + throw; + } }); }).finally(std::bind(&context::async_remove_from_batchlog, this)); } }; - try { - auto ctxt = make_lw_shared(*this, std::move(mutations), cl); - return ctxt->run().finally([p = shared_from_this(), lc, ctxt]() mutable { - p->_stats.write.mark(lc.stop().latency_in_nano()); - }); - } catch (no_such_keyspace& ex) { - return make_exception_future<>(std::current_exception()); - } catch (exceptions::unavailable_exception& ex) { - _stats.write_unavailables++; - logger.trace("Unavailable"); - return make_exception_future<>(std::current_exception()); - } catch (overloaded_exception& ex) { - _stats.write_unavailables++; - logger.trace("Overloaded"); - return make_exception_future<>(std::current_exception()); - } + auto mk_ctxt = [this] (std::vector mutations, db::consistency_level cl) { + try { + return make_ready_future>(make_lw_shared(*this, std::move(mutations), cl)); + } catch(...) { + return make_exception_future>(std::current_exception()); + } + }; + + return mk_ctxt(std::move(mutations), cl).then([this] (lw_shared_ptr ctxt) { + return ctxt->run().finally([ctxt]{}); + }).then_wrapped([p = shared_from_this(), lc] (future<> f) mutable { + return p->mutate_end(std::move(f), lc); + }); } bool storage_proxy::cannot_hint(gms::inet_address target) { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 7e4239dacb..b8396e8365 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -124,6 +124,10 @@ private: lw_shared_ptr cmd, std::vector&& partition_ranges, db::consistency_level cl); + future> mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type); + future<> mutate_begin(const std::vector ids, db::consistency_level cl, const sstring& local_dc); + future<> mutate_end(future<> mutate_result, utils::latency_counter); + public: storage_proxy(distributed& db); ~storage_proxy(); From 19770268bec88fe54a04501da6481b12afe9969d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 13 Oct 2015 16:59:14 +0300 Subject: [PATCH 4/4] storage_proxy: use common write code to write batch log mutations. Reworks write code further so it can be used to write batch log mutations. --- service/storage_proxy.cc | 44 ++++++++++++++++++++-------------------- service/storage_proxy.hh | 5 ++++- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 69e305df92..25b1a29612 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -250,7 +250,8 @@ abstract_write_response_handler& storage_proxy::get_write_response_handler(stora return *_response_handlers.find(id)->second.handler; } -storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, std::vector& pending_endpoints, std::vector dead_endpoints) +storage_proxy::response_id_type storage_proxy::create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, + std::unordered_set targets, const std::vector& pending_endpoints, std::vector dead_endpoints) { std::unique_ptr h; auto& rs = ks.get_replication_strategy(); @@ -734,13 +735,14 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level } } -future> storage_proxy::mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type) { +template +future> storage_proxy::mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler create_handler) { std::vector ids; try { ids.reserve(mutations.size()); for (auto& m : mutations) { - ids.emplace_back(create_write_response_handler(m, cl, type)); + ids.emplace_back(create_handler(m, cl, type)); } return make_ready_future>(std::move(ids)); } catch(...) { @@ -749,6 +751,12 @@ future> storage_proxy::mutate_prepa } } +future> storage_proxy::mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type) { + return mutate_prepare<>(mutations, cl, type, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { + return create_write_response_handler(m, cl, type); + }); +} + future<> storage_proxy::mutate_begin(std::vector ids, db::consistency_level cl, const sstring& local_dc) { return parallel_for_each(ids, [this, cl, &local_dc] (storage_proxy::response_id_type response_id) { // it is better to send first and hint afterwards to reduce latency @@ -887,18 +895,18 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc return chosen_endpoints; }()) { } + + future<> send_batchlog_mutation(mutation m) { + return _p.mutate_prepare<>(std::array{std::move(m)}, _cl, db::write_type::BATCH_LOG, [this] (const mutation& m, db::consistency_level cl, db::write_type type) { + auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); + return _p.create_write_response_handler(ks, cl, type, freeze(m), _batchlog_endpoints, {}, {}); + }).then([this] (std::vector ids) { + return _p.mutate_begin(std::move(ids), _cl, _local_dc); + }); + } future<> sync_write_to_batchlog() { auto m = db::get_batchlog_manager().local().get_batch_log_mutation_for(_mutations, _batch_uuid, net::messaging_service::current_version); - auto h = std::make_unique(_p._db.local().find_keyspace(db::system_keyspace::NAME), db::consistency_level::ONE, db::write_type::BATCH_LOG, make_lw_shared(freeze(m)), _batchlog_endpoints); - response_id_type response_id = _p.register_response_handler(std::move(h)); - - auto f = _p.response_wait(response_id); - _p.send_to_live_endpoints(response_id, _local_dc); - return f.handle_exception([this, response_id](auto ex) { - _p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen - //return make_exception_future(ex); - std::rethrow_exception(ex); - }); + return send_batchlog_mutation(std::move(m)); }; void async_remove_from_batchlog() { // delete batch @@ -908,15 +916,7 @@ storage_proxy::mutate_atomically(std::vector mutations, db::consistenc mutation m(key, schema); m.partition().apply_delete(*schema, {}, tombstone(now, gc_clock::now())); - auto h = std::make_unique(_p._db.local().find_keyspace(db::system_keyspace::NAME), db::consistency_level::ONE, db::write_type::BATCH_LOG, make_lw_shared(freeze(m)), _batchlog_endpoints); - auto response_id = _p.register_response_handler(std::move(h)); - - auto f = _p.response_wait(response_id); - _p.send_to_live_endpoints(response_id, _local_dc); - f.handle_exception([&p = _p, response_id](std::exception_ptr ex) { - p.remove_response_handler(response_id); // cancel expire_timer, so no hint will happen - std::rethrow_exception(ex); - }); + send_batchlog_mutation(std::move(m)); }; future<> run() { diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index b8396e8365..09d3d9de10 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -97,7 +97,8 @@ private: void got_response(response_id_type id, gms::inet_address from); future<> response_wait(response_id_type id); abstract_write_response_handler& get_write_response_handler(storage_proxy::response_id_type id); - response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, std::vector& pending_endpoints, std::vector); + response_id_type create_write_response_handler(keyspace& ks, db::consistency_level cl, db::write_type type, frozen_mutation&& mutation, std::unordered_set targets, + const std::vector& pending_endpoints, std::vector); response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type); future<> send_to_live_endpoints(response_id_type response_id, sstring local_data_center); template @@ -124,6 +125,8 @@ private: lw_shared_ptr cmd, std::vector&& partition_ranges, db::consistency_level cl); + template + future> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler); future> mutate_prepare(std::vector& mutations, db::consistency_level cl, db::write_type type); future<> mutate_begin(const std::vector ids, db::consistency_level cl, const sstring& local_dc); future<> mutate_end(future<> mutate_result, utils::latency_counter);