From 3cc03f88fdfd9a90501ab2fb596ec986c7af7ce4 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 22 Mar 2017 12:18:21 +0000 Subject: [PATCH] commitlog_replayer: Do proper const-loopup of min positions for shards Fixes #2173 Per-shard min positions can be unset if we never collected any sstable/truncation info for it, yet replay segments of that id. Wrap the lookups to handle "missing data -> default", which should have been there in the first place. Message-Id: <1490185101-12482-1-git-send-email-calle@scylladb.com> (cherry picked from commit c3a510a08d703d16f3c455696efbf0eb419e5007) --- db/commitlog/commitlog_replayer.cc | 43 +++++++++++++----------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index cf47e55c9a..545f325880 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -116,6 +116,19 @@ public: typedef std::unordered_map shard_rpm_map; typedef std::unordered_map shard_rp_map; + replay_position min_pos(unsigned shard) const { + auto i = _min_pos.find(shard); + return i != _min_pos.end() ? i->second : replay_position(); + } + replay_position cf_min_pos(const utils::UUID& uuid, unsigned shard) const { + auto i = _rpm.find(shard); + if (i == _rpm.end()) { + return replay_position(); + } + auto j = i->second.find(uuid); + return j != i->second.end() ? j->second : replay_position(); + } + seastar::sharded& _qp; shard_rpm_map @@ -172,25 +185,6 @@ future<> db::commitlog_replayer::impl::init() { }); }); }).finally([this] { - // bugfix: the above map-reduce will not_ detect if sstables - // are _missing_ from a CF. And because of re-sharding, we can't - // just insert initial zeros into the maps, because we don't know - // how many shards there we're last time. - // However, this only affects global min pos, since - // for each CF, the worst that happens is that we have a missing - // entry -> empty replay_pos == min value. But calculating - // global min pos will be off, since we will only base it on - // existing sstables-per-shard. - // So, go through all CF:s and check, if a shard mapping does not - // have data for it, assume we must set global pos to zero. - for (auto&p : _qp.local().db().local().get_column_families()) { - for (auto&p1 : _rpm) { // for each shard - if (!p1.second.count(p.first)) { - _min_pos[p1.first] = replay_position(); - } - } - } - for (auto&p : _min_pos) { logger.debug("minimum position for shard {}: {}", p.first, p.second); } @@ -207,7 +201,7 @@ db::commitlog_replayer::impl::recover(sstring file) const { assert(_column_mappings.local_is_initialized()); replay_position rp{commitlog::descriptor(file)}; - auto gp = _min_pos.at(rp.shard_id()); + auto gp = min_pos(rp.shard_id()); if (rp.id < gp.id) { logger.debug("skipping replay of fully-flushed {}", file); @@ -255,17 +249,16 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer const column_mapping& src_cm = cm_it->second; auto shard_id = rp.shard_id(); - if (rp < _min_pos.at(shard_id)) { + if (rp < min_pos(shard_id)) { logger.trace("entry {} is less than global min position. skipping", rp); s->skipped_mutations++; return make_ready_future<>(); } auto uuid = fm.column_family_id(); - auto& map = _rpm.at(shard_id); - auto i = map.find(uuid); - if (i != map.end() && rp <= i->second) { - logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); + auto cf_rp = cf_min_pos(uuid, shard_id); + if (rp <= cf_rp) { + logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp); s->skipped_mutations++; return make_ready_future<>(); }