table: Wire compound sstable set

From now own, _sstables  becomes the compound set, and _main_sstables refer
only to the main sstables of the table. In the near future, maintenance
set will be introduced and will also be managed by the compound set.

So add_sstable() and on_compaction_completion() are changed to
explicitly insert and remove sstables from the main set.

By storing compound set in _sstables, functions which used _sstables for
creating reader, computing statistics, etc, will not have to be changed
when we introduce the maintenance set, so code change is a lot minimized
by this approach.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-03-11 15:58:34 -03:00
parent 42b309b43e
commit 1e7a444a8b
3 changed files with 36 additions and 13 deletions

View File

@@ -426,7 +426,9 @@ private:
lw_shared_ptr<memtable_list> make_memtable_list();
sstables::compaction_strategy _compaction_strategy;
// generation -> sstable. Ordered by key so we can easily get the most recent.
// SSTable set which contains all non-maintenance sstables
lw_shared_ptr<sstables::sstable_set> _main_sstables;
// Compound set which manages all the SSTable sets (e.g. main, etc) and allow their operations to be combined
lw_shared_ptr<sstables::sstable_set> _sstables;
// sstables that have been compacted (so don't look up in query) but
// have not been deleted yet, so must not GC any tombstones in other sstables
@@ -596,6 +598,10 @@ private:
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const;
lw_shared_ptr<sstables::sstable_set> make_compound_sstable_set();
// Compound sstable set must be refreshed whenever any of its managed sets are changed
void refresh_compound_sstable_set();
snapshot_source sstables_as_snapshot_source();
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set>);
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;

View File

@@ -113,6 +113,14 @@ table::make_sstable_reader(schema_ptr s,
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
lw_shared_ptr<sstables::sstable_set> table::make_compound_sstable_set() {
return make_lw_shared(sstables::make_compound_sstable_set(_schema, { _main_sstables }));
}
void table::refresh_compound_sstable_set() {
_sstables = make_compound_sstable_set();
}
// Exposed for testing, not performance critical.
future<table::const_mutation_partition_ptr>
table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const {
@@ -380,7 +388,8 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
}
void table::add_sstable(sstables::shared_sstable sstable) {
_sstables = do_add_sstable(_sstables, std::move(sstable), enable_backlog_tracker::yes);
_main_sstables = do_add_sstable(_main_sstables, std::move(sstable), enable_backlog_tracker::yes);
refresh_compound_sstable_set();
}
future<>
@@ -641,7 +650,8 @@ table::stop() {
return _compaction_manager.remove(this).then([this] {
return _sstable_deletion_gate.close().then([this] {
return get_row_cache().invalidate(row_cache::external_updater([this] {
_sstables = _compaction_strategy.make_sstable_set(_schema);
_main_sstables = _compaction_strategy.make_sstable_set(_schema);
_sstables = make_compound_sstable_set();
_sstables_staging.clear();
})).then([this] {
_cache.refresh_snapshot();
@@ -723,7 +733,7 @@ void table::rebuild_statistics() {
future<lw_shared_ptr<sstables::sstable_set>>
table::build_new_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& old_sstables) {
auto current_sstables = _sstables;
auto current_sstables = _main_sstables;
auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema);
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
@@ -792,7 +802,8 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
_new_sstables = co_await _t.build_new_sstable_list(_desc.new_sstables, _desc.old_sstables);
}
virtual void execute() override {
_t._sstables = std::move(_new_sstables);
_t._main_sstables = std::move(_new_sstables);
_t.refresh_compound_sstable_set();
}
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstables::compaction_completion_desc& d) {
return std::make_unique<sstable_list_updater>(t, d);
@@ -903,7 +914,7 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
_compaction_strategy.get_backlog_tracker().transfer_ongoing_charges(new_cs.get_backlog_tracker(), move_read_charges);
auto new_sstables = new_cs.make_sstable_set(_schema);
_sstables->for_each_sstable([&] (const sstables::shared_sstable& s) {
_main_sstables->for_each_sstable([&] (const sstables::shared_sstable& s) {
add_sstable_to_backlog_tracker(new_cs.get_backlog_tracker(), s);
new_sstables.insert(s);
});
@@ -914,7 +925,8 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
// now exception safe:
_compaction_strategy = std::move(new_cs);
_sstables = std::move(new_sstables);
_main_sstables = std::move(new_sstables);
refresh_compound_sstable_set();
}
size_t table::sstables_count() const {
@@ -965,7 +977,8 @@ future<std::unordered_set<sstring>> table::get_sstables_by_partition_key(const s
}
const sstables::sstable_set& table::get_sstable_set() const {
return *_sstables;
// main sstables is enough for the outside world. sstables in other set like maintenance is not needed even for expiration purposes in compaction
return *_main_sstables;
}
lw_shared_ptr<const sstable_list> table::get_sstables() const {
@@ -1031,7 +1044,8 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man
)
, _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema)))
, _main_sstables(make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema)))
, _sstables(make_compound_sstable_set())
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
, _commitlog(cl)
, _durable_writes(true)
@@ -1379,9 +1393,10 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
pruned->insert(p);
});
};
prune(pruned, cf._sstables, enable_backlog_tracker::yes);
prune(pruned, cf._main_sstables, enable_backlog_tracker::yes);
cf._sstables = std::move(pruned);
cf._main_sstables = std::move(pruned);
cf.refresh_compound_sstable_set();
}
};
auto p = make_lw_shared<pruner>(*this);

View File

@@ -46,13 +46,15 @@ public:
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
void add_sstable(sstables::shared_sstable sstable) {
_cf->_sstables->insert(std::move(sstable));
_cf->_main_sstables->insert(std::move(sstable));
_cf->refresh_compound_sstable_set();
}
// NOTE: must run in a thread
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
_cf->_main_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
_cf->refresh_compound_sstable_set();
}
static void update_sstables_known_generation(column_family& cf, unsigned generation) {