From 874da0eb67130516fd649b1ebf2db292b740c721 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 7 Oct 2015 13:04:16 +0200 Subject: [PATCH 1/5] batchlog_manager: Run timer loop on only one shard Since replay is a "node global" operation, we should not attempt to do it in parallel on each shard. It will just overlap/interfere. Could just run this on cpu 0 or but since this _could_ be a lengty operation, each timer callback is round-robined shards just in case... --- db/batchlog_manager.cc | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 0504ce8787..cb577b4c2c 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -68,14 +68,28 @@ db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) {} future<> db::batchlog_manager::start() { - _timer.set_callback( - std::bind(&batchlog_manager::replay_all_failed_batches, this)); - _timer.arm( - lowres_clock::now() - + std::chrono::milliseconds( - service::storage_service::RING_DELAY), - std::experimental::optional { - std::chrono::milliseconds(replay_interval) }); + // Since replay is a "node global" operation, we should not attempt to + // do it in parallel on each shard. It will just overlap/interfere. + // Could just run this on cpu 0 or so, but since this _could_ be a + // lengty operation, we'll round-robin it between shards just in case... + if (smp::main_thread()) { + auto cpu = engine().cpu_id(); + _timer.set_callback( + [this, cpu]() mutable { + auto dest = (cpu++ % smp::count); + return smp::submit_to(dest, [] { + return get_local_batchlog_manager().replay_all_failed_batches(); + }).then([this] { + _timer.arm(lowres_clock::now() + + std::chrono::milliseconds(replay_interval) + ); + }); + }); + _timer.arm( + lowres_clock::now() + + std::chrono::milliseconds( + service::storage_service::RING_DELAY)); + } return make_ready_future<>(); } From 6f94a3bdad7773a9bdff048f58c5f681d53d5745 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 7 Oct 2015 13:10:34 +0200 Subject: [PATCH 2/5] batchlog_manager: Use gate instead of semaphore Since that exists now. --- db/batchlog_manager.cc | 6 ++---- db/batchlog_manager.hh | 10 ++++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index cb577b4c2c..7ac678c20c 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -96,7 +96,7 @@ future<> db::batchlog_manager::start() { future<> db::batchlog_manager::stop() { _stop = true; _timer.cancel(); - return _sem.wait(std::chrono::milliseconds(60)); + return _gate.close(); } future db::batchlog_manager::count_all_batches() const { @@ -228,7 +228,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }); }; - return _sem.wait().then([this, batch = std::move(batch)] { + return seastar::with_gate(_gate, [this, batch = std::move(batch)] { logger.debug("Started replayAllFailedBatches"); typedef ::shared_ptr page_ptr; @@ -271,8 +271,6 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }).then([this] { logger.debug("Finished replayAllFailedBatches"); }); - }).finally([this] { - _sem.signal(); }); } diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh index d0a70cc3e0..d58537ee37 100644 --- a/db/batchlog_manager.hh +++ b/db/batchlog_manager.hh @@ -42,9 +42,11 @@ #pragma once #include -#include "core/future.hh" -#include "core/distributed.hh" -#include "core/timer.hh" +#include +#include +#include +#include + #include "cql3/query_processor.hh" #include "gms/inet_address.hh" #include "db_clock.hh" @@ -61,7 +63,7 @@ private: size_t _total_batches_replayed = 0; cql3::query_processor& _qp; timer _timer; - semaphore _sem; + seastar::gate _gate; bool _stop = false; std::random_device _rd; From b46496da3441a7cf240a52ab85fad7b5df609144 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 7 Oct 2015 14:25:51 +0200 Subject: [PATCH 3/5] batchlog_manager: Rename logger * More useful/referrable on command line (--log*) * Matches class name (though not origin) --- db/batchlog_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 7ac678c20c..c63c9c9272 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -57,7 +57,7 @@ #include "db/config.hh" #include "gms/failure_detector.hh" -static logging::logger logger("BatchLog Manager"); +static logging::logger logger("batchlog_manager"); const uint32_t db::batchlog_manager::replay_interval; const uint32_t db::batchlog_manager::page_size; From 6416c62d3923e3d59b6a36210729c38e11c34443 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 7 Oct 2015 14:27:01 +0200 Subject: [PATCH 4/5] main: Actually start the batchlog_manager service loop Was not invoked previously. --- main.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/main.cc b/main.cc index 5c5445e746..1a25a9584e 100644 --- a/main.cc +++ b/main.cc @@ -300,6 +300,10 @@ int main(int ac, char** av) { }).then([] { auto& ss = service::get_local_storage_service(); return ss.init_server(); + }).then([] { + return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { + return b.start(); + }); }).then([rpc_address] { return dns::gethostbyname(rpc_address); }).then([&db, &proxy, &qp, rpc_address, cql_port, thrift_port, start_thrift] (dns::hostent e) { From a4c14d3d1dea00f6f099462b64813be5eec91448 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 7 Oct 2015 14:57:55 +0200 Subject: [PATCH 5/5] batchlog_manager: Add hint of which cpu timer callback is running on --- db/batchlog_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index c63c9c9272..2da3079b40 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -229,7 +229,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }; return seastar::with_gate(_gate, [this, batch = std::move(batch)] { - logger.debug("Started replayAllFailedBatches"); + logger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id()); typedef ::shared_ptr page_ptr; sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);