diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index 0504ce8787..2da3079b40 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -57,7 +57,7 @@ #include "db/config.hh" #include "gms/failure_detector.hh" -static logging::logger logger("BatchLog Manager"); +static logging::logger logger("batchlog_manager"); const uint32_t db::batchlog_manager::replay_interval; const uint32_t db::batchlog_manager::page_size; @@ -68,21 +68,35 @@ db::batchlog_manager::batchlog_manager(cql3::query_processor& qp) {} future<> db::batchlog_manager::start() { - _timer.set_callback( - std::bind(&batchlog_manager::replay_all_failed_batches, this)); - _timer.arm( - lowres_clock::now() - + std::chrono::milliseconds( - service::storage_service::RING_DELAY), - std::experimental::optional { - std::chrono::milliseconds(replay_interval) }); + // Since replay is a "node global" operation, we should not attempt to + // do it in parallel on each shard. It will just overlap/interfere. + // Could just run this on cpu 0 or so, but since this _could_ be a + // lengty operation, we'll round-robin it between shards just in case... + if (smp::main_thread()) { + auto cpu = engine().cpu_id(); + _timer.set_callback( + [this, cpu]() mutable { + auto dest = (cpu++ % smp::count); + return smp::submit_to(dest, [] { + return get_local_batchlog_manager().replay_all_failed_batches(); + }).then([this] { + _timer.arm(lowres_clock::now() + + std::chrono::milliseconds(replay_interval) + ); + }); + }); + _timer.arm( + lowres_clock::now() + + std::chrono::milliseconds( + service::storage_service::RING_DELAY)); + } return make_ready_future<>(); } future<> db::batchlog_manager::stop() { _stop = true; _timer.cancel(); - return _sem.wait(std::chrono::milliseconds(60)); + return _gate.close(); } future db::batchlog_manager::count_all_batches() const { @@ -214,8 +228,8 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }); }; - return _sem.wait().then([this, batch = std::move(batch)] { - logger.debug("Started replayAllFailedBatches"); + return seastar::with_gate(_gate, [this, batch = std::move(batch)] { + logger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id()); typedef ::shared_ptr page_ptr; sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size); @@ -257,8 +271,6 @@ future<> db::batchlog_manager::replay_all_failed_batches() { }).then([this] { logger.debug("Finished replayAllFailedBatches"); }); - }).finally([this] { - _sem.signal(); }); } diff --git a/db/batchlog_manager.hh b/db/batchlog_manager.hh index d0a70cc3e0..d58537ee37 100644 --- a/db/batchlog_manager.hh +++ b/db/batchlog_manager.hh @@ -42,9 +42,11 @@ #pragma once #include -#include "core/future.hh" -#include "core/distributed.hh" -#include "core/timer.hh" +#include +#include +#include +#include + #include "cql3/query_processor.hh" #include "gms/inet_address.hh" #include "db_clock.hh" @@ -61,7 +63,7 @@ private: size_t _total_batches_replayed = 0; cql3::query_processor& _qp; timer _timer; - semaphore _sem; + seastar::gate _gate; bool _stop = false; std::random_device _rd; diff --git a/main.cc b/main.cc index 5c5445e746..1a25a9584e 100644 --- a/main.cc +++ b/main.cc @@ -300,6 +300,10 @@ int main(int ac, char** av) { }).then([] { auto& ss = service::get_local_storage_service(); return ss.init_server(); + }).then([] { + return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { + return b.start(); + }); }).then([rpc_address] { return dns::gethostbyname(rpc_address); }).then([&db, &proxy, &qp, rpc_address, cql_port, thrift_port, start_thrift] (dns::hostent e) {