db: extract sstable reader creation from incremental_reader_selector
step closer to divorcing incremental_selector from sstables Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
24
database.cc
24
database.cc
@@ -387,14 +387,11 @@ class incremental_reader_selector : public reader_selector {
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
sstables::sstable_set::incremental_selector _selector;
|
||||
std::unordered_set<sstables::shared_sstable> _read_sstables;
|
||||
sstable_reader_factory_type _fn;
|
||||
|
||||
mutation_reader create_reader(sstables::shared_sstable sst) {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(_s, *_pr, _slice, _pc, _resource_tracker, _fwd, _fwd_mr));
|
||||
if (sst->is_shared()) {
|
||||
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
|
||||
}
|
||||
return std::move(reader);
|
||||
return _fn(sst, *_pr);
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -406,7 +403,8 @@ public:
|
||||
reader_resource_tracker resource_tracker,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstable_reader_factory_type fn)
|
||||
: _s(s)
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
@@ -416,7 +414,8 @@ public:
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _selector(_sstables->make_incremental_selector()) {
|
||||
, _selector(_sstables->make_incremental_selector())
|
||||
, _fn(std::move(fn)) {
|
||||
_selector_position = _pr->start() ? _pr->start()->value().token() : dht::minimum_token();
|
||||
|
||||
dblog.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
|
||||
@@ -4265,6 +4264,14 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, fwd, fwd_mr] (sstables::shared_sstable& sst, const dht::partition_range& pr) {
|
||||
mutation_reader reader = mutation_reader_from_flat_mutation_reader(sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, fwd, fwd_mr));
|
||||
if (sst->is_shared()) {
|
||||
using sig = bool (&)(const streamed_mutation&);
|
||||
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
|
||||
}
|
||||
return std::move(reader);
|
||||
};
|
||||
return mutation_reader_from_flat_mutation_reader(
|
||||
make_flat_mutation_reader<combined_mutation_reader>(s, std::make_unique<incremental_reader_selector>(s,
|
||||
std::move(sstables),
|
||||
@@ -4274,7 +4281,8 @@ mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
std::move(resource_tracker),
|
||||
std::move(trace_state),
|
||||
fwd,
|
||||
fwd_mr),
|
||||
fwd_mr,
|
||||
std::move(reader_factory_fn)),
|
||||
fwd,
|
||||
fwd_mr));
|
||||
}
|
||||
|
||||
@@ -841,6 +841,8 @@ public:
|
||||
friend class distributed_loader;
|
||||
};
|
||||
|
||||
using sstable_reader_factory_type = std::function<mutation_reader(sstables::shared_sstable&, const dht::partition_range& pr)>;
|
||||
|
||||
mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
|
||||
Reference in New Issue
Block a user