Files
scylladb/streaming/stream_blob.hh
Michał Jadwiszczak 0f3827d509 streaming/stream_blob: register staging sstables to process them
After scylladb/scylladb#22034, staging status of sstables streamed
via file streaming was ignored and view updates were never generated.

This patch fixes it and now staging sstables are registered to
`view_building_worker`. Then, the worker create view building tasks
for those sstables, so the view building coordinator can schedule them
once the tablet migration is finished.

Fixes scylladb/scylla-enterprise#4572
2025-09-23 15:34:42 +02:00

196 lines
5.8 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "message/messaging_service_fwd.hh"
#include <cstdint>
#include <vector>
#include <list>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/future.hh>
#include <seastar/rpc/rpc_types.hh>
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "bytes.hh"
#include "replica/database_fwd.hh"
#include "locator/host_id.hh"
#include "service/topology_guard.hh"
#include "sstables/open_info.hh"
#include <fmt/core.h>
#include <fmt/ostream.h>
namespace db {
namespace view {
class view_building_worker;
}
}
namespace streaming {
using file_stream_id = utils::tagged_uuid<struct file_stream_id_tag>;
// - The file_ops::stream_sstables is used to stream a sstable file.
//
// - The file_ops::load_sstables is used to stream a sstable file and
// ask the receiver to load the sstable into the system.
enum class file_ops : uint16_t {
stream_sstables,
load_sstables,
};
// For STREAM_BLOB verb
enum class stream_blob_cmd : uint8_t {
ok,
error,
data,
end_of_stream,
};
class stream_blob_data {
public:
temporary_buffer<char> buf;
stream_blob_data() = default;
stream_blob_data(temporary_buffer<char> b) : buf(std::move(b)) {}
const char* data() const {
return buf.get();
}
size_t size() const {
return buf.size();
}
bool empty() const {
return buf.size() == 0;
}
};
class stream_blob_cmd_data {
public:
stream_blob_cmd cmd;
// The optional data contains value when the cmd is stream_blob_cmd::data.
// When the cmd is set to other values, e.g., stream_blob_cmd::error, the
// data contains no value.
std::optional<stream_blob_data> data;
stream_blob_cmd_data(stream_blob_cmd c) : cmd(c) {}
stream_blob_cmd_data(stream_blob_cmd c, std::optional<stream_blob_data> d)
: cmd(c)
, data(std::move(d))
{}
stream_blob_cmd_data(stream_blob_cmd c, stream_blob_data d)
: cmd(c)
, data(std::move(d))
{}
};
class stream_blob_meta {
public:
file_stream_id ops_id;
table_id table;
sstring filename;
seastar::shard_id dst_shard_id;
streaming::file_ops fops;
service::frozen_topology_guard topo_guard;
std::optional<sstables::sstable_state> sstable_state;
// We can extend this verb to send arbitrary blob of data
};
enum class store_result {
ok, failure,
};
using stream_blob_source_fn = noncopyable_function<future<input_stream<char>>(const file_input_stream_options&)>;
using stream_blob_finish_fn = noncopyable_function<future<>(store_result)>;
using output_result = std::tuple<stream_blob_finish_fn, output_stream<char>>;
using stream_blob_create_output_fn = noncopyable_function<future<output_result>(replica::database&, const streaming::stream_blob_meta&)>;
struct stream_blob_info {
sstring filename;
streaming::file_ops fops;
std::optional<sstables::sstable_state> sstable_state;
stream_blob_source_fn source;
friend inline std::ostream& operator<<(std::ostream& os, const stream_blob_info& x) {
return os << x.filename;
}
};
// The handler for the STREAM_BLOB verb.
seastar::future<> stream_blob_handler(replica::database& db, db::view::view_building_worker& vbw, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
// Exposed mainly for testing
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
stream_blob_create_output_fn,
bool may_inject_errors = false
);
// For TABLET_STREAM_FILES
class node_and_shard {
public:
locator::host_id node;
seastar::shard_id shard;
friend inline std::ostream& operator<<(std::ostream& os, const node_and_shard& x) {
return os << x.node << ":" << x.shard;
}
};
}
template <> struct fmt::formatter<streaming::node_and_shard> : fmt::ostream_formatter {};
namespace streaming {
class stream_files_request {
public:
file_stream_id ops_id;
sstring keyspace_name;
sstring table_name;
table_id table;
dht::token_range range;
std::vector<streaming::node_and_shard> targets;
service::frozen_topology_guard topo_guard;
};
class stream_files_response {
public:
size_t stream_bytes = 0;
};
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
// stream sstables files specified by the stream_files_request req.
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
// Exposed for testability
future<size_t> tablet_stream_files(netw::messaging_service& ms,
std::list<stream_blob_info> sources,
std::vector<node_and_shard> targets,
table_id table,
file_stream_id ops_id,
service::frozen_topology_guard topo_guard,
bool may_inject_errors = false
);
future<> mark_tablet_stream_start(file_stream_id);
future<> mark_tablet_stream_done(file_stream_id);
}
template<> struct fmt::formatter<streaming::stream_blob_info>;