Files
scylladb/sstables_loader.hh
Ernest Zaslavsky 6ef7ad9b5a streaming: refactor get_sstables_for_tablets to make it accessible
Create `get_sstables_for_tablets_for_tests` friend free function
for testing purposes. Adding this free function allows
direct testing without requiring the full streamer context.
2025-12-08 12:30:23 +02:00

172 lines
6.4 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <vector>
#include <seastar/core/sharded.hh>
#include "dht/i_partitioner_fwd.hh"
#include "dht/token.hh"
#include "schema/schema_fwd.hh"
#include "sstables/shared_sstable.hh"
#include "tasks/task_manager.hh"
using namespace seastar;
namespace replica {
class database;
}
namespace sstables { class storage_manager; }
namespace netw { class messaging_service; }
namespace db {
namespace view {
class view_builder;
class view_building_worker;
}
}
namespace service {
class storage_service;
}
namespace locator {
class effective_replication_map;
}
struct stream_progress {
float total = 0.;
float completed = 0.;
virtual ~stream_progress() = default;
stream_progress& operator+=(const stream_progress& p) {
total += p.total;
completed += p.completed;
return *this;
}
void start(float amount) {
assert(amount >= 0);
total = amount;
}
virtual void advance(float amount) {
// we should not move backward
assert(amount >= 0);
completed += amount;
assert(completed <= total);
}
};
// The handler of the 'storage_service/load_new_ss_tables' endpoint which, in
// turn, is the target of the 'nodetool refresh' command.
// Gets sstables from the upload directory and makes them available in the
// system. Built on top of the distributed_loader functionality.
class sstables_loader : public seastar::peering_sharded_service<sstables_loader> {
public:
enum class stream_scope { all, dc, rack, node };
class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "sstables_loader") {}
};
private:
sharded<replica::database>& _db;
sharded<service::storage_service>& _ss;
netw::messaging_service& _messaging;
sharded<db::view::view_builder>& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
shared_ptr<task_manager_module> _task_manager_module;
sstables::storage_manager& _storage_manager;
seastar::scheduling_group _sched_group;
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
// given objective criteria
//
// It shouldn't be impossible to actively serialize two callers if the need
// ever arise.
bool _loading_new_sstables = false;
future<> load_and_stream(sstring ks_name, sstring cf_name,
table_id, std::vector<sstables::shared_sstable> sstables,
bool_class<struct primary_replica_only_tag> primary_replica_only, bool unlink_sstables, stream_scope scope,
shared_ptr<stream_progress> progress);
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
public:
sstables_loader(sharded<replica::database>& db,
sharded<service::storage_service>& ss,
netw::messaging_service& messaging,
sharded<db::view::view_builder>& vb,
sharded<db::view::view_building_worker>& vbw,
tasks::task_manager& tm,
sstables::storage_manager& sstm,
seastar::scheduling_group sg);
future<> stop();
/**
* Load new SSTables not currently tracked by the system
*
* This can be called, for instance, after copying a batch of SSTables to a CF directory.
*
* This should not be called in parallel for the same keyspace / column family, and doing
* so will throw an std::runtime_exception.
*
* @param ks_name the keyspace in which to search for new SSTables.
* @param cf_name the column family in which to search for new SSTables.
* @param load_and_stream load SSTables that do not belong to this node and stream them to the appropriate nodes.
* @param primary_replica_only whether to stream only to the primary replica that owns the data.
* @param skip_cleanup whether to skip the cleanup step when loading SSTables.
* @param skip_reshape whether to skip the reshape step when loading SSTables.
* @return a future<> when the operation finishes.
*/
future<> load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope);
/**
* Download new SSTables not currently tracked by the system from object store
*/
future<tasks::task_id> download_new_sstables(sstring ks_name, sstring cf_name,
sstring prefix, std::vector<sstring> sstables,
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica);
class download_task_impl;
};
template <>
struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(const sstables_loader::stream_scope a, FormatContext& ctx) const {
using enum sstables_loader::stream_scope;
switch (a) {
case all:
return formatter<string_view>::format("all", ctx);
case dc:
return formatter<string_view>::format("dc", ctx);
case rack:
return formatter<string_view>::format("rack", ctx);
case node:
return formatter<string_view>::format("node", ctx);
}
}
};
struct tablet_sstable_collection {
dht::token_range tablet_range;
std::vector<sstables::shared_sstable> sstables_fully_contained;
std::vector<sstables::shared_sstable> sstables_partially_contained;
};
// This function is intended for test purposes only.
// It assigns the given sstables to the given tablet ranges based on token containment.
// It returns a vector of tablet_sstable_collection, each containing the tablet range
// and the sstables that are fully or partially contained within that range.
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
std::vector<dht::token_range>&& tablets_ranges);