From a22f74df00752fa7a693296fee5c7ea11adf613f Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 22 Aug 2023 18:37:37 -0300 Subject: [PATCH] table: Introduce storage snapshot for upcoming tablet streaming New file streaming for tablets will require integration with compaction groups. So this patch introduces a way for streaming to take a storage snapshot of a given tablet using its token range. Memtable is flushed first, so all data of a tablet can be streamed through its sstables. The interface is compaction group / tablet agnostic, but user can easily pick data from a single tablet by using the range in tablet metadata for a given tablet. E.g.: auto erm = table.get_effective_replication_map(); auto& tm = erm->get_token_metadata(); auto tablet_map = tm.tablets().get_tablet_map(table.schema()->id()); for (auto tid : tablet_map.tablet_ids()) { auto tr = tmap.get_token_range(tid); auto ssts = co_await table.take_storage_snapshot(tr); ... } Signed-off-by: Raphael S. Carvalho Closes #15128 --- replica/database.hh | 7 +++++++ replica/table.cc | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/replica/database.hh b/replica/database.hh index c87bf71db5..f8d1d0451f 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -548,6 +548,8 @@ private: compaction_group* single_compaction_group_if_available() const noexcept; // Select a compaction group from a given token. compaction_group& compaction_group_for_token(dht::token token) const noexcept; + // Return ids of compaction groups, present in this shard, that own a particular token range. + std::vector compaction_group_ids_for_token_range(dht::token_range tr) const; // Select a compaction group from a given key. compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept; // Select a compaction group from a given sstable based on its token range. @@ -1171,6 +1173,11 @@ public: // Returns true if any of the sstables requries cleanup. bool requires_cleanup(const sstables::sstable_set& set) const; + // Takes snapshot of current storage state (includes memtable and sstables) from + // all compaction groups that overlap with a given token range. The output is + // a list of SSTables that represent the snapshot. + future take_storage_snapshot(dht::token_range tr); + friend class compaction_group; }; diff --git a/replica/table.cc b/replica/table.cc index ddc74e07c3..4a351ac00f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -628,6 +628,23 @@ compaction_group& table::compaction_group_for_token(dht::token token) const noex return ret; } +std::vector table::compaction_group_ids_for_token_range(dht::token_range tr) const { + std::vector ret; + auto cmp = dht::token_comparator(); + + size_t candidate_start = tr.start() ? compaction_group_for_token(tr.start()->value()).group_id() : size_t(0); + size_t candidate_end = tr.end() ? compaction_group_for_token(tr.end()->value()).group_id() : (_compaction_groups.size() - 1); + + while (candidate_start <= candidate_end) { + auto& cg = _compaction_groups[candidate_start++]; + if (cg && tr.overlaps(cg->token_range(), cmp)) { + ret.push_back(cg->group_id()); + } + } + + return ret; +} + compaction_group& table::compaction_group_for_key(partition_key_view key, const schema_ptr& s) const noexcept { // fast path when table owns a single compaction group, to avoid overhead of calculating token. if (auto cg = single_compaction_group_if_available()) { @@ -652,6 +669,30 @@ future<> table::parallel_foreach_compaction_group(std::function(compact }); } +future table::take_storage_snapshot(dht::token_range tr) { + sstables::sstable_list ret; + + for (auto cg_id : compaction_group_ids_for_token_range(tr)) { + auto& cg = _compaction_groups[cg_id]; + + // We don't care about sstables in snapshot being unlinked, as the file + // descriptors remain opened until last reference to them are gone. + // Also, we should be careful with taking a deletion lock here as a + // deadlock might occur due to memtable flush backpressure waiting on + // compaction to reduce the backlog. + + co_await cg->flush(); + + auto all_sstables = cg->make_compound_sstable_set(); + + all_sstables->for_each_sstable([&ret] (const sstables::shared_sstable& sst) mutable { + ret.insert(sst); + }); + } + + co_return ret; +} + void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept { _stats.live_disk_space_used += sst->bytes_on_disk(); _stats.total_disk_space_used += sst->bytes_on_disk();