Commitlog: change the ID generation scheme

* Make it more like origin, i.e. based on wall clock time of app start
* Encode shard ID in the, RP segement ID, to ensure RP:s and segement names
  are unique per shard
This commit is contained in:
Calle Wilund
2015-08-18 14:07:20 +02:00
parent 4ac07fa87d
commit ea38b223bd
2 changed files with 92 additions and 23 deletions

View File

@@ -40,6 +40,7 @@
#include "db/config.hh"
#include "utils/data_input.hh"
#include "utils/crc.hh"
#include "utils/runtime.hh"
#include "log.hh"
static logging::logger logger("commitlog");
@@ -82,9 +83,12 @@ public:
descriptor(descriptor&&) = default;
descriptor(const descriptor&) = default;
// TODO : version management
descriptor(uint64_t i, uint32_t v = 1)
descriptor(segment_id_type i, uint32_t v = 1)
: id(i), ver(v) {
}
descriptor(replay_position p)
: descriptor(p.id) {
}
descriptor(std::pair<uint64_t, uint32_t> p)
: descriptor(p.first, p.second) {
}
@@ -102,7 +106,7 @@ public:
throw std::domain_error("Commitlog segment is too old to open; upgrade to 1.2.5+ first");
}
uint64_t id = std::stoull(m[3].str().substr(1));
segment_id_type id = std::stoull(m[3].str().substr(1));
uint32_t ver = std::stoul(m[2].str());
return std::make_pair(id, ver);
@@ -114,7 +118,11 @@ public:
+ std::to_string(id) + FILENAME_EXTENSION;
}
const uint64_t id;
operator replay_position() const {
return replay_position(id);
}
const segment_id_type id;
const uint32_t ver;
};
@@ -158,8 +166,7 @@ public:
{
assert(max_size > 0);
if (cfg.commit_log_location.empty()) {
cfg.commit_log_location = "/tmp/urchin/commitlog/"
+ std::to_string(engine().cpu_id());
cfg.commit_log_location = "/var/lib/scylla/commitlog";
}
_regs = create_counters();
}
@@ -206,8 +213,10 @@ public:
buffer_type acquire_buffer(size_t s);
void release_buffer(buffer_type&&);
future<std::vector<descriptor>> list_descriptors(sstring dir);
private:
uint64_t _ids = 0;
segment_id_type _ids = 0;
std::vector<sseg_ptr> _segments;
std::vector<buffer_type> _temp_buffers;
timer<clock_type> _timer;
@@ -547,34 +556,68 @@ public:
const size_t db::commitlog::segment::default_size;
future<> db::commitlog::segment_manager::init() {
future<std::vector<db::commitlog::descriptor>>
db::commitlog::segment_manager::list_descriptors(sstring dirname) {
struct helper {
sstring _dirname;
file _file;
subscription<directory_entry> _list;
std::vector<db::commitlog::descriptor> _result;
helper(segment_manager * m, file && f)
: _file(std::move(f)), _list(
helper(helper&&) = default;
helper(sstring n, file && f)
: _dirname(std::move(n)), _file(std::move(f)), _list(
_file.list_directory(
std::bind(&segment_manager::process, m,
std::bind(&helper::process, this,
std::placeholders::_1))) {
}
future<> process(directory_entry de) {
auto entry_type = [this](const directory_entry & de) {
if (!de.type && !de.name.empty()) {
return engine().file_type(_dirname + "/" + de.name);
}
return make_ready_future<std::experimental::optional<directory_entry_type>>(de.type);
};
return entry_type(de).then([this, de](auto type) {
if (type == directory_entry_type::regular) {
try {
_result.emplace_back(de.name);
} catch (std::domain_error &) {
}
}
return make_ready_future<>();
});
}
future<> done() {
return _list.done();
}
};
return engine().open_directory(cfg.commit_log_location).then([this](auto dir) {
// keep sub alive...
auto h = make_lw_shared<helper>(this, std::move(dir));
return h->done().then([this, h]() {
// nothing really. just keeping sub alive
if (cfg.mode != sync_mode::BATCH) {
_timer.set_callback(std::bind(&segment_manager::sync, this));
this->arm();
}
});
});
return engine().open_directory(dirname).then([this, dirname](auto dir) {
auto h = make_lw_shared<helper>(std::move(dirname), std::move(dir));
return h->done().then([h]() {
return make_ready_future<std::vector<db::commitlog::descriptor>>(std::move(h->_result));
}).finally([h] {});
});
}
future<> db::commitlog::segment_manager::init() {
return list_descriptors(cfg.commit_log_location).then([this](auto descs) {
segment_id_type id = std::chrono::duration_cast<std::chrono::milliseconds>(runtime::get_boot_time().time_since_epoch()).count() + 1;
for (auto& d : descs) {
id = std::max(id, replay_position(d.id).base_id());
}
// base id counter is [ <shard> | <base> ]
_ids = replay_position(engine().cpu_id(), id).id;
if (cfg.mode != sync_mode::BATCH) {
_timer.set_callback(std::bind(&segment_manager::sync, this));
this->arm();
}
});
}
scollectd::registrations db::commitlog::segment_manager::create_counters() {
@@ -698,7 +741,7 @@ std::ostream& db::operator<<(std::ostream& out, const db::commitlog::segment::cf
}
std::ostream& db::operator<<(std::ostream& out, const db::replay_position& p) {
return out << "{" << p.id << ", " << p.pos << "}";
return out << "{" << p.shard_id() << ", " << p.base_id() << ", " << p.pos << "}";
}
void db::commitlog::segment_manager::discard_unused_segments() {