/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "db/view/view_building_worker.hh" #include "db/config.hh" #include "message/messaging_service.hh" #include "streaming/stream_blob.hh" #include "streaming/stream_plan.hh" #include "utils/pretty_printers.hh" #include "utils/error_injection.hh" #include "locator/host_id.hh" #include "replica/database.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "sstables/sstable_version.hh" #include "sstables/generation_type.hh" #include "sstables/types.hh" #include "idl/streaming.dist.hh" #include "service/topology_guard.hh" #include #include #include #include #include #include #include #include #include #include #include "replica/exceptions.hh" namespace streaming { static logging::logger blogger("stream_blob"); constexpr size_t file_stream_buffer_size = 128 * 1024; constexpr size_t file_stream_write_behind = 10; constexpr size_t file_stream_read_ahead = 4; static sstables::sstable_state sstable_state(const streaming::stream_blob_meta& meta) { return meta.sstable_state.value_or(sstables::sstable_state::normal); } static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::database& db, db::view::view_building_worker& vbw, table_id id, sstables::sstable_state state, sstables::entry_descriptor desc, seastar::shard_id shard) { auto& sharded_vbw = vbw.container(); co_await db.container().invoke_on(shard, [&sharded_vbw, id, desc, state, ops_id] (replica::database& db) -> future<> { replica::table& t = db.find_column_family(id); auto erm = t.get_effective_replication_map(); auto& sstm = t.get_sstables_manager(); auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format); sstables::sstable_open_config cfg { .unsealed_sstable = true, .ignore_component_digest_mismatch = db.get_config().ignore_component_digest_mismatch() }; co_await sst->load(erm->get_sharder(*t.schema()), cfg); auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> { if (loading_sst == sst) { auto cfg = sstm.configure_writer(sst->get_origin()); co_await loading_sst->seal_sstable(cfg.backup); } co_return; }; auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add); blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename()); if (state == sstables::sstable_state::staging) { // If the sstable is in staging state, register it to view building worker // to generate view updates from it. // But because the tablet is still in migration process, register the sstable // to the view building worker, which will create a view building task for it, // so then, the view building coordinator can decide to process it once the migration // is finished. // (Instead of registering the sstable to view update generator which may process it immediately.) co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id()); } }); } static utils::pretty_printed_throughput get_bw(size_t total_size, std::chrono::steady_clock::time_point start_time) { auto duration = std::chrono::steady_clock::now() - start_time; return utils::pretty_printed_throughput(total_size, duration); } // For tablet stream checks class tablet_stream_status { public: bool finished = false; void check_valid_stream(); }; void tablet_stream_status::check_valid_stream() { if (finished) { throw std::runtime_error("The stream has finished already"); } } static thread_local std::unordered_map> tablet_streams; future<> mark_tablet_stream_start(file_stream_id ops_id) { return seastar::smp::invoke_on_all([ops_id] { auto status = make_lw_shared(); tablet_streams.emplace(ops_id, status); }); } future<> mark_tablet_stream_done(file_stream_id ops_id) { return seastar::smp::invoke_on_all([ops_id] { auto it = tablet_streams.find(ops_id); if (it == tablet_streams.end()) { return; } auto status = it->second; if (status) { status->finished = true; } tablet_streams.erase(ops_id); }); } lw_shared_ptr get_tablet_stream(file_stream_id ops_id) { auto status = tablet_streams[ops_id]; if (!status) { auto msg = format("stream_sstables[{}] Could not find ops_id={}", ops_id, ops_id); blogger.warn("{}", msg); throw std::runtime_error(msg); } return status; } static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_inject, const sstring& error) { if (may_inject) { if (rand() % 500 == 0) { auto msg = format("fstream[{}] Injected file stream error={} file={}", meta.ops_id, error, meta.filename); blogger.warn("{}", msg); throw std::runtime_error(msg); } } } future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source, stream_blob_create_output_fn create_output, bool inject_errors) { bool fstream_closed = false; bool sink_closed = false; bool status_sent = false; size_t total_size = 0; auto start_time = std::chrono::steady_clock::now(); std::optional> fstream; std::exception_ptr error; stream_blob_finish_fn finish; // Will log a message when streaming is done. Used to synchronize tests. lw_shared_ptr log_done; try { if (utils::get_local_injector().is_enabled("stream_mutation_fragments")) { log_done = make_lw_shared(seastar::make_shared(seastar::defer([] { blogger.info("stream_mutation_fragments: done (tablets)"); }))); } auto status = get_tablet_stream(meta.ops_id); auto guard = service::topology_guard(meta.topo_guard); // Reject any file_ops that is not support by this node if (meta.fops != streaming::file_ops::stream_sstables && meta.fops != streaming::file_ops::load_sstables) { auto msg = format("fstream[{}] Unsupported file_ops={} peer={} file={}", meta.ops_id, int(meta.fops), from, meta.filename); blogger.warn("{}", msg); throw std::runtime_error(msg); } blogger.debug("fstream[{}] Follower started peer={} file={}", meta.ops_id, from, meta.filename); auto [f, out] = co_await create_output(db, meta); finish = std::move(f); fstream = std::move(out); bool got_end_of_stream = false; for (;;) { try { auto opt = co_await source(); if (!opt) { break; } co_await utils::get_local_injector().inject("stream_mutation_fragments", [&guard] (auto& handler) -> future<> { blogger.info("stream_mutation_fragments: waiting (tablets)"); while (!handler.poll_for_message()) { guard.check(); co_await sleep(std::chrono::milliseconds(5)); } blogger.info("stream_mutation_fragments: released (tablets)"); }); if (db.is_in_critical_disk_utilization_mode()) { throw replica::critical_disk_utilization_exception("rejected streamed file"); } stream_blob_cmd_data& cmd_data = std::get<0>(*opt); auto cmd = cmd_data.cmd; if (cmd == streaming::stream_blob_cmd::error) { blogger.warn("fstream[{}] Follower got stream_blob_cmd::error from peer={} file={}", meta.ops_id, from, meta.filename); throw std::runtime_error(format("Got stream_blob_cmd::error from peer={} file={}", from, meta.filename)); } else if (cmd == streaming::stream_blob_cmd::end_of_stream) { blogger.debug("fstream[{}] Follower got stream_blob_cmd::end_of_stream from peer={} file={}", meta.ops_id, from, meta.filename); got_end_of_stream = true; } else if (cmd == streaming::stream_blob_cmd::data) { std::optional data = std::move(cmd_data.data); if (data) { total_size += data->size(); blogger.trace("fstream[{}] Follower received data from peer={} data={}", meta.ops_id, from, data->size()); status->check_valid_stream(); if (!data->empty()) { co_await fstream->write((char*)data->data(), data->size()); } } } } catch (seastar::rpc::stream_closed&) { // After we get streaming::stream_blob_cmd::end_of_stream which // is the last message from peer, it does not matter if the // source() is closed or not. if (got_end_of_stream) { break; } else { throw; } } catch (...) { throw; } may_inject_error(meta, inject_errors, "rx_data"); } // If we reach here, streaming::stream_blob_cmd::end_of_stream should be received. Otherwise there // must be an error, e.g., the sender closed the stream without sending streaming::stream_blob_cmd::error. if (!got_end_of_stream) { throw std::runtime_error(format("fstream[{}] Follower failed to get end_of_stream", meta.ops_id)); } status->check_valid_stream(); co_await fstream->flush(); co_await fstream->close(); fstream_closed = true; may_inject_error(meta, inject_errors, "flush_and_close"); co_await finish(store_result::ok); // Send status code and close the sink co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::ok)); status_sent = true; co_await sink.close(); sink_closed = true; } catch (...) { error = std::current_exception(); } if (error) { if (!fstream_closed) { try { if (fstream) { // Make sure fstream is always closed co_await fstream->close(); } } catch (...) { blogger.warn("fstream[{}] Follower failed to close the file stream: {}", meta.ops_id, std::current_exception()); // We could do nothing but continue to cleanup more } } if (!status_sent) { try { may_inject_error(meta, inject_errors, "no_error_code_back"); co_await sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error)); } catch (...) { // Try our best to send the status code. // If we could not send it, we could do nothing but close the sink. blogger.warn("fstream[{}] Follower failed to send error code: {}", meta.ops_id, std::current_exception()); } } try { if (!sink_closed) { // Make sure sink is always closed co_await sink.close(); } } catch (...) { blogger.warn("fstream[{}] Follower failed to close the stream sink: {}", meta.ops_id, std::current_exception()); } try { // Drain everything in source for (;;) { auto opt = co_await source(); if (!opt) { break; } } } catch (...) { blogger.warn("fstream[{}] Follower failed to drain rpc stream source: {}", meta.ops_id, std::current_exception()); } try { // Remove the file in case of error if (finish) { co_await finish(store_result::failure); blogger.info("fstream[{}] Follower removed partial file={}", meta.ops_id, meta.filename); } } catch (...) { blogger.warn("fstream[{}] Follower failed to remove partial file={}: {}", meta.ops_id, meta.filename, std::current_exception()); } blogger.warn("fstream[{}] Follower failed peer={} file={} received_size={} bw={} error={}", meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time), error); // Do not call rethrow_exception(error) because the caller could do nothing but log // the error. We have already logged the error here. } else { // Get some statistics blogger.debug("fstream[{}] Follower finished peer={} file={} received_size={} bw={}", meta.ops_id, from, meta.filename, total_size, get_bw(total_size, start_time)); } co_return; } future<> stream_blob_handler(replica::database& db, db::view::view_building_worker& vbw, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source) { co_await stream_blob_handler(db, ms, std::move(from), meta, std::move(sink), std::move(source), [&vbw] (replica::database& db, const streaming::stream_blob_meta& meta) -> future { auto foptions = file_open_options(); foptions.sloppy_size = true; foptions.extent_allocation_size_hint = 32 << 20; auto stream_options = file_output_stream_options(); stream_options.buffer_size = file_stream_buffer_size; stream_options.write_behind = file_stream_write_behind; auto& table = db.find_column_family(meta.table); auto& sstm = table.get_sstables_manager(); // SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't // left sealed on the table directory. sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables, .leave_unsealed = true }; auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg); auto out = co_await sstable_sink->output(foptions, stream_options); co_return output_result{ [sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> { if (res != store_result::ok) { co_await sstable_sink->abort(); co_return; } auto sst = co_await sstable_sink->close(); if (sst) { blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id); auto desc = sst->get_descriptor(sstables::component_type::TOC); sst = {}; co_await load_sstable_for_tablet(meta.ops_id, db, vbw, meta.table, sstable_state(meta), std::move(desc), meta.dst_shard_id); } }, std::move(out) }; }); } // Get a new sstable name using the new generation // For example: // oldname: me-3ga1_0iiv_2e5uo2flv7lgdl2j0d-big-Index.db // newgen: 3ga1_0iiv_3vj5c2flv7lgdl2j0d // newname: me-3ga1_0iiv_3vj5c2flv7lgdl2j0d-big-Index.db static std::string get_sstable_name_with_generation(const file_stream_id& ops_id, const std::string& oldname, const std::string& newgen) { std::string newname = oldname; // The generation name starts after the first '-'. auto it = newname.find("-"); if (it != std::string::npos) { newname.replace(++it, newgen.size(), newgen); return newname; } else { auto msg = fmt::format("fstream[{}] Failed to get sstable name for {} with generation {}", ops_id, oldname, newgen); blogger.warn("{}", msg); throw std::runtime_error(msg); } } } template<> struct fmt::formatter : fmt::ostream_formatter {}; namespace streaming { // Send files in the files list to the nodes in targets list over network // Returns number of bytes sent over network future tablet_stream_files(netw::messaging_service& ms, std::list sources, std::vector targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) { size_t ops_total_size = 0; if (targets.empty()) { co_return ops_total_size; } if (sources.empty()) { co_return ops_total_size; } blogger.debug("fstream[{}] Master started sending n={}, sources={}, targets={}", ops_id, sources.size(), sources, targets); struct sink_and_source { locator::host_id node; rpc::sink sink; rpc::source source; bool sink_closed = false; bool status_sent = false; }; auto ops_start_time = std::chrono::steady_clock::now(); streaming::stream_blob_meta meta; meta.ops_id = ops_id; meta.table = table; meta.topo_guard = topo_guard; std::exception_ptr error; std::exception_ptr sender_error; auto stream_options = file_input_stream_options(); stream_options.buffer_size = file_stream_buffer_size; stream_options.read_ahead = file_stream_read_ahead; for (auto&& source_info : sources) { // Keep stream_blob_info alive only at duration of streaming. Allowing the file descriptor // of the sstable component to be released right after it has been streamed. auto info = std::exchange(source_info, {}); auto& filename = info.filename; std::optional> fstream; bool fstream_closed = false; try { meta.fops = info.fops; meta.filename = info.filename; meta.sstable_state = info.sstable_state; fstream = co_await info.source(stream_options); } catch (...) { blogger.warn("fstream[{}] Master failed sources={} targets={} error={}", ops_id, sources, targets, std::current_exception()); throw; } std::vector ss; size_t total_size = 0; auto start_time = std::chrono::steady_clock::now(); bool got_error_from_peer = false; try { for (auto& x : targets) { const auto& node = x.node; meta.dst_shard_id = x.shard; blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets); auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node); ss.push_back(sink_and_source{node, std::move(sink), std::move(source)}); } // This fiber sends data to peer node auto send_data_to_peer = [&] () mutable -> future<> { try { while (!got_error_from_peer) { may_inject_error(meta, inject_errors, "read_data"); auto buf = co_await fstream->read_up_to(file_stream_buffer_size); if (buf.size() == 0) { break; } streaming::stream_blob_data data(std::move(buf)); auto data_size = data.size(); stream_blob_cmd_data cmd_data(streaming::stream_blob_cmd::data, std::move(data)); co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> { total_size += data_size; ops_total_size += data_size; blogger.trace("fstream[{}] Master sending file={} to node={} chunk_size={}", ops_id, filename, s.node, data_size); may_inject_error(meta, inject_errors, "tx_data"); co_await s.sink(cmd_data); }); } } catch (...) { sender_error = std::current_exception(); } if (sender_error) { // We have to close the stream otherwise if the stream is // ok, the get_status_code_from_peer fiber below might // wait for the source() forever. for (auto& s : ss) { try { co_await s.sink.close(); s.sink_closed = true; } catch (...) { } } std::rethrow_exception(sender_error); } if (fstream) { co_await fstream->close(); fstream_closed = true; } for (auto& s : ss) { blogger.debug("fstream[{}] Master done sending file={} to node={}", ops_id, filename, s.node); if (!got_error_from_peer) { co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::end_of_stream)); s.status_sent = true; } co_await s.sink.close(); s.sink_closed = true; } }; // This fiber gets status code from peer node auto get_status_code_from_peer = [&] () mutable -> future<> { co_await coroutine::parallel_for_each(ss, [&] (sink_and_source& s) mutable -> future<> { bool got_cmd_ok = false; while (!got_error_from_peer) { try { auto opt = co_await s.source(); if (opt) { stream_blob_cmd_data& cmd_data = std::get<0>(*opt); if (cmd_data.cmd == streaming::stream_blob_cmd::error) { got_error_from_peer = true; blogger.warn("fstream[{}] Master got stream_blob_cmd::error file={} peer={}", ops_id, filename, s.node); throw std::runtime_error(format("Got stream_blob_cmd::error from peer {}", s.node)); } if (cmd_data.cmd == streaming::stream_blob_cmd::ok) { got_cmd_ok = true; } blogger.debug("fstream[{}] Master got stream_blob_cmd={} file={} peer={}", ops_id, int(cmd_data.cmd), filename, s.node); } else { break; } } catch (seastar::rpc::stream_closed&) { // After we get streaming::stream_blob_cmd::ok // which is the last message from peer, it does not // matter if the source() is closed or not. if (got_cmd_ok) { break; } else { throw; } } catch (...) { throw; } } }); }; co_await coroutine::all(send_data_to_peer, get_status_code_from_peer); } catch (...) { error = std::current_exception(); } if (error) { blogger.warn("fstream[{}] Master failed sending file={} to targets={} send_size={} bw={} error={}", ops_id, filename, targets, total_size, get_bw(total_size, start_time), error); // Error handling for fstream and sink if (!fstream_closed) { try { if (fstream) { co_await fstream->close(); } } catch (...) { // We could do nothing but continue to cleanup more blogger.warn("fstream[{}] Master failed to close file stream: {}", ops_id, std::current_exception()); } } for (auto& s : ss) { try { if (!s.status_sent && !s.sink_closed) { co_await s.sink(streaming::stream_blob_cmd_data(streaming::stream_blob_cmd::error)); s.status_sent = true; } } catch (...) { // We could do nothing but continue to close blogger.warn("fstream[{}] Master failed to send error code: {}", ops_id, std::current_exception()); } try { if (!s.sink_closed) { co_await s.sink.close(); s.sink_closed = true; } } catch (...) { // We could do nothing but continue blogger.warn("fstream[{}] Master failed to close rpc stream sink: {}", ops_id, std::current_exception()); } try { // Drain everything in source for (;;) { auto opt = co_await s.source(); if (!opt) { break; } } } catch (...) { blogger.warn("fstream[{}] Master failed to drain rpc stream source: {}", ops_id, std::current_exception()); } } // Stop handling remaining files break; } else { blogger.debug("fstream[{}] Master done sending file={} to targets={} send_size={} bw={}", ops_id, filename, targets, total_size, get_bw(total_size, start_time)); } } co_await utils::get_local_injector().inject("tablet_stream_files_end_wait", utils::wait_for_message(std::chrono::seconds(60))); if (error) { blogger.warn("fstream[{}] Master failed sending files_nr={} files={} targets={} send_size={} bw={} error={}", ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time), error); std::rethrow_exception(sender_error ? sender_error : error); } else { blogger.debug("fstream[{}] Master finished sending files_nr={} files={} targets={} send_size={} bw={}", ops_id, sources.size(), sources, targets, ops_total_size, get_bw(ops_total_size, ops_start_time)); } co_return ops_total_size; } future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) { stream_files_response resp; auto& table = db.find_column_family(req.table); auto sstables = co_await table.take_storage_snapshot(req.range); co_await utils::get_local_injector().inject("order_sstables_for_streaming", [&sstables] (auto& handler) -> future<> { if (sstables.size() == 3) { // make sure the sstables are ordered so that the sstable containing shadowed data is streamed last const std::string_view shadowed_file = handler.template get("shadowed_file").value(); for (int index: {0, 1}) { if (sstables[index].sst->component_basename(component_type::Data) == shadowed_file) { std::swap(sstables[index], sstables[2]); } } } return make_ready_future<>(); }); auto files = std::list(); auto& sst_gen = table.get_sstable_generation_generator(); auto reader = co_await db.obtain_reader_permit(table, "tablet_file_streaming", db::no_timeout, {}); for (auto& sst_snapshot : sstables) { auto& sst = sst_snapshot.sst; // stable state (across files) is a must for load to work on destination auto sst_state = sst->state(); auto sources = co_await create_stream_sources(sst_snapshot, reader); auto newgen = fmt::to_string(sst_gen()); for (auto&& s : sources) { auto oldname = s->component_basename(); auto newname = get_sstable_name_with_generation(req.ops_id, oldname, newgen); blogger.debug("fstream[{}] Get name oldname={}, newname={}", req.ops_id, oldname, newname); auto& info = files.emplace_back(); info.fops = file_ops::stream_sstables; info.sstable_state = sst_state; info.filename = std::move(newname); info.source = [s = std::move(s)](const file_input_stream_options& options) { return s->input(options); }; } // ensure we mark the end of each component sequence. if (!files.empty()) { files.back().fops = file_ops::load_sstables; } } if (files.empty()) { co_return resp; } auto sstable_nr = sstables.size(); // Release reference to sstables to be streamed here. Since one sstable is streamed at a time, // a sstable - that has been compacted - can have its space released from disk right after // that sstable's content has been fully streamed. sstables.clear(); blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}", req.ops_id, sstable_nr, files.size(), files, req.range); auto ops_start_time = std::chrono::steady_clock::now(); auto files_nr = files.size(); size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard); resp.stream_bytes = stream_bytes; auto duration = std::chrono::steady_clock::now() - ops_start_time; blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} range={} stream_bytes={} stream_time={} stream_bw={}", req.ops_id, sstable_nr, files_nr, req.range, stream_bytes, duration, get_bw(stream_bytes, ops_start_time)); co_return resp; } future tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src_host, const locator::host_id& dst_host, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard) { stream_files_response resp; std::exception_ptr error; try { co_await mark_tablet_stream_start(ops_id); } catch (...) { error = std::current_exception(); } if (!error) { try { streaming::stream_files_request req; req.ops_id = ops_id; req.keyspace_name = table.schema()->ks_name(), req.table_name = table.schema()->cf_name(); req.table = table.schema()->id(); req.range = range; req.targets = std::vector{node_and_shard{dst_host, dst_shard_id}}; req.topo_guard = topo_guard; resp = co_await ser::streaming_rpc_verbs::send_tablet_stream_files(&ms, src_host, as, req); } catch (...) { error = std::current_exception(); } } co_await mark_tablet_stream_done(ops_id); if (error) { std::rethrow_exception(error); } co_return resp; } }