staging: potentially read many SSTables at the same time

There is no reason to read a single SSTable at a time from the staging
directory. Moving SSTables from staging directory essentially involves
scanning input SSTables and creating new SSTables (albeit in a different
directory).

We have a mechanism that does that: compactions. In a follow up patch, I
will introduce a new specialization of compaction that moves SSTables
from staging (potentially compacting them if there are plenty).

In preparation for that, some signatures have to be changed and the
view_updating_consumer has to be more compaction friendly. Meaning:
- Operating with an sstable vector
- taking a table reference, not a database

Because this code is a bit fragile and the reviewer set is fundamentally
different from anything compaction related, I am sending this separately

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-04-10 15:27:53 -04:00
parent 94d6b75a27
commit 4e6400293e
5 changed files with 63 additions and 28 deletions

View File

@@ -707,8 +707,8 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
flat_mutation_reader make_reader_excluding_sstable(schema_ptr schema,
sstables::shared_sstable sst,
flat_mutation_reader make_reader_excluding_sstables(schema_ptr schema,
std::vector<sstables::shared_sstable>& sst,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
@@ -744,7 +744,7 @@ public:
}
mutation_source as_mutation_source() const;
mutation_source as_mutation_source_excluding(sstables::shared_sstable sst) const;
mutation_source as_mutation_source_excluding(std::vector<sstables::shared_sstable>& sst) const;
void set_virtual_reader(mutation_source virtual_reader) {
_virtual_reader = std::move(virtual_reader);
@@ -992,7 +992,10 @@ public:
const std::vector<view_ptr>& views() const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, const frozen_mutation& fm, db::timeout_clock::time_point timeout) const;
future<row_locker::lock_holder> push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout) const;
future<row_locker::lock_holder> stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const;
future<row_locker::lock_holder>
stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const;
void add_coordinator_read_latency(utils::estimated_histogram::duration latency);
std::chrono::milliseconds get_coordinator_read_latency_percentile(double percentile);

View File

