commitlog_replayer: Make replay parallel per shard
Fixes #2098
Replay previously did all segments in parallel on shard 0, which
caused heavy memory load. To reduce this and spread footprint
across shards, instead do X segments per shard, sequential per shard.
v2:
* Fixed whitespace errors
Message-Id: <1489503382-830-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 078589c508)
This commit is contained in:
@@ -61,13 +61,19 @@
|
||||
|
||||
static logging::logger logger("commitlog_replayer");
|
||||
|
||||
struct column_mappings {
|
||||
std::unordered_map<table_schema_version, column_mapping> map;
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
class db::commitlog_replayer::impl {
|
||||
seastar::sharded<column_mappings> _column_mappings;
|
||||
struct column_mappings {
|
||||
std::unordered_map<table_schema_version, column_mapping> map;
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
};
|
||||
|
||||
// we want the processing methods to be const, since they use
|
||||
// shard-sharing of data -> read only
|
||||
// this one is special since it is thread local.
|
||||
// Should actually make sharded::local a const function (it does
|
||||
// not modify content), but...
|
||||
mutable seastar::sharded<column_mappings> _column_mappings;
|
||||
|
||||
friend class db::commitlog_replayer;
|
||||
public:
|
||||
impl(seastar::sharded<cql3::query_processor>& db);
|
||||
@@ -94,8 +100,17 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> process(stats*, temporary_buffer<char> buf, replay_position rp);
|
||||
future<stats> recover(sstring file);
|
||||
// move start/stop of the thread local bookkeep to "top level"
|
||||
// and also make sure to assert on it actually being started.
|
||||
future<> start() {
|
||||
return _column_mappings.start();
|
||||
}
|
||||
future<> stop() {
|
||||
return _column_mappings.stop();
|
||||
}
|
||||
|
||||
future<> process(stats*, temporary_buffer<char> buf, replay_position rp) const;
|
||||
future<stats> recover(sstring file) const;
|
||||
|
||||
typedef std::unordered_map<utils::UUID, replay_position> rp_map;
|
||||
typedef std::unordered_map<unsigned, rp_map> shard_rpm_map;
|
||||
@@ -188,9 +203,11 @@ future<> db::commitlog_replayer::impl::init() {
|
||||
}
|
||||
|
||||
future<db::commitlog_replayer::impl::stats>
|
||||
db::commitlog_replayer::impl::recover(sstring file) {
|
||||
db::commitlog_replayer::impl::recover(sstring file) const {
|
||||
assert(_column_mappings.local_is_initialized());
|
||||
|
||||
replay_position rp{commitlog::descriptor(file)};
|
||||
auto gp = _min_pos[rp.shard_id()];
|
||||
auto gp = _min_pos.at(rp.shard_id());
|
||||
|
||||
if (rp.id < gp.id) {
|
||||
logger.debug("skipping replay of fully-flushed {}", file);
|
||||
@@ -220,7 +237,7 @@ db::commitlog_replayer::impl::recover(sstring file) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char> buf, replay_position rp) {
|
||||
future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char> buf, replay_position rp) const {
|
||||
try {
|
||||
|
||||
commitlog_entry_reader cer(buf);
|
||||
@@ -238,14 +255,14 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
|
||||
const column_mapping& src_cm = cm_it->second;
|
||||
|
||||
auto shard_id = rp.shard_id();
|
||||
if (rp < _min_pos[shard_id]) {
|
||||
if (rp < _min_pos.at(shard_id)) {
|
||||
logger.trace("entry {} is less than global min position. skipping", rp);
|
||||
s->skipped_mutations++;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto uuid = fm.column_family_id();
|
||||
auto& map = _rpm[shard_id];
|
||||
auto& map = _rpm.at(shard_id);
|
||||
auto i = map.find(uuid);
|
||||
if (i != map.end() && rp <= i->second) {
|
||||
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
|
||||
@@ -323,42 +340,54 @@ future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
|
||||
return _impl->_column_mappings.start().then([this, files = std::move(files)] {
|
||||
typedef std::unordered_multimap<unsigned, sstring> shard_file_map;
|
||||
|
||||
logger.info("Replaying {}", join(", ", files));
|
||||
return map_reduce(files, [this](auto f) {
|
||||
logger.debug("Replaying {}", f);
|
||||
return _impl->recover(f).then([f](impl::stats stats) {
|
||||
if (stats.corrupt_bytes != 0) {
|
||||
logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
|
||||
}
|
||||
logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
, stats.invalid_mutations
|
||||
, stats.skipped_mutations
|
||||
|
||||
// pre-compute work per shard already.
|
||||
auto map = ::make_lw_shared<shard_file_map>();
|
||||
for (auto& f : files) {
|
||||
commitlog::descriptor d(f);
|
||||
replay_position p = d;
|
||||
map->emplace(p.shard_id() % smp::count, std::move(f));
|
||||
}
|
||||
|
||||
return _impl->start().then([this, map] {
|
||||
return map_reduce(smp::all_cpus(), [this, map](unsigned id) {
|
||||
return smp::submit_to(id, [this, id, map]() {
|
||||
auto total = ::make_lw_shared<impl::stats>();
|
||||
// TODO: or something. For now, we do this serialized per shard,
|
||||
// to reduce mutation congestion. We could probably (says avi)
|
||||
// do 2 segments in parallel or something, but lets use this first.
|
||||
return do_for_each(map->begin(id), map->end(id), [this, total](const std::pair<unsigned, sstring>& p) {
|
||||
auto&f = p.second;
|
||||
logger.debug("Replaying {}", f);
|
||||
return _impl->recover(f).then([f, total](impl::stats stats) {
|
||||
if (stats.corrupt_bytes != 0) {
|
||||
logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes);
|
||||
}
|
||||
logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, f
|
||||
, stats.applied_mutations
|
||||
, stats.invalid_mutations
|
||||
, stats.skipped_mutations
|
||||
);
|
||||
*total += stats;
|
||||
});
|
||||
}).then([total] {
|
||||
return make_ready_future<impl::stats>(*total);
|
||||
});
|
||||
});
|
||||
}, impl::stats(), std::plus<impl::stats>()).then([](impl::stats totals) {
|
||||
logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, totals.applied_mutations
|
||||
, totals.invalid_mutations
|
||||
, totals.skipped_mutations
|
||||
);
|
||||
return make_ready_future<impl::stats>(stats);
|
||||
}).handle_exception([f](auto ep) -> future<impl::stats> {
|
||||
logger.error("Error recovering {}: {}", f, ep);
|
||||
try {
|
||||
std::rethrow_exception(ep);
|
||||
} catch (std::invalid_argument&) {
|
||||
logger.error("Scylla cannot process {}. Make sure to fully flush all Cassandra commit log files to sstable before migrating.", f);
|
||||
throw;
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}, impl::stats(), std::plus<impl::stats>()).then([](impl::stats totals) {
|
||||
logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)"
|
||||
, totals.applied_mutations
|
||||
, totals.invalid_mutations
|
||||
, totals.skipped_mutations
|
||||
);
|
||||
}).finally([this] {
|
||||
return _impl->_column_mappings.stop();
|
||||
return _impl->stop();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> db::commitlog_replayer::recover(sstring f) {
|
||||
|
||||
Reference in New Issue
Block a user