mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
stream_transfer_task, multishard_writer: Work with table sharder
So that we can use it on tablet-based tables.
This commit is contained in:
@@ -43,6 +43,7 @@ public:
|
||||
class multishard_writer {
|
||||
private:
|
||||
schema_ptr _s;
|
||||
const dht::sharder& _sharder;
|
||||
std::vector<foreign_ptr<std::unique_ptr<shard_writer>>> _shard_writers;
|
||||
std::vector<future<>> _pending_consumers;
|
||||
std::vector<std::optional<queue_reader_handle_v2>> _queue_reader_handles;
|
||||
@@ -52,7 +53,7 @@ private:
|
||||
std::function<future<> (flat_mutation_reader_v2)> _consumer;
|
||||
private:
|
||||
unsigned shard_for_mf(const mutation_fragment_v2& mf) {
|
||||
return _s->get_sharder().shard_of(mf.as_partition_start().key().token());
|
||||
return _sharder.shard_of(mf.as_partition_start().key().token());
|
||||
}
|
||||
future<> make_shard_writer(unsigned shard);
|
||||
future<stop_iteration> handle_mutation_fragment(mutation_fragment_v2 mf);
|
||||
@@ -63,6 +64,7 @@ private:
|
||||
public:
|
||||
multishard_writer(
|
||||
schema_ptr s,
|
||||
const dht::sharder& sharder,
|
||||
flat_mutation_reader_v2 producer,
|
||||
std::function<future<> (flat_mutation_reader_v2)> consumer);
|
||||
future<uint64_t> operator()();
|
||||
@@ -96,13 +98,15 @@ future<> shard_writer::close() noexcept {
|
||||
|
||||
multishard_writer::multishard_writer(
|
||||
schema_ptr s,
|
||||
const dht::sharder& sharder,
|
||||
flat_mutation_reader_v2 producer,
|
||||
std::function<future<> (flat_mutation_reader_v2)> consumer)
|
||||
: _s(std::move(s))
|
||||
, _queue_reader_handles(_s->get_sharder().shard_count())
|
||||
, _sharder(sharder)
|
||||
, _queue_reader_handles(_sharder.shard_count())
|
||||
, _producer(std::move(producer))
|
||||
, _consumer(std::move(consumer)) {
|
||||
_shard_writers.resize(_s->get_sharder().shard_count());
|
||||
_shard_writers.resize(_sharder.shard_count());
|
||||
}
|
||||
|
||||
future<> multishard_writer::make_shard_writer(unsigned shard) {
|
||||
@@ -142,7 +146,7 @@ future<stop_iteration> multishard_writer::handle_mutation_fragment(mutation_frag
|
||||
}
|
||||
|
||||
future<stop_iteration> multishard_writer::handle_end_of_stream() {
|
||||
return parallel_for_each(boost::irange(0u, _s->get_sharder().shard_count()), [this] (unsigned shard) {
|
||||
return parallel_for_each(boost::irange(0u, _sharder.shard_count()), [this] (unsigned shard) {
|
||||
if (_queue_reader_handles[shard]) {
|
||||
_queue_reader_handles[shard]->push_end_of_stream();
|
||||
}
|
||||
@@ -197,10 +201,11 @@ future<uint64_t> multishard_writer::operator()() {
|
||||
}
|
||||
|
||||
future<uint64_t> distribute_reader_and_consume_on_shards(schema_ptr s,
|
||||
const dht::sharder& sharder,
|
||||
flat_mutation_reader_v2 producer,
|
||||
std::function<future<> (flat_mutation_reader_v2)> consumer,
|
||||
utils::phased_barrier::operation&& op) {
|
||||
return do_with(multishard_writer(std::move(s), std::move(producer), std::move(consumer)), std::move(op), [] (multishard_writer& writer, utils::phased_barrier::operation&) {
|
||||
return do_with(multishard_writer(std::move(s), sharder, std::move(producer), std::move(consumer)), std::move(op), [] (multishard_writer& writer, utils::phased_barrier::operation&) {
|
||||
return writer().finally([&writer] {
|
||||
return writer.close();
|
||||
});
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
namespace mutation_writer {
|
||||
|
||||
future<uint64_t> distribute_reader_and_consume_on_shards(schema_ptr s,
|
||||
const dht::sharder& sharder,
|
||||
flat_mutation_reader_v2 producer,
|
||||
std::function<future<> (flat_mutation_reader_v2)> consumer,
|
||||
utils::phased_barrier::operation&& op = {});
|
||||
|
||||
@@ -492,7 +492,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
|
||||
}
|
||||
replica::table& t = _db.local().find_column_family(_schema->id());
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(_queue_reader),
|
||||
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, _schema->get_sharder(), std::move(_queue_reader),
|
||||
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)),
|
||||
t.stream_in_progress()).then([w] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
|
||||
@@ -166,13 +166,15 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
|
||||
try {
|
||||
// Make sure the table with cf_id is still present at this point.
|
||||
// Close the sink in case the table is dropped.
|
||||
auto op = _db.local().find_column_family(cf_id).stream_in_progress();
|
||||
auto& table = _db.local().find_column_family(cf_id);
|
||||
auto erm = table.get_effective_replication_map();
|
||||
auto op = table.stream_in_progress();
|
||||
//FIXME: discarded future.
|
||||
(void)mutation_writer::distribute_reader_and_consume_on_shards(s,
|
||||
(void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s),
|
||||
make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)),
|
||||
make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)),
|
||||
std::move(op)
|
||||
).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future<uint64_t> f) mutable {
|
||||
).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future<uint64_t> f) mutable {
|
||||
int32_t status = 0;
|
||||
uint64_t received_partitions = 0;
|
||||
if (f.failed()) {
|
||||
|
||||
@@ -80,7 +80,8 @@ struct send_info {
|
||||
return do_until(std::move(stop_cond), [this, &found_relevant_range, &ranges_it] {
|
||||
dht::token_range range = *ranges_it++;
|
||||
if (!found_relevant_range) {
|
||||
auto sharder = dht::selective_token_range_sharder(cf.schema()->get_sharder(), std::move(range), this_shard_id());
|
||||
auto& table_sharder = cf.get_effective_replication_map()->get_sharder(*cf.schema());
|
||||
auto sharder = dht::selective_token_range_sharder(table_sharder, std::move(range), this_shard_id());
|
||||
auto range_shard = sharder.next();
|
||||
if (range_shard) {
|
||||
found_relevant_range = true;
|
||||
|
||||
@@ -66,7 +66,7 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
auto source_reader = partition_nr > 0 ? make_flat_mutation_reader_from_mutations_v2(gen.schema(), make_reader_permit(e), muts) : make_empty_flat_reader_v2(s, make_reader_permit(e));
|
||||
auto close_source_reader = deferred_close(source_reader);
|
||||
auto& sharder = s->get_sharder();
|
||||
size_t partitions_received = distribute_reader_and_consume_on_shards(s,
|
||||
size_t partitions_received = distribute_reader_and_consume_on_shards(s, sharder,
|
||||
std::move(source_reader),
|
||||
[&sharder, &shards_after, error] (flat_mutation_reader_v2 reader) mutable {
|
||||
if (error) {
|
||||
@@ -137,7 +137,7 @@ SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
};
|
||||
auto& sharder = s->get_sharder();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s,
|
||||
distribute_reader_and_consume_on_shards(s, sharder,
|
||||
make_generating_reader_v2(s, make_reader_permit(e), std::move(get_next_mutation_fragment)),
|
||||
[&sharder, error] (flat_mutation_reader_v2 reader) mutable {
|
||||
if (error) {
|
||||
|
||||
Reference in New Issue
Block a user