The following is observed in pytest:
1) node1, stream master, tried to pull data from node3
2) node3, stream follower, found node1 restarted
3) node3 killed the rpc stream
4) node1 did not get the stream session failure message from node3. This
failure message was supposed to kill the stream plan on node1. That's the
reason node1 failed the stream session much later at "2024-08-19 21:07:45,539".
Note, node3 failed the stream on its side, so it should have sent the stream
session failure message.
```
$ cat node1.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc
INFO 2024-08-19 20:24:01,162 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers={127.0.34.3}, master
ERROR 2024-08-19 20:24:01,190 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to handle STREAM_MUTATION_FRAGMENTS (receive and distribute phase) for ks=ks, cf=cf, peer=127.0.34.3: seastar::nested_exception: seastar::rpc::stream_closed (rpc stream was closed by peer) (while cleaning up after seastar::rpc::stream_closed (rpc stream was closed by peer))
WARN 2024-08-19 21:07:45,539 [shard 0:main] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.3}, tx=0 KiB, 0.00 KiB/s, rx=484 KiB, 0.18 KiB/s
$ cat node3.log |grep f890bea0-5e68-11ef-99ae-e5bca04385fc
INFO 2024-08-19 20:24:01,163 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Executing streaming plan for Tablet migration-ks-index-0 with peers=127.0.34.1, slave
INFO 2024-08-19 20:24:01,164 [shard 1:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Start sending ks=ks, cf=cf, estimated_partitions=2560, with new rpc streaming
WARN 2024-08-19 20:24:01,187 [shard 0: gms] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming plan for Tablet migration-ks-index-0 failed, peers={127.0.34.1}, tx=633 KiB, 26506.81 KiB/s, rx=0 KiB, 0.00 KiB/s
WARN 2024-08-19 20:24:01,188 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] stream_transfer_task: Fail to send to 127.0.34.1:0: seastar::rpc::stream_closed (rpc stream was closed by peer)
WARN 2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Failed to send: seastar::rpc::stream_closed (rpc stream was closed by peer)
WARN 2024-08-19 20:24:01,189 [shard 0:strm] stream_session - [Stream #f890bea0-5e68-11ef-99ae-e5bca04385fc] Streaming error occurred, peer=127.0.34.1
```
To be safe in case the stream fail message is not received, node1 could fail
the stream plan as soon as the rpc stream is aborted in the
stream_mutation_fragments handler.
Fixes #20227
Closes scylladb/scylladb#21960
83 lines
2.9 KiB
C++
83 lines
2.9 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#include "streaming/stream_plan.hh"
|
|
#include "streaming/stream_result_future.hh"
|
|
#include "streaming/stream_state.hh"
|
|
#include <seastar/coroutine/all.hh>
|
|
|
|
namespace streaming {
|
|
|
|
extern logging::logger sslog;
|
|
|
|
stream_plan& stream_plan::request_ranges(locator::host_id from, sstring keyspace, dht::token_range_vector ranges) {
|
|
return request_ranges(from, keyspace, std::move(ranges), {});
|
|
}
|
|
|
|
stream_plan& stream_plan::request_ranges(locator::host_id from, sstring keyspace, dht::token_range_vector ranges, std::vector<sstring> column_families) {
|
|
_range_added = true;
|
|
auto session = _coordinator->get_or_create_session(_mgr, from);
|
|
session->add_stream_request(keyspace, std::move(ranges), std::move(column_families));
|
|
session->set_reason(_reason);
|
|
session->set_topo_guard(_topo_guard);
|
|
return *this;
|
|
}
|
|
|
|
stream_plan& stream_plan::transfer_ranges(locator::host_id to, sstring keyspace, dht::token_range_vector ranges) {
|
|
return transfer_ranges(to, std::move(keyspace), std::move(ranges), {});
|
|
}
|
|
|
|
stream_plan& stream_plan::transfer_ranges(locator::host_id to, sstring keyspace, dht::token_range_vector ranges, std::vector<sstring> column_families) {
|
|
_range_added = true;
|
|
auto session = _coordinator->get_or_create_session(_mgr, to);
|
|
session->add_transfer_ranges(std::move(keyspace), std::move(ranges), std::move(column_families));
|
|
session->set_reason(_reason);
|
|
session->set_topo_guard(_topo_guard);
|
|
return *this;
|
|
}
|
|
|
|
future<stream_state> stream_plan::execute() {
|
|
sslog.debug("[Stream #{}] Executing stream_plan description={} range_added={}", _plan_id, _description, _range_added);
|
|
if (!_range_added) {
|
|
stream_state state(_plan_id, _description, std::vector<session_info>());
|
|
return make_ready_future<stream_state>(std::move(state));
|
|
}
|
|
if (_aborted) {
|
|
throw std::runtime_error(format("steam_plan {} is aborted", _plan_id));
|
|
}
|
|
return stream_result_future::init_sending_side(_mgr, _plan_id, _description, _handlers, _coordinator);
|
|
}
|
|
|
|
stream_plan& stream_plan::listeners(std::vector<stream_event_handler*> handlers) {
|
|
std::copy(handlers.begin(), handlers.end(), std::back_inserter(_handlers));
|
|
return *this;
|
|
}
|
|
|
|
void stream_plan::do_abort() {
|
|
_aborted = true;
|
|
_coordinator->abort_all_stream_sessions();
|
|
}
|
|
|
|
void stream_plan::abort() noexcept {
|
|
try {
|
|
// FIXME: do_abort() can throw, because its underlying implementation
|
|
// allocates a vector and calls vector::push_back(). Let's make it noexcept too
|
|
do_abort();
|
|
} catch (...) {
|
|
try {
|
|
sslog.error("Failed to abort stream plan: {}", std::current_exception());
|
|
} catch (...) {
|
|
// Nothing else we can do.
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|