diff --git a/sstables_loader.cc b/sstables_loader.cc index 1cc3a227ff..7dc61179e7 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -118,6 +118,8 @@ public: } // anonymous namespace +using primary_replica_only = bool_class; + class sstable_streamer { protected: netw::messaging_service& _ms; @@ -125,13 +127,16 @@ protected: replica::table& _table; locator::effective_replication_map_ptr _erm; std::vector _sstables; + const primary_replica_only _primary_replica_only; public: - sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector sstables) + sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector sstables, primary_replica_only primary) : _ms(ms) , _db(db) , _table(db.find_column_family(table_id)) , _erm(_table.get_effective_replication_map()) - , _sstables(std::move(sstables)) { + , _sstables(std::move(sstables)) + , _primary_replica_only(primary) + { // By sorting SSTables by their primary key, we allow SSTable runs to be // incrementally streamed. // Overlapping run fragments can have their content deduplicated, reducing @@ -145,19 +150,19 @@ public: virtual ~sstable_streamer() {} - virtual future<> stream(bool primary_replica_only); - inet_address_vector_replica_set get_endpoints(const dht::token& token, bool primary_replica_only) const; + virtual future<> stream(); + inet_address_vector_replica_set get_endpoints(const dht::token& token) const; protected: virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const; - future<> stream_sstables(const dht::partition_range&, std::vector, bool primary_replica_only); - future<> stream_sstable_mutations(const dht::partition_range&, std::vector, bool primary_replica_only); + future<> stream_sstables(const dht::partition_range&, std::vector); + future<> stream_sstable_mutations(const dht::partition_range&, std::vector); }; class tablet_sstable_streamer : public sstable_streamer { const locator::tablet_map& _tablet_map; public: - tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector sstables) - : sstable_streamer(ms, db, table_id, std::move(sstables)) + tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector sstables, primary_replica_only primary) + : sstable_streamer(ms, db, table_id, std::move(sstables), primary) , _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) { } @@ -171,18 +176,17 @@ public: return result; } - virtual future<> stream(bool primary_replica_only) override; + virtual future<> stream() override; virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const override; private: - future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector sstables, - bool primary_replica_only) { + future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector sstables) { // FIXME: fully contained sstables can be optimized. - return stream_sstables(pr, std::move(sstables), primary_replica_only); + return stream_sstables(pr, std::move(sstables)); } }; -inet_address_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token, bool primary_replica_only) const { - if (primary_replica_only) { +inet_address_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const { + if (_primary_replica_only) { return get_primary_endpoints(token); } auto current_targets = _erm->get_natural_endpoints_without_node_being_replaced(token); @@ -203,13 +207,13 @@ inet_address_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(c return to_replica_set(replicas); } -future<> sstable_streamer::stream(bool primary_replica_only) { +future<> sstable_streamer::stream() { const auto full_partition_range = dht::partition_range::make_open_ended_both_sides(); - co_await stream_sstables(full_partition_range, std::move(_sstables), primary_replica_only); + co_await stream_sstables(full_partition_range, std::move(_sstables)); } -future<> tablet_sstable_streamer::stream(bool primary_replica_only) { +future<> tablet_sstable_streamer::stream() { // sstables are sorted by first key in reverse order. auto sstable_it = _sstables.rbegin(); @@ -249,13 +253,12 @@ future<> tablet_sstable_streamer::stream(bool primary_replica_only) { } auto tablet_pr = dht::to_partition_range(tablet_range); - co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), primary_replica_only); - co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), primary_replica_only); + co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained)); + co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained)); } } -future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector sstables, - bool primary_replica_only) { +future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector sstables) { while (!sstables.empty()) { size_t batch_sst_nr = 16; std::vector sst_processed; @@ -266,12 +269,11 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std:: sstables.pop_back(); } - co_await stream_sstable_mutations(pr, std::move(sst_processed), primary_replica_only); + co_await stream_sstable_mutations(pr, std::move(sst_processed)); } } -future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector sstables, - bool primary_replica_only) { +future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector sstables) { const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token)); auto s = _table.schema(); const auto cf_id = s->id(); @@ -312,7 +314,7 @@ future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& auto& start = mf->as_partition_start(); const auto& current_dk = start.key(); - current_targets = get_endpoints(current_dk.token(), primary_replica_only); + current_targets = get_endpoints(current_dk.token()); llog.trace("load_and_stream: ops_uuid={}, current_dk={}, current_targets={}", ops_uuid, current_dk.token(), current_targets); for (auto& node : current_targets) { @@ -389,13 +391,13 @@ static std::unique_ptr make_sstable_streamer(bool uses_tablets } future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, - ::table_id table_id, std::vector sstables, bool primary_replica_only) { + ::table_id table_id, std::vector sstables, bool primary) { // streamer guarantees topology stability, for correctness, by holding effective_replication_map // throughout its lifetime. auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(), - _messaging, _db.local(), table_id, std::move(sstables)); + _messaging, _db.local(), table_id, std::move(sstables), primary_replica_only(primary)); - co_await streamer->stream(primary_replica_only); + co_await streamer->stream(); } // For more details, see distributed_loader::process_upload_dir().