From f88220aeeebf764c8cb14b700456b2c8688cfdd1 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 6 Jul 2023 01:22:10 +0200 Subject: [PATCH] stream_transfer_task, multishard_writer: Work with table sharder So that we can use it on tablet-based tables. --- mutation_writer/multishard_writer.cc | 15 ++++++++++----- mutation_writer/multishard_writer.hh | 1 + repair/row_level.cc | 2 +- streaming/stream_session.cc | 8 +++++--- streaming/stream_transfer_task.cc | 3 ++- test/boost/mutation_writer_test.cc | 4 ++-- 6 files changed, 21 insertions(+), 12 deletions(-) diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index e8be1d9a7c..6571cfb401 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -43,6 +43,7 @@ public: class multishard_writer { private: schema_ptr _s; + const dht::sharder& _sharder; std::vector>> _shard_writers; std::vector> _pending_consumers; std::vector> _queue_reader_handles; @@ -52,7 +53,7 @@ private: std::function (flat_mutation_reader_v2)> _consumer; private: unsigned shard_for_mf(const mutation_fragment_v2& mf) { - return _s->get_sharder().shard_of(mf.as_partition_start().key().token()); + return _sharder.shard_of(mf.as_partition_start().key().token()); } future<> make_shard_writer(unsigned shard); future handle_mutation_fragment(mutation_fragment_v2 mf); @@ -63,6 +64,7 @@ private: public: multishard_writer( schema_ptr s, + const dht::sharder& sharder, flat_mutation_reader_v2 producer, std::function (flat_mutation_reader_v2)> consumer); future operator()(); @@ -96,13 +98,15 @@ future<> shard_writer::close() noexcept { multishard_writer::multishard_writer( schema_ptr s, + const dht::sharder& sharder, flat_mutation_reader_v2 producer, std::function (flat_mutation_reader_v2)> consumer) : _s(std::move(s)) - , _queue_reader_handles(_s->get_sharder().shard_count()) + , _sharder(sharder) + , _queue_reader_handles(_sharder.shard_count()) , _producer(std::move(producer)) , _consumer(std::move(consumer)) { - _shard_writers.resize(_s->get_sharder().shard_count()); + _shard_writers.resize(_sharder.shard_count()); } future<> multishard_writer::make_shard_writer(unsigned shard) { @@ -142,7 +146,7 @@ future multishard_writer::handle_mutation_fragment(mutation_frag } future multishard_writer::handle_end_of_stream() { - return parallel_for_each(boost::irange(0u, _s->get_sharder().shard_count()), [this] (unsigned shard) { + return parallel_for_each(boost::irange(0u, _sharder.shard_count()), [this] (unsigned shard) { if (_queue_reader_handles[shard]) { _queue_reader_handles[shard]->push_end_of_stream(); } @@ -197,10 +201,11 @@ future multishard_writer::operator()() { } future distribute_reader_and_consume_on_shards(schema_ptr s, + const dht::sharder& sharder, flat_mutation_reader_v2 producer, std::function (flat_mutation_reader_v2)> consumer, utils::phased_barrier::operation&& op) { - return do_with(multishard_writer(std::move(s), std::move(producer), std::move(consumer)), std::move(op), [] (multishard_writer& writer, utils::phased_barrier::operation&) { + return do_with(multishard_writer(std::move(s), sharder, std::move(producer), std::move(consumer)), std::move(op), [] (multishard_writer& writer, utils::phased_barrier::operation&) { return writer().finally([&writer] { return writer.close(); }); diff --git a/mutation_writer/multishard_writer.hh b/mutation_writer/multishard_writer.hh index 4843e3a0ff..6d86ddf984 100644 --- a/mutation_writer/multishard_writer.hh +++ b/mutation_writer/multishard_writer.hh @@ -16,6 +16,7 @@ namespace mutation_writer { future distribute_reader_and_consume_on_shards(schema_ptr s, + const dht::sharder& sharder, flat_mutation_reader_v2 producer, std::function (flat_mutation_reader_v2)> consumer, utils::phased_barrier::operation&& op = {}); diff --git a/repair/row_level.cc b/repair/row_level.cc index 20fe2f7f8b..22af848fd5 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -492,7 +492,7 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { } replica::table& t = _db.local().find_column_family(_schema->id()); rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); - _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(_queue_reader), + _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, _schema->get_sharder(), std::move(_queue_reader), streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)), t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6fc704ebd0..6e172e0fd0 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -166,13 +166,15 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { try { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped. - auto op = _db.local().find_column_family(cf_id).stream_in_progress(); + auto& table = _db.local().find_column_family(cf_id); + auto erm = table.get_effective_replication_map(); + auto op = table.stream_in_progress(); //FIXME: discarded future. - (void)mutation_writer::distribute_reader_and_consume_on_shards(s, + (void)mutation_writer::distribute_reader_and_consume_on_shards(s, erm->get_sharder(*s), make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)), std::move(op) - ).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future f) mutable { + ).then_wrapped([s, plan_id, from, sink, estimated_partitions, erm] (future f) mutable { int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index a278c75107..a62493205a 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -80,7 +80,8 @@ struct send_info { return do_until(std::move(stop_cond), [this, &found_relevant_range, &ranges_it] { dht::token_range range = *ranges_it++; if (!found_relevant_range) { - auto sharder = dht::selective_token_range_sharder(cf.schema()->get_sharder(), std::move(range), this_shard_id()); + auto& table_sharder = cf.get_effective_replication_map()->get_sharder(*cf.schema()); + auto sharder = dht::selective_token_range_sharder(table_sharder, std::move(range), this_shard_id()); auto range_shard = sharder.next(); if (range_shard) { found_relevant_range = true; diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index 6b4514d07c..f262d162ab 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -66,7 +66,7 @@ SEASTAR_TEST_CASE(test_multishard_writer) { auto source_reader = partition_nr > 0 ? make_flat_mutation_reader_from_mutations_v2(gen.schema(), make_reader_permit(e), muts) : make_empty_flat_reader_v2(s, make_reader_permit(e)); auto close_source_reader = deferred_close(source_reader); auto& sharder = s->get_sharder(); - size_t partitions_received = distribute_reader_and_consume_on_shards(s, + size_t partitions_received = distribute_reader_and_consume_on_shards(s, sharder, std::move(source_reader), [&sharder, &shards_after, error] (flat_mutation_reader_v2 reader) mutable { if (error) { @@ -137,7 +137,7 @@ SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) { }; auto& sharder = s->get_sharder(); try { - distribute_reader_and_consume_on_shards(s, + distribute_reader_and_consume_on_shards(s, sharder, make_generating_reader_v2(s, make_reader_permit(e), std::move(get_next_mutation_fragment)), [&sharder, error] (flat_mutation_reader_v2 reader) mutable { if (error) {