storage_service: Implement stream_tablet RPC

Performs streaming of data for a single tablet between two tablet
replicas. The node which gets the RPC is the receiving replica.
This commit is contained in:
Tomasz Grabiec
2023-07-06 13:43:32 +02:00
parent e3a8bb7ec9
commit 6d545b2f9e
9 changed files with 132 additions and 6 deletions

View File

@@ -273,9 +273,9 @@ future<> range_streamer::stream_async() {
nr_ranges_streamed, nr_ranges_streamed + ranges_to_stream.size(), nr_ranges_total);
auto ranges_streamed = ranges_to_stream.size();
if (_nr_rx_added) {
sp.request_ranges(source, keyspace, std::move(ranges_to_stream));
sp.request_ranges(source, keyspace, std::move(ranges_to_stream), _tables);
} else if (_nr_tx_added) {
sp.transfer_ranges(source, keyspace, std::move(ranges_to_stream));
sp.transfer_ranges(source, keyspace, std::move(ranges_to_stream), _tables);
}
sp.execute().discard_result().get();
// Update finished percentage

View File

@@ -77,7 +77,8 @@ public:
};
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source, std::unordered_set<token> tokens,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason)
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason,
std::vector<sstring> tables = {})
: _db(db)
, _stream_manager(sm)
, _token_metadata_ptr(std::move(tmptr))
@@ -87,13 +88,14 @@ public:
, _dr(std::move(dr))
, _description(std::move(description))
, _reason(reason)
, _tables(std::move(tables))
{
_abort_source.check();
}
range_streamer(distributed<replica::database>& db, sharded<streaming::stream_manager>& sm, const token_metadata_ptr tmptr, abort_source& abort_source,
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason)
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason) {
inet_address address, locator::endpoint_dc_rack dr, sstring description, streaming::stream_reason reason, std::vector<sstring> tables = {})
: range_streamer(db, sm, std::move(tmptr), abort_source, std::unordered_set<token>(), address, std::move(dr), description, reason, std::move(tables)) {
}
void add_source_filter(std::unique_ptr<i_source_filter> filter) {
@@ -155,6 +157,7 @@ private:
locator::endpoint_dc_rack _dr;
sstring _description;
streaming::stream_reason _reason;
std::vector<sstring> _tables;
std::unordered_multimap<sstring, std::unordered_map<inet_address, dht::token_range_vector>> _to_stream;
std::unordered_set<std::unique_ptr<i_source_filter>> _source_filters;
// Number of tx and rx ranges added

View File

@@ -6,6 +6,19 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace locator {
struct tablet_id final {
uint64_t value();
};
struct global_tablet_id final {
::table_id table;
locator::tablet_id tablet;
};
}
namespace service {
struct fencing_token {
service::topology::version_t topology_version;
@@ -38,4 +51,5 @@ struct raft_topology_pull_params {};
verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
verb [[cancellable]] tablet_stream_data (locator::global_tablet_id);
}

View File

@@ -18,6 +18,7 @@
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
#include "gms/gossip_digest_ack2.hh"
#include "locator/tablets.hh"
#include "query-request.hh"
#include "query-result.hh"
#include <seastar/rpc/rpc.hh>
@@ -593,6 +594,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
case messaging_verb::TABLET_STREAM_DATA:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:

View File

@@ -182,7 +182,8 @@ enum class messaging_verb : int32_t {
DIRECT_FD_PING = 63,
RAFT_TOPOLOGY_CMD = 64,
RAFT_PULL_TOPOLOGY_SNAPSHOT = 65,
LAST = 66,
TABLET_STREAM_DATA = 66,
LAST = 67,
};
} // namespace netw

View File

@@ -22,6 +22,7 @@
#include "query_result_merger.hh"
#include <seastar/core/do_with.hh>
#include "message/messaging_service.hh"
#include "locator/tablets.hh"
#include "gms/gossiper.hh"
#include <seastar/core/future-util.hh>
#include "db/read_repair_decision.hh"

View File

@@ -19,6 +19,7 @@
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include "locator/tablets.hh"
#include <seastar/core/smp.hh>
#include "mutation/canonical_mutation.hh"
#include "seastar/core/on_internal_error.hh"
@@ -5358,6 +5359,100 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
});
}
static
locator::tablet_replica get_leaving_replica(const locator::tablet_info& tinfo, const locator::tablet_transition_info& trinfo) {
std::unordered_set<locator::tablet_replica> leaving(tinfo.replicas.begin(), tinfo.replicas.end());
for (auto&& r : trinfo.next) {
leaving.erase(r);
}
if (leaving.empty()) {
throw std::runtime_error(format("No leaving replicas"));
}
if (leaving.size() > 1) {
throw std::runtime_error(format("More than one leaving replica"));
}
return *leaving.begin();
}
inet_address storage_service::host2ip(locator::host_id host) {
auto ip = _group0->address_map().find(raft::server_id(host.uuid()));
if (!ip) {
throw std::runtime_error(::format("Cannot map host {} to ip", host));
}
return *ip;
}
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
// The coordinator does not execute global token metadata barrier before jumping to "streaming" stage, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_abort_source);
auto tm = _shared_token_metadata.get();
auto& tmap = tm->tablets().get_tablet_map(tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
}
if (trinfo->pending_replica.host != tm->get_my_id()) {
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
locator::tablet_replica leaving_replica = get_leaving_replica(tinfo, *trinfo);
if (leaving_replica.host == tm->get_my_id()) {
// The algorithm doesn't work with tablet migration within the same node because
// it assumes there is only one tablet replica, picked by the sharder, on local node.
throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}",
tablet, leaving_replica.shard, trinfo->pending_replica.shard));
}
auto leaving_replica_ip = host2ip(leaving_replica.host);
if (_tablet_streaming[tablet]) {
slogger.debug("Streaming retry joining with existing session for tablet {}", tablet);
co_await _tablet_streaming[tablet]->get_future();
co_return;
}
auto async_gate_holder = _async_gate.hold();
promise<> p;
_tablet_streaming[tablet] = seastar::shared_future<>(p.get_future());
auto erase_tablet_streaming = seastar::defer([&] {
_tablet_streaming.erase(tablet);
});
try {
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm, _abort_source,
get_broadcast_address(), _sys_ks.local().local_dc_rack(),
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
_gossiper.get_unreachable_members()));
std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint;
ranges_per_endpoint[leaving_replica_ip].emplace_back(range);
streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint));
co_await streamer->stream_async();
p.set_value();
slogger.info("Streaming for tablet migration of {} successful", tablet);
} catch (...) {
p.set_exception(std::current_exception());
slogger.warn("Streaming for tablet migration of {} from {} failed: {}", tablet, leaving_replica, std::current_exception());
throw;
}
}
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
@@ -5421,6 +5516,9 @@ void storage_service::init_messaging_service(sharded<service::storage_proxy>& pr
};
});
});
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [this] (locator::global_tablet_id tablet) {
return stream_tablet(tablet);
});
}
future<> storage_service::uninit_messaging_service() {

View File

@@ -15,6 +15,7 @@
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "inet_address_vectors.hh"
#include <seastar/core/distributed.hh>
#include <seastar/core/condition-variable.hh>
@@ -145,6 +146,8 @@ private:
future<> node_ops_abort(node_ops_id ops_uuid);
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
future<> node_ops_abort_thread();
future<> stream_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id);
public:
storage_service(abort_source& as, distributed<replica::database>& db,
gms::gossiper& gossiper,
@@ -759,6 +762,7 @@ private:
std::optional<shared_future<>> _decomission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
std::unordered_map<locator::global_tablet_id, std::optional<shared_future<>>> _tablet_streaming;
// During decommission, the node waits for the coordinator to tell it to shut down.
std::optional<promise<>> _shutdown_request_promise;
struct {

View File

@@ -22,6 +22,7 @@ enum class stream_reason : uint8_t {
rebuild,
repair,
replace,
tablet_migration,
};
}
@@ -46,6 +47,8 @@ struct fmt::formatter<streaming::stream_reason> : fmt::formatter<std::string_vie
return formatter<std::string_view>::format("repair", ctx);
case replace:
return formatter<std::string_view>::format("replace", ctx);
case tablet_migration:
return formatter<std::string_view>::format("tablet migration", ctx);
}
std::abort();
}