diff --git a/cql3/untyped_result_set.hh b/cql3/untyped_result_set.hh index cd8ef9090b..3bbe5079ed 100644 --- a/cql3/untyped_result_set.hh +++ b/cql3/untyped_result_set.hh @@ -70,17 +70,25 @@ public: } // this could maybe be done as an overload of get_as (or something), but that just // muddles things for no real gain. Let user (us) attempt to know what he is doing instead. - template - std::unordered_map get_map(const sstring& name) const { - auto vec = boost::any_cast( - map_type_impl::get_instance(data_type_for(), - data_type_for(), false)->deserialize( - get_blob(name))); - std::unordered_map res; - std::transform(vec.begin(), vec.end(), - std::inserter(res, res.end()), [](auto& p) { + template + void get_map_data(const sstring& name, Iter out, data_type keytype = + data_type_for(), data_type valtype = + data_type_for()) const { + auto vec = + boost::any_cast( + map_type_impl::get_instance(keytype, valtype, false)->deserialize( + get_blob(name))); + std::transform(vec.begin(), vec.end(), out, + [](auto& p) { return std::pair(boost::any_cast(p.first), boost::any_cast(p.second)); }); + } + template + std::unordered_map get_map(const sstring& name, + data_type keytype = data_type_for(), data_type valtype = + data_type_for()) const { + std::unordered_map res; + get_map_data(name, std::inserter(res, res.end()), keytype, valtype); return res; } const std::vector<::shared_ptr>& get_columns() const { diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 7f3bfa2767..640a13ca0d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -895,7 +895,11 @@ void db::commitlog::segment_manager::discard_unused_segments() { logger.debug("Segment {} is unused", *s); return true; } - logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + if (s->is_still_allocating()) { + logger.debug("Not safe to delete segment {}; still allocating.", s); + } else { + logger.debug("Not safe to delete segment {}; dirty is {}", s, segment::cf_mark {*s}); + } return false; }); if (i != _segments.end()) { @@ -1188,7 +1192,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type } future<> read_header() { return fin.read_exactly(segment::descriptor_header_size).then([this](temporary_buffer buf) { - advance(buf); + if (!advance(buf)) { + // zero length file. accept it just to be nice. + return make_ready_future<>(); + } // Will throw if we got eof data_input in(buf); auto ver = in.read(); @@ -1208,9 +1215,6 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type this->id = id; this->next = 0; - if (start_off > pos) { - return skip(start_off - pos); - } return make_ready_future<>(); }); } @@ -1238,6 +1242,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type this->next = next; + if (start_off >= next) { + return skip(next - pos); + } + return do_until(std::bind(&work::end_of_chunk, this), std::bind(&work::read_entry, this)); }); } @@ -1265,6 +1273,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type throw std::runtime_error("Invalid entry size"); } + if (start_off > pos) { + return skip(size - entry_header_size); + } + return fin.read_exactly(size - entry_header_size).then([this, size, checksum, rp](temporary_buffer buf) { advance(buf); @@ -1297,8 +1309,10 @@ db::commitlog::read_log_file(file f, commit_load_reader_func next, position_type auto w = make_lw_shared(std::move(f), off); auto ret = w->s.listen(std::move(next)); - w->s.started().then(std::bind(&work::read_file, w.get())).finally([w] { + w->s.started().then(std::bind(&work::read_file, w.get())).then([w] { w->s.close(); + }).handle_exception([w](auto ep) { + w->s.set_exception(ep); }); return ret; diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index e6046484cd..de60c80a6b 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -117,11 +117,16 @@ future<> db::commitlog_replayer::impl::init() { logger.warn("Could not read sstable metadata {}", std::current_exception()); } } - // TODO: this is not correct. Truncation does not fully take sharding into consideration - return db::system_keyspace::get_truncated_position(uuid).then([&map, uuid](auto truncated_rp) { - if (truncated_rp != replay_position()) { - auto& pp = map[engine().cpu_id()][uuid]; - pp = std::max(pp, truncated_rp); + // We do this on each cpu, for each CF, which technically is a little wasteful, but the values are + // cached, this is only startup, and it makes the code easier. + // Get all truncation records for the CF and initialize max rps if + // present. Cannot do this on demand, as there may be no sstables to + // mark the CF as "needed". + return db::system_keyspace::get_truncated_position(uuid).then([&map, &uuid](std::vector tpps) { + for (auto& p : tpps) { + logger.trace("CF {} truncated at {}", uuid, p); + auto& pp = map[p.shard_id()][uuid]; + pp = std::max(pp, p); } }); }).then([&map] { @@ -183,8 +188,8 @@ future<> db::commitlog_replayer::impl::process(stats* s, temporary_buffer auto uuid = fm.column_family_id(); auto& map = _rpm[shard]; auto i = map.find(uuid); - if (i != map.end() && rp < i->second) { - logger.trace("entry {} at {} is less than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); + if (i != map.end() && rp <= i->second) { + logger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, i->second); s->skipped_mutations++; return make_ready_future<>(); } @@ -248,7 +253,10 @@ future<> db::commitlog_replayer::recover(std::vector files) { logger.info("Replaying {}", files); return parallel_for_each(files, [this](auto f) { - return this->recover(std::move(f)); + return this->recover(f).handle_exception([f](auto ep) { + logger.error("Error recovering {}: {}", f, ep); + std::rethrow_exception(ep); + }); }); } diff --git a/db/commitlog/replay_position.hh b/db/commitlog/replay_position.hh index f4f8fae1c8..f269777934 100644 --- a/db/commitlog/replay_position.hh +++ b/db/commitlog/replay_position.hh @@ -71,6 +71,9 @@ struct replay_position { bool operator<(const replay_position & r) const { return id < r.id ? true : (r.id < id ? false : pos < r.pos); } + bool operator<=(const replay_position & r) const { + return id < r.id ? true : (r.id < id ? false : pos <= r.pos); + } bool operator==(const replay_position & r) const { return id == r.id && pos == r.pos; } diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 85ca2b7f05..a0922f4c27 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -40,6 +40,8 @@ #include #include +#include +#include #include "system_keyspace.hh" #include "types.hh" @@ -50,6 +52,7 @@ #include "cql3/query_options.hh" #include "cql3/query_processor.hh" #include "utils/fb_utilities.hh" +#include "utils/hash.hh" #include "dht/i_partitioner.hh" #include "version.hh" #include "thrift/server.hh" @@ -506,15 +509,30 @@ future<> setup(distributed& db, distributed& qp return make_ready_future<>(); } -typedef std::pair truncation_entry; -typedef std::unordered_map truncation_map; +typedef std::pair truncation_entry; +typedef utils::UUID truncation_key; +typedef std::unordered_map truncation_map; + static thread_local std::experimental::optional truncation_records; -future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, const db::replay_position& rp) { - db::serializer rps(rp); - bytes buf(bytes::initialized_later(), sizeof(db_clock::rep) + rps.size()); +future<> save_truncation_records(const column_family& cf, db_clock::time_point truncated_at, replay_positions positions) { + auto size = + sizeof(db_clock::rep) + + positions.size() + * db::serializer( + db::replay_position()).size(); + bytes buf(bytes::initialized_later(), size); data_output out(buf); - rps(out); + + // Old version would write a single RP. We write N. Resulting blob size + // will determine how many. + // An external entity reading this blob would get a "correct" RP + // and a garbled time stamp. But an external entity has no business + // reading this data anyway, since it is meaningless outside this + // machine instance. + for (auto& rp : positions) { + db::serializer::write(out, rp); + } out.write(truncated_at.time_since_epoch().count()); map_type_impl::native_type tmp; @@ -543,14 +561,21 @@ static future get_truncation_record(utils::UUID cf_id) { sstring req = sprint("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL); return qctx->qp().execute_internal(req).then([cf_id](::shared_ptr rs) { truncation_map tmp; - if (!rs->empty() && rs->one().has("truncated_set")) { + if (!rs->empty() && rs->one().has("truncated_at")) { auto map = rs->one().get_map("truncated_at"); for (auto& p : map) { + auto uuid = p.first; + auto buf = p.second; + truncation_entry e; - data_input in(p.second); - e.first = db::serializer::read(in); + + data_input in(buf); + + while (in.avail() > sizeof(db_clock::rep)) { + e.first.emplace_back(db::serializer::read(in)); + } e.second = db_clock::time_point(db_clock::duration(in.read())); - tmp[p.first] = e; + tmp[uuid] = e; } } truncation_records = std::move(tmp); @@ -560,9 +585,38 @@ static future get_truncation_record(utils::UUID cf_id) { return make_ready_future((*truncation_records)[cf_id]); } -future get_truncated_position(utils::UUID cf_id) { +future<> save_truncation_record(const column_family& cf, db_clock::time_point truncated_at, db::replay_position rp) { + // TODO: this is horribly ineffective, we're doing a full flush of all system tables for all cores + // once, for each core (calling us). But right now, redesigning so that calling here (or, rather, + // save_truncation_records), is done from "somewhere higher, once per machine, not shard" is tricky. + // Mainly because drop_tables also uses truncate. And is run per-core as well. Gah. + return get_truncated_position(cf.schema()->id()).then([&cf, truncated_at, rp](replay_positions positions) { + auto i = std::find_if(positions.begin(), positions.end(), [rp](auto& p) { + return p.shard_id() == rp.shard_id(); + }); + if (i == positions.end()) { + positions.emplace_back(rp); + } else { + *i = rp; + } + return save_truncation_records(cf, truncated_at, positions); + }); +} + +future get_truncated_position(utils::UUID cf_id, uint32_t shard) { + return get_truncated_position(std::move(cf_id)).then([shard](replay_positions positions) { + for (auto& rp : positions) { + if (shard == rp.shard_id()) { + return make_ready_future(rp); + } + } + return make_ready_future(); + }); +} + + future get_truncated_position(utils::UUID cf_id) { return get_truncation_record(cf_id).then([](truncation_entry e) { - return make_ready_future(e.first); + return make_ready_future(e.first); }); } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 8eafc04904..088255266c 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -277,9 +277,13 @@ enum class bootstrap_state { return CompactionHistoryTabularData.from(queryResultSet); } #endif - future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, const db::replay_position&); + typedef std::vector replay_positions; + + future<> save_truncation_record(const column_family&, db_clock::time_point truncated_at, db::replay_position); + future<> save_truncation_records(const column_family&, db_clock::time_point truncated_at, replay_positions); future<> remove_truncation_record(utils::UUID); - future get_truncated_position(utils::UUID); + future get_truncated_position(utils::UUID); + future get_truncated_position(utils::UUID, uint32_t shard); future get_truncated_at(utils::UUID); #if 0