sstables: Expose integrity option via crawling mutation readers

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
This commit is contained in:
Nikos Dragazis
2024-08-23 01:02:11 +03:00
parent 1d2dc9f2e1
commit 716fc487fd
6 changed files with 24 additions and 14 deletions

View File

@@ -1548,10 +1548,11 @@ public:
crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& mon)
read_monitor& mon,
sstable::integrity_check integrity)
: mp_row_consumer_reader_k_l(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), std::move(trace_state), streamed_mutation::forwarding::no, _sst)
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, sstable::integrity_check::no))
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, integrity))
, _monitor(mon) {
_monitor.on_read_started(_context->reader_position());
}
@@ -1594,9 +1595,10 @@ mutation_reader make_crawling_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor) {
read_monitor& monitor,
sstable::integrity_check integrity) {
return make_mutation_reader<crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
std::move(trace_state), monitor);
std::move(trace_state), monitor, integrity);
}
} // namespace kl

View File

@@ -11,6 +11,7 @@
#include "readers/mutation_reader_fwd.hh"
#include "readers/mutation_reader.hh"
#include "sstables/progress_monitor.hh"
#include "sstables/sstables.hh"
namespace sstables {
namespace kl {
@@ -45,7 +46,8 @@ mutation_reader make_crawling_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor);
read_monitor& monitor,
sstable::integrity_check integrity);
} // namespace kl
} // namespace sstables

View File

@@ -1748,10 +1748,11 @@ public:
mx_crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& mon)
read_monitor& mon,
sstable::integrity_check integrity)
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), std::move(trace_state), streamed_mutation::forwarding::no, _sst)
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, sstable::integrity_check::no))
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, integrity))
, _monitor(mon) {
_monitor.on_read_started(_context->reader_position());
}
@@ -1792,9 +1793,10 @@ mutation_reader make_crawling_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor) {
read_monitor& monitor,
sstable::integrity_check integrity) {
return make_mutation_reader<mx_crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
std::move(trace_state), monitor);
std::move(trace_state), monitor, integrity);
}
void mp_row_consumer_reader_mx::on_next_partition(dht::decorated_key key, tombstone tomb) {

View File

@@ -11,6 +11,7 @@
#include "readers/mutation_reader_fwd.hh"
#include "readers/mutation_reader.hh"
#include "sstables/progress_monitor.hh"
#include "sstables/sstables.hh"
namespace sstables {
namespace mx {
@@ -48,7 +49,8 @@ mutation_reader make_crawling_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor);
read_monitor& monitor,
sstable::integrity_check integrity);
// Validate the content of the sstable with the mutation_fragment_stream_valdiator,
// additionally cross checking that the content is laid out as expected by the

View File

@@ -2309,11 +2309,12 @@ sstable::make_crawling_reader(
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state,
read_monitor& monitor) {
read_monitor& monitor,
integrity_check integrity) {
if (_version >= version_types::mc) {
return mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor);
return mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
}
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor);
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
}
static std::tuple<entry_descriptor, sstring, sstring> make_entry_descriptor(const std::filesystem::path& sst_path, sstring* const provided_ks, sstring* const provided_cf) {

View File

@@ -288,7 +288,8 @@ public:
schema_ptr schema,
reader_permit permit,
tracing::trace_state_ptr trace_state = {},
read_monitor& monitor = default_read_monitor());
read_monitor& monitor = default_read_monitor(),
integrity_check integrity = integrity_check::no);
// Returns mutation_source containing all writes contained in this sstable.
// The mutation_source shares ownership of this sstable.