diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 614ccf63e3..cf47e55c9a 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -61,13 +61,19 @@ static logging::logger logger("commitlog_replayer"); -struct column_mappings { - std::unordered_map map; - future<> stop() { return make_ready_future<>(); } -}; - class db::commitlog_replayer::impl { - seastar::sharded _column_mappings; + struct column_mappings { + std::unordered_map map; + future<> stop() { return make_ready_future<>(); } + }; + + // we want the processing methods to be const, since they use + // shard-sharing of data -> read only + // this one is special since it is thread local. + // Should actually make sharded::local a const function (it does + // not modify content), but... + mutable seastar::sharded _column_mappings; + friend class db::commitlog_replayer; public: impl(seastar::sharded& db); @@ -94,8 +100,17 @@ public: } }; - future<> process(stats*, temporary_buffer buf, replay_position rp); - future recover(sstring file); + // move start/stop of the thread local bookkeep to "top level" + // and also make sure to assert on it actually being started. + future<> start() { + return _column_mappings.start(); + } + future<> stop() { + return _column_mappings.stop(); + } + + future<> process(stats*, temporary_buffer buf, replay_position rp) const; + future recover(sstring file) const; typedef std::unordered_map rp_map; typedef std::unordered_map shard_rpm_map; @@ -188,9 +203,11 @@ future<> db::commitlog_replayer::impl::init() { } future -db::commitlog_replayer::impl::recover(sstring file) { +db::commitlog_replayer::impl::recover(sstring file) const { + assert(_column_mappings.local_is_initialized()); + replay_position rp{commitlog::descriptor(file)}; - auto gp = _min_pos[rp.shard_id()]; + auto gp = _min_pos.at(rp.shard_id()); if (rp.id < gp.id) { logger.debug("skipping replay of fully-flushed {}", file); @@ -220,7 +237,7 @@ db::commitlog_replayer::impl::recover(sstring file) { }); } -future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer buf, replay_position rp) { +future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer buf, replay_position rp) const { try { commitlog_entry_reader cer(buf); @@ -238,14 +255,14 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer const column_mapping& src_cm = cm_it->second; auto shard_id = rp.shard_id(); - if (rp < _min_pos[shard_id]) { + if (rp < _min_pos.at(shard_id)) { logger.trace("entry {} is less than global min position. skipping", rp); s->skipped_mutations++; return make_ready_future<>(); } auto uuid = fm.column_family_id(); - auto& map = _rpm[shard_id]; + auto& map = _rpm.at(shard_id); auto i = map.find(uuid); if (i != map.end() && rp <= i->second) { logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); @@ -323,42 +340,54 @@ future db::commitlog_replayer::create_replayer(seastar:: } future<> db::commitlog_replayer::recover(std::vector files) { - return _impl->_column_mappings.start().then([this, files = std::move(files)] { + typedef std::unordered_multimap shard_file_map; + logger.info("Replaying {}", join(", ", files)); - return map_reduce(files, [this](auto f) { - logger.debug("Replaying {}", f); - return _impl->recover(f).then([f](impl::stats stats) { - if (stats.corrupt_bytes != 0) { - logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes); - } - logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)" - , f - , stats.applied_mutations - , stats.invalid_mutations - , stats.skipped_mutations + + // pre-compute work per shard already. + auto map = ::make_lw_shared(); + for (auto& f : files) { + commitlog::descriptor d(f); + replay_position p = d; + map->emplace(p.shard_id() % smp::count, std::move(f)); + } + + return _impl->start().then([this, map] { + return map_reduce(smp::all_cpus(), [this, map](unsigned id) { + return smp::submit_to(id, [this, id, map]() { + auto total = ::make_lw_shared(); + // TODO: or something. For now, we do this serialized per shard, + // to reduce mutation congestion. We could probably (says avi) + // do 2 segments in parallel or something, but lets use this first. + return do_for_each(map->begin(id), map->end(id), [this, total](const std::pair& p) { + auto&f = p.second; + logger.debug("Replaying {}", f); + return _impl->recover(f).then([f, total](impl::stats stats) { + if (stats.corrupt_bytes != 0) { + logger.warn("Corrupted file: {}. {} bytes skipped.", f, stats.corrupt_bytes); + } + logger.debug("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)" + , f + , stats.applied_mutations + , stats.invalid_mutations + , stats.skipped_mutations + ); + *total += stats; + }); + }).then([total] { + return make_ready_future(*total); + }); + }); + }, impl::stats(), std::plus()).then([](impl::stats totals) { + logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)" + , totals.applied_mutations + , totals.invalid_mutations + , totals.skipped_mutations ); - return make_ready_future(stats); - }).handle_exception([f](auto ep) -> future { - logger.error("Error recovering {}: {}", f, ep); - try { - std::rethrow_exception(ep); - } catch (std::invalid_argument&) { - logger.error("Scylla cannot process {}. Make sure to fully flush all Cassandra commit log files to sstable before migrating.", f); - throw; - } catch (...) { - throw; - } }); - }, impl::stats(), std::plus()).then([](impl::stats totals) { - logger.info("Log replay complete, {} replayed mutations ({} invalid, {} skipped)" - , totals.applied_mutations - , totals.invalid_mutations - , totals.skipped_mutations - ); }).finally([this] { - return _impl->_column_mappings.stop(); + return _impl->stop(); }); - }); } future<> db::commitlog_replayer::recover(sstring f) {