@@ -19,7 +19,9 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <boost/range/adaptor/map.hpp>
#include "view_update_generator.hh"
#include "service/priority_manager.hh"
static logging::logger vug_logger("view_update_generator");
@@ -33,28 +35,53 @@ future<> view_update_generator::start() {
if (_sstables_with_tables.empty()) {
_pending_sstables.wait().get();
}
while (!_sstables_with_tables.empty()) {
auto& [sst, t] = _sstables_with_tables.front();
// If we got here, we will process all tables we know about so far eventually so there
// is no starvation
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
schema_ptr s = t->schema();
// Copy what we have so far so we don't miss new updates
auto sstables = std::exchange(_sstables_with_tables[t], {});
try {
schema_ptr s = t->schema();
flat_mutation_reader staging_sstable_reader = sst->read_rows_flat(s, no_reader_permit());
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, _db, sst, _as), db::no_timeout);
// temporary: need an sstable set for the flat mutation reader, but the
// compaction_descriptor takes a vector. Soon this will become a compaction
// so the transformation to the SSTable set will not be needed.
auto ssts = make_lw_shared(t->get_compaction_strategy().make_sstable_set(s));
for (auto& sst : sstables) {
ssts->insert(sst);
}
flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s,
no_reader_permit(),
std::move(ssts),
query::full_partition_range,
s->full_slice(),
service::get_local_streaming_read_priority(),
nullptr,
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no);
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout);
if (result == stop_iteration::yes) {
break;
}
} catch (...) {
vug_logger.warn("Processing {} failed: {}. Will retry...", sst->get_filename(), std::current_exception());
vug_logger.warn("Processing {} failed for table {}:{}. Will retry...", s->ks_name(), s->cf_name(), std::current_exception());
// Need to add sstables back to the set so we can retry later. By now it may
// have had other updates.
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_with_tables[t]));
break;
}
try {
// collect all staging sstables to move in a map, grouped by table.
_sstables_to_move[t].push_back(sst);
std::move(sstables.begin(), sstables.end(), std::back_inserter(_sstables_to_move[t]));
} catch (...) {
// Move from staging will be retried upon restart.
vug_logger.warn("Moving {} from staging failed: {}. Ignoring...", sst->get_filename(), std::current_exception());
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
}
_registration_sem.signal();
_sstables_with_tables.pop_front();
}
// For each table, move the processed staging sstables into the table's base dir.
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
@@ -88,7 +115,8 @@ future<> view_update_generator::register_staging_sstable(sstables::shared_sstabl
if (_as.abort_requested()) {
return make_ready_future<>();
}
_sstables_with_tables.emplace_back(std::move(sst), std::move(table));
_sstables_with_tables[table].push_back(std::move(sst));
_pending_sstables.signal();
if (should_throttle()) {
return _registration_sem.wait(1);

View File

@@ -43,7 +43,7 @@ class view_update_generator {
lw_shared_ptr<table> t;
sstable_with_table(sstables::shared_sstable sst, lw_shared_ptr<table> t) : sst(std::move(sst)), t(std::move(t)) { }
};
std::deque<sstable_with_table> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_with_tables;
std::unordered_map<lw_shared_ptr<table>, std::vector<sstables::shared_sstable>> _sstables_to_move;
public:
view_update_generator(database& db) : _db(db) { }

View File

@@ -36,14 +36,14 @@ namespace db::view {
class view_updating_consumer {
schema_ptr _schema;
lw_shared_ptr<table> _table;
sstables::shared_sstable _excluded_sstable;
std::vector<sstables::shared_sstable> _excluded_sstables;
const seastar::abort_source& _as;
std::optional<mutation> _m;
public:
view_updating_consumer(schema_ptr schema, database& db, sstables::shared_sstable excluded_sstable, const seastar::abort_source& as)
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as)
: _schema(std::move(schema))
, _table(db.find_column_family(_schema->id()).shared_from_this())
, _excluded_sstable(excluded_sstable)
, _table(table.shared_from_this())
, _excluded_sstables(std::move(excluded_sstables))
, _as(as)
, _m()
{ }

View File

@@ -2473,8 +2473,8 @@ table::disable_auto_compaction() {
}
flat_mutation_reader
table::make_reader_excluding_sstable(schema_ptr s,
sstables::shared_sstable sst,
table::make_reader_excluding_sstables(schema_ptr s,
std::vector<sstables::shared_sstable>& excluded,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -2489,7 +2489,9 @@ table::make_reader_excluding_sstable(schema_ptr s,
}
auto effective_sstables = ::make_lw_shared<sstables::sstable_set>(*_sstables);
effective_sstables->erase(sst);
for (auto& sst : excluded) {
effective_sstables->erase(sst);
}
readers.emplace_back(make_sstable_reader(s, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
@@ -2591,13 +2593,15 @@ future<row_locker::lock_holder> table::push_view_replica_updates(const schema_pt
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source(), service::get_local_sstable_query_read_priority());
}
future<row_locker::lock_holder> table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, sstables::shared_sstable excluded_sstable) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(std::move(excluded_sstable)), service::get_local_streaming_write_priority());
future<row_locker::lock_holder>
table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout,
std::vector<sstables::shared_sstable>& excluded_sstables) const {
return do_push_view_replica_updates(s, std::move(m), timeout, as_mutation_source_excluding(excluded_sstables), service::get_local_streaming_write_priority());
}
mutation_source
table::as_mutation_source_excluding(sstables::shared_sstable sst) const {
return mutation_source([this, sst = std::move(sst)] (schema_ptr s,
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
return mutation_source([this, &ssts] (schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
@@ -2605,7 +2609,7 @@ table::as_mutation_source_excluding(sstables::shared_sstable sst) const {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return this->make_reader_excluding_sstable(std::move(s), std::move(sst), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
return this->make_reader_excluding_sstables(std::move(s), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
}
@@ -2614,7 +2618,7 @@ stop_iteration db::view::view_updating_consumer::consume_end_of_partition() {
return stop_iteration::yes;
}
try {
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstable).get();
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get();
} catch (...) {
tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
}