From f7151cac6166df7df74b08ffc9118d1bb10280a6 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 30 Sep 2015 16:50:02 +0200 Subject: [PATCH 1/8] cql3::untyped_result_set: Allow "get_map" to be explicit about result type Allow providing both hash/equal etc for resulting map, as well as explicit data_types for the deserialization. Also allow direct extraction of kv-pairs to iterator, for more advanced unpacking. --- cql3/untyped_result_set.hh | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/cql3/untyped_result_set.hh b/cql3/untyped_result_set.hh index cd8ef9090b..3bbe5079ed 100644 --- a/cql3/untyped_result_set.hh +++ b/cql3/untyped_result_set.hh @@ -70,17 +70,25 @@ public: } // this could maybe be done as an overload of get_as (or something), but that just // muddles things for no real gain. Let user (us) attempt to know what he is doing instead. - template - std::unordered_map get_map(const sstring& name) const { - auto vec = boost::any_cast( - map_type_impl::get_instance(data_type_for(), - data_type_for(), false)->deserialize( - get_blob(name))); - std::unordered_map res; - std::transform(vec.begin(), vec.end(), - std::inserter(res, res.end()), [](auto& p) { + template + void get_map_data(const sstring& name, Iter out, data_type keytype = + data_type_for(), data_type valtype = + data_type_for()) const { + auto vec = + boost::any_cast( + map_type_impl::get_instance(keytype, valtype, false)->deserialize( + get_blob(name))); + std::transform(vec.begin(), vec.end(), out, + [](auto& p) { return std::pair(boost::any_cast(p.first), boost::any_cast(p.second)); }); + } + template + std::unordered_map get_map(const sstring& name, + data_type keytype = data_type_for(), data_type valtype = + data_type_for()) const { + std::unordered_map res; + get_map_data(name, std::inserter(res, res.end()), keytype, valtype); return res; } const std::vector<::shared_ptr>& get_columns() const { From 024041c752beb8c9df337088e7b59bc163ac89db Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 6 Oct 2015 15:59:17 +0200 Subject: [PATCH 2/8] commitlog: make log message slightly more informative/correct --- db/commitlog/commitlog.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 7f3bfa2767..b2bf6c890b 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -895,7 +895,11 @@ void db::commitlog::segment_manager::discard_unused_segments() { logger.debug("Segment {} is unused", *s); return true; } - logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + if (s->is_still_allocating()) { + logger.debug("Not safe to delete segment {}; still allocating.", s); + } else { + logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + } return false; }); if (i != _segments.end()) { From 199b72c6f3db31f76a8253b0900c2fcf9bbc1825 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 6 Oct 2015 15:59:46 +0200 Subject: [PATCH 3/8] commitlog: fix reader "offset" handling broken + ensure exceptions propagates Must ensure we find a chunk/entry boundary still even when run with a start offset, since file navigation in chunk based. Was not observed as broken previously because 1.) We did not run with offsets 2.) The exception never reached caller. Also make the reader silently ignore empty files. --- db/commitlog/commitlog.cc | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index b2bf6c890b..640a13ca0d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1192,7 +1192,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type } future<> read_header() { return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer buf) { - advance(buf); + if (!advance(buf)) { + // zero length file. accept it just to be nice. + return make_ready_future<>(); + } // Will throw if we got eof data_input in(buf); auto ver = in.read(); @@ -1212,9 +1215,6 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type this->id = id; this->next = 0; - if (start_off > pos) { - return skip(start_off - pos); - } return make_ready_future<>(); }); } @@ -1242,6 +1242,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type this->next = next; + if (start_off >= next) { + return skip(next - pos); + } + return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this)); }); } @@ -1269,6 +1273,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type throw std::runtime_error("Invalid entry size"); } + if (start_off > pos) { + return skip(size - entry_header_size); + } + return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer buf) { advance(buf); @@ -1301,8 +1309,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type auto w = make_lw_shared(std::move(f), off); auto ret = w->s.listen(std::move(next)); - w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] { + w->s.started().then(std::bind(&work::read_file, w.get())).then([w] { w->s.close(); + }).handle_exception([w](auto ep) { + w->s.set_exception(ep); }); return ret; From 6b0ab79ecb39eb76f7e54874c8bf14af9a38654e Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 30 Sep 2015 16:51:28 +0200 Subject: [PATCH 4/8] system_keyspace: Keep per-shard truncation records Fixes #423 * CF ID now maps to a truncation record comprised of a set of per-shard RP:s and a high-mark timestamp * Retrieving RP:s are done in "bulk" * Truncation time is calculated as max of all shards. This version of the patch will accept "old" truncation data, though the result of applying it will most likely not be correct (just one shard) Record is still kept as a blob, "new" format is indicated by record size. --- db/system_keyspace.cc | 78 ++++++++++++++++++++++++++++++++++++------- db/system_keyspace.hh | 8 +++-- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 85ca2b7f05..a0922f4c27 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -40,6 +40,8 @@ #include #include +#include +#include #include "system_keyspace.hh" #include "types.hh" @@ -50,6 +52,7 @@ #include "cql3/query_options.hh" #include "cql3/query_processor.hh" #include "utils/fb_utilities.hh" +#include "utils/hash.hh" #include "dht/i_partitioner.hh" #include "version.hh" #include "thrift/server.hh" @@ -506,15 +509,30 @@ future<> setup(distributed& db, distributed& qp return make_ready_future<>(); } -typedef std::pair truncation_entry; -typedef std::unordered_map truncation_map; +typedef std::pair truncation_entry; +typedef utils::UUID truncation_key; +typedef std::unordered_map truncation_map; + static thread_local std::experimental::optional truncation_records; -future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) { - db::serializer rps(rp); - bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size()); +future<> save_truncation_records(const column_family& cf, db_clock::time_point truncated_at, replay_positions positions) { + auto size = + sizeof(db_clock::rep) + + positions.size() + * db::serializer( + db::replay_position()).size(); + bytes buf(bytes::initialized_later(), size); data_output out(buf); - rps(out); + + // Old version would write a single RP. We write N. Resulting blob size + // will determine how many. + // An external entity reading this blob would get a "correct" RP + // and a garbled time stamp. But an external entity has no business + // reading this data anyway, since it is meaningless outside this + // machine instance. + for (auto& rp : positions) { + db::serializer::write(out, rp); + } out.write(truncated_at.time_since_epoch().count()); map_type_impl::native_type tmp; @@ -543,14 +561,21 @@ static future get_truncation_record(utils::UUID cf_id) { sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL); return qctx->qp().execute_internal(req).then([cf_id](::shared_ptr rs) { truncation_map tmp; - if (!rs->empty() && rs->one().has("truncated_set")) { + if (!rs->empty() && rs->one().has("truncated_at")) { auto map = rs->one().get_map("truncated_at"); for (auto& p : map) { + auto uuid = p.first; + auto buf = p.second; + truncation_entry e; - data_input in(p.second); - e.first = db::serializer::read(in); + + data_input in(buf); + + while (in.avail() > sizeof(db_clock::rep)) { + e.first.emplace_back(db::serializer::read(in)); + } e.second = db_clock::time_point(db_clock::duration(in.read())); - tmp[p.first] = e; + tmp[uuid] = e; } } truncation_records = std::move(tmp); @@ -560,9 +585,38 @@ static future get_truncation_record(utils::UUID cf_id) { return make_ready_future((*truncation_records)[cf_id]); } -future get_truncated_position(utils::UUID cf_id) { +future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, db::replay_position rp) { + // TODO: this is horribly ineffective, we're doing a full flush of all system tables for all cores + // once, for each core (calling us). But right now, redesigning so that calling here (or, rather, + // save_truncation_records), is done from "somewhere higher, once per machine, not shard" is tricky. + // Mainly because drop_tables also uses truncate. And is run per-core as well. Gah. + return get_truncated_position(cf.schema()->id()).then([&cf, truncated_at, rp](replay_positions positions) { + auto i = std::find_if(positions.begin(), positions.end(), [rp](auto& p) { + return p.shard_id() == rp.shard_id(); + }); + if (i == positions.end()) { + positions.emplace_back(rp); + } else { + *i = rp; + } + return save_truncation_records(cf, truncated_at, positions); + }); +} + +future get_truncated_position(utils::UUID cf_id, uint32_t shard) { + return get_truncated_position(std::move(cf_id)).then([shard](replay_positions positions) { + for (auto& rp : positions) { + if (shard == rp.shard_id()) { + return make_ready_future(rp); + } + } + return make_ready_future(); + }); +} + + future get_truncated_position(utils::UUID cf_id) { return get_truncation_record(cf_id).then([](truncation_entry e) { - return make_ready_future(e.first); + return make_ready_future(e.first); }); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 8eafc04904..088255266c 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -277,9 +277,13 @@ enum class bootstrap_state { return CompactionHistoryTabularData.from(queryResultSet); } #endif - future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, const db::replay_position&); + typedef std::vector replay_positions; + + future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position); + future<> save_truncation_records(const column_family&, db_clock::time_point truncated_at, replay_positions); future<> remove_truncation_record(utils::UUID); - future get_truncated_position(utils::UUID); + future get_truncated_position(utils::UUID); + future get_truncated_position(utils::UUID, uint32_t shard); future get_truncated_at(utils::UUID); #if 0 From 271eb3ba0207c7aa333ec9046a32f7a15141ffae Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 6 Oct 2015 16:04:49 +0200 Subject: [PATCH 5/8] replay_position: Add <= comparator --- db/commitlog/replay_position.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/db/commitlog/replay_position.hh b/db/commitlog/replay_position.hh index f4f8fae1c8..f269777934 100644 --- a/db/commitlog/replay_position.hh +++ b/db/commitlog/replay_position.hh @@ -71,6 +71,9 @@ struct replay_position { bool operator<(const replay_position & r) const { return id < r.id ? true : (r.id < id ? false : pos < r.pos); } + bool operator<=(const replay_position & r) const { + return id < r.id ? true : (r.id < id ? false : pos <= r.pos); + } bool operator==(const replay_position & r) const { return id == r.id && pos == r.pos; } From 3f1fa77979f70635c1d6d873923a3d36af16afbd Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 6 Oct 2015 16:02:33 +0200 Subject: [PATCH 6/8] commitlog_replayer: Fix broken comparison A commitlog entry should be ignored if its position is <= highest recorded position, not <. --- db/commitlog/commitlog_replayer.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index e6046484cd..cdd721b64a 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -183,8 +183,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer auto uuid = fm.column_family_id(); auto& map = _rpm[shard]; auto i = map.find(uuid); - if (i != map.end() && rp < i->second) { - logger.trace("entry {} at {} is less than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); + if (i != map.end() && rp <= i->second) { + logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); s->skipped_mutations++; return make_ready_future<>(); } From 17bd18b59c1a7d391ce0b95795017c10f391e559 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 6 Oct 2015 16:03:25 +0200 Subject: [PATCH 7/8] commitlog_replayer: Add logging message for exceptions in multi-file recover --- db/commitlog/commitlog_replayer.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index cdd721b64a..5193db8df4 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -248,7 +248,10 @@ future<> db::commitlog_replayer::recover(std::vector files) { logger.info("Replaying {}", files); return parallel_for_each(files, [this](auto f) { - return this->recover(std::move(f)); + return this->recover(f).handle_exception([f](auto ep) { + logger.error("Error recovering {}: {}", f, ep); + std::rethrow_exception(ep); + }); }); } From a66c22f1ec65b7a971cd0f5c9ec426e2ea1a46c5 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Wed, 30 Sep 2015 16:53:21 +0200 Subject: [PATCH 8/8] commitlog_replayer: Acquire truncation RP:s per replayed shard I.e. get them in bulk and fill in for all shards --- db/commitlog/commitlog_replayer.cc | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index 5193db8df4..de60c80a6b 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -117,11 +117,16 @@ future<> db::commitlog_replayer::impl::init() { logger.warn("Could not read sstable metadata {}", std::current_exception()); } } - // TODO: this is not correct. Truncation does not fully take sharding into consideration - return db::system_keyspace::get_truncated_position(uuid).then([&map, uuid](auto truncated_rp) { - if (truncated_rp != replay_position()) { - auto& pp = map[engine().cpu_id()][uuid]; - pp = std::max(pp, truncated_rp); + // We do this on each cpu, for each CF, which technically is a little wasteful, but the values are + // cached, this is only startup, and it makes the code easier. + // Get all truncation records for the CF and initialize max rps if + // present. Cannot do this on demand, as there may be no sstables to + // mark the CF as "needed". + return db::system_keyspace::get_truncated_position(uuid).then([&map, &uuid](std::vector tpps) { + for (auto& p : tpps) { + logger.trace("CF {} truncated at {}", uuid, p); + auto& pp = map[p.shard_id()][uuid]; + pp = std::max(pp, p); } }); }).then([&map] {