Merge "Batchlog manager - run loop on only one shard" from Calle
"* Runs the batchlog loop on only main cpu, but round-robins the actual work to each available shard in round-robin fashion. * Use gate to guard work loop instead of semaphore (better shutdown, eventually) * Actually _start_ the batch loop (not done previously) * Rename logger + add cpu# hint" Fixes #424
This commit is contained in:
@@ -57,7 +57,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
|
||||
static logging::logger logger("BatchLog Manager");
|
||||
static logging::logger logger("batchlog_manager");
|
||||
|
||||
const uint32_t db::batchlog_manager::replay_interval;
|
||||
const uint32_t db::batchlog_manager::page_size;
|
||||
@@ -68,21 +68,35 @@ db::batchlog_manager::batchlog_manager(cql3::query_processor& qp)
|
||||
{}
|
||||
|
||||
future<> db::batchlog_manager::start() {
|
||||
_timer.set_callback(
|
||||
std::bind(&batchlog_manager::replay_all_failed_batches, this));
|
||||
_timer.arm(
|
||||
lowres_clock::now()
|
||||
+ std::chrono::milliseconds(
|
||||
service::storage_service::RING_DELAY),
|
||||
std::experimental::optional<lowres_clock::duration> {
|
||||
std::chrono::milliseconds(replay_interval) });
|
||||
// Since replay is a "node global" operation, we should not attempt to
|
||||
// do it in parallel on each shard. It will just overlap/interfere.
|
||||
// Could just run this on cpu 0 or so, but since this _could_ be a
|
||||
// lengty operation, we'll round-robin it between shards just in case...
|
||||
if (smp::main_thread()) {
|
||||
auto cpu = engine().cpu_id();
|
||||
_timer.set_callback(
|
||||
[this, cpu]() mutable {
|
||||
auto dest = (cpu++ % smp::count);
|
||||
return smp::submit_to(dest, [] {
|
||||
return get_local_batchlog_manager().replay_all_failed_batches();
|
||||
}).then([this] {
|
||||
_timer.arm(lowres_clock::now()
|
||||
+ std::chrono::milliseconds(replay_interval)
|
||||
);
|
||||
});
|
||||
});
|
||||
_timer.arm(
|
||||
lowres_clock::now()
|
||||
+ std::chrono::milliseconds(
|
||||
service::storage_service::RING_DELAY));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> db::batchlog_manager::stop() {
|
||||
_stop = true;
|
||||
_timer.cancel();
|
||||
return _sem.wait(std::chrono::milliseconds(60));
|
||||
return _gate.close();
|
||||
}
|
||||
|
||||
future<size_t> db::batchlog_manager::count_all_batches() const {
|
||||
@@ -214,8 +228,8 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
|
||||
});
|
||||
};
|
||||
|
||||
return _sem.wait().then([this, batch = std::move(batch)] {
|
||||
logger.debug("Started replayAllFailedBatches");
|
||||
return seastar::with_gate(_gate, [this, batch = std::move(batch)] {
|
||||
logger.debug("Started replayAllFailedBatches (cpu {})", engine().cpu_id());
|
||||
|
||||
typedef ::shared_ptr<cql3::untyped_result_set> page_ptr;
|
||||
sstring query = sprint("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", system_keyspace::NAME, system_keyspace::BATCHLOG, page_size);
|
||||
@@ -257,8 +271,6 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
|
||||
}).then([this] {
|
||||
logger.debug("Finished replayAllFailedBatches");
|
||||
});
|
||||
}).finally([this] {
|
||||
_sem.signal();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -42,9 +42,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include "core/future.hh"
|
||||
#include "core/distributed.hh"
|
||||
#include "core/timer.hh"
|
||||
#include <core/future.hh>
|
||||
#include <core/distributed.hh>
|
||||
#include <core/timer.hh>
|
||||
#include <core/gate.hh>
|
||||
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "db_clock.hh"
|
||||
@@ -61,7 +63,7 @@ private:
|
||||
size_t _total_batches_replayed = 0;
|
||||
cql3::query_processor& _qp;
|
||||
timer<clock_type> _timer;
|
||||
semaphore _sem;
|
||||
seastar::gate _gate;
|
||||
bool _stop = false;
|
||||
|
||||
std::random_device _rd;
|
||||
|
||||
4
main.cc
4
main.cc
@@ -300,6 +300,10 @@ int main(int ac, char** av) {
|
||||
}).then([] {
|
||||
auto& ss = service::get_local_storage_service();
|
||||
return ss.init_server();
|
||||
}).then([] {
|
||||
return db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
});
|
||||
}).then([rpc_address] {
|
||||
return dns::gethostbyname(rpc_address);
|
||||
}).then([&db, &proxy, &qp, rpc_address, cql_port, thrift_port, start_thrift] (dns::hostent e) {
|
||||
|
||||
Reference in New Issue
Block a user