Files
scylladb/streaming/stream_blob.cc
Asias He 4018dc7f0d Introduce file stream for tablet
File based stream is a new feature that optimizes tablet movement
significantly. It streams the entire SSTable files without deserializing
SSTable files into mutation fragments and re-serializing them back into
SSTables on receiving nodes. As a result, less data is streamed over the
network, and less CPU is consumed, especially for data models that
contain small cells.

The following patches are imported from the scylla enterprise:

*) Merge 'Introduce file stream for tablet' from Asias He

    This patch uses Seastar RPC stream interface to stream sstable files on
    network for tablet migration.

    It streams sstables instead of mutation fragments. The file based
    stream has multiple advantages over the mutation streaming.

    - No serialization or deserialization for mutation fragments
    - No need to read and process each mutation fragments
    - On wire data is more compact and smaller

    In the test below, a significant speed up is observed.

    Two nodes, 1 shard per node, 1 initial_tablets:

    - Start node 1
    - Insert 10M rows of data with c-s
    - Bootstrap node 2

    Node 1 will migration data to node2 with the file stream.

    Test results:

    1) File stream: bytes on wire = 1132006250 bytes, bw = 836MB/s

    [shard 0:stre] stream_blob - stream_sstables[eadaa8e0-a4f2-4cc6-bf10-39ad1ce106b0]
	Finished sending sstable_nr=2 files_nr=18 files={} range=(-1,9223372036854775807] bytes_sent=1132006250 stream_bw=836MB/s
    [shard 0:stre] storage_service - Streaming for tablet migration of a4f68900-568a-11ee-b7b9-c2b13945eed2:1 took 1.08004s seconds

    2) Mutation stream: bytes on wire = 3030004736 bytes, bw = 125410.87 KiB/s = 128MB/s

    [shard 0:stre] stream_session - [Stream #406dc8b0-56b5-11ee-bc2d-000bf4871058]
	Streaming plan for Tablet migration-ks1-index-0 succeeded, peers={127.0.0.1}, tx=0 KiB, 0.00 KiB/s, rx=2958989 KiB, 125410.87 KiB/s
    [shard 0:stre] storage_service - Streaming for tablet migration of a4f68900-568a-11ee-b7b9-c2b13945eed2:1 took 23.5992s seconds

    Test Summary:

    File stream v.s. Mutation stream improvements

    - Stream bandwidth = 836 / 128  (MB/s)  = 6.53X

    - Stream time = 23.60 / 1.08  (Seconds) = 21.85X

    - Stream bytes on wire = 3030004736 / 1132006250 (Bytes)= 2.67X

    Closes scylladb/scylla-enterprise#3438

    * github.com:scylladb/scylla-enterprise:
      tests: Add file_stream_test
      streaming: Implement file stream for tablet

*) streaming: Use new take_storage_snapshot interface

    The new take_storage_snapshot returns a file object instead of a file
    name. This allows the file stream sender to read from the file even if
    the file is deleted by compaction.

    Closes scylladb/scylla-enterprise#3728

*) streaming: Protect unsupported file types for file stream

    Currently, we assume the file streamed over the stream_blob rpc verb is
    a sstable file. This patch rejects the unsupported file types on the
    receiver side. This allows us to stream more file types later using the
    current file stream infrastructure without worrying about old nodes
    processing the new file types in the wrong way.

    - The file_ops::noop is renamed to file_ops::stream_sstables to be
      explicit about the file types

    - A missing test_file_stream_error_injection is added to the idl

    Fixes: #3846
    Tests: test_unsupported_file_ops

    Closes scylladb/scylla-enterprise#3847

*) idl: Add service::session_id id to idl

    It will be used in the next patch.

    Refs #3907

*) streaming: Protect file stream with topology_guard

    Similar to "storage_service, tablets: Use session to guard tablet
    streaming", this patch protects file stream with topology_guard.

    Fixes #3907

