From ea38b223bd69e1b243520e138c2bbbdd53f89bb8 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 18 Aug 2015 14:07:20 +0200 Subject: [PATCH] Commitlog: change the ID generation scheme * Make it more like origin, i.e. based on wall clock time of app start * Encode shard ID in the, RP segement ID, to ensure RP:s and segement names are unique per shard --- db/commitlog/commitlog.cc | 87 ++++++++++++++++++++++++--------- db/commitlog/replay_position.hh | 28 ++++++++++- 2 files changed, 92 insertions(+), 23 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 06208e876c..a997c73618 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -40,6 +40,7 @@ #include "db/config.hh" #include "utils/data_input.hh" #include "utils/crc.hh" +#include "utils/runtime.hh" #include "log.hh" static logging::logger logger("commitlog"); @@ -82,9 +83,12 @@ public: descriptor(descriptor&&) = default; descriptor(const descriptor&) = default; // TODO : version management - descriptor(uint64_t i, uint32_t v = 1) + descriptor(segment_id_type i, uint32_t v = 1) : id(i), ver(v) { } + descriptor(replay_position p) + : descriptor(p.id) { + } descriptor(std::pair p) : descriptor(p.first, p.second) { } @@ -102,7 +106,7 @@ public: throw std::domain_error("Commitlog segment is too old to open; upgrade to 1.2.5+ first"); } - uint64_t id = std::stoull(m[3].str().substr(1)); + segment_id_type id = std::stoull(m[3].str().substr(1)); uint32_t ver = std::stoul(m[2].str()); return std::make_pair(id, ver); @@ -114,7 +118,11 @@ public: + std::to_string(id) + FILENAME_EXTENSION; } - const uint64_t id; + operator replay_position() const { + return replay_position(id); + } + + const segment_id_type id; const uint32_t ver; }; @@ -158,8 +166,7 @@ public: { assert(max_size > 0); if (cfg.commit_log_location.empty()) { - cfg.commit_log_location = "/tmp/urchin/commitlog/" - + std::to_string(engine().cpu_id()); + cfg.commit_log_location = "/var/lib/scylla/commitlog"; } _regs = create_counters(); } @@ -206,8 +213,10 @@ public: buffer_type acquire_buffer(size_t s); void release_buffer(buffer_type&&); + future> list_descriptors(sstring dir); + private: - uint64_t _ids = 0; + segment_id_type _ids = 0; std::vector _segments; std::vector _temp_buffers; timer _timer; @@ -547,34 +556,68 @@ public: const size_t db::commitlog::segment::default_size; -future<> db::commitlog::segment_manager::init() { +future> +db::commitlog::segment_manager::list_descriptors(sstring dirname) { struct helper { + sstring _dirname; file _file; subscription _list; + std::vector _result; - helper(segment_manager * m, file && f) - : _file(std::move(f)), _list( + helper(helper&&) = default; + helper(sstring n, file && f) + : _dirname(std::move(n)), _file(std::move(f)), _list( _file.list_directory( - std::bind(&segment_manager::process, m, + std::bind(&helper::process, this, std::placeholders::_1))) { } + future<> process(directory_entry de) { + auto entry_type = [this](const directory_entry & de) { + if (!de.type && !de.name.empty()) { + return engine().file_type(_dirname + "/" + de.name); + } + return make_ready_future>(de.type); + }; + return entry_type(de).then([this, de](auto type) { + if (type == directory_entry_type::regular) { + try { + _result.emplace_back(de.name); + } catch (std::domain_error &) { + } + } + return make_ready_future<>(); + }); + } + future<> done() { return _list.done(); } }; - return engine().open_directory(cfg.commit_log_location).then([this](auto dir) { - // keep sub alive... - auto h = make_lw_shared(this, std::move(dir)); - return h->done().then([this, h]() { - // nothing really. just keeping sub alive - if (cfg.mode != sync_mode::BATCH) { - _timer.set_callback(std::bind(&segment_manager::sync, this)); - this->arm(); - } - }); - }); + return engine().open_directory(dirname).then([this, dirname](auto dir) { + auto h = make_lw_shared(std::move(dirname), std::move(dir)); + return h->done().then([h]() { + return make_ready_future>(std::move(h->_result)); + }).finally([h] {}); + }); +} + +future<> db::commitlog::segment_manager::init() { + return list_descriptors(cfg.commit_log_location).then([this](auto descs) { + segment_id_type id = std::chrono::duration_cast(runtime::get_boot_time().time_since_epoch()).count() + 1; + for (auto& d : descs) { + id = std::max(id, replay_position(d.id).base_id()); + } + + // base id counter is [ | ] + _ids = replay_position(engine().cpu_id(), id).id; + + if (cfg.mode != sync_mode::BATCH) { + _timer.set_callback(std::bind(&segment_manager::sync, this)); + this->arm(); + } + }); } scollectd::registrations db::commitlog::segment_manager::create_counters() { @@ -698,7 +741,7 @@ std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment::cf } std::ostream& db::operator<<(std::ostream& out, const db::replay_position& p) { - return out << "{" << p.id << ", " << p.pos << "}"; + return out << "{" << p.shard_id() << ", " << p.base_id() << ", " << p.pos << "}"; } void db::commitlog::segment_manager::discard_unused_segments() { diff --git a/db/commitlog/replay_position.hh b/db/commitlog/replay_position.hh index 351ce2f971..48d69710b9 100644 --- a/db/commitlog/replay_position.hh +++ b/db/commitlog/replay_position.hh @@ -31,12 +31,25 @@ using segment_id_type = uint64_t; using position_type = uint32_t; struct replay_position { + static const constexpr size_t max_cpu_bits = 10; // 1024 cpus. should be enough for anyone + static const constexpr size_t max_ts_bits = 8 * sizeof(segment_id_type) - max_cpu_bits; + static const constexpr segment_id_type ts_mask = (segment_id_type(1) << max_ts_bits) - 1; + static const constexpr segment_id_type cpu_mask = ~ts_mask; + segment_id_type id; position_type pos; replay_position(segment_id_type i = 0, position_type p = 0) : id(i), pos(p) - { } + {} + + replay_position(unsigned shard, segment_id_type i, position_type p = 0) + : id((segment_id_type(shard) << max_ts_bits) | i), pos(p) + { + if (i & cpu_mask) { + throw std::invalid_argument("base id overflow: " + std::to_string(i)); + } + } bool operator<(const replay_position & r) const { return id < r.id ? true : (r.id < id ? false : pos < r.pos); @@ -44,6 +57,19 @@ struct replay_position { bool operator==(const replay_position & r) const { return id == r.id && pos == r.pos; } + bool operator!=(const replay_position & r) const { + return !(*this == r); + } + + unsigned shard_id() const { + return unsigned(id >> max_ts_bits); + } + segment_id_type base_id() const { + return id & ts_mask; + } + replay_position base() const { + return replay_position(base_id(), pos); + } template auto describe_type(Describer f) { return f(id, pos); }