Files
scylladb/sstables_loader.cc
Raphael S. Carvalho fbeee8b65d Optimize load-and-stream
load-and-stream implements no policy when deciding which SSTables will go in
each streaming round (batch of 16 SSTables), meaning the choice is random.

It can take advantage of the fact that the LSM-tree layout, with ICS and LCS,
is a set of SSTable runs, where each run is composed of SSTables that are
disjoint in their key range.

By sorting SSTables to be streamed by their first key, the effect is that
SSTable runs will be incrementally streamed (in token order).

SSTable runs in the same replica group (or in the same node) will have their
content deduplicated, reducing significantly the amount of data we need to
put on the wire. The improvement is proportional to the space amplification
in the table, which again, depends on the compaction strategy used.

Another important benefit is that the destination nodes will receive SSTables
in token order, allowing off-strategy compaction to be more efficient.

This is how I tested it:

1) Generated a 5GB dataset to a ICS table.
2) Started a fresh 2-node cluster. RF=2.
3) Ran load-and-stream against one of the replicas.

BEFORE:

$ time curl -X POST "http://127.0.0.1:10000/storage_service/sstables/keyspace1?cf=standard1&load_and_stream=true"

real	4m40.613s
user	0m0.005s
sys	0m0.007s

AFTER:

$ time curl -X POST "http://127.0.0.1:10000/storage_service/sstables/keyspace1?cf=standard1&load_and_stream=true"

real	2m39.271s
user	0m0.005s
sys	0m0.004s

That's ~1.76x faster.

That's explained by deduplication:

BEFORE:

