table: Wire compound sstable set
From now own, _sstables becomes the compound set, and _main_sstables refer only to the main sstables of the table. In the near future, maintenance set will be introduced and will also be managed by the compound set. So add_sstable() and on_compaction_completion() are changed to explicitly insert and remove sstables from the main set. By storing compound set in _sstables, functions which used _sstables for creating reader, computing statistics, etc, will not have to be changed when we introduce the maintenance set, so code change is a lot minimized by this approach. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -426,7 +426,9 @@ private:
|
||||
lw_shared_ptr<memtable_list> make_memtable_list();
|
||||
|
||||
sstables::compaction_strategy _compaction_strategy;
|
||||
// generation -> sstable. Ordered by key so we can easily get the most recent.
|
||||
// SSTable set which contains all non-maintenance sstables
|
||||
lw_shared_ptr<sstables::sstable_set> _main_sstables;
|
||||
// Compound set which manages all the SSTable sets (e.g. main, etc) and allow their operations to be combined
|
||||
lw_shared_ptr<sstables::sstable_set> _sstables;
|
||||
// sstables that have been compacted (so don't look up in query) but
|
||||
// have not been deleted yet, so must not GC any tombstones in other sstables
|
||||
@@ -596,6 +598,10 @@ private:
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const;
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> make_compound_sstable_set();
|
||||
// Compound sstable set must be refreshed whenever any of its managed sets are changed
|
||||
void refresh_compound_sstable_set();
|
||||
|
||||
snapshot_source sstables_as_snapshot_source();
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set>);
|
||||
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
|
||||
|
||||
35
table.cc
35
table.cc
@@ -113,6 +113,14 @@ table::make_sstable_reader(schema_ptr s,
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> table::make_compound_sstable_set() {
|
||||
return make_lw_shared(sstables::make_compound_sstable_set(_schema, { _main_sstables }));
|
||||
}
|
||||
|
||||
void table::refresh_compound_sstable_set() {
|
||||
_sstables = make_compound_sstable_set();
|
||||
}
|
||||
|
||||
// Exposed for testing, not performance critical.
|
||||
future<table::const_mutation_partition_ptr>
|
||||
table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const {
|
||||
@@ -380,7 +388,8 @@ table::do_add_sstable(lw_shared_ptr<sstables::sstable_set> sstables, sstables::s
|
||||
}
|
||||
|
||||
void table::add_sstable(sstables::shared_sstable sstable) {
|
||||
_sstables = do_add_sstable(_sstables, std::move(sstable), enable_backlog_tracker::yes);
|
||||
_main_sstables = do_add_sstable(_main_sstables, std::move(sstable), enable_backlog_tracker::yes);
|
||||
refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -641,7 +650,8 @@ table::stop() {
|
||||
return _compaction_manager.remove(this).then([this] {
|
||||
return _sstable_deletion_gate.close().then([this] {
|
||||
return get_row_cache().invalidate(row_cache::external_updater([this] {
|
||||
_sstables = _compaction_strategy.make_sstable_set(_schema);
|
||||
_main_sstables = _compaction_strategy.make_sstable_set(_schema);
|
||||
_sstables = make_compound_sstable_set();
|
||||
_sstables_staging.clear();
|
||||
})).then([this] {
|
||||
_cache.refresh_snapshot();
|
||||
@@ -723,7 +733,7 @@ void table::rebuild_statistics() {
|
||||
future<lw_shared_ptr<sstables::sstable_set>>
|
||||
table::build_new_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& old_sstables) {
|
||||
auto current_sstables = _sstables;
|
||||
auto current_sstables = _main_sstables;
|
||||
auto new_sstable_list = _compaction_strategy.make_sstable_set(_schema);
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> s(old_sstables.begin(), old_sstables.end());
|
||||
@@ -792,7 +802,8 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
|
||||
_new_sstables = co_await _t.build_new_sstable_list(_desc.new_sstables, _desc.old_sstables);
|
||||
}
|
||||
virtual void execute() override {
|
||||
_t._sstables = std::move(_new_sstables);
|
||||
_t._main_sstables = std::move(_new_sstables);
|
||||
_t.refresh_compound_sstable_set();
|
||||
}
|
||||
static std::unique_ptr<row_cache::external_updater_impl> make(table& t, sstables::compaction_completion_desc& d) {
|
||||
return std::make_unique<sstable_list_updater>(t, d);
|
||||
@@ -903,7 +914,7 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
_compaction_strategy.get_backlog_tracker().transfer_ongoing_charges(new_cs.get_backlog_tracker(), move_read_charges);
|
||||
|
||||
auto new_sstables = new_cs.make_sstable_set(_schema);
|
||||
_sstables->for_each_sstable([&] (const sstables::shared_sstable& s) {
|
||||
_main_sstables->for_each_sstable([&] (const sstables::shared_sstable& s) {
|
||||
add_sstable_to_backlog_tracker(new_cs.get_backlog_tracker(), s);
|
||||
new_sstables.insert(s);
|
||||
});
|
||||
@@ -914,7 +925,8 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy)
|
||||
|
||||
// now exception safe:
|
||||
_compaction_strategy = std::move(new_cs);
|
||||
_sstables = std::move(new_sstables);
|
||||
_main_sstables = std::move(new_sstables);
|
||||
refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
size_t table::sstables_count() const {
|
||||
@@ -965,7 +977,8 @@ future<std::unordered_set<sstring>> table::get_sstables_by_partition_key(const s
|
||||
}
|
||||
|
||||
const sstables::sstable_set& table::get_sstable_set() const {
|
||||
return *_sstables;
|
||||
// main sstables is enough for the outside world. sstables in other set like maintenance is not needed even for expiration purposes in compaction
|
||||
return *_main_sstables;
|
||||
}
|
||||
|
||||
lw_shared_ptr<const sstable_list> table::get_sstables() const {
|
||||
@@ -1031,7 +1044,8 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man
|
||||
)
|
||||
, _memtables(_config.enable_disk_writes ? make_memtable_list() : make_memory_only_memtable_list())
|
||||
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _sstables(make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema)))
|
||||
, _main_sstables(make_lw_shared<sstables::sstable_set>(_compaction_strategy.make_sstable_set(_schema)))
|
||||
, _sstables(make_compound_sstable_set())
|
||||
, _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes)
|
||||
, _commitlog(cl)
|
||||
, _durable_writes(true)
|
||||
@@ -1379,9 +1393,10 @@ future<db::replay_position> table::discard_sstables(db_clock::time_point truncat
|
||||
pruned->insert(p);
|
||||
});
|
||||
};
|
||||
prune(pruned, cf._sstables, enable_backlog_tracker::yes);
|
||||
prune(pruned, cf._main_sstables, enable_backlog_tracker::yes);
|
||||
|
||||
cf._sstables = std::move(pruned);
|
||||
cf._main_sstables = std::move(pruned);
|
||||
cf.refresh_compound_sstable_set();
|
||||
}
|
||||
};
|
||||
auto p = make_lw_shared<pruner>(*this);
|
||||
|
||||
@@ -46,13 +46,15 @@ public:
|
||||
column_family_test(lw_shared_ptr<column_family> cf) : _cf(cf) {}
|
||||
|
||||
void add_sstable(sstables::shared_sstable sstable) {
|
||||
_cf->_sstables->insert(std::move(sstable));
|
||||
_cf->_main_sstables->insert(std::move(sstable));
|
||||
_cf->refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
// NOTE: must run in a thread
|
||||
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
|
||||
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
|
||||
_cf->_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
_cf->_main_sstables = _cf->build_new_sstable_list(new_sstables, sstables_to_remove).get0();
|
||||
_cf->refresh_compound_sstable_set();
|
||||
}
|
||||
|
||||
static void update_sstables_known_generation(column_family& cf, unsigned generation) {
|
||||
|
||||
Reference in New Issue
Block a user