*) streaming: Take service topology_guard under the try block

    Taking the service::topology_guard could throw. Currently, it throws
    outside the try block, so the rpc sink will not be closed, causing the
    following assertion:

    ```
    scylla: seastar/include/seastar/rpc/rpc_impl.hh:815: virtual
    seastar::rpc::sink_impl<netw::serializer,
    streaming::stream_blob_cmd_data>::~sink_impl() [Serializer =
    netw::serializer, Out = <streaming::stream_blob_cmd_data>]: Assertion
    `this->_con->get()->sink_closed()' failed.
    ```

    To fix, move more code including the topology_guard taking code to the
    try block.

    Fixes https://github.com/scylladb/scylla-enterprise/issues/4106

    Closes scylladb/scylla-enterprise#4110

*) Merge 'Preserve original SSTable state with file based tablet migration' from Raphael "Raph" Carvalho

    We're not preserving the SSTable state across file based migration, so
    staging SSTables for example are being placed into main directory, and
    consequently, we're mixing staging and non-staging data, losing the
    ability to continue from where the old replica left off.
    It's expected that the view update backlog is transferred from old
    into new replica, as migration doesn't wait for leaving replica to
    complete view update work (which can take long). Elasticity is preferred.

    So this fix guarantees that the state of the SSTable will be preserved
    by propagating it in form of subdirectory (each subdirectory is
    statically mapped with a particular state).

    The staging sstables aren't being registered into view update generator
    yet, as that's supposed to be fixed in OSS (more details can be found
    at https://github.com/scylladb/scylladb/issues/19149).

    Fixes #4265.

    Closes scylladb/scylla-enterprise#4267

    * github.com:scylladb/scylla-enterprise:
      tablet: Preserve original SSTable state with file based tablet migration
      sstables: Add get method for sstable state

*) sstable: (Re-)add shareabled_components getter

*) Merge 'File streaming sstables: Use sstable source/sink to transfer snapshots' from Calle Wilund

    Fixes #4246

    Alternative approach/better separation of concern, transport vs. sstable layer. Builds on #4472, but fancier.

    Ensures we transfer and pre-process scylla metadata for streamed
    file blobs first, then properly apply receiving nodes local config
    by using a source and sink layer exported from sstables, which
    handles things like ordering, metadata filtering (on source) as well
    as handling metadata and proper IO paths when writing data on
    receiver node (sink).

    This implementation maintains the statelessness of the current
    design, and the delegated sink side will re-read and re-write the
    metadata for each component processed. This is a little wasteful,
    but the meta is small, and it is less error prone than trying to do
    caching cross-shards etc. The transport is isolated from the
    knowledge.

    This is an alternative/complement to #4436 and #4472, fixing the
    underlying issue. Note that while the layers/API:s here allows easy
    fixing of other fundamental problems in the feature (such as
    destination location etc), these are not included in the PR, to keep
    it as close to the current behaviour as possible.

    Closes scylladb/scylla-enterprise#4646

    * github.com:scylladb/scylla-enterprise:
      raft_tests: Copy/add a topology test with encryption
      file streaming: Use sstable source/sink to transfer snapshots
      sstables: Add source and sink objects + producers for transfering a snapshot
      sstable::types: Add remove accessor for extension info in metadata

*) The change for error injection in merge commit 966ea5955dd8760:

    File streaming now has "stream_mutation_fragments" error injection points
    so test_table_dropped_during_streaming works with file streaming.

*) doc: document file-based streaming

    This commit adds a description of the file-based streaming feature to the documentation.

    It will be displayed in the docs using the scylladb_include_flag directive after
    https://github.com/scylladb/scylladb/pull/20182 is merged, backported to branch-6.0,
    and, in turn, branch-2024.2.

    Refs https://github.com/scylladb/scylla-enterprise/issues/4585
    Refs https://github.com/scylladb/scylla-enterprise/issues/4254

    Closes scylladb/scylla-enterprise#4587

*) doc: move File-based streaming to the Tablets source file-based-streaming

    This commit moves the description of file-based streaming from a common include file
    to the regular doc source file where tablets are described.

    Closes scylladb/scylla-enterprise#4652

*) streaming: sstable_stream_sink_impl: abort: prevent null pointer dereference

Closes scylladb/scylladb#22467
2025-01-26 12:51:59 +02:00

706 lines
30 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "gms/inet_address.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_version.hh"
#include "sstables/generation_type.hh"
#include "sstables/types.hh"
#include "idl/streaming.dist.hh"
#include "service/topology_guard.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/fstream.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/all.hh>
#include <vector>
#include <cfloat>
#include <filesystem>
#include <fmt/ranges.h>
namespace streaming {
static logging::logger blogger("stream_blob");
constexpr size_t file_stream_buffer_size = 128 * 1024;
constexpr size_t file_stream_write_behind = 10;
constexpr size_t file_stream_read_ahead = 4;
static sstables::sstable_state sstable_state(const streaming::stream_blob_meta& meta) {
return meta.sstable_state.value_or(sstables::sstable_state::normal);
}
static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::database& db, table_id table, sstables::sstable_state state, sstring filename, seastar::shard_id shard) {
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", ops_id, filename, shard);
auto s = db.find_column_family(table).schema();
auto data_path = std::filesystem::path(filename);
auto desc = sstables::parse_path(data_path, s->ks_name(), s->cf_name());
co_await db.container().invoke_on(shard, [id = s->id(), desc, state] (replica::database& db) -> future<> {
replica::table& t = db.find_column_family(id);
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
co_await sst->load(erm->get_sharder(*t.schema()));
co_await t.add_sstable_and_update_cache(sst);
});
blogger.info("stream_sstables[{}] Loaded sstable {} on shard {} successfully", ops_id, filename, shard);
}
static utils::pretty_printed_throughput get_bw(size_t total_size, std::chrono::steady_clock::time_point start_time) {
auto duration = std::chrono::steady_clock::now() - start_time;
return utils::pretty_printed_throughput(total_size, duration);
}
// For tablet stream checks
class tablet_stream_status {
public:
bool finished = false;
void check_valid_stream();
};
void tablet_stream_status::check_valid_stream() {
if (finished) {
throw std::runtime_error("The stream has finished already");
}
}
static thread_local std::unordered_map<file_stream_id, lw_shared_ptr<tablet_stream_status>> tablet_streams;
future<> mark_tablet_stream_start(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto status = make_lw_shared<tablet_stream_status>();
tablet_streams.emplace(ops_id, status);
});
}
future<> mark_tablet_stream_done(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto it = tablet_streams.find(ops_id);
if (it == tablet_streams.end()) {
return;
}
auto status = it->second;
if (status) {
status->finished = true;
}
tablet_streams.erase(ops_id);
});
}
lw_shared_ptr<tablet_stream_status> get_tablet_stream(file_stream_id ops_id) {
auto status = tablet_streams[ops_id];
if (!status) {
auto msg = format("stream_sstables[{}] Could not find ops_id={}", ops_id, ops_id);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
return status;
}
static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_inject, const sstring& error) {
if (may_inject) {
if (rand() % 500 == 0) {
auto msg = format("fstream[{}] Injected file stream error={} file={}",
meta.ops_id, error, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
stream_blob_create_output_fn create_output,
bool inject_errors) {
bool fstream_closed = false;
bool sink_closed = false;
bool status_sent = false;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
std::optional<output_stream<char>> fstream;
std::exception_ptr error;
stream_blob_finish_fn finish;
// Will log a message when streaming is done. Used to synchronize tests.
lw_shared_ptr<std::any> log_done;
if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) {
log_done = make_lw_shared<std::any>(seastar::make_shared(seastar::defer([] {
blogger.info("stream_mutation_fragments: done (tablets)");
})));
}
try {
auto status = get_tablet_stream(meta.ops_id);
auto guard = service::topology_guard(meta.topo_guard);
// Reject any file_ops that is not support by this node
if (meta.fops != streaming::file_ops::stream_sstables &&
meta.fops != streaming::file_ops::load_sstables) {
auto msg = format("fstream[{}] Unsupported file_ops={} peer={} file={}",
meta.ops_id, int(meta.fops), from, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
blogger.debug("fstream[{}] Follower started peer={} file={}",
meta.ops_id, from, meta.filename);
auto [f, out] = co_await create_output(db, meta);
finish = std::move(f);
fstream = std::move(out);
bool got_end_of_stream = false;
for (;;) {
try {
auto opt = co_await source();
if (!opt) {
break;
}
co_await utils::get_local_injector().inject("stream_mutation_fragments", [&guard] (auto& handler) -> future<> {
blogger.info("stream_mutation_fragments: waiting (tablets)");
while (!handler.poll_for_message()) {
guard.check();
co_await sleep(std::chrono::milliseconds(5));
}
blogger.info("stream_mutation_fragments: released (tablets)");
});
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
auto cmd = cmd_data.cmd;
if (cmd == streaming::stream_blob_cmd::error) {
blogger.warn("fstream[{}] Follower got stream_blob_cmd::error from peer={} file={}",
meta.ops_id, from, meta.filename);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer={} file={}", from, meta.filename));
} else if (cmd == streaming::stream_blob_cmd::end_of_stream) {
blogger.debug("fstream[{}] Follower got stream_blob_cmd::end_of_stream from peer={} file={}",
meta.ops_id, from, meta.filename);
got_end_of_stream = true;
} else if (cmd == streaming::stream_blob_cmd::data) {
std::optional<streaming::stream_blob_data> data = std::move(cmd_data.data);
if (data) {
total_size += data->size();
blogger.trace("fstream[{}] Follower received data from peer={} data={}", meta.ops_id, from, data->size());
status->check_valid_stream();
if (!data->empty()) {
co_await fstream->write((char*)data->data(), data->size());
}
}
}
} catch (seastar::rpc::stream_closed) {
// After we get streaming::stream_blob_cmd::end_of_stream which
// is the last message from peer, it does not matter if the
// source() is closed or not.
if (got_end_of_stream) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
may_inject_error(meta, inject_errors, "rx_data");
}
// If we reach here, streaming::stream_blob_cmd::end_of_stream should be received. Otherwise there
// must be an error, e.g., the sender closed the stream without sending streaming::stream_blob_cmd::error.
if (!got_end_of_stream) {
throw std::runtime_error(format("fstream[{}] Follower failed to get end_of_stream", meta.ops_id));
}
status->check_valid_stream();
co_await fstream->flush();
co_await fstream->close();
fstream_closed = true;
may_inject_error(meta, inject_errors, "flush_and_close");
co_await finish(store_result::ok);
// Send status code and close the sink
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::ok));
status_sent = true;
co_await sink.close();
sink_closed = true;
} catch (...) {
error = std::current_exception();
}
if (error) {
blogger.warn("fstream[{}] Follower failed peer={} file={} received_size={} bw={} error={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time), error);
if (!fstream_closed) {
try {
if (fstream) {
// Make sure fstream is always closed
co_await fstream->close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the file stream: {}",
meta.ops_id, std::current_exception());
// We could do nothing but continue to cleanup more
}
}
if (!status_sent) {
try {
may_inject_error(meta, inject_errors, "no_error_code_back");
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
} catch (...) {
// Try our best to send the status code.
// If we could not send it, we could do nothing but close the sink.
blogger.warn("fstream[{}] Follower failed to send error code: {}",
meta.ops_id, std::current_exception());
}
}
try {
if (!sink_closed) {
// Make sure sink is always closed
co_await sink.close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the stream sink: {}",
meta.ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to drain rpc stream source: {}",
meta.ops_id, std::current_exception());
}
try {
// Remove the file in case of error
if (finish) {
co_await finish(store_result::failure);
blogger.info("fstream[{}] Follower removed partial file={}", meta.ops_id, meta.filename);
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to remove partial file={}: {}",
meta.ops_id, meta.filename, std::current_exception());
}
// Do not call rethrow_exception(error) because the caller could do nothing but log
// the error. We have already logged the error here.
} else {
// Get some statistics
blogger.debug("fstream[{}] Follower finished peer={} file={} received_size={} bw={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time));
}
co_return;
}
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
gms::inet_address from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
co_await stream_blob_handler(db, ms, std::move(from), meta, std::move(sink), std::move(source), [](replica::database& db, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto foptions = file_open_options();
foptions.sloppy_size = true;
foptions.extent_allocation_size_hint = 32 << 20;
auto stream_options = file_output_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.write_behind = file_stream_write_behind;
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, meta.fops == file_ops::load_sstables);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{
[sstable_sink = std::move(sstable_sink), &meta, &db](store_result res) -> future<> {
if (res != store_result::ok) {
co_await sstable_sink->abort();
co_return;
}
auto sst = co_await sstable_sink->close_and_seal();
if (sst) {
auto filename = sst->toc_filename();
sst = {};
co_await load_sstable_for_tablet(meta.ops_id, db, meta.table, sstable_state(meta), std::move(filename), meta.dst_shard_id);
}
},
std::move(out)
};
});
}
// Get a new sstable name using the new generation
// For example:
// oldname: me-3ga1_0iiv_2e5uo2flv7lgdl2j0d-big-Index.db
// newgen: 3ga1_0iiv_3vj5c2flv7lgdl2j0d
// newname: me-3ga1_0iiv_3vj5c2flv7lgdl2j0d-big-Index.db
static std::string get_sstable_name_with_generation(const file_stream_id& ops_id, const std::string& oldname, const std::string& newgen) {
std::string newname = oldname;
// The generation name starts after the first '-'.
auto it = newname.find("-");
if (it != std::string::npos) {
newname.replace(++it, newgen.size(), newgen);
return newname;
} else {
auto msg = fmt::format("fstream[{}] Failed to get sstable name for {} with generation {}", ops_id, oldname, newgen);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
template<> struct fmt::formatter<streaming::stream_blob_info> : fmt::ostream_formatter {};
namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
}
if (sources.empty()) {
co_return ops_total_size;
}
blogger.debug("fstream[{}] Master started sending n={}, sources={}, targets={}",
ops_id, sources.size(), sources, targets);
struct sink_and_source {
gms::inet_address node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
bool status_sent = false;
};
auto ops_start_time = std::chrono::steady_clock::now();
streaming::stream_blob_meta meta;
meta.ops_id = ops_id;
meta.table = table;
meta.topo_guard = topo_guard;
std::exception_ptr error;
auto stream_options = file_input_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.read_ahead = file_stream_read_ahead;
for (auto& info : sources) {
auto& filename = info.filename;
std::optional<input_stream<char>> fstream;
bool fstream_closed = false;
try {
meta.fops = info.fops;
meta.filename = info.filename;
meta.sstable_state = info.sstable_state;
fstream = co_await info.source(stream_options);
} catch (...) {
blogger.warn("fstream[{}] Master failed sources={} targets={} error={}",
ops_id, sources, targets, std::current_exception());
throw;
}
std::vector<sink_and_source> ss;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
bool got_error_from_peer = false;
try {
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
auto ip = co_await host2ip(node);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
auto send_data_to_peer = [&] () mutable -> future<> {
std::exception_ptr error;
try {
while (!got_error_from_peer) {
may_inject_error(meta, inject_errors, "read_data");
auto buf = co_await fstream->read_up_to(file_stream_buffer_size);
if (buf.size() == 0) {
break;
}
streaming::stream_blob_data data(std::move(buf));
auto data_size = data.size();
stream_blob_cmd_data cmd_data(streaming::stream_blob_cmd::data, std::move(data));
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
total_size += data_size;
ops_total_size += data_size;
blogger.trace("fstream[{}] Master sending file={} to node={} chunk_size={}",
ops_id, filename, s.node, data_size);
may_inject_error(meta, inject_errors, "tx_data");
co_await s.sink(cmd_data);
});
}
} catch (...) {
error = std::current_exception();
}
if (error) {
// We have to close the stream otherwise if the stream is
// ok, the get_status_code_from_peer fiber below might
// wait for the source() forever.
for (auto& s : ss) {
try {
co_await s.sink.close();
s.sink_closed = true;
} catch (...) {
}
}
std::rethrow_exception(error);
}
if (fstream) {
co_await fstream->close();
fstream_closed = true;
}
for (auto& s : ss) {
blogger.debug("fstream[{}] Master done sending file={} to node={}", ops_id, filename, s.node);
if (!got_error_from_peer) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::end_of_stream));
s.status_sent = true;
}
co_await s.sink.close();
s.sink_closed = true;
}
};
// This fiber gets status code from peer node
auto get_status_code_from_peer = [&] () mutable -> future<> {
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
bool got_cmd_ok = false;
while (!got_error_from_peer) {
try {
auto opt = co_await s.source();
if (opt) {
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
if (cmd_data.cmd == streaming::stream_blob_cmd::error) {
got_error_from_peer = true;
blogger.warn("fstream[{}] Master got stream_blob_cmd::error file={} peer={}",
ops_id, filename, s.node);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer {}", s.node));
} if (cmd_data.cmd == streaming::stream_blob_cmd::ok) {
got_cmd_ok = true;
}
blogger.debug("fstream[{}] Master got stream_blob_cmd={} file={} peer={}",
ops_id, int(cmd_data.cmd), filename, s.node);
} else {
break;
}
} catch (seastar::rpc::stream_closed) {
// After we get streaming::stream_blob_cmd::ok
// which is the last message from peer, it does not
// matter if the source() is closed or not.
if (got_cmd_ok) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
}
});
};
co_await coroutine::all(send_data_to_peer, get_status_code_from_peer);
} catch (...) {
error = std::current_exception();
}
if (error) {
blogger.warn("fstream[{}] Master failed sending file={} to targets={} send_size={} bw={} error={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time), error);
// Error handling for fstream and sink
if (!fstream_closed) {
try {
if (fstream) {
co_await fstream->close();
}
} catch (...) {
// We could do nothing but continue to cleanup more
blogger.warn("fstream[{}] Master failed to close file stream: {}",
ops_id, std::current_exception());
}
}
for (auto& s : ss) {
try {
if (!s.status_sent && !s.sink_closed) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
s.status_sent = true;
}
} catch (...) {
// We could do nothing but continue to close
blogger.warn("fstream[{}] Master failed to send error code: {}",
ops_id, std::current_exception());
}
try {
if (!s.sink_closed) {
co_await s.sink.close();
s.sink_closed = true;
}
} catch (...) {
// We could do nothing but continue
blogger.warn("fstream[{}] Master failed to close rpc stream sink: {}",
ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await s.source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Master failed to drain rpc stream source: {}",
ops_id, std::current_exception());
}
}
// Stop handling remaining files
break;
} else {
blogger.debug("fstream[{}] Master done sending file={} to targets={} send_size={} bw={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time));
}
}
if (error) {
blogger.warn("fstream[{}] Master failed sending files_nr={} files={} targets={} send_size={} bw={} error={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time), error);
std::rethrow_exception(error);
} else {
blogger.debug("fstream[{}] Master finished sending files_nr={} files={} targets={} send_size={} bw={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time));
}
co_return ops_total_size;
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto sstables = co_await table.take_storage_snapshot(req.range);
co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> {
if (sstables.size() == 3) {
// make sure the sstables are ordered so that the sstable containing shadowed data is streamed last
const std::string_view shadowed_file = handler.template get<std::string_view>("shadowed_file").value();
for (int index: {0, 1}) {
if (sstables[index].sst->component_basename(component_type::Data) == shadowed_file) {
std::swap(sstables[index], sstables[2]);
}
}
}
return make_ready_future<>();
});
auto files = std::list<stream_blob_info>();
sstables::sstable_generation_generator sst_gen(0);
for (auto& sst_snapshot : sstables) {
auto& sst = sst_snapshot.sst;
// stable state (across files) is a must for load to work on destination
auto sst_state = sst->state();
auto sources = create_stream_sources(sst_snapshot);
auto newgen = fmt::to_string(sst_gen(sstables::uuid_identifiers::yes));
for (auto&& s : sources) {
auto oldname = s->component_basename();
auto newname = get_sstable_name_with_generation(req.ops_id, oldname, newgen);
blogger.debug("fstream[{}] Get name oldname={}, newname={}", req.ops_id, oldname, newname);
auto& info = files.emplace_back();
info.fops = file_ops::stream_sstables;
info.sstable_state = sst_state;
info.filename = std::move(newname);
info.source = [s = std::move(s)](const file_input_stream_options& options) {
return s->input(options);
};
}
// ensure we mark the end of each component sequence.
if (!files.empty()) {
files.back().fops = file_ops::load_sstables;
}
}
if (files.empty()) {
co_return resp;
}
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstables.size(), files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",
req.ops_id, sstables.size(), files.size(), files, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time));
co_return resp;
}
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id,
replica::table& table,
const dht::token_range& range,
const locator::host_id& src_host,
const locator::host_id& dst_host,
seastar::shard_id dst_shard_id,
netw::messaging_service& ms,
abort_source& as,
service::frozen_topology_guard topo_guard) {
stream_files_response resp;
std::exception_ptr error;
try {
co_await mark_tablet_stream_start(ops_id);
} catch (...) {
error = std::current_exception();
}
if (!error) {
try {
streaming::stream_files_request req;
req.ops_id = ops_id;
req.keyspace_name = table.schema()->ks_name(),
req.table_name = table.schema()->cf_name();
req.table = table.schema()->id();
req.range = range;
req.targets = std::vector<node_and_shard>{node_and_shard{dst_host, dst_shard_id}};
req.topo_guard = topo_guard;
resp = co_await ser::streaming_rpc_verbs::send_tablet_stream_files(&ms, src_host, as, req);
} catch (...) {
error = std::current_exception();
}
}
co_await mark_tablet_stream_done(ops_id);
if (error) {
std::rethrow_exception(error);
}
co_return resp;
}
}