Merge 'sstable: s/crawling_sstable_mutation_reader/sstable_full_scan_reader' from Kefu Chai

"crawling" is a little bit obscure in this context. so let's rename this class to reflect the fact that this reader only reads the entire content of the sstable.

both crawling reader for kl and mx formats are renamed. also, in order to be consistent, all "crawling reader" in variable names are updated as well.

---

it's a cleanup, hence no need to backport.

Closes scylladb/scylladb#20599

* github.com:scylladb/scylladb:
  sstable: s/crawling_sstable_mutation_reader/sstable_full_scan_reader
  sstable/mx/reader: add comment for mx_crawling_sstable_mutation_reader
This commit is contained in:
Botond Dénes
2024-09-17 11:55:08 +03:00
13 changed files with 69 additions and 57 deletions

View File

@@ -1637,8 +1637,8 @@ public:
if (!range.is_full()) {
on_internal_error(clogger, fmt::format("Scrub compaction in mode {} expected full partition range, but got {} instead", _options.operation_mode, range));
}
auto crawling_reader = _compacting->make_crawling_reader(std::move(s), std::move(permit), nullptr, unwrap_monitor_generator());
return make_mutation_reader<reader>(std::move(crawling_reader), _options.operation_mode, _validation_errors);
auto full_scan_reader = _compacting->make_full_scan_reader(std::move(s), std::move(permit), nullptr, unwrap_monitor_generator());
return make_mutation_reader<reader>(std::move(full_scan_reader), _options.operation_mode, _validation_errors);
}
uint64_t partitions_per_sstable() const override {

View File

@@ -1537,7 +1537,7 @@ mutation_reader make_reader(
}
class crawling_sstable_mutation_reader : public mp_row_consumer_reader_k_l {
class sstable_full_scan_reader : public mp_row_consumer_reader_k_l {
using DataConsumeRowsContext = kl::data_consume_rows_context;
using Consumer = mp_row_consumer_k_l;
static_assert(RowConsumer<Consumer>);
@@ -1545,7 +1545,7 @@ class crawling_sstable_mutation_reader : public mp_row_consumer_reader_k_l {
std::unique_ptr<DataConsumeRowsContext> _context;
read_monitor& _monitor;
public:
crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
sstable_full_scan_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& mon,
@@ -1561,13 +1561,13 @@ public:
push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_end()));
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
on_internal_error(sstlog, "sstable_full_scan_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}
virtual future<> fast_forward_to(position_range cr) override {
on_internal_error(sstlog, "crawling_sstable_mutation_reader: doesn't support fast_forward_to(position_range)");
on_internal_error(sstlog, "sstable_full_scan_reader: doesn't support fast_forward_to(position_range)");
}
virtual future<> next_partition() override {
on_internal_error(sstlog, "crawling_sstable_mutation_reader: doesn't support next_partition()");
on_internal_error(sstlog, "sstable_full_scan_reader: doesn't support next_partition()");
}
virtual future<> fill_buffer() override {
if (_end_of_stream) {
@@ -1585,19 +1585,19 @@ public:
}
_monitor.on_read_completed();
return _context->close().handle_exception([_ = std::move(_context)] (std::exception_ptr ep) {
sstlog.warn("Failed closing of crawling_sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
sstlog.warn("Failed closing of sstable_full_scan_reader: {}. Ignored since the reader is already done.", ep);
});
}
};
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor,
sstable::integrity_check integrity) {
return make_mutation_reader<crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
return make_mutation_reader<sstable_full_scan_reader>(std::move(sstable), std::move(schema), std::move(permit),
std::move(trace_state), monitor, integrity);
}

View File

@@ -41,7 +41,7 @@ mutation_reader make_reader(
// A reader which doesn't use the index at all. It reads everything from the
// sstable and it doesn't support skipping.
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,

View File

@@ -1737,7 +1737,19 @@ mutation_reader make_reader(
value_or_reference(std::move(slice)), std::move(trace_state), fwd, fwd_mr, monitor);
}
class mx_crawling_sstable_mutation_reader : public mp_row_consumer_reader_mx {
/// a reader which does not support seeking to given position.
///
/// unlike mx_sstable_mutation_reader which allows fast forwarding read,
/// mx_sstable_full_scan_reader
///
/// - always reads the full range, and it is not able to read a subset of the
/// sstable
/// - does not support fast forwarding
///
/// It is designed to be used in conditions where:
/// - the index is not reliable, or
/// - the consumer reads the whole sstable
class mx_sstable_full_scan_reader : public mp_row_consumer_reader_mx {
using DataConsumeRowsContext = data_consume_rows_context_m<mp_row_consumer_m>;
using Consumer = mp_row_consumer_m;
static_assert(RowConsumer<Consumer>);
@@ -1745,7 +1757,7 @@ class mx_crawling_sstable_mutation_reader : public mp_row_consumer_reader_mx {
std::unique_ptr<DataConsumeRowsContext> _context;
read_monitor& _monitor;
public:
mx_crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
mx_sstable_full_scan_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& mon,
@@ -1759,13 +1771,13 @@ public:
public:
void on_out_of_clustering_range() override { }
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}
virtual future<> fast_forward_to(position_range cr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(position_range)");
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support fast_forward_to(position_range)");
}
virtual future<> next_partition() override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support next_partition()");
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support next_partition()");
}
virtual future<> fill_buffer() override {
if (_end_of_stream) {
@@ -1783,19 +1795,19 @@ public:
}
_monitor.on_read_completed();
return _context->close().handle_exception([_ = std::move(_context)] (std::exception_ptr ep) {
sstlog.warn("Failed closing of mx_crawling_sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
sstlog.warn("Failed closing of mx_sstable_full_scan_reader: {}. Ignored since the reader is already done.", ep);
});
}
};
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor,
sstable::integrity_check integrity) {
return make_mutation_reader<mx_crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
return make_mutation_reader<mx_sstable_full_scan_reader>(std::move(sstable), std::move(schema), std::move(permit),
std::move(trace_state), monitor, integrity);
}

View File

@@ -44,7 +44,7 @@ mutation_reader make_reader(
// A reader which doesn't use the index at all. It reads everything from the
// sstable and it doesn't support skipping.
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,

View File

@@ -1394,14 +1394,14 @@ sstable_set::make_local_shard_sstable_reader(
fwd_mr);
}
mutation_reader sstable_set::make_crawling_reader(
mutation_reader sstable_set::make_full_scan_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_ptr,
read_monitor_generator& monitor_generator) const {
std::vector<mutation_reader> readers;
for_each_sstable([&] (const shared_sstable& sst) mutable {
readers.emplace_back(sst->make_crawling_reader(schema, permit, trace_ptr, monitor_generator(sst)));
readers.emplace_back(sst->make_full_scan_reader(schema, permit, trace_ptr, monitor_generator(sst)));
});
return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
}

View File

@@ -241,7 +241,7 @@ public:
read_monitor_generator& rmg = default_read_monitor_generator(),
const sstable_predicate& p = default_sstable_predicate()) const;
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
schema_ptr,
reader_permit,
tracing::trace_state_ptr,

View File

@@ -1958,7 +1958,7 @@ future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor);
}
auto reader = make_crawling_reader(_schema, permit, nullptr, monitor, integrity_check::yes);
auto reader = make_full_scan_reader(_schema, permit, nullptr, monitor, integrity_check::yes);
try {
auto validator = mutation_fragment_stream_validator(*_schema);
@@ -2352,16 +2352,16 @@ sstable::make_reader(
}
mutation_reader
sstable::make_crawling_reader(
sstable::make_full_scan_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor,
integrity_check integrity) {
if (_version >= version_types::mc) {
return mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
return mx::make_full_scan_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
}
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
return kl::make_full_scan_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
}
static std::tuple<entry_descriptor, sstring, sstring> make_entry_descriptor(const std::filesystem::path& sst_path, sstring* const provided_ks, sstring* const provided_cf) {

View File

@@ -284,7 +284,7 @@ public:
// A reader which doesn't use the index at all. It reads everything from the
// sstable and it doesn't support skipping.
mutation_reader make_crawling_reader(
mutation_reader make_full_scan_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state = {},

View File

@@ -2933,7 +2933,7 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
});
}
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
SEASTAR_TEST_CASE(test_full_scan_reader_out_of_range_last_range_tombstone_change) {
return test_env::do_with_async([] (test_env& env) {
simple_schema table;
@@ -2947,11 +2947,11 @@ SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change)
auto sst = make_sstable_containing(env.make_sstable(table.schema()), {mut});
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
assert_that(sst->make_full_scan_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
SEASTAR_TEST_CASE(test_full_scan_reader_random_schema_random_mutations) {
return test_env::do_with_async([] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
@@ -2969,14 +2969,14 @@ SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
auto sst = make_sstable_containing(env.make_sstable(schema), muts);
{
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
auto rd = assert_that(sst->make_full_scan_reader(schema, env.make_reader_permit()));
for (const auto& mut : muts) {
rd.produces(mut);
}
}
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
assert_that(sst->make_full_scan_reader(schema, env.make_reader_permit())).has_monotonic_positions();
});
}

View File

@@ -53,8 +53,8 @@ future<> test_sequential_read(distributed<perf_sstable_test_env>& dt) {
return time_runs(iterations, parallelism, dt, &perf_sstable_test_env::read_sequential_partitions);
}
future<> test_crawling_streaming(distributed<perf_sstable_test_env>& dt) {
return time_runs(iterations, parallelism, dt, &perf_sstable_test_env::crawling_streaming);
future<> test_full_scan_streaming(distributed<perf_sstable_test_env>& dt) {
return time_runs(iterations, parallelism, dt, &perf_sstable_test_env::full_scan_streaming);
}
future<> test_partitioned_streaming(distributed<perf_sstable_test_env>& dt) {
@@ -67,7 +67,7 @@ enum class test_modes {
write,
index_write,
compaction,
crawling_streaming,
full_scan_streaming,
partitioned_streaming,
};
@@ -77,7 +77,7 @@ static const std::unordered_map<sstring, test_modes> test_mode = {
{"write", test_modes::write },
{"index_write", test_modes::index_write },
{"compaction", test_modes::compaction },
{"crawling_streaming", test_modes::crawling_streaming },
{"full_scan_streaming", test_modes::full_scan_streaming },
{"parititioned_streaming", test_modes::partitioned_streaming },
};
@@ -115,7 +115,7 @@ int scylla_sstable_main(int argc, char** argv) {
("num_columns", bpo::value<unsigned>()->default_value(5), "number of columns per row")
("column_size", bpo::value<unsigned>()->default_value(64), "size in bytes for each column")
("sstables", bpo::value<unsigned>()->default_value(1), "number of sstables (valid only for compaction mode)")
("mode", bpo::value<test_modes>()->default_value(test_modes::index_write), "one of: sequential_read, index_read, write, compaction, index_write, crawling_streaming, partitioned_streaming")
("mode", bpo::value<test_modes>()->default_value(test_modes::index_write), "one of: sequential_read, index_read, write, compaction, index_write, full_scan_streaming, partitioned_streaming")
("testdir", bpo::value<sstring>()->default_value("/var/lib/scylla/perf-tests"), "directory in which to store the sstables")
("compaction-strategy", bpo::value<sstring>()->default_value("SizeTieredCompactionStrategy"), "compaction strategy to use, one of "
"(SizeTieredCompactionStrategy, LeveledCompactionStrategy, DateTieredCompactionStrategy, TimeWindowCompactionStrategy)")
@@ -151,7 +151,7 @@ int scylla_sstable_main(int argc, char** argv) {
[[fallthrough]];
case sequential_read:
[[fallthrough]];
case crawling_streaming:
case full_scan_streaming:
[[fallthrough]];
case partitioned_streaming:
return test->invoke_on_all([] (perf_sstable_test_env &t) {
@@ -178,8 +178,8 @@ int scylla_sstable_main(int argc, char** argv) {
return test_index_read(*test).then([test] {});
case sequential_read:
return test_sequential_read(*test).then([test] {});
case crawling_streaming:
return test_crawling_streaming(*test).then([test] {});
case full_scan_streaming:
return test_full_scan_streaming(*test).then([test] {});
case partitioned_streaming:
return test_partitioned_streaming(*test).then([test] {});
case index_write:

View File

@@ -147,7 +147,7 @@ private:
}
enum class sst_reader {
crawling,
full_scan,
partitioned,
};
@@ -163,11 +163,11 @@ private:
// do not compact when performing streaming, as we focus on the read
// performance
auto reader = mutation_reader{nullptr};
if (reader_type == sst_reader::crawling) {
reader = sst_set->make_crawling_reader(s,
_env.make_reader_permit(),
tracing::trace_state_ptr{},
default_read_monitor_generator());
if (reader_type == sst_reader::full_scan) {
reader = sst_set->make_full_scan_reader(s,
_env.make_reader_permit(),
tracing::trace_state_ptr{},
default_read_monitor_generator());
} else {
const auto full_partition_range = dht::partition_range::make_open_ended_both_sides();
auto& slice = s->full_slice();
@@ -341,8 +341,8 @@ public:
});
}
future<double> crawling_streaming(int idx) {
return do_streaming(sst_reader::crawling);
future<double> full_scan_streaming(int idx) {
return do_streaming(sst_reader::full_scan);
}
future<double> partitioned_streaming(int idx) {

View File

@@ -830,15 +830,15 @@ stop_iteration consume_reader(mutation_reader rd, sstable_consumer& consumer, ss
return consumer.consume_sstable_end().get();
}
void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstables::shared_sstable> sstables, bool merge, bool use_crawling_reader,
void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstables::shared_sstable> sstables, bool merge, bool use_full_scan_reader,
std::function<stop_iteration(mutation_reader&, sstables::sstable*)> reader_consumer) {
sst_log.trace("consume_sstables(): {} sstables, merge={}, use_crawling_reader={}", sstables.size(), merge, use_crawling_reader);
sst_log.trace("consume_sstables(): {} sstables, merge={}, use_full_scan_reader={}", sstables.size(), merge, use_full_scan_reader);
if (merge) {
std::vector<mutation_reader> readers;
readers.reserve(sstables.size());
for (const auto& sst : sstables) {
if (use_crawling_reader) {
readers.emplace_back(sst->make_crawling_reader(schema, permit));
if (use_full_scan_reader) {
readers.emplace_back(sst->make_full_scan_reader(schema, permit));
} else {
readers.emplace_back(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice()));
}
@@ -848,8 +848,8 @@ void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstab
reader_consumer(rd, nullptr);
} else {
for (const auto& sst : sstables) {
auto rd = use_crawling_reader
? sst->make_crawling_reader(schema, permit)
auto rd = use_full_scan_reader
? sst->make_full_scan_reader(schema, permit)
: sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice());
if (reader_consumer(rd, sst.get()) == stop_iteration::yes) {
@@ -2607,10 +2607,10 @@ void sstable_consumer_operation(schema_ptr schema, reader_permit permit, const s
const auto merge = vm.count("merge");
const auto no_skips = vm.count("no-skips");
const auto partitions = get_partitions(schema, vm);
const auto use_crawling_reader = no_skips || partitions.empty();
const auto use_full_scan_reader = no_skips || partitions.empty();
auto consumer = std::make_unique<SstableConsumer>(schema, permit, vm);
consumer->consume_stream_start().get();
consume_sstables(schema, permit, sstables, merge, use_crawling_reader, [&, &consumer = *consumer] (mutation_reader& rd, sstables::sstable* sst) {
consume_sstables(schema, permit, sstables, merge, use_full_scan_reader, [&, &consumer = *consumer] (mutation_reader& rd, sstables::sstable* sst) {
return consume_reader(std::move(rd), consumer, sst, partitions, no_skips);
});
consumer->consume_stream_end().get();