Files
scylladb/streaming/stream_blob.cc
Taras Veretilnyk 7214f5a0b6 sstables: propagate ignore_component_digest_mismatch config to all load sites
Add ignore_component_digest_mismatch option to db::config (default false).
When set, sstable loading logs a warning instead of throwing on component
digest mismatches, allowing a node to start up despite corrupted non-vital
components or bugs in digest calculation.

Propagate the config to all production sstable load paths:
- distributed_loader (node startup, upload dir processing)
- storage_service (tablet storage cloning)
- sstables_loader (load-and-stream, download tasks, attach)
- stream_blob (tablet streaming)
2026-03-10 19:24:05 +01:00

745 lines
32 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/view/view_building_worker.hh"
#include "db/config.hh"
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_version.hh"
#include "sstables/generation_type.hh"
#include "sstables/types.hh"
#include "idl/streaming.dist.hh"
#include "service/topology_guard.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/fstream.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/all.hh>
#include <vector>
#include <cfloat>
#include <filesystem>
#include <fmt/ranges.h>
#include "replica/exceptions.hh"
namespace streaming {
static logging::logger blogger("stream_blob");
constexpr size_t file_stream_buffer_size = 128 * 1024;
constexpr size_t file_stream_write_behind = 10;
constexpr size_t file_stream_read_ahead = 4;
static sstables::sstable_state sstable_state(const streaming::stream_blob_meta& meta) {
return meta.sstable_state.value_or(sstables::sstable_state::normal);
}
static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::database& db, db::view::view_building_worker& vbw, table_id id, sstables::sstable_state state, sstables::entry_descriptor desc, seastar::shard_id shard) {
auto& sharded_vbw = vbw.container();
co_await db.container().invoke_on(shard, [&sharded_vbw, id, desc, state, ops_id] (replica::database& db) -> future<> {
replica::table& t = db.find_column_family(id);
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
sstables::sstable_open_config cfg { .unsealed_sstable = true,
.ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch() };
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto cfg = sstm.configure_writer(sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
if (state == sstables::sstable_state::staging) {
// If the sstable is in staging state, register it to view building worker
// to generate view updates from it.
// But because the tablet is still in migration process, register the sstable
// to the view building worker, which will create a view building task for it,
// so then, the view building coordinator can decide to process it once the migration
// is finished.
// (Instead of registering the sstable to view update generator which may process it immediately.)
co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id());
}
});
}
static utils::pretty_printed_throughput get_bw(size_t total_size, std::chrono::steady_clock::time_point start_time) {
auto duration = std::chrono::steady_clock::now() - start_time;
return utils::pretty_printed_throughput(total_size, duration);
}
// For tablet stream checks
class tablet_stream_status {
public:
bool finished = false;
void check_valid_stream();
};
void tablet_stream_status::check_valid_stream() {
if (finished) {
throw std::runtime_error("The stream has finished already");
}
}
static thread_local std::unordered_map<file_stream_id, lw_shared_ptr<tablet_stream_status>> tablet_streams;
future<> mark_tablet_stream_start(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto status = make_lw_shared<tablet_stream_status>();
tablet_streams.emplace(ops_id, status);
});
}
future<> mark_tablet_stream_done(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto it = tablet_streams.find(ops_id);
if (it == tablet_streams.end()) {
return;
}
auto status = it->second;
if (status) {
status->finished = true;
}
tablet_streams.erase(ops_id);
});
}
lw_shared_ptr<tablet_stream_status> get_tablet_stream(file_stream_id ops_id) {
auto status = tablet_streams[ops_id];
if (!status) {
auto msg = format("stream_sstables[{}] Could not find ops_id={}", ops_id, ops_id);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
return status;
}
static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_inject, const sstring& error) {
if (may_inject) {
if (rand() % 500 == 0) {
auto msg = format("fstream[{}] Injected file stream error={} file={}",
meta.ops_id, error, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
stream_blob_create_output_fn create_output,
bool inject_errors) {
bool fstream_closed = false;
bool sink_closed = false;
bool status_sent = false;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
std::optional<output_stream<char>> fstream;
std::exception_ptr error;
stream_blob_finish_fn finish;
// Will log a message when streaming is done. Used to synchronize tests.
lw_shared_ptr<std::any> log_done;
try {
if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) {
log_done = make_lw_shared<std::any>(seastar::make_shared(seastar::defer([] {
blogger.info("stream_mutation_fragments: done (tablets)");
})));
}
auto status = get_tablet_stream(meta.ops_id);
auto guard = service::topology_guard(meta.topo_guard);
// Reject any file_ops that is not support by this node
if (meta.fops != streaming::file_ops::stream_sstables &&
meta.fops != streaming::file_ops::load_sstables) {
auto msg = format("fstream[{}] Unsupported file_ops={} peer={} file={}",
meta.ops_id, int(meta.fops), from, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
blogger.debug("fstream[{}] Follower started peer={} file={}",
meta.ops_id, from, meta.filename);
auto [f, out] = co_await create_output(db, meta);
finish = std::move(f);
fstream = std::move(out);
bool got_end_of_stream = false;
for (;;) {
try {
auto opt = co_await source();
if (!opt) {
break;
}
co_await utils::get_local_injector().inject("stream_mutation_fragments", [&guard] (auto& handler) -> future<> {
blogger.info("stream_mutation_fragments: waiting (tablets)");
while (!handler.poll_for_message()) {
guard.check();
co_await sleep(std::chrono::milliseconds(5));
}
blogger.info("stream_mutation_fragments: released (tablets)");
});
if (db.is_in_critical_disk_utilization_mode()) {
throw replica::critical_disk_utilization_exception("rejected streamed file");
}
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
auto cmd = cmd_data.cmd;
if (cmd == streaming::stream_blob_cmd::error) {
blogger.warn("fstream[{}] Follower got stream_blob_cmd::error from peer={} file={}",
meta.ops_id, from, meta.filename);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer={} file={}", from, meta.filename));
} else if (cmd == streaming::stream_blob_cmd::end_of_stream) {
blogger.debug("fstream[{}] Follower got stream_blob_cmd::end_of_stream from peer={} file={}",
meta.ops_id, from, meta.filename);
got_end_of_stream = true;
} else if (cmd == streaming::stream_blob_cmd::data) {
std::optional<streaming::stream_blob_data> data = std::move(cmd_data.data);
if (data) {
total_size += data->size();
blogger.trace("fstream[{}] Follower received data from peer={} data={}", meta.ops_id, from, data->size());
status->check_valid_stream();
if (!data->empty()) {
co_await fstream->write((char*)data->data(), data->size());
}
}
}
} catch (seastar::rpc::stream_closed&) {
// After we get streaming::stream_blob_cmd::end_of_stream which
// is the last message from peer, it does not matter if the
// source() is closed or not.
if (got_end_of_stream) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
may_inject_error(meta, inject_errors, "rx_data");
}
// If we reach here, streaming::stream_blob_cmd::end_of_stream should be received. Otherwise there
// must be an error, e.g., the sender closed the stream without sending streaming::stream_blob_cmd::error.
if (!got_end_of_stream) {
throw std::runtime_error(format("fstream[{}] Follower failed to get end_of_stream", meta.ops_id));
}
status->check_valid_stream();
co_await fstream->flush();
co_await fstream->close();
fstream_closed = true;
may_inject_error(meta, inject_errors, "flush_and_close");
co_await finish(store_result::ok);
// Send status code and close the sink
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::ok));
status_sent = true;
co_await sink.close();
sink_closed = true;
} catch (...) {
error = std::current_exception();
}
if (error) {
if (!fstream_closed) {
try {
if (fstream) {
// Make sure fstream is always closed
co_await fstream->close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the file stream: {}",
meta.ops_id, std::current_exception());
// We could do nothing but continue to cleanup more
}
}
if (!status_sent) {
try {
may_inject_error(meta, inject_errors, "no_error_code_back");
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
} catch (...) {
// Try our best to send the status code.
// If we could not send it, we could do nothing but close the sink.
blogger.warn("fstream[{}] Follower failed to send error code: {}",
meta.ops_id, std::current_exception());
}
}
try {
if (!sink_closed) {
// Make sure sink is always closed
co_await sink.close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the stream sink: {}",
meta.ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to drain rpc stream source: {}",
meta.ops_id, std::current_exception());
}
try {
// Remove the file in case of error
if (finish) {
co_await finish(store_result::failure);
blogger.info("fstream[{}] Follower removed partial file={}", meta.ops_id, meta.filename);
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to remove partial file={}: {}",
meta.ops_id, meta.filename, std::current_exception());
}
blogger.warn("fstream[{}] Follower failed peer={} file={} received_size={} bw={} error={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time), error);
// Do not call rethrow_exception(error) because the caller could do nothing but log
// the error. We have already logged the error here.
} else {
// Get some statistics
blogger.debug("fstream[{}] Follower finished peer={} file={} received_size={} bw={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time));
}
co_return;
}
future<> stream_blob_handler(replica::database& db, db::view::view_building_worker& vbw, netw::messaging_service& ms,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
co_await stream_blob_handler(db, ms, std::move(from), meta, std::move(sink), std::move(source), [&vbw] (replica::database& db, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto foptions = file_open_options();
foptions.sloppy_size = true;
foptions.extent_allocation_size_hint = 32 << 20;
auto stream_options = file_output_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.write_behind = file_stream_write_behind;
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
.leave_unsealed = true };
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{
[sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> {
if (res != store_result::ok) {
co_await sstable_sink->abort();
co_return;
}
auto sst = co_await sstable_sink->close();
if (sst) {
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id);
auto desc = sst->get_descriptor(sstables::component_type::TOC);
sst = {};
co_await load_sstable_for_tablet(meta.ops_id, db, vbw, meta.table, sstable_state(meta), std::move(desc), meta.dst_shard_id);
}
},
std::move(out)
};
});
}
// Get a new sstable name using the new generation
// For example:
// oldname: me-3ga1_0iiv_2e5uo2flv7lgdl2j0d-big-Index.db
// newgen: 3ga1_0iiv_3vj5c2flv7lgdl2j0d
// newname: me-3ga1_0iiv_3vj5c2flv7lgdl2j0d-big-Index.db
static std::string get_sstable_name_with_generation(const file_stream_id& ops_id, const std::string& oldname, const std::string& newgen) {
std::string newname = oldname;
// The generation name starts after the first '-'.
auto it = newname.find("-");
if (it != std::string::npos) {
newname.replace(++it, newgen.size(), newgen);
return newname;
} else {
auto msg = fmt::format("fstream[{}] Failed to get sstable name for {} with generation {}", ops_id, oldname, newgen);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
template<> struct fmt::formatter<streaming::stream_blob_info> : fmt::ostream_formatter {};
namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets,
table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
}
if (sources.empty()) {
co_return ops_total_size;
}
blogger.debug("fstream[{}] Master started sending n={}, sources={}, targets={}",
ops_id, sources.size(), sources, targets);
struct sink_and_source {
locator::host_id node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
bool status_sent = false;
};
auto ops_start_time = std::chrono::steady_clock::now();
streaming::stream_blob_meta meta;
meta.ops_id = ops_id;
meta.table = table;
meta.topo_guard = topo_guard;
std::exception_ptr error;
std::exception_ptr sender_error;
auto stream_options = file_input_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.read_ahead = file_stream_read_ahead;
for (auto&& source_info : sources) {
// Keep stream_blob_info alive only at duration of streaming. Allowing the file descriptor
// of the sstable component to be released right after it has been streamed.
auto info = std::exchange(source_info, {});
auto& filename = info.filename;
std::optional<input_stream<char>> fstream;
bool fstream_closed = false;
try {
meta.fops = info.fops;
meta.filename = info.filename;
meta.sstable_state = info.sstable_state;
fstream = co_await info.source(stream_options);
} catch (...) {
blogger.warn("fstream[{}] Master failed sources={} targets={} error={}",
ops_id, sources, targets, std::current_exception());
throw;
}
std::vector<sink_and_source> ss;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
bool got_error_from_peer = false;
try {
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
auto send_data_to_peer = [&] () mutable -> future<> {
try {
while (!got_error_from_peer) {
may_inject_error(meta, inject_errors, "read_data");
auto buf = co_await fstream->read_up_to(file_stream_buffer_size);
if (buf.size() == 0) {
break;
}
streaming::stream_blob_data data(std::move(buf));
auto data_size = data.size();
stream_blob_cmd_data cmd_data(streaming::stream_blob_cmd::data, std::move(data));
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
total_size += data_size;
ops_total_size += data_size;
blogger.trace("fstream[{}] Master sending file={} to node={} chunk_size={}",
ops_id, filename, s.node, data_size);
may_inject_error(meta, inject_errors, "tx_data");
co_await s.sink(cmd_data);
});
}
} catch (...) {
sender_error = std::current_exception();
}
if (sender_error) {
// We have to close the stream otherwise if the stream is
// ok, the get_status_code_from_peer fiber below might
// wait for the source() forever.
for (auto& s : ss) {
try {
co_await s.sink.close();
s.sink_closed = true;
} catch (...) {
}
}
std::rethrow_exception(sender_error);
}
if (fstream) {
co_await fstream->close();
fstream_closed = true;
}
for (auto& s : ss) {
blogger.debug("fstream[{}] Master done sending file={} to node={}", ops_id, filename, s.node);
if (!got_error_from_peer) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::end_of_stream));
s.status_sent = true;
}
co_await s.sink.close();
s.sink_closed = true;
}
};
// This fiber gets status code from peer node
auto get_status_code_from_peer = [&] () mutable -> future<> {
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
bool got_cmd_ok = false;
while (!got_error_from_peer) {
try {
auto opt = co_await s.source();
if (opt) {
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
if (cmd_data.cmd == streaming::stream_blob_cmd::error) {
got_error_from_peer = true;
blogger.warn("fstream[{}] Master got stream_blob_cmd::error file={} peer={}",
ops_id, filename, s.node);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer {}", s.node));
} if (cmd_data.cmd == streaming::stream_blob_cmd::ok) {
got_cmd_ok = true;
}
blogger.debug("fstream[{}] Master got stream_blob_cmd={} file={} peer={}",
ops_id, int(cmd_data.cmd), filename, s.node);
} else {
break;
}
} catch (seastar::rpc::stream_closed&) {
// After we get streaming::stream_blob_cmd::ok
// which is the last message from peer, it does not
// matter if the source() is closed or not.
if (got_cmd_ok) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
}
});
};
co_await coroutine::all(send_data_to_peer, get_status_code_from_peer);
} catch (...) {
error = std::current_exception();
}
if (error) {
blogger.warn("fstream[{}] Master failed sending file={} to targets={} send_size={} bw={} error={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time), error);
// Error handling for fstream and sink
if (!fstream_closed) {
try {
if (fstream) {
co_await fstream->close();
}
} catch (...) {
// We could do nothing but continue to cleanup more
blogger.warn("fstream[{}] Master failed to close file stream: {}",
ops_id, std::current_exception());
}
}
for (auto& s : ss) {
try {
if (!s.status_sent && !s.sink_closed) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
s.status_sent = true;
}
} catch (...) {
// We could do nothing but continue to close
blogger.warn("fstream[{}] Master failed to send error code: {}",
ops_id, std::current_exception());
}
try {
if (!s.sink_closed) {
co_await s.sink.close();
s.sink_closed = true;
}
} catch (...) {
// We could do nothing but continue
blogger.warn("fstream[{}] Master failed to close rpc stream sink: {}",
ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await s.source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Master failed to drain rpc stream source: {}",
ops_id, std::current_exception());
}
}
// Stop handling remaining files
break;
} else {
blogger.debug("fstream[{}] Master done sending file={} to targets={} send_size={} bw={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time));
}
}
co_await utils::get_local_injector().inject("tablet_stream_files_end_wait", utils::wait_for_message(std::chrono::seconds(60)));
if (error) {
blogger.warn("fstream[{}] Master failed sending files_nr={} files={} targets={} send_size={} bw={} error={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time), error);
std::rethrow_exception(sender_error ? sender_error : error);
} else {
blogger.debug("fstream[{}] Master finished sending files_nr={} files={} targets={} send_size={} bw={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time));
}
co_return ops_total_size;
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto sstables = co_await table.take_storage_snapshot(req.range);
co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> {
if (sstables.size() == 3) {
// make sure the sstables are ordered so that the sstable containing shadowed data is streamed last
const std::string_view shadowed_file = handler.template get<std::string_view>("shadowed_file").value();
for (int index: {0, 1}) {
if (sstables[index].sst->component_basename(component_type::Data) == shadowed_file) {
std::swap(sstables[index], sstables[2]);
}
}
}
return make_ready_future<>();
});
auto files = std::list<stream_blob_info>();
auto& sst_gen = table.get_sstable_generation_generator();
auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {});
for (auto& sst_snapshot : sstables) {
auto& sst = sst_snapshot.sst;
// stable state (across files) is a must for load to work on destination
auto sst_state = sst->state();
auto sources = co_await create_stream_sources(sst_snapshot, reader);
auto newgen = fmt::to_string(sst_gen());
for (auto&& s : sources) {
auto oldname = s->component_basename();
auto newname = get_sstable_name_with_generation(req.ops_id, oldname, newgen);
blogger.debug("fstream[{}] Get name oldname={}, newname={}", req.ops_id, oldname, newname);
auto& info = files.emplace_back();
info.fops = file_ops::stream_sstables;
info.sstable_state = sst_state;
info.filename = std::move(newname);
info.source = [s = std::move(s)](const file_input_stream_options& options) {
return s->input(options);
};
}
// ensure we mark the end of each component sequence.
if (!files.empty()) {
files.back().fops = file_ops::load_sstables;
}
}
if (files.empty()) {
co_return resp;
}
auto sstable_nr = sstables.size();
// Release reference to sstables to be streamed here. Since one sstable is streamed at a time,
// a sstable - that has been compacted - can have its space released from disk right after
// that sstable's content has been fully streamed.
sstables.clear();
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstable_nr, files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
auto files_nr = files.size();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}",
req.ops_id, sstable_nr, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time));
co_return resp;
}
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id,
replica::table& table,
const dht::token_range& range,
const locator::host_id& src_host,
const locator::host_id& dst_host,
seastar::shard_id dst_shard_id,
netw::messaging_service& ms,
abort_source& as,
service::frozen_topology_guard topo_guard) {
stream_files_response resp;
std::exception_ptr error;
try {
co_await mark_tablet_stream_start(ops_id);
} catch (...) {
error = std::current_exception();
}
if (!error) {
try {
streaming::stream_files_request req;
req.ops_id = ops_id;
req.keyspace_name = table.schema()->ks_name(),
req.table_name = table.schema()->cf_name();
req.table = table.schema()->id();
req.range = range;
req.targets = std::vector<node_and_shard>{node_and_shard{dst_host, dst_shard_id}};
req.topo_guard = topo_guard;
resp = co_await ser::streaming_rpc_verbs::send_tablet_stream_files(&ms, src_host, as, req);
} catch (...) {
error = std::current_exception();
}
}
co_await mark_tablet_stream_done(ops_id);
if (error) {
std::rethrow_exception(error);
}
co_return resp;
}
}