Files
scylladb/streaming/stream_blob.cc
Botond Dénes 24d8843803 Merge 'streaming: stream_blob: hold table for streaming' from Michael Litvak
When initializing streaming sources in tablet_stream_files_handler we
use a reference to the table. We should hold the table while doing so,
because otherwise the table may be dropped and destroyed when we yield.
Use the table.stream_in_progress() phaser to hold the table while we
access it.

For sstable file streaming we can release the table after the snapshot
is initialized, and the table may be dropped safely because the files
are held by the snapshot and we don't access the table anymore. There
was a single access to the table for logging but it is replaced by a
pre-calculated variable.

For logstor segment streaming, currently it doesn't support discarding
the segments while they are streamed - when the table is dropped it
discard the segments by overwriting and freeing them, so they shouldn't
be accessed after that. Therefore, in that case continue to hold the
table until streaming is completed.

Fixes [SCYLLADB-1533](https://scylladb.atlassian.net/browse/SCYLLADB-1533)

It's a pre-existing use-after-free issue in sstable file streaming so should be backported to all releases.
It's also made worse with the recent changes of logstor, and affects also non-logstor tables, so the logstor fixes should be in the same release (2026.2).

[SCYLLADB-1533]: https://scylladb.atlassian.net/browse/SCYLLADB-1533?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

Closes scylladb/scylladb#29488

* github.com:scylladb/scylladb:
  test: test drop table during streaming
  streaming: stream_blob: hold table for streaming

(cherry picked from commit 9bfcc25cf7)

Closes scylladb/scylladb#29524
2026-04-20 14:18:13 +03:00

747 lines
32 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/view/view_building_worker.hh"
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_version.hh"
#include "sstables/generation_type.hh"
#include "sstables/types.hh"
#include "idl/streaming.dist.hh"
#include "service/topology_guard.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/core/fstream.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/all.hh>
#include <vector>
#include <cfloat>
#include <filesystem>
#include <fmt/ranges.h>
#include "replica/exceptions.hh"
namespace streaming {
static logging::logger blogger("stream_blob");
constexpr size_t file_stream_buffer_size = 128 * 1024;
constexpr size_t file_stream_write_behind = 10;
constexpr size_t file_stream_read_ahead = 4;
static sstables::sstable_state sstable_state(const streaming::stream_blob_meta& meta) {
return meta.sstable_state.value_or(sstables::sstable_state::normal);
}
static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::database& db, db::view::view_building_worker& vbw, table_id id, sstables::sstable_state state, sstables::entry_descriptor desc, seastar::shard_id shard) {
auto& sharded_vbw = vbw.container();
co_await db.container().invoke_on(shard, [&sharded_vbw, id, desc, state, ops_id] (replica::database& db) -> future<> {
replica::table& t = db.find_column_family(id);
auto erm = t.get_effective_replication_map();
auto& sstm = t.get_sstables_manager();
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
sstables::sstable_open_config cfg { .unsealed_sstable = true };
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
if (loading_sst == sst) {
auto cfg = sstm.configure_writer(sst->get_origin());
co_await loading_sst->seal_sstable(cfg.backup);
}
co_return;
};
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
if (state == sstables::sstable_state::staging) {
// If the sstable is in staging state, register it to view building worker
// to generate view updates from it.
// But because the tablet is still in migration process, register the sstable
// to the view building worker, which will create a view building task for it,
// so then, the view building coordinator can decide to process it once the migration
// is finished.
// (Instead of registering the sstable to view update generator which may process it immediately.)
co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id());
}
});
}
static utils::pretty_printed_throughput get_bw(size_t total_size, std::chrono::steady_clock::time_point start_time) {
auto duration = std::chrono::steady_clock::now() - start_time;
return utils::pretty_printed_throughput(total_size, duration);
}
// For tablet stream checks
class tablet_stream_status {
public:
bool finished = false;
void check_valid_stream();
};
void tablet_stream_status::check_valid_stream() {
if (finished) {
throw std::runtime_error("The stream has finished already");
}
}
static thread_local std::unordered_map<file_stream_id, lw_shared_ptr<tablet_stream_status>> tablet_streams;
future<> mark_tablet_stream_start(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto status = make_lw_shared<tablet_stream_status>();
tablet_streams.emplace(ops_id, status);
});
}
future<> mark_tablet_stream_done(file_stream_id ops_id) {
return seastar::smp::invoke_on_all([ops_id] {
auto it = tablet_streams.find(ops_id);
if (it == tablet_streams.end()) {
return;
}
auto status = it->second;
if (status) {
status->finished = true;
}
tablet_streams.erase(ops_id);
});
}
lw_shared_ptr<tablet_stream_status> get_tablet_stream(file_stream_id ops_id) {
auto status = tablet_streams[ops_id];
if (!status) {
auto msg = format("stream_sstables[{}] Could not find ops_id={}", ops_id, ops_id);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
return status;
}
static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_inject, const sstring& error) {
if (may_inject) {
if (rand() % 500 == 0) {
auto msg = format("fstream[{}] Injected file stream error={} file={}",
meta.ops_id, error, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
stream_blob_create_output_fn create_output,
bool inject_errors) {
bool fstream_closed = false;
bool sink_closed = false;
bool status_sent = false;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
std::optional<output_stream<char>> fstream;
std::exception_ptr error;
stream_blob_finish_fn finish;
// Will log a message when streaming is done. Used to synchronize tests.
lw_shared_ptr<std::any> log_done;
try {
if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) {
log_done = make_lw_shared<std::any>(seastar::make_shared(seastar::defer([] {
blogger.info("stream_mutation_fragments: done (tablets)");
})));
}
auto status = get_tablet_stream(meta.ops_id);
auto guard = service::topology_guard(meta.topo_guard);
// Reject any file_ops that is not support by this node
if (meta.fops != streaming::file_ops::stream_sstables &&
meta.fops != streaming::file_ops::load_sstables) {
auto msg = format("fstream[{}] Unsupported file_ops={} peer={} file={}",
meta.ops_id, int(meta.fops), from, meta.filename);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
blogger.debug("fstream[{}] Follower started peer={} file={}",
meta.ops_id, from, meta.filename);
auto [f, out] = co_await create_output(db, meta);
finish = std::move(f);
fstream = std::move(out);
bool got_end_of_stream = false;
for (;;) {
try {
auto opt = co_await source();
if (!opt) {
break;
}
co_await utils::get_local_injector().inject("stream_mutation_fragments", [&guard] (auto& handler) -> future<> {
blogger.info("stream_mutation_fragments: waiting (tablets)");
while (!handler.poll_for_message()) {
guard.check();
co_await sleep(std::chrono::milliseconds(5));
}
blogger.info("stream_mutation_fragments: released (tablets)");
});
if (db.is_in_critical_disk_utilization_mode()) {
throw replica::critical_disk_utilization_exception("rejected streamed file");
}
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
auto cmd = cmd_data.cmd;
if (cmd == streaming::stream_blob_cmd::error) {
blogger.warn("fstream[{}] Follower got stream_blob_cmd::error from peer={} file={}",
meta.ops_id, from, meta.filename);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer={} file={}", from, meta.filename));
} else if (cmd == streaming::stream_blob_cmd::end_of_stream) {
blogger.debug("fstream[{}] Follower got stream_blob_cmd::end_of_stream from peer={} file={}",
meta.ops_id, from, meta.filename);
got_end_of_stream = true;
} else if (cmd == streaming::stream_blob_cmd::data) {
std::optional<streaming::stream_blob_data> data = std::move(cmd_data.data);
if (data) {
total_size += data->size();
blogger.trace("fstream[{}] Follower received data from peer={} data={}", meta.ops_id, from, data->size());
status->check_valid_stream();
if (!data->empty()) {
co_await fstream->write((char*)data->data(), data->size());
}
}
}
} catch (seastar::rpc::stream_closed&) {
// After we get streaming::stream_blob_cmd::end_of_stream which
// is the last message from peer, it does not matter if the
// source() is closed or not.
if (got_end_of_stream) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
may_inject_error(meta, inject_errors, "rx_data");
}
// If we reach here, streaming::stream_blob_cmd::end_of_stream should be received. Otherwise there
// must be an error, e.g., the sender closed the stream without sending streaming::stream_blob_cmd::error.
if (!got_end_of_stream) {
throw std::runtime_error(format("fstream[{}] Follower failed to get end_of_stream", meta.ops_id));
}
status->check_valid_stream();
co_await fstream->flush();
co_await fstream->close();
fstream_closed = true;
may_inject_error(meta, inject_errors, "flush_and_close");
co_await finish(store_result::ok);
// Send status code and close the sink
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::ok));
status_sent = true;
co_await sink.close();
sink_closed = true;
} catch (...) {
error = std::current_exception();
}
if (error) {
if (!fstream_closed) {
try {
if (fstream) {
// Make sure fstream is always closed
co_await fstream->close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the file stream: {}",
meta.ops_id, std::current_exception());
// We could do nothing but continue to cleanup more
}
}
if (!status_sent) {
try {
may_inject_error(meta, inject_errors, "no_error_code_back");
co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
} catch (...) {
// Try our best to send the status code.
// If we could not send it, we could do nothing but close the sink.
blogger.warn("fstream[{}] Follower failed to send error code: {}",
meta.ops_id, std::current_exception());
}
}
try {
if (!sink_closed) {
// Make sure sink is always closed
co_await sink.close();
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to close the stream sink: {}",
meta.ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to drain rpc stream source: {}",
meta.ops_id, std::current_exception());
}
try {
// Remove the file in case of error
if (finish) {
co_await finish(store_result::failure);
blogger.info("fstream[{}] Follower removed partial file={}", meta.ops_id, meta.filename);
}
} catch (...) {
blogger.warn("fstream[{}] Follower failed to remove partial file={}: {}",
meta.ops_id, meta.filename, std::current_exception());
}
blogger.warn("fstream[{}] Follower failed peer={} file={} received_size={} bw={} error={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time), error);
// Do not call rethrow_exception(error) because the caller could do nothing but log
// the error. We have already logged the error here.
} else {
// Get some statistics
blogger.debug("fstream[{}] Follower finished peer={} file={} received_size={} bw={}",
meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time));
}
co_return;
}
future<> stream_blob_handler(replica::database& db, db::view::view_building_worker& vbw, netw::messaging_service& ms,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
co_await stream_blob_handler(db, ms, std::move(from), meta, std::move(sink), std::move(source), [&vbw] (replica::database& db, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto foptions = file_open_options();
foptions.sloppy_size = true;
foptions.extent_allocation_size_hint = 32 << 20;
auto stream_options = file_output_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.write_behind = file_stream_write_behind;
auto& table = db.find_column_family(meta.table);
auto& sstm = table.get_sstables_manager();
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
// left sealed on the table directory.
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
.leave_unsealed = true };
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
auto out = co_await sstable_sink->output(foptions, stream_options);
co_return output_result{
[sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> {
if (res != store_result::ok) {
co_await sstable_sink->abort();
co_return;
}
auto sst = co_await sstable_sink->close();
if (sst) {
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id);
auto desc = sst->get_descriptor(sstables::component_type::TOC);
sst = {};
co_await load_sstable_for_tablet(meta.ops_id, db, vbw, meta.table, sstable_state(meta), std::move(desc), meta.dst_shard_id);
}
},
std::move(out)
};
});
}
// Get a new sstable name using the new generation
// For example:
// oldname: me-3ga1_0iiv_2e5uo2flv7lgdl2j0d-big-Index.db
// newgen: 3ga1_0iiv_3vj5c2flv7lgdl2j0d
// newname: me-3ga1_0iiv_3vj5c2flv7lgdl2j0d-big-Index.db
static std::string get_sstable_name_with_generation(const file_stream_id& ops_id, const std::string& oldname, const std::string& newgen) {
std::string newname = oldname;
// The generation name starts after the first '-'.
auto it = newname.find("-");
if (it != std::string::npos) {
newname.replace(++it, newgen.size(), newgen);
return newname;
} else {
auto msg = fmt::format("fstream[{}] Failed to get sstable name for {} with generation {}", ops_id, oldname, newgen);
blogger.warn("{}", msg);
throw std::runtime_error(msg);
}
}
}
template<> struct fmt::formatter<streaming::stream_blob_info> : fmt::ostream_formatter {};
namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets,
table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
}
if (sources.empty()) {
co_return ops_total_size;
}
blogger.debug("fstream[{}] Master started sending n={}, sources={}, targets={}",
ops_id, sources.size(), sources, targets);
struct sink_and_source {
locator::host_id node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
bool status_sent = false;
};
auto ops_start_time = std::chrono::steady_clock::now();
streaming::stream_blob_meta meta;
meta.ops_id = ops_id;
meta.table = table;
meta.topo_guard = topo_guard;
std::exception_ptr error;
std::exception_ptr sender_error;
auto stream_options = file_input_stream_options();
stream_options.buffer_size = file_stream_buffer_size;
stream_options.read_ahead = file_stream_read_ahead;
for (auto&& source_info : sources) {
// Keep stream_blob_info alive only at duration of streaming. Allowing the file descriptor
// of the sstable component to be released right after it has been streamed.
auto info = std::exchange(source_info, {});
auto& filename = info.filename;
std::optional<input_stream<char>> fstream;
bool fstream_closed = false;
try {
meta.fops = info.fops;
meta.filename = info.filename;
meta.sstable_state = info.sstable_state;
fstream = co_await info.source(stream_options);
} catch (...) {
blogger.warn("fstream[{}] Master failed sources={} targets={} error={}",
ops_id, sources, targets, std::current_exception());
throw;
}
std::vector<sink_and_source> ss;
size_t total_size = 0;
auto start_time = std::chrono::steady_clock::now();
bool got_error_from_peer = false;
try {
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
auto send_data_to_peer = [&] () mutable -> future<> {
try {
while (!got_error_from_peer) {
may_inject_error(meta, inject_errors, "read_data");
auto buf = co_await fstream->read_up_to(file_stream_buffer_size);
if (buf.size() == 0) {
break;
}
streaming::stream_blob_data data(std::move(buf));
auto data_size = data.size();
stream_blob_cmd_data cmd_data(streaming::stream_blob_cmd::data, std::move(data));
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
total_size += data_size;
ops_total_size += data_size;
blogger.trace("fstream[{}] Master sending file={} to node={} chunk_size={}",
ops_id, filename, s.node, data_size);
may_inject_error(meta, inject_errors, "tx_data");
co_await s.sink(cmd_data);
});
}
} catch (...) {
sender_error = std::current_exception();
}
if (sender_error) {
// We have to close the stream otherwise if the stream is
// ok, the get_status_code_from_peer fiber below might
// wait for the source() forever.
for (auto& s : ss) {
try {
co_await s.sink.close();
s.sink_closed = true;
} catch (...) {
}
}
std::rethrow_exception(sender_error);
}
if (fstream) {
co_await fstream->close();
fstream_closed = true;
}
for (auto& s : ss) {
blogger.debug("fstream[{}] Master done sending file={} to node={}", ops_id, filename, s.node);
if (!got_error_from_peer) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::end_of_stream));
s.status_sent = true;
}
co_await s.sink.close();
s.sink_closed = true;
}
};
// This fiber gets status code from peer node
auto get_status_code_from_peer = [&] () mutable -> future<> {
co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> {
bool got_cmd_ok = false;
while (!got_error_from_peer) {
try {
auto opt = co_await s.source();
if (opt) {
stream_blob_cmd_data& cmd_data = std::get<0>(*opt);
if (cmd_data.cmd == streaming::stream_blob_cmd::error) {
got_error_from_peer = true;
blogger.warn("fstream[{}] Master got stream_blob_cmd::error file={} peer={}",
ops_id, filename, s.node);
throw std::runtime_error(format("Got stream_blob_cmd::error from peer {}", s.node));
} if (cmd_data.cmd == streaming::stream_blob_cmd::ok) {
got_cmd_ok = true;
}
blogger.debug("fstream[{}] Master got stream_blob_cmd={} file={} peer={}",
ops_id, int(cmd_data.cmd), filename, s.node);
} else {
break;
}
} catch (seastar::rpc::stream_closed&) {
// After we get streaming::stream_blob_cmd::ok
// which is the last message from peer, it does not
// matter if the source() is closed or not.
if (got_cmd_ok) {
break;
} else {
throw;
}
} catch (...) {
throw;
}
}
});
};
co_await coroutine::all(send_data_to_peer, get_status_code_from_peer);
} catch (...) {
error = std::current_exception();
}
if (error) {
blogger.warn("fstream[{}] Master failed sending file={} to targets={} send_size={} bw={} error={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time), error);
// Error handling for fstream and sink
if (!fstream_closed) {
try {
if (fstream) {
co_await fstream->close();
}
} catch (...) {
// We could do nothing but continue to cleanup more
blogger.warn("fstream[{}] Master failed to close file stream: {}",
ops_id, std::current_exception());
}
}
for (auto& s : ss) {
try {
if (!s.status_sent && !s.sink_closed) {
co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error));
s.status_sent = true;
}
} catch (...) {
// We could do nothing but continue to close
blogger.warn("fstream[{}] Master failed to send error code: {}",
ops_id, std::current_exception());
}
try {
if (!s.sink_closed) {
co_await s.sink.close();
s.sink_closed = true;
}
} catch (...) {
// We could do nothing but continue
blogger.warn("fstream[{}] Master failed to close rpc stream sink: {}",
ops_id, std::current_exception());
}
try {
// Drain everything in source
for (;;) {
auto opt = co_await s.source();
if (!opt) {
break;
}
}
} catch (...) {
blogger.warn("fstream[{}] Master failed to drain rpc stream source: {}",
ops_id, std::current_exception());
}
}
// Stop handling remaining files
break;
} else {
blogger.debug("fstream[{}] Master done sending file={} to targets={} send_size={} bw={}",
ops_id, filename, targets, total_size, get_bw(total_size, start_time));
}
}
co_await utils::get_local_injector().inject("tablet_stream_files_end_wait", utils::wait_for_message(std::chrono::seconds(60)));
if (error) {
blogger.warn("fstream[{}] Master failed sending files_nr={} files={} targets={} send_size={} bw={} error={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time), error);
std::rethrow_exception(sender_error ? sender_error : error);
} else {
blogger.debug("fstream[{}] Master finished sending files_nr={} files={} targets={} send_size={} bw={}",
ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time));
}
co_return ops_total_size;
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto table_stream_op = table.stream_in_progress();
auto sstables = co_await table.take_storage_snapshot(req.range);
co_await utils::get_local_injector().inject("wait_before_tablet_stream_files_after_snapshot", utils::wait_for_message(std::chrono::seconds(60)));
co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> {
if (sstables.size() == 3) {
// make sure the sstables are ordered so that the sstable containing shadowed data is streamed last
const std::string_view shadowed_file = handler.template get<std::string_view>("shadowed_file").value();
for (int index: {0, 1}) {
if (sstables[index].sst->component_basename(component_type::Data) == shadowed_file) {
std::swap(sstables[index], sstables[2]);
}
}
}
return make_ready_future<>();
});
auto files = std::list<stream_blob_info>();
auto& sst_gen = table.get_sstable_generation_generator();
auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {});
for (auto& sst_snapshot : sstables) {
auto& sst = sst_snapshot.sst;
// stable state (across files) is a must for load to work on destination
auto sst_state = sst->state();
auto sources = co_await create_stream_sources(sst_snapshot, reader);
auto newgen = fmt::to_string(sst_gen());
for (auto&& s : sources) {
auto oldname = s->component_basename();
auto newname = get_sstable_name_with_generation(req.ops_id, oldname, newgen);
blogger.debug("fstream[{}] Get name oldname={}, newname={}", req.ops_id, oldname, newname);
auto& info = files.emplace_back();
info.fops = file_ops::stream_sstables;
info.sstable_state = sst_state;
info.filename = std::move(newname);
info.source = [s = std::move(s)](const file_input_stream_options& options) {
return s->input(options);
};
}
// ensure we mark the end of each component sequence.
if (!files.empty()) {
files.back().fops = file_ops::load_sstables;
}
}
if (files.empty()) {
co_return resp;
}
auto sstable_nr = sstables.size();
// Release reference to sstables to be streamed here. Since one sstable is streamed at a time,
// a sstable - that has been compacted - can have its space released from disk right after
// that sstable's content has been fully streamed.
sstables.clear();
// Release the table - we don't need to access it anymore and the files are held by the snapshot.
table_stream_op = {};
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstable_nr, files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
auto files_nr = files.size();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}",
req.ops_id, sstable_nr, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time));
co_return resp;
}
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id,
replica::table& table,
const dht::token_range& range,
const locator::host_id& src_host,
const locator::host_id& dst_host,
seastar::shard_id dst_shard_id,
netw::messaging_service& ms,
abort_source& as,
service::frozen_topology_guard topo_guard) {
stream_files_response resp;
std::exception_ptr error;
try {
co_await mark_tablet_stream_start(ops_id);
} catch (...) {
error = std::current_exception();
}
if (!error) {
try {
streaming::stream_files_request req;
req.ops_id = ops_id;
req.keyspace_name = table.schema()->ks_name(),
req.table_name = table.schema()->cf_name();
req.table = table.schema()->id();
req.range = range;
req.targets = std::vector<node_and_shard>{node_and_shard{dst_host, dst_shard_id}};
req.topo_guard = topo_guard;
resp = co_await ser::streaming_rpc_verbs::send_tablet_stream_files(&ms, src_host, as, req);
} catch (...) {
error = std::current_exception();
}
}
co_await mark_tablet_stream_done(ops_id);
if (error) {
std::rethrow_exception(error);
}
co_return resp;
}
}