commitlog_replayer: Do proper const-loopup of min positions for shards

Fixes #2173

Per-shard min positions can be unset if we never collected any
sstable/truncation info for it, yet replay segments of that id.

Wrap the lookups to handle "missing data -> default", which should have been
there in the first place.

Message-Id: <1490185101-12482-1-git-send-email-calle@scylladb.com>
(cherry picked from commit c3a510a08d)
This commit is contained in:
Calle Wilund
2017-03-22 12:18:21 +00:00
committed by Avi Kivity
parent 4179d8f7c4
commit 3cc03f88fd

View File

@@ -116,6 +116,19 @@ public:
typedef std::unordered_map<unsigned, rp_map> shard_rpm_map;
typedef std::unordered_map<unsigned, replay_position> shard_rp_map;
replay_position min_pos(unsigned shard) const {
auto i = _min_pos.find(shard);
return i != _min_pos.end() ? i->second : replay_position();
}
replay_position cf_min_pos(const utils::UUID& uuid, unsigned shard) const {
auto i = _rpm.find(shard);
if (i == _rpm.end()) {
return replay_position();
}
auto j = i->second.find(uuid);
return j != i->second.end() ? j->second : replay_position();
}
seastar::sharded<cql3::query_processor>&
_qp;
shard_rpm_map
@@ -172,25 +185,6 @@ future<> db::commitlog_replayer::impl::init() {
});
});
}).finally([this] {
// bugfix: the above map-reduce will not_ detect if sstables
// are _missing_ from a CF. And because of re-sharding, we can't
// just insert initial zeros into the maps, because we don't know
// how many shards there we're last time.
// However, this only affects global min pos, since
// for each CF, the worst that happens is that we have a missing
// entry -> empty replay_pos == min value. But calculating
// global min pos will be off, since we will only base it on
// existing sstables-per-shard.
// So, go through all CF:s and check, if a shard mapping does not
// have data for it, assume we must set global pos to zero.
for (auto&p : _qp.local().db().local().get_column_families()) {
for (auto&p1 : _rpm) { // for each shard
if (!p1.second.count(p.first)) {
_min_pos[p1.first] = replay_position();
}
}
}
for (auto&p : _min_pos) {
logger.debug("minimum position for shard {}: {}", p.first, p.second);
}
@@ -207,7 +201,7 @@ db::commitlog_replayer::impl::recover(sstring file) const {
assert(_column_mappings.local_is_initialized());
replay_position rp{commitlog::descriptor(file)};
auto gp = _min_pos.at(rp.shard_id());
auto gp = min_pos(rp.shard_id());
if (rp.id < gp.id) {
logger.debug("skipping replay of fully-flushed {}", file);
@@ -255,17 +249,16 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
const column_mapping& src_cm = cm_it->second;
auto shard_id = rp.shard_id();
if (rp < _min_pos.at(shard_id)) {
if (rp < min_pos(shard_id)) {
logger.trace("entry {} is less than global min position. skipping", rp);
s->skipped_mutations++;
return make_ready_future<>();
}
auto uuid = fm.column_family_id();
auto& map = _rpm.at(shard_id);
auto i = map.find(uuid);
if (i != map.end() && rp <= i->second) {
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
auto cf_rp = cf_min_pos(uuid, shard_id);
if (rp <= cf_rp) {
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp);
s->skipped_mutations++;
return make_ready_future<>();
}