sstables_loader: Make primary_replica_only bool_class RAII field

This boolean is currently passed all the way around as pure bool
argument. And it's only needed in a single get_endpoints() method that
calculates the target endpoints.

This patch places this bool on class streamer, so that the call chain
arguments are not polluted, and converts it to bool_class.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2024-10-03 09:46:53 +03:00
parent 7a7a1e3558
commit 7eb48358e9

View File

@@ -118,6 +118,8 @@ public:
} // anonymous namespace
using primary_replica_only = bool_class<struct primary_replica_only_tag>;
class sstable_streamer {
protected:
netw::messaging_service& _ms;
@@ -125,13 +127,16 @@ protected:
replica::table& _table;
locator::effective_replication_map_ptr _erm;
std::vector<sstables::shared_sstable> _sstables;
const primary_replica_only _primary_replica_only;
public:
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables)
sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary)
: _ms(ms)
, _db(db)
, _table(db.find_column_family(table_id))
, _erm(_table.get_effective_replication_map())
, _sstables(std::move(sstables)) {
, _sstables(std::move(sstables))
, _primary_replica_only(primary)
{
// By sorting SSTables by their primary key, we allow SSTable runs to be
// incrementally streamed.
// Overlapping run fragments can have their content deduplicated, reducing
@@ -145,19 +150,19 @@ public:
virtual ~sstable_streamer() {}
virtual future<> stream(bool primary_replica_only);
inet_address_vector_replica_set get_endpoints(const dht::token& token, bool primary_replica_only) const;
virtual future<> stream();
inet_address_vector_replica_set get_endpoints(const dht::token& token) const;
protected:
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const;
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>, bool primary_replica_only);
future<> stream_sstable_mutations(const dht::partition_range&, std::vector<sstables::shared_sstable>, bool primary_replica_only);
future<> stream_sstables(const dht::partition_range&, std::vector<sstables::shared_sstable>);
future<> stream_sstable_mutations(const dht::partition_range&, std::vector<sstables::shared_sstable>);
};
class tablet_sstable_streamer : public sstable_streamer {
const locator::tablet_map& _tablet_map;
public:
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables)
: sstable_streamer(ms, db, table_id, std::move(sstables))
tablet_sstable_streamer(netw::messaging_service& ms, replica::database& db, ::table_id table_id, std::vector<sstables::shared_sstable> sstables, primary_replica_only primary)
: sstable_streamer(ms, db, table_id, std::move(sstables), primary)
, _tablet_map(_erm->get_token_metadata().tablets().get_tablet_map(table_id)) {
}
@@ -171,18 +176,17 @@ public:
return result;
}
virtual future<> stream(bool primary_replica_only) override;
virtual future<> stream() override;
virtual inet_address_vector_replica_set get_primary_endpoints(const dht::token& token) const override;
private:
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
future<> stream_fully_contained_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
// FIXME: fully contained sstables can be optimized.
return stream_sstables(pr, std::move(sstables), primary_replica_only);
return stream_sstables(pr, std::move(sstables));
}
};
inet_address_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token, bool primary_replica_only) const {
if (primary_replica_only) {
inet_address_vector_replica_set sstable_streamer::get_endpoints(const dht::token& token) const {
if (_primary_replica_only) {
return get_primary_endpoints(token);
}
auto current_targets = _erm->get_natural_endpoints_without_node_being_replaced(token);
@@ -203,13 +207,13 @@ inet_address_vector_replica_set tablet_sstable_streamer::get_primary_endpoints(c
return to_replica_set(replicas);
}
future<> sstable_streamer::stream(bool primary_replica_only) {
future<> sstable_streamer::stream() {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
co_await stream_sstables(full_partition_range, std::move(_sstables), primary_replica_only);
co_await stream_sstables(full_partition_range, std::move(_sstables));
}
future<> tablet_sstable_streamer::stream(bool primary_replica_only) {
future<> tablet_sstable_streamer::stream() {
// sstables are sorted by first key in reverse order.
auto sstable_it = _sstables.rbegin();
@@ -249,13 +253,12 @@ future<> tablet_sstable_streamer::stream(bool primary_replica_only) {
}
auto tablet_pr = dht::to_partition_range(tablet_range);
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained), primary_replica_only);
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained), primary_replica_only);
co_await stream_sstables(tablet_pr, std::move(sstables_partially_contained));
co_await stream_fully_contained_sstables(tablet_pr, std::move(sstables_fully_contained));
}
}
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
while (!sstables.empty()) {
size_t batch_sst_nr = 16;
std::vector<sstables::shared_sstable> sst_processed;
@@ -266,12 +269,11 @@ future<> sstable_streamer::stream_sstables(const dht::partition_range& pr, std::
sstables.pop_back();
}
co_await stream_sstable_mutations(pr, std::move(sst_processed), primary_replica_only);
co_await stream_sstable_mutations(pr, std::move(sst_processed));
}
}
future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables,
bool primary_replica_only) {
future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range& pr, std::vector<sstables::shared_sstable> sstables) {
const auto token_range = pr.transform(std::mem_fn(&dht::ring_position::token));
auto s = _table.schema();
const auto cf_id = s->id();
@@ -312,7 +314,7 @@ future<> sstable_streamer::stream_sstable_mutations(const dht::partition_range&
auto& start = mf->as_partition_start();
const auto& current_dk = start.key();
current_targets = get_endpoints(current_dk.token(), primary_replica_only);
current_targets = get_endpoints(current_dk.token());
llog.trace("load_and_stream: ops_uuid={}, current_dk={}, current_targets={}", ops_uuid,
current_dk.token(), current_targets);
for (auto& node : current_targets) {
@@ -389,13 +391,13 @@ static std::unique_ptr<sstable_streamer> make_sstable_streamer(bool uses_tablets
}
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary_replica_only) {
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary) {
// streamer guarantees topology stability, for correctness, by holding effective_replication_map
// throughout its lifetime.
auto streamer = make_sstable_streamer(_db.local().find_column_family(table_id).uses_tablets(),
_messaging, _db.local(), table_id, std::move(sstables));
_messaging, _db.local(), table_id, std::move(sstables), primary_replica_only(primary));
co_await streamer->stream(primary_replica_only);
co_await streamer->stream();
}
// For more details, see distributed_loader::process_upload_dir().