staging: potentially read many SSTables at the same time
There is no reason to read a single SSTable at a time from the staging directory. Moving SSTables from staging directory essentially involves scanning input SSTables and creating new SSTables (albeit in a different directory). We have a mechanism that does that: compactions. In a follow up patch, I will introduce a new specialization of compaction that moves SSTables from staging (potentially compacting them if there are plenty). In preparation for that, some signatures have to be changed and the view_updating_consumer has to be more compaction friendly. Meaning: - Operating with an sstable vector - taking a table reference, not a database Because this code is a bit fragile and the reviewer set is fundamentally different from anything compaction related, I am sending this separately Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
11
database.hh
11
database.hh
@@ -707,8 +707,8 @@ public:
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
||||
flat_mutation_reader make_reader_excluding_sstable(schema_ptr schema,
|
||||
sstables::shared_sstable sst,
|
||||
flat_mutation_reader make_reader_excluding_sstables(schema_ptr schema,
|
||||
std::vector<sstables::shared_sstable>& sst,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
@@ -744,7 +744,7 @@ public:
|
||||
}
|
||||
|
||||
mutation_source as_mutation_source() const;
|
||||
mutation_source as_mutation_source_excluding(sstables::shared_sstable sst) const;
|
||||
mutation_source as_mutation_source_excluding(std::vector<sstables::shared_sstable>& sst) const;
|
||||
|
||||
void set_virtual_reader(mutation_source virtual_reader) {
|
||||
_virtual_reader = std::move(virtual_reader);
|
||||
@@ -992,7 +992,10 @@ public:
|
||||
const std::vector<view_ptr>& views() const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const;
|
||||
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const;
|
||||
future<row_locker::lock_holder> stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const;
|
||||
future<row_locker::lock_holder>
|
||||
stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
||||
std::vector<sstables::shared_sstable>& excluded_sstables) const;
|
||||
|
||||
void add_coordinator_read_latency(utils::estimated_histogram::duration latency);
|
||||
std::chrono::milliseconds get_coordinator_read_latency_percentile(double percentile);
|
||||
|
||||
|
||||
@@ -19,7 +19,9 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "view_update_generator.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
|
||||
static logging::logger vug_logger("view_update_generator");
|
||||
|
||||
@@ -33,28 +35,53 @@ future<> view_update_generator::start() {
|
||||
if (_sstables_with_tables.empty()) {
|
||||
_pending_sstables.wait().get();
|
||||
}
|
||||
while (!_sstables_with_tables.empty()) {
|
||||
auto& [sst, t] = _sstables_with_tables.front();
|
||||
|
||||
// If we got here, we will process all tables we know about so far eventually so there
|
||||
// is no starvation
|
||||
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
|
||||
schema_ptr s = t->schema();
|
||||
|
||||
// Copy what we have so far so we don't miss new updates
|
||||
auto sstables = std::exchange(_sstables_with_tables[t], {});
|
||||
|
||||
try {
|
||||
schema_ptr s = t->schema();
|
||||
flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s, no_reader_permit());
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, _db, sst, _as), db::no_timeout);
|
||||
// temporary: need an sstable set for the flat mutation reader, but the
|
||||
// compaction_descriptor takes a vector. Soon this will become a compaction
|
||||
// so the transformation to the SSTable set will not be needed.
|
||||
auto ssts = make_lw_shared(t->get_compaction_strategy().make_sstable_set(s));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s,
|
||||
no_reader_permit(),
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
service::get_local_streaming_read_priority(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout);
|
||||
if (result == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
} catch (...) {
|
||||
vug_logger.warn("Processing {} failed: {}. Will retry...", sst->get_filename(), std::current_exception());
|
||||
vug_logger.warn("Processing {} failed for table {}:{}. Will retry...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
// Need to add sstables back to the set so we can retry later. By now it may
|
||||
// have had other updates.
|
||||
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_with_tables[t]));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
// collect all staging sstables to move in a map, grouped by table.
|
||||
_sstables_to_move[t].push_back(sst);
|
||||
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t]));
|
||||
} catch (...) {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving {} from staging failed: {}. Ignoring...", sst->get_filename(), std::current_exception());
|
||||
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
}
|
||||
_registration_sem.signal();
|
||||
_sstables_with_tables.pop_front();
|
||||
}
|
||||
// For each table, move the processed staging sstables into the table's base dir.
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
@@ -88,7 +115,8 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl
|
||||
if (_as.abort_requested()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
_sstables_with_tables.emplace_back(std::move(sst), std::move(table));
|
||||
_sstables_with_tables[table].push_back(std::move(sst));
|
||||
|
||||
_pending_sstables.signal();
|
||||
if (should_throttle()) {
|
||||
return _registration_sem.wait(1);
|
||||
|
||||
@@ -43,7 +43,7 @@ class view_update_generator {
|
||||
lw_shared_ptr<table> t;
|
||||
sstable_with_table(sstables::shared_sstable sst, lw_shared_ptr<table> t) : sst(std::move(sst)), t(std::move(t)) { }
|
||||
};
|
||||
std::deque<sstable_with_table> _sstables_with_tables;
|
||||
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
|
||||
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
|
||||
public:
|
||||
view_update_generator(database& db) : _db(db) { }
|
||||
|
||||
@@ -36,14 +36,14 @@ namespace db::view {
|
||||
class view_updating_consumer {
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<table> _table;
|
||||
sstables::shared_sstable _excluded_sstable;
|
||||
std::vector<sstables::shared_sstable> _excluded_sstables;
|
||||
const seastar::abort_source& _as;
|
||||
std::optional<mutation> _m;
|
||||
public:
|
||||
view_updating_consumer(schema_ptr schema, database& db, sstables::shared_sstable excluded_sstable, const seastar::abort_source& as)
|
||||
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as)
|
||||
: _schema(std::move(schema))
|
||||
, _table(db.find_column_family(_schema->id()).shared_from_this())
|
||||
, _excluded_sstable(excluded_sstable)
|
||||
, _table(table.shared_from_this())
|
||||
, _excluded_sstables(std::move(excluded_sstables))
|
||||
, _as(as)
|
||||
, _m()
|
||||
{ }
|
||||
|
||||
22
table.cc
22
table.cc
@@ -2473,8 +2473,8 @@ table::disable_auto_compaction() {
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_reader_excluding_sstable(schema_ptr s,
|
||||
sstables::shared_sstable sst,
|
||||
table::make_reader_excluding_sstables(schema_ptr s,
|
||||
std::vector<sstables::shared_sstable>& excluded,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -2489,7 +2489,9 @@ table::make_reader_excluding_sstable(schema_ptr s,
|
||||
}
|
||||
|
||||
auto effective_sstables = ::make_lw_shared<sstables::sstable_set>(*_sstables);
|
||||
effective_sstables->erase(sst);
|
||||
for (auto& sst : excluded) {
|
||||
effective_sstables->erase(sst);
|
||||
}
|
||||
|
||||
readers.emplace_back(make_sstable_reader(s, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
|
||||
@@ -2591,13 +2593,15 @@ future<row_locker::lock_holder> table::push_view_replica_updates(const schema_pt
|
||||
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(), service::get_local_sstable_query_read_priority());
|
||||
}
|
||||
|
||||
future<row_locker::lock_holder> table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const {
|
||||
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(std::move(excluded_sstable)), service::get_local_streaming_write_priority());
|
||||
future<row_locker::lock_holder>
|
||||
table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
|
||||
std::vector<sstables::shared_sstable>& excluded_sstables) const {
|
||||
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(excluded_sstables), service::get_local_streaming_write_priority());
|
||||
}
|
||||
|
||||
mutation_source
|
||||
table::as_mutation_source_excluding(sstables::shared_sstable sst) const {
|
||||
return mutation_source([this, sst = std::move(sst)] (schema_ptr s,
|
||||
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
|
||||
return mutation_source([this, &ssts] (schema_ptr s,
|
||||
reader_permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -2605,7 +2609,7 @@ table::as_mutation_source_excluding(sstables::shared_sstable sst) const {
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return this->make_reader_excluding_sstable(std::move(s), std::move(sst), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return this->make_reader_excluding_sstables(std::move(s), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2614,7 +2618,7 @@ stop_iteration db::view::view_updating_consumer::consume_end_of_partition() {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
try {
|
||||
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstable).get();
|
||||
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get();
|
||||
} catch (...) {
|
||||
tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user