Files
scylladb/db/commitlog/commitlog_replayer.cc
Raphael S. Carvalho c729ea36e1 commitlog: guard commit log replay against reordering
After killing scylla in the middle of a write, the next scylla
instance failed to finish commit log replay, showing the following
error message:

scylla: core/future.hh:448: void promise<T>::set_value(A&& ...)
[with A = {}; T = {}]: Assertion `_state' failed.

After a long debug session, I figured out that check_valid_rp() was
triggering the exception replay_position_reordered_exception, which
means replay position reordering.

Looking at 8b9a63a3c6, I noticed that database::apply is guarded
against reodering, but commitlog replay code is not.

Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
2015-09-12 06:17:14 -03:00

248 lines
8.6 KiB
C++

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modified by Cloudius Systems
* Copyright 2015 Cloudius Systems
*/
#include <memory>
#include <vector>
#include <algorithm>
#include <unordered_map>
#include <boost/range/adaptor/map.hpp>
#include <core/future.hh>
#include <core/sharded.hh>
#include "commitlog.hh"
#include "commitlog_replayer.hh"
#include "database.hh"
#include "sstables/sstables.hh"
#include "db/system_keyspace.hh"
#include "db/serializer.hh"
#include "cql3/query_processor.hh"
#include "log.hh"
static logging::logger logger("commitlog_replayer");
class db::commitlog_replayer::impl {
public:
impl(seastar::sharded<cql3::query_processor>& db);
future<> init();
struct stats {
uint64_t invalid_mutations = 0;
uint64_t skipped_mutations = 0;
uint64_t applied_mutations = 0;
};
future<> process(stats*, temporary_buffer<char> buf, replay_position rp);
future<stats> recover(sstring file);
typedef std::unordered_map<utils::UUID, replay_position> rp_map;
typedef std::unordered_map<unsigned, rp_map> shard_rpm_map;
typedef std::unordered_map<unsigned, replay_position> shard_rp_map;
seastar::sharded<cql3::query_processor>&
_qp;
shard_rpm_map
_rpm;
shard_rp_map
_min_pos;
};
db::commitlog_replayer::impl::impl(seastar::sharded<cql3::query_processor>& qp)
: _qp(qp)
{}
future<> db::commitlog_replayer::impl::init() {
return _qp.map_reduce([this](shard_rpm_map map) {
for (auto& p1 : map) {
for (auto& p2 : p1.second) {
auto& pp = _rpm[p1.first][p2.first];
pp = std::max(pp, p2.second);
auto& min = _min_pos[p1.first];
min = (min == replay_position()) ? p2.second : std::min(p2.second, min);
}
}
}, [this](cql3::query_processor& qp) {
return do_with(shard_rpm_map{}, [this, &qp](shard_rpm_map& map) {
return parallel_for_each(qp.db().local().get_column_families(), [&map, &qp](auto& cfp) {
auto uuid = cfp.first;
for (auto& sst : *cfp.second->get_sstables() | boost::adaptors::map_values) {
try {
auto p = sst->get_stats_metadata().position;
logger.trace("sstable {} -> rp {}", sst->get_filename(), p);
if (p != replay_position()) {
auto& pp = map[p.shard_id()][uuid];
pp = std::max(pp, p);
}
} catch (...) {
logger.warn("Could not read sstable metadata {}", std::current_exception());
}
}
// TODO: this is not correct. Truncation does not fully take sharding into consideration
return db::system_keyspace::get_truncated_position(qp, uuid).then([&map, uuid](auto truncated_rp) {
if (truncated_rp != replay_position()) {
auto& pp = map[engine().cpu_id()][uuid];
pp = std::max(pp, truncated_rp);
}
});
}).then([&map] {
return make_ready_future<shard_rpm_map>(map);
});
});
}).finally([this] {
for (auto&p : _min_pos) {
logger.debug("minimum position for shard {}: {}", p.first, p.second);
}
for (auto&p1 : _rpm) {
for (auto& p2 : p1.second) {
logger.debug("replay position for shard/uuid {}/{}: {}", p1.first, p2.first, p2.second);
}
}
});
}
future<db::commitlog_replayer::impl::stats>
db::commitlog_replayer::impl::recover(sstring file) {
logger.info("Replaying {}", file);
replay_position rp{commitlog::descriptor(file)};
auto gp = _min_pos[rp.shard_id()];
if (rp.id < gp.id) {
logger.debug("skipping replay of fully-flushed {}", file);
return make_ready_future<stats>();
}
position_type p = 0;
if (rp.id == gp.id) {
p = gp.pos;
}
auto s = make_lw_shared<stats>();
return db::commitlog::read_log_file(file,
std::bind(&impl::process, this, s.get(), std::placeholders::_1,
std::placeholders::_2), p).then([](auto s) {
auto f = s.done();
return f.finally([s = std::move(s)] {});
}).then([s] {
return make_ready_future<stats>(*s);
});
}
future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char> buf, replay_position rp) {
auto shard = rp.shard_id();
if (rp < _min_pos[shard]) {
logger.trace("entry {} is less than global min position. skipping", rp);
s->skipped_mutations++;
return make_ready_future<>();
}
try {
frozen_mutation fm(bytes(reinterpret_cast<const int8_t *>(buf.get()), buf.size()));
auto uuid = fm.column_family_id();
auto& map = _rpm[shard];
auto i = map.find(uuid);
if (i != map.end() && rp < i->second) {
logger.trace("entry {} at {} is less than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
s->skipped_mutations++;
return make_ready_future<>();
}
auto shard = _qp.local().db().local().shard_of(fm);
return _qp.local().db().invoke_on(shard, [fm = std::move(fm), rp, shard, s] (database& db) -> future<> {
// TODO: might need better verification that the deserialized mutation
// is schema compatible. My guess is that just applying the mutation
// will not do this.
auto& cf = db.find_column_family(fm.column_family_id());
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("replaying at {} {}:{} at {}", fm.column_family_id(),
cf.schema()->ks_name(), cf.schema()->cf_name(), rp);
}
try {
cf.apply(fm, replay_position(shard, rp.base_id(), rp.pos));
} catch (replay_position_reordered_exception&) {
logger.debug("replay_position reordering detected");
auto m = make_lw_shared<frozen_mutation>(std::move(fm));
return db.apply(*m).then([m] {});
}
s->applied_mutations++;
return make_ready_future<>();
});
} catch (no_such_column_family&) {
// No such CF now? Origin just ignores this.
} catch (...) {
s->invalid_mutations++;
// TODO: write mutation to file like origin.
logger.warn("error replaying: {}", std::current_exception());
}
return make_ready_future<>();
}
db::commitlog_replayer::commitlog_replayer(seastar::sharded<cql3::query_processor>& qp)
: _impl(std::make_unique<impl>(qp))
{}
db::commitlog_replayer::commitlog_replayer(commitlog_replayer&& r)
: _impl(std::move(r._impl))
{}
db::commitlog_replayer::~commitlog_replayer()
{}
future<db::commitlog_replayer> db::commitlog_replayer::create_replayer(seastar::sharded<cql3::query_processor>& qp) {
return do_with(commitlog_replayer(qp), [](auto&& rp) {
auto f = rp._impl->init();
return f.then([rp = std::move(rp)]() mutable {
return make_ready_future<commitlog_replayer>(std::move(rp));
});
});
}
future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
logger.info("Replaying {}", files);
return parallel_for_each(files, [this](auto f) {
return this->recover(std::move(f));
});
}
future<> db::commitlog_replayer::recover(sstring file) {
return _impl->recover(file).then([file](impl::stats stats) {
logger.info("Log replay of {} complete, {} replayed mutations ({} invalid, {} skipped)"
, file
, stats.applied_mutations
, stats.invalid_mutations
, stats.skipped_mutations
);
});
}