db: extract sstable reader creation from incremental_reader_selector

step closer to divorcing incremental_selector from sstables

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2017-12-05 15:08:46 -02:00
parent ab82bacddd
commit 3d725d6823
2 changed files with 18 additions and 8 deletions

View File

@@ -387,14 +387,11 @@ class incremental_reader_selector : public reader_selector {
mutation_reader::forwarding _fwd_mr;
sstables::sstable_set::incremental_selector _selector;
std::unordered_set<sstables::shared_sstable> _read_sstables;
sstable_reader_factory_type _fn;
mutation_reader create_reader(sstables::shared_sstable sst) {
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(_s, *_pr, _slice, _pc, _resource_tracker, _fwd, _fwd_mr));
if (sst->is_shared()) {
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
}
return std::move(reader);
return _fn(sst, *_pr);
}
public:
@@ -406,7 +403,8 @@ public:
reader_resource_tracker resource_tracker,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
mutation_reader::forwarding fwd_mr,
sstable_reader_factory_type fn)
: _s(s)
, _pr(&pr)
, _sstables(std::move(sstables))
@@ -416,7 +414,8 @@ public:
, _trace_state(std::move(trace_state))
, _fwd(fwd)
, _fwd_mr(fwd_mr)
, _selector(_sstables->make_incremental_selector()) {
, _selector(_sstables->make_incremental_selector())
, _fn(std::move(fn)) {
_selector_position = _pr->start() ? _pr->start()->value().token() : dht::minimum_token();
dblog.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
@@ -4265,6 +4264,14 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, fwd, fwd_mr] (sstables::shared_sstable& sst, const dht::partition_range& pr) {
mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, fwd, fwd_mr));
if (sst->is_shared()) {
using sig = bool (&)(const streamed_mutation&);
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
}
return std::move(reader);
};
return mutation_reader_from_flat_mutation_reader(
make_flat_mutation_reader<combined_mutation_reader>(s, std::make_unique<incremental_reader_selector>(s,
std::move(sstables),
@@ -4274,7 +4281,8 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
std::move(resource_tracker),
std::move(trace_state),
fwd,
fwd_mr),
fwd_mr,
std::move(reader_factory_fn)),
fwd,
fwd_mr));
}

View File

@@ -841,6 +841,8 @@ public:
friend class distributed_loader;
};
using sstable_reader_factory_type = std::function<mutation_reader(sstables::shared_sstable&, const dht::partition_range& pr)>;
mutation_reader make_range_sstable_reader(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,