Merge "Truncation records per shard"

Fixes  #423

"Changes the "truncated_at" blob contents of system.local table. It now stores
N replay_positions, where N == # shards.

The system.local table schema remains unchanged, and older truncation data
is accepted, though it will for obvious reasons still be insufficient.

Since the data is opaque to the running instance, blob compatibilty with
origin should be irrelevant (and we're not really that now anyway).

Note that technically, changing shard cound inbetween runs could make us hold
on to RP data "longer than required", but this is
a.) Insignificant data sizes
b.) Data that is valid exactly once: When restarting a failed node and
    replaying. The "shards" only refer to "last run", and after that we don't
    care. At worst, we can get less than fresh data (not all shards manage
    to save truncation records before crash).

It is worth noting (and I've done do in the code) that the system.local table
+ sharding cause some rather silly inefficiencens, since for this (and others)
we store a value for each shard, each save which causes a global flush of the
systable, in turn delegated on all cores. So the op is N^2 in "db complexity".
At some point we should maybe consider if operations like "drop table" and
"truncate" should not be done on shard level, but on machine level, so it can
coordinate itself. But otoh, it is rare and not _very_ expensive either."
This commit is contained in:
Avi Kivity
2015-10-07 14:33:22 +03:00
6 changed files with 128 additions and 37 deletions

View File

@@ -70,17 +70,25 @@ public:
}
// this could maybe be done as an overload of get_as (or something), but that just
// muddles things for no real gain. Let user (us) attempt to know what he is doing instead.
template<typename K, typename V>
std::unordered_map<K, V> get_map(const sstring& name) const {
auto vec = boost::any_cast<const map_type_impl::native_type&>(
map_type_impl::get_instance(data_type_for<K>(),
data_type_for<V>(), false)->deserialize(
get_blob(name)));
std::unordered_map<K, V> res;
std::transform(vec.begin(), vec.end(),
std::inserter(res, res.end()), [](auto& p) {
template<typename K, typename V, typename Iter>
void get_map_data(const sstring& name, Iter out, data_type keytype =
data_type_for<K>(), data_type valtype =
data_type_for<V>()) const {
auto vec =
boost::any_cast<const map_type_impl::native_type&>(
map_type_impl::get_instance(keytype, valtype, false)->deserialize(
get_blob(name)));
std::transform(vec.begin(), vec.end(), out,
[](auto& p) {
return std::pair<K, V>(boost::any_cast<const K&>(p.first), boost::any_cast<const V&>(p.second));
});
}
template<typename K, typename V, typename ... Rest>
std::unordered_map<K, V, Rest...> get_map(const sstring& name,
data_type keytype = data_type_for<K>(), data_type valtype =
data_type_for<V>()) const {
std::unordered_map<K, V, Rest...> res;
get_map_data<K, V>(name, std::inserter(res, res.end()), keytype, valtype);
return res;
}
const std::vector<::shared_ptr<column_specification>>& get_columns() const {

View File

@@ -895,7 +895,11 @@ void db::commitlog::segment_manager::discard_unused_segments() {
logger.debug("Segment {} is unused", *s);
return true;
}
logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
if (s->is_still_allocating()) {
logger.debug("Not safe to delete segment {}; still allocating.", s);
} else {
logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s});
}
return false;
});
if (i != _segments.end()) {
@@ -1188,7 +1192,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
}
future<> read_header() {
return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer<char> buf) {
advance(buf);
if (!advance(buf)) {
// zero length file. accept it just to be nice.
return make_ready_future<>();
}
// Will throw if we got eof
data_input in(buf);
auto ver = in.read<uint32_t>();
@@ -1208,9 +1215,6 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
this->id = id;
this->next = 0;
if (start_off > pos) {
return skip(start_off - pos);
}
return make_ready_future<>();
});
}
@@ -1238,6 +1242,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
this->next = next;
if (start_off >= next) {
return skip(next - pos);
}
return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this));
});
}
@@ -1265,6 +1273,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
throw std::runtime_error("Invalid entry size");
}
if (start_off > pos) {
return skip(size - entry_header_size);
}
return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer<char> buf) {
advance(buf);
@@ -1297,8 +1309,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type
auto w = make_lw_shared<work>(std::move(f), off);
auto ret = w->s.listen(std::move(next));
w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] {
w->s.started().then(std::bind(&work::read_file, w.get())).then([w] {
w->s.close();
}).handle_exception([w](auto ep) {
w->s.set_exception(ep);
});
return ret;

View File

@@ -117,11 +117,16 @@ future<> db::commitlog_replayer::impl::init() {
logger.warn("Could not read sstable metadata {}", std::current_exception());
}
}
// TODO: this is not correct. Truncation does not fully take sharding into consideration
return db::system_keyspace::get_truncated_position(uuid).then([&map, uuid](auto truncated_rp) {
if (truncated_rp != replay_position()) {
auto& pp = map[engine().cpu_id()][uuid];
pp = std::max(pp, truncated_rp);
// We do this on each cpu, for each CF, which technically is a little wasteful, but the values are
// cached, this is only startup, and it makes the code easier.
// Get all truncation records for the CF and initialize max rps if
// present. Cannot do this on demand, as there may be no sstables to
// mark the CF as "needed".
return db::system_keyspace::get_truncated_position(uuid).then([&map, &uuid](std::vector<db::replay_position> tpps) {
for (auto& p : tpps) {
logger.trace("CF {} truncated at {}", uuid, p);
auto& pp = map[p.shard_id()][uuid];
pp = std::max(pp, p);
}
});
}).then([&map] {
@@ -183,8 +188,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer<char>
auto uuid = fm.column_family_id();
auto& map = _rpm[shard];
auto i = map.find(uuid);
if (i != map.end() && rp < i->second) {
logger.trace("entry {} at {} is less than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
if (i != map.end() && rp <= i->second) {
logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second);
s->skipped_mutations++;
return make_ready_future<>();
}
@@ -248,7 +253,10 @@ future<> db::commitlog_replayer::recover(std::vector<sstring> files) {
logger.info("Replaying {}", files);
return parallel_for_each(files, [this](auto f) {
return this->recover(std::move(f));
return this->recover(f).handle_exception([f](auto ep) {
logger.error("Error recovering {}: {}", f, ep);
std::rethrow_exception(ep);
});
});
}

View File

@@ -71,6 +71,9 @@ struct replay_position {
bool operator<(const replay_position & r) const {
return id < r.id ? true : (r.id < id ? false : pos < r.pos);
}
bool operator<=(const replay_position & r) const {
return id < r.id ? true : (r.id < id ? false : pos <= r.pos);
}
bool operator==(const replay_position & r) const {
return id == r.id && pos == r.pos;
}

View File

@@ -40,6 +40,8 @@
#include <boost/range/algorithm_ext/push_back.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/map.hpp>
#include "system_keyspace.hh"
#include "types.hh"
@@ -50,6 +52,7 @@
#include "cql3/query_options.hh"
#include "cql3/query_processor.hh"
#include "utils/fb_utilities.hh"
#include "utils/hash.hh"
#include "dht/i_partitioner.hh"
#include "version.hh"
#include "thrift/server.hh"
@@ -506,15 +509,30 @@ future<> setup(distributed<database>& db, distributed<cql3::query_processor>& qp
return make_ready_future<>();
}
typedef std::pair<db::replay_position, db_clock::time_point> truncation_entry;
typedef std::unordered_map<utils::UUID, truncation_entry> truncation_map;
typedef std::pair<replay_positions, db_clock::time_point> truncation_entry;
typedef utils::UUID truncation_key;
typedef std::unordered_map<truncation_key, truncation_entry> truncation_map;
static thread_local std::experimental::optional<truncation_map> truncation_records;
future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) {
db::serializer<replay_position> rps(rp);
bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size());
future<> save_truncation_records(const column_family& cf, db_clock::time_point truncated_at, replay_positions positions) {
auto size =
sizeof(db_clock::rep)
+ positions.size()
* db::serializer<replay_position>(
db::replay_position()).size();
bytes buf(bytes::initialized_later(), size);
data_output out(buf);
rps(out);
// Old version would write a single RP. We write N. Resulting blob size
// will determine how many.
// An external entity reading this blob would get a "correct" RP
// and a garbled time stamp. But an external entity has no business
// reading this data anyway, since it is meaningless outside this
// machine instance.
for (auto& rp : positions) {
db::serializer<replay_position>::write(out, rp);
}
out.write<db_clock::rep>(truncated_at.time_since_epoch().count());
map_type_impl::native_type tmp;
@@ -543,14 +561,21 @@ static future<truncation_entry> get_truncation_record(utils::UUID cf_id) {
sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL);
return qctx->qp().execute_internal(req).then([cf_id](::shared_ptr<cql3::untyped_result_set> rs) {
truncation_map tmp;
if (!rs->empty() && rs->one().has("truncated_set")) {
if (!rs->empty() && rs->one().has("truncated_at")) {
auto map = rs->one().get_map<utils::UUID, bytes>("truncated_at");
for (auto& p : map) {
auto uuid = p.first;
auto buf = p.second;
truncation_entry e;
data_input in(p.second);
e.first = db::serializer<replay_position>::read(in);
data_input in(buf);
while (in.avail() > sizeof(db_clock::rep)) {
e.first.emplace_back(db::serializer<replay_position>::read(in));
}
e.second = db_clock::time_point(db_clock::duration(in.read<db_clock::rep>()));
tmp[p.first] = e;
tmp[uuid] = e;
}
}
truncation_records = std::move(tmp);
@@ -560,9 +585,38 @@ static future<truncation_entry> get_truncation_record(utils::UUID cf_id) {
return make_ready_future<truncation_entry>((*truncation_records)[cf_id]);
}
future<db::replay_position> get_truncated_position(utils::UUID cf_id) {
future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, db::replay_position rp) {
// TODO: this is horribly ineffective, we're doing a full flush of all system tables for all cores
// once, for each core (calling us). But right now, redesigning so that calling here (or, rather,
// save_truncation_records), is done from "somewhere higher, once per machine, not shard" is tricky.
// Mainly because drop_tables also uses truncate. And is run per-core as well. Gah.
return get_truncated_position(cf.schema()->id()).then([&cf, truncated_at, rp](replay_positions positions) {
auto i = std::find_if(positions.begin(), positions.end(), [rp](auto& p) {
return p.shard_id() == rp.shard_id();
});
if (i == positions.end()) {
positions.emplace_back(rp);
} else {
*i = rp;
}
return save_truncation_records(cf, truncated_at, positions);
});
}
future<db::replay_position> get_truncated_position(utils::UUID cf_id, uint32_t shard) {
return get_truncated_position(std::move(cf_id)).then([shard](replay_positions positions) {
for (auto& rp : positions) {
if (shard == rp.shard_id()) {
return make_ready_future<db::replay_position>(rp);
}
}
return make_ready_future<db::replay_position>();
});
}
future<replay_positions> get_truncated_position(utils::UUID cf_id) {
return get_truncation_record(cf_id).then([](truncation_entry e) {
return make_ready_future<db::replay_position>(e.first);
return make_ready_future<replay_positions>(e.first);
});
}

View File

@@ -277,9 +277,13 @@ enum class bootstrap_state {
return CompactionHistoryTabularData.from(queryResultSet);
}
#endif
future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, const db::replay_position&);
typedef std::vector<db::replay_position> replay_positions;
future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position);
future<> save_truncation_records(const column_family&, db_clock::time_point truncated_at, replay_positions);
future<> remove_truncation_record(utils::UUID);
future<db::replay_position> get_truncated_position(utils::UUID);
future<replay_positions> get_truncated_position(utils::UUID);
future<db::replay_position> get_truncated_position(utils::UUID, uint32_t shard);
future<db_clock::time_point> get_truncated_at(utils::UUID);
#if 0