mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 20:16:43 +00:00
sstables: extend compaction for new resharding
Extends compaction for new resharding algorithm. Not wired yet. New resharding will compact shared sstable(s) and create one sstable for each owner. It's up to the caller to open these new unshared sstables at their respective column families. This new approach will save a lot of bandwidth because we'll no longer read the entire shared sstable #smp::count times. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -198,6 +198,22 @@ protected:
|
||||
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_info->start_size) / _max_sstable_size)));
|
||||
return ceil(double(_estimated_partitions) / estimated_sstables);
|
||||
}
|
||||
|
||||
void setup_new_sstable(shared_sstable& sst) {
|
||||
_info->new_sstables.push_back(sst);
|
||||
sst->get_metadata_collector().set_replay_position(_rp);
|
||||
sst->get_metadata_collector().sstable_level(_sstable_level);
|
||||
for (auto ancestor : _ancestors) {
|
||||
sst->add_ancestor(ancestor);
|
||||
}
|
||||
}
|
||||
|
||||
void finish_new_sstable(stdx::optional<sstable_writer>& writer, shared_sstable& sst) {
|
||||
writer->consume_end_of_stream();
|
||||
writer = stdx::nullopt;
|
||||
sst->open_data().get0();
|
||||
_info->end_size += sst->data_size();
|
||||
}
|
||||
public:
|
||||
compaction& operator=(const compaction&) = delete;
|
||||
compaction(const compaction&) = delete;
|
||||
@@ -283,12 +299,7 @@ private:
|
||||
virtual sstable_writer* select_sstable_writer(const dht::decorated_key& dk) {
|
||||
if (!_writer) {
|
||||
_sst = _creator();
|
||||
_info->new_sstables.push_back(_sst);
|
||||
_sst->get_metadata_collector().set_replay_position(_rp);
|
||||
_sst->get_metadata_collector().sstable_level(_sstable_level);
|
||||
for (auto ancestor : _ancestors) {
|
||||
_sst->add_ancestor(ancestor);
|
||||
}
|
||||
setup_new_sstable(_sst);
|
||||
|
||||
auto&& priority = service::get_local_compaction_priority();
|
||||
sstable_writer_config cfg;
|
||||
@@ -299,10 +310,7 @@ private:
|
||||
}
|
||||
|
||||
virtual void stop_sstable_writer() {
|
||||
_writer->consume_end_of_stream();
|
||||
_writer = stdx::nullopt;
|
||||
_sst->open_data().get0();
|
||||
_info->end_size += _sst->data_size();
|
||||
finish_new_sstable(_writer, _sst);
|
||||
}
|
||||
|
||||
virtual void finish_sstable_writer() {
|
||||
@@ -417,6 +425,66 @@ public:
|
||||
};
|
||||
|
||||
|
||||
class resharding_compaction final : public compaction {
|
||||
std::vector<std::pair<shared_sstable, stdx::optional<sstable_writer>>> _output_sstables;
|
||||
shard_id _shard; // shard of current sstable writer
|
||||
std::function<shared_sstable(shard_id)> _sstable_creator;
|
||||
public:
|
||||
resharding_compaction(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable(shard_id)> creator,
|
||||
uint64_t max_sstable_size, uint32_t sstable_level)
|
||||
: compaction(cf, std::move(sstables), {}, max_sstable_size, sstable_level)
|
||||
, _output_sstables(smp::count)
|
||||
, _sstable_creator(std::move(creator))
|
||||
{
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
logger.info("Resharding {}", formatted_msg);
|
||||
}
|
||||
|
||||
void report_finish(const sstring& formatted_msg, std::chrono::time_point<db_clock> ended_at) const override {
|
||||
logger.info("Resharded {}", formatted_msg);
|
||||
}
|
||||
|
||||
std::function<api::timestamp_type(const dht::decorated_key&)> max_purgeable_func() override {
|
||||
return [] (const dht::decorated_key& dk) {
|
||||
return api::min_timestamp;
|
||||
};
|
||||
}
|
||||
|
||||
sstable_writer* select_sstable_writer(const dht::decorated_key& dk) override {
|
||||
_shard = dht::shard_of(dk.token());
|
||||
auto& sst = _output_sstables[_shard].first;
|
||||
auto& writer = _output_sstables[_shard].second;
|
||||
|
||||
if (!writer) {
|
||||
sst = _sstable_creator(_shard);
|
||||
setup_new_sstable(sst);
|
||||
|
||||
sstable_writer_config cfg;
|
||||
cfg.max_sstable_size = _max_sstable_size;
|
||||
auto&& priority = service::get_local_compaction_priority();
|
||||
writer.emplace(sst->get_writer(*_cf.schema(), partitions_per_sstable(), cfg, priority, _shard));
|
||||
}
|
||||
return &*writer;
|
||||
}
|
||||
|
||||
void stop_sstable_writer() override {
|
||||
auto& sst = _output_sstables[_shard].first;
|
||||
auto& writer = _output_sstables[_shard].second;
|
||||
|
||||
finish_new_sstable(writer, sst);
|
||||
}
|
||||
|
||||
void finish_sstable_writer() override {
|
||||
for (auto& p : _output_sstables) {
|
||||
if (p.second) {
|
||||
finish_new_sstable(p.second, p.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
future<std::vector<shared_sstable>> compaction::run(std::unique_ptr<compaction> c) {
|
||||
return seastar::async([c = std::move(c)] () mutable {
|
||||
auto reader = c->setup();
|
||||
@@ -459,6 +527,16 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
|
||||
return compaction::run(std::move(c));
|
||||
}
|
||||
|
||||
future<std::vector<shared_sstable>>
|
||||
reshard_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable(shard_id)> creator,
|
||||
uint64_t max_sstable_size, uint32_t sstable_level) {
|
||||
if (sstables.empty()) {
|
||||
throw std::runtime_error(sprint("Called resharding with empty set on behalf of {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name()));
|
||||
}
|
||||
auto c = std::make_unique<resharding_compaction>(std::move(sstables), cf, std::move(creator), max_sstable_size, sstable_level);
|
||||
return compaction::run(std::move(c));
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable>
|
||||
get_fully_expired_sstables(column_family& cf, std::vector<sstables::shared_sstable>& compacting, int32_t gc_before) {
|
||||
logger.debug("Checking droppable sstables in {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
|
||||
@@ -104,6 +104,12 @@ namespace sstables {
|
||||
column_family& cf, std::function<shared_sstable()> creator,
|
||||
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false);
|
||||
|
||||
// Compacts a set of N shared sstables into M sstables. For every shard involved,
|
||||
// i.e. which owns any of the sstables, a new unshared sstable is created.
|
||||
future<std::vector<shared_sstable>> reshard_sstables(std::vector<shared_sstable> sstables,
|
||||
column_family& cf, std::function<shared_sstable(shard_id)> creator,
|
||||
uint64_t max_sstable_size, uint32_t sstable_level);
|
||||
|
||||
// Return the most interesting bucket applying the size-tiered strategy.
|
||||
std::vector<sstables::shared_sstable>
|
||||
size_tiered_most_interesting_bucket(lw_shared_ptr<sstable_list> candidates);
|
||||
|
||||
Reference in New Issue
Block a user