Merge "table,sstable_set: use v2 readers below the cache" from Bodtrond

"
Convert sstable_set and table::make_sstable_reader() to v2. With this
all readers below cache use the v2 format.

Tests: unit(dev)
"

* 'table-make-sstable-reader-v2/v1' of https://github.com/denesb/scylla:
  table: upgrade make_sstable_reader() to v2
  sstables/sstable_set: create_single_key_sstable_reader() upgrade to v2
  sstables/sstable_set: remove unused and undefined make_reader() member
This commit is contained in:
Avi Kivity
2021-12-20 17:53:44 +02:00
5 changed files with 25 additions and 36 deletions

View File

@@ -604,7 +604,7 @@ private:
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// The 'range' parameter must be live as long as the reader is used.
// Mutations returned by the reader will all have given schema.
flat_mutation_reader make_sstable_reader(schema_ptr schema,
flat_mutation_reader_v2 make_sstable_reader(schema_ptr schema,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& range,

View File

@@ -769,7 +769,7 @@ sstable_set_impl::select_sstable_runs(const std::vector<shared_sstable>& sstable
throw_with_backtrace<std::bad_function_call>();
}
flat_mutation_reader
flat_mutation_reader_v2
sstable_set_impl::create_single_key_sstable_reader(
column_family* cf,
schema_ptr schema,
@@ -786,7 +786,7 @@ sstable_set_impl::create_single_key_sstable_reader(
auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *schema, pos);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader(schema, permit);
return make_empty_flat_reader_v2(schema, permit);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
@@ -809,10 +809,10 @@ sstable_set_impl::create_single_key_sstable_reader(
readers.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema, permit, {mutation(schema, *pos.key())}, slice, fwd)));
}
sstable_histogram.add(num_readers);
return downgrade_to_v1(make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr));
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
}
flat_mutation_reader
flat_mutation_reader_v2
time_series_sstable_set::create_single_key_sstable_reader(
column_family* cf,
schema_ptr schema,
@@ -850,7 +850,7 @@ time_series_sstable_set::create_single_key_sstable_reader(
auto it = std::find_if(_sstables->begin(), _sstables->end(), [&] (const sst_entry& e) { return pk_filter(*e.second); });
if (it == _sstables->end()) {
// No sstables contain data for the queried partition.
return make_empty_flat_reader(std::move(schema), std::move(permit));
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
}
auto& stats = *cf->cf_stats();
@@ -883,10 +883,10 @@ time_series_sstable_set::create_single_key_sstable_reader(
auto reversed = slice.is_reversed();
// Note that `sstable_position_reader_queue` always includes a reader which emits a `partition_start` fragment,
// guaranteeing that the reader we return emits it as well; this helps us avoid the problem from #3552.
return downgrade_to_v1(make_clustering_combined_reader(
return make_clustering_combined_reader(
schema, permit, fwd_sm,
make_position_reader_queue(
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm, reversed)));
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm, reversed));
}
compound_sstable_set::compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets)
@@ -1033,7 +1033,7 @@ sstable_set make_compound_sstable_set(schema_ptr schema, std::vector<lw_shared_p
return sstable_set(std::make_unique<compound_sstable_set>(schema, std::move(sets)), schema);
}
flat_mutation_reader
flat_mutation_reader_v2
compound_sstable_set::create_single_key_sstable_reader(
column_family* cf,
schema_ptr schema,
@@ -1050,7 +1050,7 @@ compound_sstable_set::create_single_key_sstable_reader(
auto non_empty_set_count = std::distance(sets.begin(), it);
if (!non_empty_set_count) {
return make_empty_flat_reader(schema, permit);
return make_empty_flat_reader_v2(schema, permit);
}
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
if (non_empty_set_count == 1) {
@@ -1061,13 +1061,13 @@ compound_sstable_set::create_single_key_sstable_reader(
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
boost::make_iterator_range(sets.begin(), it)
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return upgrade_to_v2(non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr));
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
})
);
return downgrade_to_v1(make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr));
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
}
flat_mutation_reader
flat_mutation_reader_v2
sstable_set::create_single_key_sstable_reader(
column_family* cf,
schema_ptr schema,

View File

@@ -107,7 +107,7 @@ public:
};
incremental_selector make_incremental_selector() const;
flat_mutation_reader create_single_key_sstable_reader(
flat_mutation_reader_v2 create_single_key_sstable_reader(
column_family*,
schema_ptr,
reader_permit,
@@ -146,17 +146,6 @@ public:
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
flat_mutation_reader make_reader(
schema_ptr,
reader_permit,
const dht::partition_range&,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
flat_mutation_reader_v2 make_crawling_reader(
schema_ptr,
reader_permit,

View File

@@ -47,7 +47,7 @@ public:
virtual void erase(shared_sstable sst) = 0;
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
virtual flat_mutation_reader create_single_key_sstable_reader(
virtual flat_mutation_reader_v2 create_single_key_sstable_reader(
column_family*,
schema_ptr,
reader_permit,
@@ -143,7 +143,7 @@ public:
streamed_mutation::forwarding fwd_sm,
bool reversed) const;
virtual flat_mutation_reader create_single_key_sstable_reader(
virtual flat_mutation_reader_v2 create_single_key_sstable_reader(
column_family*,
schema_ptr,
reader_permit,
@@ -175,7 +175,7 @@ public:
virtual void erase(shared_sstable sst) override;
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
virtual flat_mutation_reader create_single_key_sstable_reader(
virtual flat_mutation_reader_v2 create_single_key_sstable_reader(
column_family*,
schema_ptr,
reader_permit,

View File

@@ -62,7 +62,7 @@ static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
flat_mutation_reader
flat_mutation_reader_v2
table::make_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
@@ -79,14 +79,14 @@ table::make_sstable_reader(schema_ptr s,
if (pr.is_singular() && pr.start()->value().has_key()) {
const dht::ring_position& pos = pr.start()->value();
if (dht::shard_of(*s, pos.token()) != this_shard_id()) {
return make_empty_flat_reader(s, std::move(permit)); // range doesn't belong to this shard
return make_empty_flat_reader_v2(s, std::move(permit)); // range doesn't belong to this shard
}
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
} else {
return downgrade_to_v1(sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr));
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr);
}
}
@@ -200,7 +200,7 @@ table::make_reader(schema_ptr s,
// - fast forwarding is implemented in reversed sstable readers.
readers.emplace_back(upgrade_to_v2(_cache.make_reader(s, permit, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
} else {
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
auto comb_reader = downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
@@ -238,7 +238,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
for (auto&& mt : *_memtables) {
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
});
@@ -256,7 +256,7 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_perm
for (auto&& mt : *_memtables) {
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(schema, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
readers.emplace_back(upgrade_to_v2(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return downgrade_to_v1(make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr));
}
@@ -2244,7 +2244,7 @@ table::make_reader_excluding_sstables(schema_ptr s,
effective_sstables->insert(sst);
});
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
}