Create `get_sstables_for_tablets_for_tests` friend free function for testing purposes. Adding this free function allows direct testing without requiring the full streamer context.
172 lines
6.4 KiB
C++
172 lines
6.4 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <vector>
|
|
#include <seastar/core/sharded.hh>
|
|
#include "dht/i_partitioner_fwd.hh"
|
|
#include "dht/token.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "sstables/shared_sstable.hh"
|
|
#include "tasks/task_manager.hh"
|
|
|
|
using namespace seastar;
|
|
|
|
namespace replica {
|
|
class database;
|
|
}
|
|
|
|
namespace sstables { class storage_manager; }
|
|
|
|
namespace netw { class messaging_service; }
|
|
namespace db {
|
|
namespace view {
|
|
class view_builder;
|
|
class view_building_worker;
|
|
}
|
|
}
|
|
namespace service {
|
|
class storage_service;
|
|
}
|
|
namespace locator {
|
|
class effective_replication_map;
|
|
}
|
|
|
|
struct stream_progress {
|
|
float total = 0.;
|
|
float completed = 0.;
|
|
|
|
virtual ~stream_progress() = default;
|
|
stream_progress& operator+=(const stream_progress& p) {
|
|
total += p.total;
|
|
completed += p.completed;
|
|
return *this;
|
|
}
|
|
void start(float amount) {
|
|
assert(amount >= 0);
|
|
total = amount;
|
|
}
|
|
virtual void advance(float amount) {
|
|
// we should not move backward
|
|
assert(amount >= 0);
|
|
completed += amount;
|
|
assert(completed <= total);
|
|
}
|
|
};
|
|
|
|
// The handler of the 'storage_service/load_new_ss_tables' endpoint which, in
|
|
// turn, is the target of the 'nodetool refresh' command.
|
|
// Gets sstables from the upload directory and makes them available in the
|
|
// system. Built on top of the distributed_loader functionality.
|
|
class sstables_loader : public seastar::peering_sharded_service<sstables_loader> {
|
|
public:
|
|
enum class stream_scope { all, dc, rack, node };
|
|
class task_manager_module : public tasks::task_manager::module {
|
|
public:
|
|
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "sstables_loader") {}
|
|
};
|
|
|
|
private:
|
|
sharded<replica::database>& _db;
|
|
sharded<service::storage_service>& _ss;
|
|
netw::messaging_service& _messaging;
|
|
sharded<db::view::view_builder>& _view_builder;
|
|
sharded<db::view::view_building_worker>& _view_building_worker;
|
|
shared_ptr<task_manager_module> _task_manager_module;
|
|
sstables::storage_manager& _storage_manager;
|
|
seastar::scheduling_group _sched_group;
|
|
|
|
// Note that this is obviously only valid for the current shard. Users of
|
|
// this facility should elect a shard to be the coordinator based on any
|
|
// given objective criteria
|
|
//
|
|
// It shouldn't be impossible to actively serialize two callers if the need
|
|
// ever arise.
|
|
bool _loading_new_sstables = false;
|
|
|
|
future<> load_and_stream(sstring ks_name, sstring cf_name,
|
|
table_id, std::vector<sstables::shared_sstable> sstables,
|
|
bool_class<struct primary_replica_only_tag> primary_replica_only, bool unlink_sstables, stream_scope scope,
|
|
shared_ptr<stream_progress> progress);
|
|
|
|
future<seastar::shared_ptr<const locator::effective_replication_map>> await_topology_quiesced_and_get_erm(table_id table_id);
|
|
public:
|
|
sstables_loader(sharded<replica::database>& db,
|
|
sharded<service::storage_service>& ss,
|
|
netw::messaging_service& messaging,
|
|
sharded<db::view::view_builder>& vb,
|
|
sharded<db::view::view_building_worker>& vbw,
|
|
tasks::task_manager& tm,
|
|
sstables::storage_manager& sstm,
|
|
seastar::scheduling_group sg);
|
|
|
|
future<> stop();
|
|
|
|
/**
|
|
* Load new SSTables not currently tracked by the system
|
|
*
|
|
* This can be called, for instance, after copying a batch of SSTables to a CF directory.
|
|
*
|
|
* This should not be called in parallel for the same keyspace / column family, and doing
|
|
* so will throw an std::runtime_exception.
|
|
*
|
|
* @param ks_name the keyspace in which to search for new SSTables.
|
|
* @param cf_name the column family in which to search for new SSTables.
|
|
* @param load_and_stream load SSTables that do not belong to this node and stream them to the appropriate nodes.
|
|
* @param primary_replica_only whether to stream only to the primary replica that owns the data.
|
|
* @param skip_cleanup whether to skip the cleanup step when loading SSTables.
|
|
* @param skip_reshape whether to skip the reshape step when loading SSTables.
|
|
* @return a future<> when the operation finishes.
|
|
*/
|
|
future<> load_new_sstables(sstring ks_name, sstring cf_name,
|
|
bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope);
|
|
|
|
/**
|
|
* Download new SSTables not currently tracked by the system from object store
|
|
*/
|
|
future<tasks::task_id> download_new_sstables(sstring ks_name, sstring cf_name,
|
|
sstring prefix, std::vector<sstring> sstables,
|
|
sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica);
|
|
|
|
class download_task_impl;
|
|
};
|
|
|
|
template <>
|
|
struct fmt::formatter<sstables_loader::stream_scope> : fmt::formatter<string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const sstables_loader::stream_scope a, FormatContext& ctx) const {
|
|
using enum sstables_loader::stream_scope;
|
|
switch (a) {
|
|
case all:
|
|
return formatter<string_view>::format("all", ctx);
|
|
case dc:
|
|
return formatter<string_view>::format("dc", ctx);
|
|
case rack:
|
|
return formatter<string_view>::format("rack", ctx);
|
|
case node:
|
|
return formatter<string_view>::format("node", ctx);
|
|
}
|
|
}
|
|
};
|
|
|
|
struct tablet_sstable_collection {
|
|
dht::token_range tablet_range;
|
|
std::vector<sstables::shared_sstable> sstables_fully_contained;
|
|
std::vector<sstables::shared_sstable> sstables_partially_contained;
|
|
};
|
|
|
|
// This function is intended for test purposes only.
|
|
// It assigns the given sstables to the given tablet ranges based on token containment.
|
|
// It returns a vector of tablet_sstable_collection, each containing the tablet range
|
|
// and the sstables that are fully or partially contained within that range.
|
|
// The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping.
|
|
// Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order.
|
|
future<std::vector<tablet_sstable_collection>> get_sstables_for_tablets_for_tests(const std::vector<sstables::shared_sstable>& sstables,
|
|
std::vector<dht::token_range>&& tablets_ranges);
|