diff --git a/database.hh b/database.hh index 9c359d909a..43d4538be3 100644 --- a/database.hh +++ b/database.hh @@ -707,8 +707,8 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader make_reader_excluding_sstable(schema_ptr schema, - sstables::shared_sstable sst, + flat_mutation_reader make_reader_excluding_sstables(schema_ptr schema, + std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), @@ -744,7 +744,7 @@ public: } mutation_source as_mutation_source() const; - mutation_source as_mutation_source_excluding(sstables::shared_sstable sst) const; + mutation_source as_mutation_source_excluding(std::vector& sst) const; void set_virtual_reader(mutation_source virtual_reader) { _virtual_reader = std::move(virtual_reader); @@ -992,7 +992,10 @@ public: const std::vector& views() const; future push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const; future push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const; - future stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const; + future + stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, + std::vector& excluded_sstables) const; + void add_coordinator_read_latency(utils::estimated_histogram::duration latency); std::chrono::milliseconds get_coordinator_read_latency_percentile(double percentile); diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 26120e3956..6faf95976a 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -19,7 +19,9 @@ * along with Scylla. If not, see . */ +#include #include "view_update_generator.hh" +#include "service/priority_manager.hh" static logging::logger vug_logger("view_update_generator"); @@ -33,28 +35,53 @@ future<> view_update_generator::start() { if (_sstables_with_tables.empty()) { _pending_sstables.wait().get(); } - while (!_sstables_with_tables.empty()) { - auto& [sst, t] = _sstables_with_tables.front(); + + // If we got here, we will process all tables we know about so far eventually so there + // is no starvation + for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) { + schema_ptr s = t->schema(); + + // Copy what we have so far so we don't miss new updates + auto sstables = std::exchange(_sstables_with_tables[t], {}); + try { - schema_ptr s = t->schema(); - flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s, no_reader_permit()); - auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, _db, sst, _as), db::no_timeout); + // temporary: need an sstable set for the flat mutation reader, but the + // compaction_descriptor takes a vector. Soon this will become a compaction + // so the transformation to the SSTable set will not be needed. + auto ssts = make_lw_shared(t->get_compaction_strategy().make_sstable_set(s)); + for (auto& sst : sstables) { + ssts->insert(sst); + } + + flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s, + no_reader_permit(), + std::move(ssts), + query::full_partition_range, + s->full_slice(), + service::get_local_streaming_read_priority(), + nullptr, + ::streamed_mutation::forwarding::no, + ::mutation_reader::forwarding::no); + + auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout); if (result == stop_iteration::yes) { break; } } catch (...) { - vug_logger.warn("Processing {} failed: {}. Will retry...", sst->get_filename(), std::current_exception()); + vug_logger.warn("Processing {} failed for table {}:{}. Will retry...", s->ks_name(), s->cf_name(), std::current_exception()); + // Need to add sstables back to the set so we can retry later. By now it may + // have had other updates. + std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_with_tables[t])); break; } try { // collect all staging sstables to move in a map, grouped by table. - _sstables_to_move[t].push_back(sst); + std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t])); } catch (...) { // Move from staging will be retried upon restart. - vug_logger.warn("Moving {} from staging failed: {}. Ignoring...", sst->get_filename(), std::current_exception()); + vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception()); } _registration_sem.signal(); - _sstables_with_tables.pop_front(); } // For each table, move the processed staging sstables into the table's base dir. for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) { @@ -88,7 +115,8 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl if (_as.abort_requested()) { return make_ready_future<>(); } - _sstables_with_tables.emplace_back(std::move(sst), std::move(table)); + _sstables_with_tables[table].push_back(std::move(sst)); + _pending_sstables.signal(); if (should_throttle()) { return _registration_sem.wait(1); diff --git a/db/view/view_update_generator.hh b/db/view/view_update_generator.hh index d05affcf88..5a85616465 100644 --- a/db/view/view_update_generator.hh +++ b/db/view/view_update_generator.hh @@ -43,7 +43,7 @@ class view_update_generator { lw_shared_ptr t; sstable_with_table(sstables::shared_sstable sst, lw_shared_ptr
t) : sst(std::move(sst)), t(std::move(t)) { } }; - std::deque _sstables_with_tables; + std::unordered_map, std::vector> _sstables_with_tables; std::unordered_map, std::vector> _sstables_to_move; public: view_update_generator(database& db) : _db(db) { } diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index e43e8cd1ab..f5a5876ec4 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -36,14 +36,14 @@ namespace db::view { class view_updating_consumer { schema_ptr _schema; lw_shared_ptr
_table; - sstables::shared_sstable _excluded_sstable; + std::vector _excluded_sstables; const seastar::abort_source& _as; std::optional _m; public: - view_updating_consumer(schema_ptr schema, database& db, sstables::shared_sstable excluded_sstable, const seastar::abort_source& as) + view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as) : _schema(std::move(schema)) - , _table(db.find_column_family(_schema->id()).shared_from_this()) - , _excluded_sstable(excluded_sstable) + , _table(table.shared_from_this()) + , _excluded_sstables(std::move(excluded_sstables)) , _as(as) , _m() { } diff --git a/table.cc b/table.cc index 53351f5fcf..1c0fcdd30e 100644 --- a/table.cc +++ b/table.cc @@ -2473,8 +2473,8 @@ table::disable_auto_compaction() { } flat_mutation_reader -table::make_reader_excluding_sstable(schema_ptr s, - sstables::shared_sstable sst, +table::make_reader_excluding_sstables(schema_ptr s, + std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -2489,7 +2489,9 @@ table::make_reader_excluding_sstable(schema_ptr s, } auto effective_sstables = ::make_lw_shared(*_sstables); - effective_sstables->erase(sst); + for (auto& sst : excluded) { + effective_sstables->erase(sst); + } readers.emplace_back(make_sstable_reader(s, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(s, std::move(readers), fwd, fwd_mr); @@ -2591,13 +2593,15 @@ future table::push_view_replica_updates(const schema_pt return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(), service::get_local_sstable_query_read_priority()); } -future table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const { - return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(std::move(excluded_sstable)), service::get_local_streaming_write_priority()); +future +table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, + std::vector& excluded_sstables) const { + return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(excluded_sstables), service::get_local_streaming_write_priority()); } mutation_source -table::as_mutation_source_excluding(sstables::shared_sstable sst) const { - return mutation_source([this, sst = std::move(sst)] (schema_ptr s, +table::as_mutation_source_excluding(std::vector& ssts) const { + return mutation_source([this, &ssts] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, @@ -2605,7 +2609,7 @@ table::as_mutation_source_excluding(sstables::shared_sstable sst) const { tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_excluding_sstable(std::move(s), std::move(sst), range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_excluding_sstables(std::move(s), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -2614,7 +2618,7 @@ stop_iteration db::view::view_updating_consumer::consume_end_of_partition() { return stop_iteration::yes; } try { - auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstable).get(); + auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get(); } catch (...) { tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); }