table: Introduce storage snapshot for upcoming tablet streaming

New file streaming for tablets will require integration with compaction
groups. So this patch introduces a way for streaming to take a storage
snapshot of a given tablet using its token range. Memtable is flushed
first, so all data of a tablet can be streamed through its sstables.
The interface is compaction group / tablet agnostic, but user can
easily pick data from a single tablet by using the range in tablet
metadata for a given tablet.

E.g.:

	auto erm = table.get_effective_replication_map();
	auto& tm = erm->get_token_metadata();
	auto tablet_map = tm.tablets().get_tablet_map(table.schema()->id());

	for (auto tid : tablet_map.tablet_ids()) {
		auto tr = tmap.get_token_range(tid);

		auto ssts = co_await table.take_storage_snapshot(tr);

		...
	}

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #15128
This commit is contained in:
Raphael S. Carvalho
2023-08-22 18:37:37 -03:00
committed by Tomasz Grabiec
parent 9806bddf75
commit a22f74df00
2 changed files with 48 additions and 0 deletions

View File

@@ -548,6 +548,8 @@ private:
compaction_group* single_compaction_group_if_available() const noexcept;
// Select a compaction group from a given token.
compaction_group& compaction_group_for_token(dht::token token) const noexcept;
// Return ids of compaction groups, present in this shard, that own a particular token range.
std::vector<size_t> compaction_group_ids_for_token_range(dht::token_range tr) const;
// Select a compaction group from a given key.
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept;
// Select a compaction group from a given sstable based on its token range.
@@ -1171,6 +1173,11 @@ public:
// Returns true if any of the sstables requries cleanup.
bool requires_cleanup(const sstables::sstable_set& set) const;
// Takes snapshot of current storage state (includes memtable and sstables) from
// all compaction groups that overlap with a given token range. The output is
// a list of SSTables that represent the snapshot.
future<sstables::sstable_list> take_storage_snapshot(dht::token_range tr);
friend class compaction_group;
};

View File

@@ -628,6 +628,23 @@ compaction_group& table::compaction_group_for_token(dht::token token) const noex
return ret;
}
std::vector<size_t> table::compaction_group_ids_for_token_range(dht::token_range tr) const {
std::vector<size_t> ret;
auto cmp = dht::token_comparator();
size_t candidate_start = tr.start() ? compaction_group_for_token(tr.start()->value()).group_id() : size_t(0);
size_t candidate_end = tr.end() ? compaction_group_for_token(tr.end()->value()).group_id() : (_compaction_groups.size() - 1);
while (candidate_start <= candidate_end) {
auto& cg = _compaction_groups[candidate_start++];
if (cg && tr.overlaps(cg->token_range(), cmp)) {
ret.push_back(cg->group_id());
}
}
return ret;
}
compaction_group& table::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept {
// fast path when table owns a single compaction group, to avoid overhead of calculating token.
if (auto cg = single_compaction_group_if_available()) {
@@ -652,6 +669,30 @@ future<> table::parallel_foreach_compaction_group(std::function<future<>(compact
});
}
future<sstables::sstable_list> table::take_storage_snapshot(dht::token_range tr) {
sstables::sstable_list ret;
for (auto cg_id : compaction_group_ids_for_token_range(tr)) {
auto& cg = _compaction_groups[cg_id];
// We don't care about sstables in snapshot being unlinked, as the file
// descriptors remain opened until last reference to them are gone.
// Also, we should be careful with taking a deletion lock here as a
// deadlock might occur due to memtable flush backpressure waiting on
// compaction to reduce the backlog.
co_await cg->flush();
auto all_sstables = cg->make_compound_sstable_set();
all_sstables->for_each_sstable([&ret] (const sstables::shared_sstable& sst) mutable {
ret.insert(sst);
});
}
co_return ret;
}
void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept {
_stats.live_disk_space_used += sst->bytes_on_disk();
_stats.total_disk_space_used += sst->bytes_on_disk();