From 3d725d68238e190ddf65dd986f09d52d48a85650 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 5 Dec 2017 15:08:46 -0200 Subject: [PATCH] db: extract sstable reader creation from incremental_reader_selector step closer to divorcing incremental_selector from sstables Signed-off-by: Raphael S. Carvalho --- database.cc | 24 ++++++++++++++++-------- database.hh | 2 ++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/database.cc b/database.cc index b2856a3d60..2e9540ba94 100644 --- a/database.cc +++ b/database.cc @@ -387,14 +387,11 @@ class incremental_reader_selector : public reader_selector { mutation_reader::forwarding _fwd_mr; sstables::sstable_set::incremental_selector _selector; std::unordered_set _read_sstables; + sstable_reader_factory_type _fn; mutation_reader create_reader(sstables::shared_sstable sst) { tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); - mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(_s, *_pr, _slice, _pc, _resource_tracker, _fwd, _fwd_mr)); - if (sst->is_shared()) { - reader = make_filtering_reader(std::move(reader), belongs_to_current_shard); - } - return std::move(reader); + return _fn(sst, *_pr); } public: @@ -406,7 +403,8 @@ public: reader_resource_tracker resource_tracker, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) + mutation_reader::forwarding fwd_mr, + sstable_reader_factory_type fn) : _s(s) , _pr(&pr) , _sstables(std::move(sstables)) @@ -416,7 +414,8 @@ public: , _trace_state(std::move(trace_state)) , _fwd(fwd) , _fwd_mr(fwd_mr) - , _selector(_sstables->make_incremental_selector()) { + , _selector(_sstables->make_incremental_selector()) + , _fn(std::move(fn)) { _selector_position = _pr->start() ? _pr->start()->value().token() : dht::minimum_token(); dblog.trace("incremental_reader_selector {}: created for range: {} with {} sstables", @@ -4265,6 +4264,14 @@ mutation_reader make_range_sstable_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { + auto reader_factory_fn = [s, &slice, &pc, resource_tracker, fwd, fwd_mr] (sstables::shared_sstable& sst, const dht::partition_range& pr) { + mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, fwd, fwd_mr)); + if (sst->is_shared()) { + using sig = bool (&)(const streamed_mutation&); + reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard)); + } + return std::move(reader); + }; return mutation_reader_from_flat_mutation_reader( make_flat_mutation_reader(s, std::make_unique(s, std::move(sstables), @@ -4274,7 +4281,8 @@ mutation_reader make_range_sstable_reader(schema_ptr s, std::move(resource_tracker), std::move(trace_state), fwd, - fwd_mr), + fwd_mr, + std::move(reader_factory_fn)), fwd, fwd_mr)); } diff --git a/database.hh b/database.hh index 61d4ee70cb..4ded53e45b 100644 --- a/database.hh +++ b/database.hh @@ -841,6 +841,8 @@ public: friend class distributed_loader; }; +using sstable_reader_factory_type = std::function; + mutation_reader make_range_sstable_reader(schema_ptr s, lw_shared_ptr sstables, const dht::partition_range& pr,