INFO  2023-02-17 22:59:01,100 [shard 0] stream_session - [Stream #79d3ce7a-ea47-4b6e-9214-930610a18ccd] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3445376, received_partitions=2755835
INFO  2023-02-17 22:59:41,491 [shard 0] stream_session - [Stream #bc6bad99-4438-4e1e-92db-b2cb394039c8] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3308288, received_partitions=2836491
INFO  2023-02-17 23:00:20,585 [shard 0] stream_session - [Stream #e95c4f49-0a2f-47ea-b41f-d900dd87ead5] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3129088, received_partitions=2734029
INFO  2023-02-17 23:00:49,297 [shard 0] stream_session - [Stream #255cba95-a099-4fec-a72c-f87d5cac2b1d] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=2544128, received_partitions=1959370
INFO  2023-02-17 23:01:33,110 [shard 0] stream_session - [Stream #96b5737e-30c7-4af8-a8b8-96fecbcbcbd0] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3624576, received_partitions=3085681
INFO  2023-02-17 23:02:20,909 [shard 0] stream_session - [Stream #3185a48b-fb9e-4190-88f4-5c7a386bc9bd] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3505024, received_partitions=3079345
INFO  2023-02-17 23:03:02,039 [shard 0] stream_session - [Stream #0d2964dc-d5e3-4775-825c-97f736d14713] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=2808192, received_partitions=2655811

AFTER:

INFO  2023-02-17 23:12:49,155 [shard 0] stream_session - [Stream #bf00963c-3334-4035-b1a9-4b3ceb7a188a] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=2965376, received_partitions=1006535
INFO  2023-02-17 23:13:13,365 [shard 0] stream_session - [Stream #1cd2e3ac-a68b-4cb5-8a06-707e91cf59db] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3543936, received_partitions=1406157
INFO  2023-02-17 23:13:37,474 [shard 0] stream_session - [Stream #5a278230-6b4b-461f-8396-c15df7092d03] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3639936, received_partitions=1371298
INFO  2023-02-17 23:14:02,132 [shard 0] stream_session - [Stream #19f40dc3-e02a-4321-a917-a6590d99dd03] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3638912, received_partitions=1435386
INFO  2023-02-17 23:14:26,673 [shard 0] stream_session - [Stream #d47507eb-2067-4e8f-a4f7-c82d5fbd4228] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=3561600, received_partitions=1423024
INFO  2023-02-17 23:14:49,307 [shard 0] stream_session - [Stream #d42ee911-253a-4de6-ac89-6a3c05b88d66] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=2382592, received_partitions=1452656
INFO  2023-02-17 23:15:10,067 [shard 0] stream_session - [Stream #1f78c1bf-8e20-41bd-95de-16de3fc5f86c] Write to sstable for ks=keyspace1, cf=standard1, estimated_partitions=2632320, received_partitions=1252298

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20230219191924.37070-1-raphaelsc@scylladb.com>
2023-02-20 12:46:14 +01:00

283 lines
13 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/rpc/rpc.hh>
#include "sstables_loader.hh"
#include "replica/distributed_loader.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "gms/inet_address.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "streaming/stream_reason.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include <cfloat>
#include <algorithm>
static logging::logger llog("sstables_loader");
namespace {
class send_meta_data {
gms::inet_address _node;
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> _sink;
seastar::rpc::source<int32_t> _source;
bool _error_from_peer = false;
size_t _num_partitions_sent = 0;
size_t _num_bytes_sent = 0;
future<> _receive_done;
private:
future<> do_receive() {
int32_t status = 0;
while (auto status_opt = co_await _source()) {
status = std::get<0>(*status_opt);
llog.debug("send_meta_data: got error code={}, from node={}", status, _node);
if (status == -1) {
_error_from_peer = true;
}
}
llog.debug("send_meta_data: finished reading source from node={}", _node);
if (_error_from_peer) {
throw std::runtime_error(format("send_meta_data: got error code={} from node={}", status, _node));
}
co_return;
}
public:
send_meta_data(gms::inet_address node,
seastar::rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink,
seastar::rpc::source<int32_t> source)
: _node(std::move(node))
, _sink(std::move(sink))
, _source(std::move(source))
, _receive_done(make_ready_future<>()) {
}
void receive() {
_receive_done = do_receive();
}
future<> send(const frozen_mutation_fragment& fmf, bool is_partition_start) {
if (_error_from_peer) {
throw std::runtime_error(format("send_meta_data: got error from peer node={}", _node));
}
auto size = fmf.representation().size();
if (is_partition_start) {
++_num_partitions_sent;
}
_num_bytes_sent += size;
llog.trace("send_meta_data: send mf to node={}, size={}", _node, size);
co_return co_await _sink(fmf, streaming::stream_mutation_fragments_cmd::mutation_fragment_data);
}
future<> finish(bool failed) {
std::exception_ptr eptr;
try {
if (failed) {
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::error);
} else {
co_await _sink(frozen_mutation_fragment(bytes_ostream()), streaming::stream_mutation_fragments_cmd::end_of_stream);
}
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to send {} to node={}, err={}",
failed ? "stream_mutation_fragments_cmd::error" : "stream_mutation_fragments_cmd::end_of_stream", _node, eptr);
}
try {
co_await _sink.close();
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to close sink to node={}, err={}", _node, eptr);
}
try {
co_await std::move(_receive_done);
} catch (...) {
eptr = std::current_exception();
llog.warn("send_meta_data: failed to process source from node={}, err={}", _node, eptr);
}
if (eptr) {
std::rethrow_exception(eptr);
}
co_return;
}
size_t num_partitions_sent() {
return _num_partitions_sent;
}
size_t num_bytes_sent() {
return _num_bytes_sent;
}
};
} // anonymous namespace
future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name,
::table_id table_id, std::vector<sstables::shared_sstable> sstables, bool primary_replica_only) {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
const auto full_token_range = dht::token_range::make_open_ended_both_sides();
auto& table = _db.local().find_column_family(table_id);
auto s = table.schema();
const auto cf_id = s->id();
const auto reason = streaming::stream_reason::repair;
auto erm = _db.local().find_keyspace(ks_name).get_effective_replication_map();
// By sorting SSTables by their primary key, we allow SSTable runs to be
// incrementally streamed.
// Overlapping run fragments can have their content deduplicated, reducing
// the amount of data we need to put on the wire.
// Elements are popped off from the back of the vector, therefore we're sorting
// it in descending order, to start from the smaller tokens.
std::ranges::sort(sstables, [] (const sstables::shared_sstable& x, const sstables::shared_sstable& y) {
return x->compare_by_first_key(*y) > 0;
});
size_t nr_sst_total = sstables.size();
size_t nr_sst_current = 0;
while (!sstables.empty()) {
auto ops_uuid = streaming::plan_id{utils::make_random_uuid()};
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
size_t batch_sst_nr = 16;
std::vector<sstring> sst_names;
std::vector<sstables::shared_sstable> sst_processed;
size_t estimated_partitions = 0;
while (batch_sst_nr-- && !sstables.empty()) {
auto sst = sstables.back();
estimated_partitions += sst->estimated_keys_for_range(full_token_range);
sst_names.push_back(sst->get_filename());
sst_set->insert(sst);
sst_processed.push_back(sst);
sstables.pop_back();
}
llog.info("load_and_stream: started ops_uuid={}, process [{}-{}] out of {} sstables={}",
ops_uuid, nr_sst_current, nr_sst_current + sst_processed.size(), nr_sst_total, sst_names);
auto start_time = std::chrono::steady_clock::now();
inet_address_vector_replica_set current_targets;
std::unordered_map<gms::inet_address, send_meta_data> metas;
size_t num_partitions_processed = 0;
size_t num_bytes_read = 0;
nr_sst_current += sst_processed.size();
auto permit = co_await _db.local().obtain_reader_permit(table, "sstables_loader::load_and_stream()", db::no_timeout);
auto reader = mutation_fragment_v1_stream(table.make_streaming_reader(s, std::move(permit), full_partition_range, sst_set));
std::exception_ptr eptr;
bool failed = false;
try {
netw::messaging_service& ms = _messaging;
while (auto mf = co_await reader()) {
bool is_partition_start = mf->is_partition_start();
if (is_partition_start) {
++num_partitions_processed;
auto& start = mf->as_partition_start();
const auto& current_dk = start.key();
current_targets = erm->get_natural_endpoints(current_dk.token());
if (primary_replica_only && current_targets.size() > 1) {
current_targets.resize(1);
}
llog.trace("load_and_stream: ops_uuid={}, current_dk={}, current_targets={}", ops_uuid,
current_dk.token(), current_targets);
for (auto& node : current_targets) {
if (!metas.contains(node)) {
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_mutation_fragments(reader.schema()->version(),
ops_uuid, cf_id, estimated_partitions, reason, netw::messaging_service::msg_addr(node));
llog.debug("load_and_stream: ops_uuid={}, make sink and source for node={}", ops_uuid, node);
metas.emplace(node, send_meta_data(node, std::move(sink), std::move(source)));
metas.at(node).receive();
}
}
}
frozen_mutation_fragment fmf = freeze(*s, *mf);
num_bytes_read += fmf.representation().size();
co_await coroutine::parallel_for_each(current_targets, [&metas, &fmf, is_partition_start] (const gms::inet_address& node) {
return metas.at(node).send(fmf, is_partition_start);
});
}
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, send_phase, err={}",
ops_uuid, ks_name, cf_name, eptr);
}
co_await reader.close();
try {
co_await coroutine::parallel_for_each(metas.begin(), metas.end(), [failed] (std::pair<const gms::inet_address, send_meta_data>& pair) {
auto& meta = pair.second;
return meta.finish(failed);
});
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, finish_phase, err={}",
ops_uuid, ks_name, cf_name, eptr);
}
if (!failed) {
try {
co_await coroutine::parallel_for_each(sst_processed, [&] (sstables::shared_sstable& sst) {
llog.debug("load_and_stream: ops_uuid={}, ks={}, table={}, remove sst={}",
ops_uuid, ks_name, cf_name, sst->component_filenames());
return sst->unlink();
});
} catch (...) {
failed = true;
eptr = std::current_exception();
llog.warn("load_and_stream: ops_uuid={}, ks={}, table={}, del_sst_phase, err={}",
ops_uuid, ks_name, cf_name, eptr);
}
}
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start_time).count();
for (auto& [node, meta] : metas) {
llog.info("load_and_stream: ops_uuid={}, ks={}, table={}, target_node={}, num_partitions_sent={}, num_bytes_sent={}",
ops_uuid, ks_name, cf_name, node, meta.num_partitions_sent(), meta.num_bytes_sent());
}
auto partition_rate = std::fabs(duration) > FLT_EPSILON ? num_partitions_processed / duration : 0;
auto bytes_rate = std::fabs(duration) > FLT_EPSILON ? num_bytes_read / duration / 1024 / 1024 : 0;
auto status = failed ? "failed" : "succeeded";
llog.info("load_and_stream: finished ops_uuid={}, ks={}, table={}, partitions_processed={} partitions, bytes_processed={} bytes, partitions_per_second={} partitions/s, bytes_per_second={} MiB/s, duration={} s, status={}",
ops_uuid, ks_name, cf_name, num_partitions_processed, num_bytes_read, partition_rate, bytes_rate, duration, status);
if (failed) {
std::rethrow_exception(eptr);
}
}
co_return;
}
// For more details, see the commends on column_family::load_new_sstables
// All the global operations are going to happen here, and just the reloading happens
// in there.
future<> sstables_loader::load_new_sstables(sstring ks_name, sstring cf_name,
bool load_and_stream, bool primary_replica_only) {
if (_loading_new_sstables) {
throw std::runtime_error("Already loading SSTables. Try again later");
} else {
_loading_new_sstables = true;
}
llog.info("Loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}",
ks_name, cf_name, load_and_stream, primary_replica_only);
try {
if (load_and_stream) {
::table_id table_id;
std::vector<std::vector<sstables::shared_sstable>> sstables_on_shards;
std::tie(table_id, sstables_on_shards) = co_await replica::distributed_loader::get_sstables_from_upload_dir(_db, ks_name, cf_name);
co_await container().invoke_on_all([&sstables_on_shards, ks_name, cf_name, table_id, primary_replica_only] (sstables_loader& loader) mutable -> future<> {
co_await loader.load_and_stream(ks_name, cf_name, table_id, std::move(sstables_on_shards[this_shard_id()]), primary_replica_only);
});
} else {
co_await replica::distributed_loader::process_upload_dir(_db, _sys_dist_ks, _view_update_generator, ks_name, cf_name);
}
} catch (...) {
llog.warn("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=failed: {}",
ks_name, cf_name, load_and_stream, primary_replica_only, std::current_exception());
_loading_new_sstables = false;
throw;
}
llog.info("Done loading new SSTables for keyspace={}, table={}, load_and_stream={}, primary_replica_only={}, status=succeeded",
ks_name, cf_name, load_and_stream, primary_replica_only);
_loading_new_sstables = false;
co_return;
}