If the read is aborted via the permit (due to timeout) don't throw the abort exception, instead propagate it via the future chain. Also, use try_catch<> instead of try ... catch to decorate malformed_sstable_exception with the file name.
2381 lines
113 KiB
C++
2381 lines
113 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "reader.hh"
|
|
#include "types/concrete_types.hh"
|
|
#include "mutation/mutation_fragment_stream_validator.hh"
|
|
#include "sstables/liveness_info.hh"
|
|
#include "sstables/mutation_fragment_filter.hh"
|
|
#include "sstables/m_format_read_helpers.hh"
|
|
#include "sstables/sstable_mutation_reader.hh"
|
|
#include "sstables/processing_result_generator.hh"
|
|
#include "utils/to_string.hh"
|
|
#include "utils/value_or_reference.hh"
|
|
|
|
namespace sstables {
|
|
namespace mx {
|
|
|
|
class mp_row_consumer_reader_mx : public mp_row_consumer_reader_base, public mutation_reader::impl {
|
|
friend class sstables::mx::mp_row_consumer_m;
|
|
public:
|
|
mp_row_consumer_reader_mx(schema_ptr s, reader_permit permit, shared_sstable sst)
|
|
: mp_row_consumer_reader_base(std::move(sst))
|
|
, impl(std::move(s), std::move(permit))
|
|
{
|
|
_permit.on_start_sstable_read();
|
|
}
|
|
virtual ~mp_row_consumer_reader_mx() {
|
|
_permit.on_finish_sstable_read();
|
|
}
|
|
|
|
virtual data_consumer::proceed on_next_partition(dht::decorated_key, tombstone);
|
|
};
|
|
|
|
enum class row_processing_result {
|
|
// Causes the parser to return the control to the caller without advancing.
|
|
// Next time when the parser is called, the same consumer method will be called.
|
|
retry_later,
|
|
|
|
// Causes the parser to proceed to the next element.
|
|
do_proceed,
|
|
|
|
// Causes the parser to skip the whole row. consume_row_end() will not be called for the current row.
|
|
skip_row
|
|
};
|
|
|
|
class mp_row_consumer_m {
|
|
reader_permit _permit;
|
|
const shared_sstable& _sst;
|
|
tracing::trace_state_ptr _trace_state;
|
|
public:
|
|
|
|
mp_row_consumer_reader_mx* _reader;
|
|
schema_ptr _schema;
|
|
const query::partition_slice& _slice;
|
|
std::optional<mutation_fragment_filter> _mf_filter;
|
|
|
|
bool _is_mutation_end = true;
|
|
streamed_mutation::forwarding _fwd;
|
|
// For static-compact tables C* stores the only row in the static row but in our representation they're regular rows.
|
|
const bool _treat_static_row_as_regular;
|
|
|
|
std::optional<clustering_row> _in_progress_row;
|
|
std::optional<range_tombstone_change> _stored_tombstone;
|
|
static_row _in_progress_static_row;
|
|
bool _inside_static_row = false;
|
|
|
|
struct cell {
|
|
column_id id;
|
|
atomic_cell_or_collection val;
|
|
};
|
|
std::vector<cell> _cells;
|
|
collection_mutation_description _cm;
|
|
|
|
data_consumer::proceed consume_range_tombstone_start(clustering_key_prefix ck, bound_kind k, tombstone t) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_start(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
|
|
if (_mf_filter->current_tombstone()) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Range tombstones have to be disjoint: current opened range tombstone {}, new tombstone {}",
|
|
_mf_filter->current_tombstone(), t));
|
|
}
|
|
auto pos = position_in_partition(position_in_partition::range_tag_t(), k, std::move(ck));
|
|
return on_range_tombstone_change(std::move(pos), t);
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone_end(clustering_key_prefix ck, bound_kind k, tombstone t) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_end(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
|
|
if (!_mf_filter->current_tombstone()) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Closing range tombstone that wasn't opened: clustering {}, kind {}, tombstone {}",
|
|
ck, k, t));
|
|
}
|
|
if (_mf_filter->current_tombstone() != t) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Range tombstone with ck {} and two different tombstones at ends: {}, {}",
|
|
ck, _mf_filter->current_tombstone(), t));
|
|
}
|
|
auto pos = position_in_partition(position_in_partition::range_tag_t(), k, std::move(ck));
|
|
return on_range_tombstone_change(std::move(pos), {});
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone_boundary(position_in_partition pos, tombstone left, tombstone right) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_boundary(pos={}, left={}, right={})", fmt::ptr(this), pos, left, right);
|
|
if (!_mf_filter->current_tombstone()) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Closing range tombstone that wasn't opened: pos {}, tombstone {}", pos, left));
|
|
}
|
|
if (_mf_filter->current_tombstone() != left) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Range tombstone at {} and two different tombstones at ends: {}, {}",
|
|
pos, _mf_filter->current_tombstone(), left));
|
|
}
|
|
return on_range_tombstone_change(std::move(pos), right);
|
|
}
|
|
|
|
const column_definition& get_column_definition(std::optional<column_id> column_id) const {
|
|
auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column;
|
|
return _schema->column_at(column_type, *column_id);
|
|
}
|
|
|
|
inline data_consumer::proceed on_range_tombstone_change(position_in_partition pos, tombstone t) {
|
|
sstlog.trace("mp_row_consumer_m {}: on_range_tombstone_change({}, {}->{})", fmt::ptr(this), pos,
|
|
_mf_filter->current_tombstone(), t);
|
|
|
|
mutation_fragment_filter::clustering_result result = _mf_filter->apply(pos, t);
|
|
|
|
for (auto&& rt : result.rts) {
|
|
sstlog.trace("mp_row_consumer_m {}: push({})", fmt::ptr(this), rt);
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
|
|
}
|
|
|
|
switch (result.action) {
|
|
case mutation_fragment_filter::result::emit:
|
|
sstlog.trace("mp_row_consumer_m {}: emit", fmt::ptr(this));
|
|
break;
|
|
case mutation_fragment_filter::result::ignore:
|
|
sstlog.trace("mp_row_consumer_m {}: ignore", fmt::ptr(this));
|
|
if (_mf_filter->out_of_range()) {
|
|
_reader->on_out_of_clustering_range();
|
|
return data_consumer::proceed::no;
|
|
}
|
|
if (_mf_filter->is_current_range_changed()) {
|
|
return data_consumer::proceed::no;
|
|
}
|
|
break;
|
|
case mutation_fragment_filter::result::store_and_finish:
|
|
sstlog.trace("mp_row_consumer_m {}: store", fmt::ptr(this));
|
|
_stored_tombstone = range_tombstone_change(pos, t);
|
|
_reader->on_out_of_clustering_range();
|
|
return data_consumer::proceed::no;
|
|
}
|
|
|
|
return data_consumer::proceed(!_reader->is_buffer_full() && !need_preempt());
|
|
}
|
|
|
|
inline void reset_for_new_partition() {
|
|
_is_mutation_end = true;
|
|
_in_progress_row.reset();
|
|
_stored_tombstone.reset();
|
|
_mf_filter.reset();
|
|
}
|
|
|
|
void check_schema_mismatch(const column_translation::column_info& column_info, const column_definition& column_def) const {
|
|
if (column_info.schema_mismatch) {
|
|
throw malformed_sstable_exception(
|
|
format("{} definition in serialization header does not match schema. Expected {} but got {}",
|
|
column_def.name_as_text(),
|
|
column_def.type->name(),
|
|
column_info.type->name()));
|
|
}
|
|
}
|
|
|
|
void check_column_missing_in_current_schema(const column_translation::column_info& column_info,
|
|
api::timestamp_type timestamp) const {
|
|
if (!column_info.id) {
|
|
sstring name = sstring(to_string_view(*column_info.name));
|
|
auto it = _schema->dropped_columns().find(name);
|
|
if (it == _schema->dropped_columns().end() || timestamp > it->second.timestamp) {
|
|
throw malformed_sstable_exception(format("Column {} missing in current schema", name));
|
|
}
|
|
}
|
|
}
|
|
|
|
public:
|
|
mp_row_consumer_m(mp_row_consumer_reader_mx* reader,
|
|
const schema_ptr schema,
|
|
reader_permit permit,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
const shared_sstable& sst)
|
|
: _permit(std::move(permit))
|
|
, _sst(sst)
|
|
, _trace_state(std::move(trace_state))
|
|
, _reader(reader)
|
|
, _schema(schema)
|
|
, _slice(slice)
|
|
, _fwd(fwd)
|
|
, _treat_static_row_as_regular(_schema->is_static_compact_table()
|
|
&& (!sst->has_scylla_component() || sst->features().is_enabled(sstable_feature::CorrectStaticCompact))) // See #4139
|
|
{
|
|
_cells.reserve(std::max(_schema->static_columns_count(), _schema->regular_columns_count()));
|
|
}
|
|
|
|
mp_row_consumer_m(mp_row_consumer_reader_mx* reader,
|
|
const schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
const shared_sstable& sst)
|
|
: mp_row_consumer_m(reader, schema, std::move(permit), schema->full_slice(), std::move(trace_state), fwd, sst)
|
|
{ }
|
|
|
|
~mp_row_consumer_m() {}
|
|
|
|
// See the RowConsumer concept
|
|
void push_ready_fragments() {
|
|
if (auto rto = std::move(_stored_tombstone)) {
|
|
_stored_tombstone = std::nullopt;
|
|
on_range_tombstone_change(rto->position(), rto->tombstone());
|
|
}
|
|
}
|
|
|
|
std::optional<position_in_partition_view> maybe_skip() {
|
|
if (!_mf_filter) {
|
|
return {};
|
|
}
|
|
return _mf_filter->maybe_skip();
|
|
}
|
|
|
|
bool is_mutation_end() const {
|
|
return _is_mutation_end;
|
|
}
|
|
|
|
void setup_for_partition(const partition_key& pk) {
|
|
sstlog.trace("mp_row_consumer_m {}: setup_for_partition({})", fmt::ptr(this), pk);
|
|
_is_mutation_end = false;
|
|
_mf_filter.emplace(*_schema, query::clustering_key_filter_ranges(_slice.row_ranges(*_schema, pk)), _fwd);
|
|
}
|
|
|
|
std::optional<position_in_partition_view> fast_forward_to(position_range r) {
|
|
if (!_mf_filter) {
|
|
_reader->on_out_of_clustering_range();
|
|
return {};
|
|
}
|
|
// r is used to trim range tombstones and range_tombstone:s can be trimmed only to positions
|
|
// which are !is_clustering_row(). Replace with equivalent ranges.
|
|
// Long-term we should guarantee this on position_range.
|
|
if (r.start().is_clustering_row()) {
|
|
r.set_start(position_in_partition::before_key(r.start().key()));
|
|
}
|
|
if (r.end().is_clustering_row()) {
|
|
r.set_end(position_in_partition::before_key(r.end().key()));
|
|
}
|
|
auto skip = _mf_filter->fast_forward_to(std::move(r));
|
|
if (skip) {
|
|
position_in_partition::less_compare less(*_schema);
|
|
// No need to skip using index if stored fragments are after the start of the range
|
|
if (_in_progress_row && !less(_in_progress_row->position(), *skip)) {
|
|
return {};
|
|
}
|
|
if (_stored_tombstone && !less(_stored_tombstone->position(), *skip)) {
|
|
return {};
|
|
}
|
|
}
|
|
if (_mf_filter->out_of_range()) {
|
|
_reader->on_out_of_clustering_range();
|
|
}
|
|
return skip;
|
|
}
|
|
|
|
/*
|
|
* Sets the range tombstone start. Overwrites the currently set RT start if any.
|
|
* Used for skipping through wide partitions using index when the data block
|
|
* skipped to starts in the middle of an opened range tombstone.
|
|
*/
|
|
void set_range_tombstone(tombstone t) {
|
|
sstlog.trace("mp_row_consumer_m {}: set_range_tombstone({})", fmt::ptr(this), t);
|
|
_mf_filter->set_tombstone(t);
|
|
}
|
|
|
|
// Consume the row's key and deletion_time. The latter determines if the
|
|
// row is a tombstone, and if so, when it has been deleted.
|
|
// Note that the key is in serialized form, and should be deserialized
|
|
// (according to the schema) before use.
|
|
// As explained above, the key object is only valid during this call, and
|
|
// if the implementation wishes to save it, it must copy the *contents*.
|
|
data_consumer::proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_partition_start(deltime=({}, {})), _is_mutation_end={}", fmt::ptr(this),
|
|
deltime.local_deletion_time, deltime.marked_for_delete_at, _is_mutation_end);
|
|
if (!_is_mutation_end) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
auto pk = key.to_partition_key(*_schema);
|
|
setup_for_partition(pk);
|
|
auto dk = dht::decorate_key(*_schema, pk);
|
|
|
|
auto should_proceed = _reader->on_next_partition(std::move(dk), tombstone(deltime));
|
|
if (should_proceed == data_consumer::proceed::no) {
|
|
return data_consumer::proceed::no;
|
|
}
|
|
return data_consumer::proceed(!_reader->is_buffer_full() && !need_preempt());
|
|
}
|
|
|
|
row_processing_result consume_row_start(const std::vector<fragmented_temporary_buffer>& ecp) {
|
|
auto key = clustering_key_prefix::from_range(ecp | std::views::transform(
|
|
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
|
|
|
_sst->get_stats().on_row_read();
|
|
sstlog.trace("mp_row_consumer_m {}: consume_row_start({})", fmt::ptr(this), key);
|
|
|
|
_in_progress_row.emplace(std::move(key));
|
|
|
|
mutation_fragment_filter::clustering_result res = _mf_filter->apply(_in_progress_row->position());
|
|
|
|
for (auto&& rt : res.rts) {
|
|
sstlog.trace("mp_row_consumer_m {}: push({})", fmt::ptr(this), rt);
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
|
|
}
|
|
|
|
switch (res.action) {
|
|
case mutation_fragment_filter::result::emit:
|
|
sstlog.trace("mp_row_consumer_m {}: emit", fmt::ptr(this));
|
|
return row_processing_result::do_proceed;
|
|
case mutation_fragment_filter::result::ignore:
|
|
sstlog.trace("mp_row_consumer_m {}: ignore", fmt::ptr(this));
|
|
if (_mf_filter->out_of_range()) {
|
|
_reader->on_out_of_clustering_range();
|
|
// We actually want skip_later, which doesn't exist, but retry_later
|
|
// is ok because signalling out-of-range on the reader will cause it
|
|
// to either stop reading or skip to the next partition using index,
|
|
// not by ignoring fragments.
|
|
return row_processing_result::retry_later;
|
|
}
|
|
if (_mf_filter->is_current_range_changed()) {
|
|
return row_processing_result::retry_later;
|
|
} else {
|
|
_in_progress_row.reset();
|
|
return row_processing_result::skip_row;
|
|
}
|
|
case mutation_fragment_filter::result::store_and_finish:
|
|
sstlog.trace("mp_row_consumer_m {}: store_and_finish", fmt::ptr(this));
|
|
_reader->on_out_of_clustering_range();
|
|
return row_processing_result::retry_later;
|
|
}
|
|
abort();
|
|
}
|
|
|
|
data_consumer::proceed consume_row_marker_and_tombstone(
|
|
const liveness_info& info, tombstone tomb, tombstone shadowable_tomb) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_row_marker_and_tombstone({}, {}, {}), key={}",
|
|
fmt::ptr(this), info.to_row_marker(), tomb, shadowable_tomb, _in_progress_row->position());
|
|
_in_progress_row->apply(info.to_row_marker());
|
|
_in_progress_row->apply(tomb);
|
|
if (shadowable_tomb) {
|
|
_in_progress_row->apply(shadowable_tombstone{shadowable_tomb});
|
|
}
|
|
if (_in_progress_row->tomb()) {
|
|
_sst->get_stats().on_row_tombstone_read();
|
|
}
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
row_processing_result consume_static_row_start() {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_static_row_start()", fmt::ptr(this));
|
|
if (_treat_static_row_as_regular) {
|
|
return consume_row_start({});
|
|
}
|
|
_inside_static_row = true;
|
|
_in_progress_static_row = static_row();
|
|
return row_processing_result::do_proceed;
|
|
}
|
|
|
|
data_consumer::proceed consume_column(const column_translation::column_info& column_info,
|
|
bytes_view cell_path,
|
|
fragmented_temporary_buffer::view value,
|
|
api::timestamp_type timestamp,
|
|
gc_clock::duration ttl,
|
|
gc_clock::time_point local_deletion_time,
|
|
bool is_deleted) {
|
|
const std::optional<column_id>& column_id = column_info.id;
|
|
sstlog.trace("mp_row_consumer_m {}: consume_column(id={}, path={}, value={}, ts={}, ttl={}, del_time={}, deleted={})", fmt::ptr(this),
|
|
column_id, fmt_hex(cell_path), value, timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
|
|
check_column_missing_in_current_schema(column_info, timestamp);
|
|
if (!column_id) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
const column_definition& column_def = get_column_definition(column_id);
|
|
if (timestamp <= column_def.dropped_at()) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
check_schema_mismatch(column_info, column_def);
|
|
if (column_def.is_multi_cell()) {
|
|
auto& value_type = visit(*column_def.type, make_visitor(
|
|
[] (const collection_type_impl& ctype) -> const abstract_type& { return *ctype.value_comparator(); },
|
|
[&] (const user_type_impl& utype) -> const abstract_type& {
|
|
if (cell_path.size() != sizeof(int16_t)) {
|
|
throw malformed_sstable_exception(format("wrong size of field index while reading UDT column: expected {}, got {}",
|
|
sizeof(int16_t), cell_path.size()));
|
|
}
|
|
|
|
auto field_idx = deserialize_field_index(cell_path);
|
|
if (field_idx >= utype.size()) {
|
|
throw malformed_sstable_exception(format("field index too big while reading UDT column: type has {} fields, got {}",
|
|
utype.size(), field_idx));
|
|
}
|
|
|
|
return *utype.type(field_idx);
|
|
},
|
|
[] (const abstract_type& o) -> const abstract_type& {
|
|
throw malformed_sstable_exception(format("attempted to read multi-cell column, but expected type was {}", o.name()));
|
|
}
|
|
));
|
|
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
|
|
: make_atomic_cell(value_type,
|
|
timestamp,
|
|
value,
|
|
ttl,
|
|
local_deletion_time,
|
|
atomic_cell::collection_member::yes);
|
|
_cm.cells.emplace_back(to_bytes(cell_path), std::move(ac));
|
|
} else {
|
|
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
|
|
: make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time,
|
|
atomic_cell::collection_member::no);
|
|
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
|
|
}
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_complex_column_start(const sstables::column_translation::column_info& column_info,
|
|
tombstone tomb) {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_start({}, {})", fmt::ptr(this), column_info.id, tomb);
|
|
_cm.tomb = tomb;
|
|
_cm.cells.clear();
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_complex_column_end(const sstables::column_translation::column_info& column_info) {
|
|
const std::optional<column_id>& column_id = column_info.id;
|
|
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_end({})", fmt::ptr(this), column_id);
|
|
if (_cm.tomb) {
|
|
check_column_missing_in_current_schema(column_info, _cm.tomb.timestamp);
|
|
}
|
|
if (column_id) {
|
|
const column_definition& column_def = get_column_definition(column_id);
|
|
if (!_cm.cells.empty() || (_cm.tomb && _cm.tomb.timestamp > column_def.dropped_at())) {
|
|
check_schema_mismatch(column_info, column_def);
|
|
_cells.push_back({column_def.id, _cm.serialize(*column_def.type)});
|
|
}
|
|
}
|
|
_cm.tomb = {};
|
|
_cm.cells.clear();
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_counter_column(const column_translation::column_info& column_info,
|
|
fragmented_temporary_buffer::view value,
|
|
api::timestamp_type timestamp) {
|
|
const std::optional<column_id>& column_id = column_info.id;
|
|
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", fmt::ptr(this), column_id, value, timestamp);
|
|
check_column_missing_in_current_schema(column_info, timestamp);
|
|
if (!column_id) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
const column_definition& column_def = get_column_definition(column_id);
|
|
if (timestamp <= column_def.dropped_at()) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
check_schema_mismatch(column_info, column_def);
|
|
auto ac = make_counter_cell(timestamp, value);
|
|
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
|
|
bound_kind kind,
|
|
tombstone tomb) {
|
|
auto ck = clustering_key_prefix::from_range(ecp | std::views::transform(
|
|
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
|
if (kind == bound_kind::incl_start || kind == bound_kind::excl_start) {
|
|
return consume_range_tombstone_start(std::move(ck), kind, std::move(tomb));
|
|
} else { // *_end kind
|
|
return consume_range_tombstone_end(std::move(ck), kind, std::move(tomb));
|
|
}
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
|
|
sstables::bound_kind_m kind,
|
|
tombstone end_tombstone,
|
|
tombstone start_tombstone) {
|
|
auto ck = clustering_key_prefix::from_range(ecp | std::views::transform(
|
|
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
|
switch (kind) {
|
|
case bound_kind_m::incl_end_excl_start: {
|
|
auto pos = position_in_partition(position_in_partition::range_tag_t(), bound_kind::incl_end, std::move(ck));
|
|
return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone);
|
|
}
|
|
case bound_kind_m::excl_end_incl_start: {
|
|
auto pos = position_in_partition(position_in_partition::range_tag_t(), bound_kind::excl_end, std::move(ck));
|
|
return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone);
|
|
}
|
|
default:
|
|
on_parse_error(format("Invalid boundary type", static_cast<std::underlying_type<sstables::bound_kind_m>::type>(kind)), _sst->get_filename());
|
|
}
|
|
}
|
|
|
|
data_consumer::proceed consume_row_end() {
|
|
auto fill_cells = [this] (column_kind kind, row& cells) {
|
|
for (auto &&c : _cells) {
|
|
cells.apply(_schema->column_at(kind, c.id), std::move(c.val));
|
|
}
|
|
_cells.clear();
|
|
};
|
|
|
|
if (_inside_static_row) {
|
|
fill_cells(column_kind::static_column, _in_progress_static_row.cells());
|
|
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", fmt::ptr(this), static_row::printer(*_schema, _in_progress_static_row));
|
|
_inside_static_row = false;
|
|
if (!_in_progress_static_row.empty()) {
|
|
auto action = _mf_filter->apply(_in_progress_static_row);
|
|
switch (action) {
|
|
case mutation_fragment_filter::result::emit:
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(_in_progress_static_row)));
|
|
break;
|
|
case mutation_fragment_filter::result::ignore:
|
|
break;
|
|
case mutation_fragment_filter::result::store_and_finish:
|
|
// static row is always either emitted or ignored.
|
|
throw runtime_exception("We should never need to store static row");
|
|
}
|
|
}
|
|
} else {
|
|
if (!_cells.empty()) {
|
|
fill_cells(column_kind::regular_column, _in_progress_row->cells());
|
|
}
|
|
if (_slice.is_reversed() &&
|
|
// we always consume whole rows (i.e. `consume_row_end` is always called) when reading in reverse,
|
|
// even when `consume_row_start` requested to ignore the row. This happens because for reversed reads
|
|
// skipping is performed in the intermediary reversing data source (not in the reader) and the source
|
|
// always returns whole rows.
|
|
// Hence we must again check what the filtering result for this row was, even though we already
|
|
// checked it in `consume_row_start`; otherwise we would incorrectly emit rows that were filtered out.
|
|
_mf_filter->apply(_in_progress_row->position()).action != mutation_fragment_filter::result::emit) {
|
|
return data_consumer::proceed(!_reader->is_buffer_full() && !need_preempt());
|
|
}
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(
|
|
*_schema, permit(), *std::exchange(_in_progress_row, {})));
|
|
}
|
|
|
|
return data_consumer::proceed(!_reader->is_buffer_full() && !need_preempt());
|
|
}
|
|
|
|
void on_end_of_stream() {
|
|
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream()", fmt::ptr(this));
|
|
if (_mf_filter && _mf_filter->current_tombstone()) {
|
|
if (_mf_filter->out_of_range()) {
|
|
throw sstables::malformed_sstable_exception("Unclosed range tombstone.");
|
|
}
|
|
auto result = _mf_filter->apply(position_in_partition_view::after_all_clustered_rows(), {});
|
|
for (auto&& rt : result.rts) {
|
|
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream(), emitting last tombstone: {}", fmt::ptr(this), rt);
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
|
|
}
|
|
}
|
|
if (!_reader->_partition_finished) {
|
|
consume_partition_end();
|
|
}
|
|
_reader->_end_of_stream = true;
|
|
}
|
|
|
|
// Called at the end of the row, after all cells.
|
|
// Returns a flag saying whether the sstable consumer should stop now, or
|
|
// proceed consuming more data.
|
|
data_consumer::proceed consume_partition_end() {
|
|
sstlog.trace("mp_row_consumer_m {}: consume_partition_end()", fmt::ptr(this));
|
|
reset_for_new_partition();
|
|
|
|
if (_fwd == streamed_mutation::forwarding::yes) {
|
|
_reader->_end_of_stream = true;
|
|
return data_consumer::proceed::no;
|
|
}
|
|
|
|
_reader->_index_in_current_partition = false;
|
|
_reader->_partition_finished = true;
|
|
_reader->_before_partition = true;
|
|
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), partition_end()));
|
|
return data_consumer::proceed(!_reader->is_buffer_full() && !need_preempt());
|
|
}
|
|
|
|
// Called when the reader is fast forwarded to given element.
|
|
void reset(sstables::indexable_element el) {
|
|
sstlog.trace("mp_row_consumer_m {}: reset({})", fmt::ptr(this), static_cast<int>(el));
|
|
if (el == indexable_element::partition) {
|
|
reset_for_new_partition();
|
|
} else {
|
|
_in_progress_row.reset();
|
|
_stored_tombstone.reset();
|
|
_is_mutation_end = false;
|
|
}
|
|
}
|
|
|
|
// Call after a reverse index skip is performed during reversed reads.
|
|
void reset_after_reversed_read_skip() {
|
|
// We must not reset `_in_progress_row` since rows are always consumed fully
|
|
// during reversed reads. We also don't need to reset any state that may change
|
|
// when moving between partitions as reversed skips are only performed within
|
|
// a partition.
|
|
// We must only reset the stored tombstone. A range tombstone may be stored in forwarding
|
|
// mode, when the parser gets ahead of the currently forwarded-to range and provides
|
|
// us (the consumer) a tombstone positioned after the range; we store it so we can
|
|
// process it again when (if) the read gets forwarded to a range containing this
|
|
// tombstone. But a successful index skip means that the source jumped to a later
|
|
// position, so to a position past the stored tombstone's (if there is one) position.
|
|
// The stored tombstone may no longer be relevant for the position we're at. The correct
|
|
// active tombstone, if any, is obtained from the index and will be set using
|
|
// `set_range_tombstone`.
|
|
_stored_tombstone.reset();
|
|
}
|
|
|
|
position_in_partition_view position() {
|
|
if (_inside_static_row) {
|
|
return position_in_partition_view(position_in_partition_view::static_row_tag_t{});
|
|
}
|
|
if (_stored_tombstone) {
|
|
return _stored_tombstone->position();
|
|
}
|
|
if (_in_progress_row) {
|
|
return _in_progress_row->position();
|
|
}
|
|
if (_is_mutation_end) {
|
|
return position_in_partition_view::for_partition_end();
|
|
}
|
|
return position_in_partition_view::for_partition_start();
|
|
}
|
|
|
|
// The permit for this read
|
|
reader_permit& permit() {
|
|
return _permit;
|
|
}
|
|
|
|
tracing::trace_state_ptr trace_state() const {
|
|
return _trace_state;
|
|
}
|
|
};
|
|
|
|
// data_consume_rows_context_m remembers the context that an ongoing
|
|
// data_consume_rows() future is in for SSTable in 3_x format.
|
|
template <typename Consumer>
|
|
requires requires(
|
|
Consumer& c,
|
|
sstables::key_view pk_view,
|
|
sstables::deletion_time deltime,
|
|
const std::vector<fragmented_temporary_buffer>& ck_view,
|
|
const liveness_info& l_info,
|
|
tombstone tomb,
|
|
const column_translation::column_info& column_info,
|
|
bytes_view cell_path,
|
|
fragmented_temporary_buffer::view value,
|
|
api::timestamp_type timestamp,
|
|
gc_clock::duration ttl,
|
|
gc_clock::time_point local_deletion_time,
|
|
bool is_deleted,
|
|
bound_kind kind,
|
|
sstables::bound_kind_m kind_m) {
|
|
{ c.permit() } -> std::convertible_to<reader_permit>;
|
|
{ c.trace_state() } -> std::same_as<tracing::trace_state_ptr>;
|
|
{ c.consume_partition_start(pk_view, deltime) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_static_row_start() } -> std::same_as<row_processing_result>;
|
|
{ c.consume_row_start(ck_view) } -> std::same_as<row_processing_result>;
|
|
{ c.consume_row_marker_and_tombstone(l_info, tomb, tomb) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_column(column_info, cell_path, value, timestamp, ttl, local_deletion_time, is_deleted) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_complex_column_start(column_info, tomb) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_complex_column_end(column_info) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_counter_column(column_info, value, timestamp) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_range_tombstone(ck_view, kind, tomb) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_range_tombstone(ck_view, kind_m, tomb, tomb) } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_row_end() } -> std::same_as<data_consumer::proceed>;
|
|
{ c.consume_partition_end() } -> std::same_as<data_consumer::proceed>;
|
|
c.on_end_of_stream();
|
|
}
|
|
class data_consume_rows_context_m : public data_consumer::continuous_data_consumer<data_consume_rows_context_m<Consumer>> {
|
|
using parent = data_consumer::continuous_data_consumer<data_consume_rows_context_m<Consumer>>;
|
|
using read_status = typename parent::read_status;
|
|
private:
|
|
enum class state {
|
|
PARTITION_START,
|
|
DELETION_TIME,
|
|
FLAGS,
|
|
OTHER,
|
|
} _state = state::PARTITION_START;
|
|
|
|
// becomes false when we yield in the main coroutine, although we don't need to consume
|
|
// more data buffers to continue, switch back to true afterwards
|
|
bool _consuming = true;
|
|
Consumer& _consumer;
|
|
shared_sstable _sst;
|
|
const serialization_header& _header;
|
|
column_translation _column_translation;
|
|
const bool _has_shadowable_tombstones;
|
|
|
|
temporary_buffer<char> _pk;
|
|
|
|
unfiltered_flags_m _flags{0};
|
|
unfiltered_extended_flags_m _extended_flags{0};
|
|
uint64_t _next_row_offset;
|
|
liveness_info _liveness;
|
|
bool _is_first_unfiltered = true;
|
|
|
|
std::vector<fragmented_temporary_buffer> _row_key;
|
|
|
|
struct row_schema {
|
|
using column_range = std::ranges::subrange<std::vector<column_translation::column_info>::const_iterator>;
|
|
|
|
// All columns for this kind of row inside column_translation of the current sstable
|
|
column_range _all_columns;
|
|
|
|
// Subrange of _all_columns which is yet to be processed for current row
|
|
column_range _columns;
|
|
|
|
// Represents the subset of _all_columns present in current row
|
|
boost::dynamic_bitset<uint64_t> _columns_selector; // size() == _columns.size()
|
|
};
|
|
|
|
row_schema _regular_row;
|
|
row_schema _static_row;
|
|
row_schema* _row;
|
|
|
|
uint64_t _missing_columns_to_read;
|
|
|
|
std::ranges::subrange<std::vector<std::optional<uint32_t>>::const_iterator> _ck_column_value_fix_lengths;
|
|
|
|
tombstone _row_tombstone;
|
|
tombstone _row_shadowable_tombstone;
|
|
|
|
column_flags_m _column_flags{0};
|
|
api::timestamp_type _column_timestamp;
|
|
gc_clock::time_point _column_local_deletion_time;
|
|
gc_clock::duration _column_ttl;
|
|
fragmented_temporary_buffer _column_value;
|
|
temporary_buffer<char> _cell_path;
|
|
uint64_t _ck_blocks_header;
|
|
uint32_t _ck_blocks_header_offset;
|
|
bool _null_component_occured;
|
|
uint64_t _subcolumns_to_read = 0;
|
|
api::timestamp_type _complex_column_marked_for_delete;
|
|
tombstone _complex_column_tombstone;
|
|
bool _reading_range_tombstone_ck = false;
|
|
bound_kind_m _range_tombstone_kind;
|
|
uint16_t _ck_size;
|
|
/*
|
|
* We need two range tombstones because range tombstone marker can be either a single bound
|
|
* or a double bound that represents end of one range tombstone and start of another at the same time.
|
|
* If range tombstone marker is a single bound then only _left_range_tombstone is used.
|
|
* Otherwise, _left_range_tombstone represents tombstone for a range tombstone that's being closed
|
|
* and _right_range_tombstone represents a tombstone for a range tombstone that's being opened.
|
|
*/
|
|
tombstone _left_range_tombstone;
|
|
tombstone _right_range_tombstone;
|
|
|
|
processing_result_generator _gen;
|
|
temporary_buffer<char>* _processing_data;
|
|
void start_row(row_schema& rs) {
|
|
_row = &rs;
|
|
_row->_columns = _row->_all_columns;
|
|
}
|
|
void setup_columns(row_schema& rs, const std::vector<column_translation::column_info>& columns) {
|
|
rs._all_columns = std::ranges::subrange(columns);
|
|
rs._columns_selector = boost::dynamic_bitset<uint64_t>(columns.size());
|
|
}
|
|
void skip_absent_columns() {
|
|
size_t pos = _row->_columns_selector.find_first();
|
|
if (pos == boost::dynamic_bitset<uint64_t>::npos) {
|
|
pos = _row->_columns.size();
|
|
}
|
|
_row->_columns.advance(pos);
|
|
}
|
|
bool no_more_columns() const { return _row->_columns.empty(); }
|
|
void move_to_next_column() {
|
|
size_t current_pos = _row->_columns_selector.size() - _row->_columns.size();
|
|
size_t next_pos = _row->_columns_selector.find_next(current_pos);
|
|
size_t jump_to_next = (next_pos == boost::dynamic_bitset<uint64_t>::npos) ? _row->_columns.size()
|
|
: next_pos - current_pos;
|
|
_row->_columns.advance(jump_to_next);
|
|
}
|
|
bool is_column_simple() const { return !_row->_columns.front().is_collection; }
|
|
bool is_column_counter() const { return _row->_columns.front().is_counter; }
|
|
const column_translation::column_info& get_column_info() const {
|
|
return _row->_columns.front();
|
|
}
|
|
std::optional<uint32_t> get_column_value_length() const {
|
|
return _row->_columns.front().value_length;
|
|
}
|
|
void setup_ck(const std::vector<std::optional<uint32_t>>& column_value_fix_lengths) {
|
|
_row_key.clear();
|
|
_row_key.reserve(column_value_fix_lengths.size());
|
|
if (column_value_fix_lengths.empty()) {
|
|
_ck_column_value_fix_lengths = std::ranges::subrange(column_value_fix_lengths);
|
|
} else {
|
|
_ck_column_value_fix_lengths = std::ranges::subrange(std::begin(column_value_fix_lengths),
|
|
std::begin(column_value_fix_lengths) + _ck_size);
|
|
}
|
|
_ck_blocks_header_offset = 0u;
|
|
}
|
|
bool no_more_ck_blocks() const { return _ck_column_value_fix_lengths.empty(); }
|
|
void move_to_next_ck_block() {
|
|
_ck_column_value_fix_lengths.advance(1);
|
|
++_ck_blocks_header_offset;
|
|
if (_ck_blocks_header_offset == 32u) {
|
|
_ck_blocks_header_offset = 0u;
|
|
}
|
|
}
|
|
std::optional<uint32_t> get_ck_block_value_length() const {
|
|
return _ck_column_value_fix_lengths.front();
|
|
}
|
|
bool is_block_empty() const {
|
|
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset))) != 0;
|
|
}
|
|
bool is_block_null() const {
|
|
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset + 1))) != 0;
|
|
}
|
|
bool should_read_block_header() const {
|
|
return _ck_blocks_header_offset == 0u;
|
|
}
|
|
public:
|
|
using consumer = Consumer;
|
|
// assumes !primitive_consumer::active()
|
|
bool non_consuming() const {
|
|
return !_consuming;
|
|
}
|
|
|
|
data_consumer::processing_result process_state(temporary_buffer<char>& data) {
|
|
_processing_data = &data;
|
|
return _gen.generate();
|
|
}
|
|
private:
|
|
processing_result_generator do_process_state() {
|
|
if (_state != state::PARTITION_START) {
|
|
goto flags_label;
|
|
}
|
|
partition_start_label: {
|
|
_is_first_unfiltered = true;
|
|
_state = state::DELETION_TIME;
|
|
co_yield this->read_short_length_bytes(*_processing_data, _pk);
|
|
_state = state::OTHER;
|
|
co_yield this->read_32(*_processing_data);
|
|
co_yield this->read_64(*_processing_data);
|
|
deletion_time del;
|
|
del.local_deletion_time = this->_u32;
|
|
del.marked_for_delete_at = this->_u64;
|
|
auto ret = _consumer.consume_partition_start(key_view(to_bytes_view(_pk)), del);
|
|
// after calling the consume function, we can release the
|
|
// buffers we held for it.
|
|
_pk.release();
|
|
_state = state::FLAGS;
|
|
if (ret == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
}
|
|
flags_label:
|
|
_liveness = {};
|
|
_row_tombstone = {};
|
|
_row_shadowable_tombstone = {};
|
|
co_yield this->read_8(*_processing_data);
|
|
_flags = unfiltered_flags_m(this->_u8);
|
|
_state = state::OTHER;
|
|
if (_flags.is_end_of_partition()) {
|
|
_state = state::PARTITION_START;
|
|
if (_consumer.consume_partition_end() == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
goto partition_start_label;
|
|
} else if (_flags.is_range_tombstone()) {
|
|
_is_first_unfiltered = false;
|
|
co_yield this->read_8(*_processing_data);
|
|
_range_tombstone_kind = bound_kind_m(this->_u8);
|
|
co_yield this->read_16(*_processing_data);
|
|
_ck_size = this->_u16;
|
|
if (_ck_size == 0) {
|
|
_row_key.clear();
|
|
_range_tombstone_kind = is_start(_range_tombstone_kind)
|
|
? bound_kind_m::incl_start : bound_kind_m::incl_end;
|
|
goto range_tombstone_body_label;
|
|
} else {
|
|
_reading_range_tombstone_ck = true;
|
|
}
|
|
} else if (!_flags.has_extended_flags()) {
|
|
_extended_flags = unfiltered_extended_flags_m(uint8_t{0u});
|
|
start_row(_regular_row);
|
|
_ck_size = _column_translation.clustering_column_value_fix_legths().size();
|
|
} else {
|
|
co_yield this->read_8(*_processing_data);
|
|
_extended_flags = unfiltered_extended_flags_m(this->_u8);
|
|
if (_extended_flags.has_cassandra_shadowable_deletion()) {
|
|
throw std::runtime_error("SSTables with Cassandra-style shadowable deletion cannot be read by Scylla");
|
|
}
|
|
if (_extended_flags.is_static()) {
|
|
if (_is_first_unfiltered) {
|
|
start_row(_static_row);
|
|
_is_first_unfiltered = false;
|
|
goto row_body_label;
|
|
} else {
|
|
throw malformed_sstable_exception("static row should be a first unfiltered in a partition");
|
|
}
|
|
}
|
|
start_row(_regular_row);
|
|
_ck_size = _column_translation.clustering_column_value_fix_legths().size();
|
|
}
|
|
_is_first_unfiltered = false;
|
|
_null_component_occured = false;
|
|
setup_ck(_column_translation.clustering_column_value_fix_legths());
|
|
while (!no_more_ck_blocks()) {
|
|
if (should_read_block_header()) {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_ck_blocks_header = this->_u64;
|
|
}
|
|
if (is_block_null()) {
|
|
_null_component_occured = true;
|
|
move_to_next_ck_block();
|
|
continue;
|
|
}
|
|
if (_null_component_occured) {
|
|
throw malformed_sstable_exception("non-null component after null component");
|
|
}
|
|
if (is_block_empty()) {
|
|
_row_key.push_back({});
|
|
move_to_next_ck_block();
|
|
continue;
|
|
}
|
|
read_status status = read_status::waiting;
|
|
if (auto len = get_ck_block_value_length()) {
|
|
status = this->read_bytes(*_processing_data, *len, _column_value);
|
|
} else {
|
|
status = this->read_unsigned_vint_length_bytes(*_processing_data, _column_value);
|
|
}
|
|
co_yield status;
|
|
_row_key.push_back(std::move(_column_value));
|
|
move_to_next_ck_block();
|
|
}
|
|
if (_reading_range_tombstone_ck) {
|
|
_reading_range_tombstone_ck = false;
|
|
goto range_tombstone_body_label;
|
|
}
|
|
row_body_label: {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_next_row_offset = this->position() - _processing_data->size() + this->_u64;
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
// Ignore the result
|
|
row_processing_result ret = _extended_flags.is_static()
|
|
? _consumer.consume_static_row_start()
|
|
: _consumer.consume_row_start(_row_key);
|
|
|
|
while (ret == row_processing_result::retry_later) {
|
|
co_yield data_consumer::proceed::no;
|
|
ret = _extended_flags.is_static()
|
|
? _consumer.consume_static_row_start()
|
|
: _consumer.consume_row_start(_row_key);
|
|
}
|
|
if (ret == row_processing_result::skip_row) {
|
|
_state = state::FLAGS;
|
|
auto current_pos = this->position() - _processing_data->size();
|
|
auto maybe_skip_bytes = this->skip(*_processing_data, _next_row_offset - current_pos);
|
|
if (std::holds_alternative<skip_bytes>(maybe_skip_bytes)) {
|
|
co_yield maybe_skip_bytes;
|
|
}
|
|
goto flags_label;
|
|
}
|
|
if (_extended_flags.is_static()) {
|
|
if (_flags.has_timestamp() || _flags.has_ttl() || _flags.has_deletion()) {
|
|
throw malformed_sstable_exception(format("Static row has unexpected flags: timestamp={}, ttl={}, deletion={}",
|
|
_flags.has_timestamp(), _flags.has_ttl(), _flags.has_deletion()));
|
|
}
|
|
} else {
|
|
if (_flags.has_timestamp()) {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
|
|
_liveness.set_timestamp(parse_timestamp(_header, this->_u64));
|
|
if (_flags.has_ttl()) {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_liveness.set_ttl(parse_ttl(_header, this->_u64));
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_liveness.set_local_deletion_time(parse_expiry(_header, this->_u64));
|
|
}
|
|
}
|
|
if (_flags.has_deletion()) {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_row_tombstone.timestamp = parse_timestamp(_header, this->_u64);
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_row_tombstone.deletion_time = parse_expiry(_header, this->_u64);
|
|
}
|
|
if (_extended_flags.has_scylla_shadowable_deletion()) {
|
|
if (!_has_shadowable_tombstones) {
|
|
throw malformed_sstable_exception("Scylla shadowable tombstone flag is set but not supported on this SSTables");
|
|
}
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_row_shadowable_tombstone.timestamp = parse_timestamp(_header, this->_u64);
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_row_shadowable_tombstone.deletion_time = parse_expiry(_header, this->_u64);
|
|
}
|
|
_consumer.consume_row_marker_and_tombstone(
|
|
_liveness, std::move(_row_tombstone), std::move(_row_shadowable_tombstone));
|
|
}
|
|
if (!_flags.has_all_columns()) {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
uint64_t missing_column_bitmap_or_count = this->_u64;
|
|
if (_row->_columns.size() < 64) {
|
|
_row->_columns_selector.clear();
|
|
_row->_columns_selector.append(missing_column_bitmap_or_count);
|
|
_row->_columns_selector.flip();
|
|
_row->_columns_selector.resize(_row->_columns.size());
|
|
skip_absent_columns();
|
|
goto column_label;
|
|
}
|
|
_row->_columns_selector.resize(_row->_columns.size());
|
|
if (_row->_columns.size() - missing_column_bitmap_or_count < _row->_columns.size() / 2) {
|
|
_missing_columns_to_read = _row->_columns.size() - missing_column_bitmap_or_count;
|
|
_row->_columns_selector.reset();
|
|
} else {
|
|
_missing_columns_to_read = missing_column_bitmap_or_count;
|
|
_row->_columns_selector.set();
|
|
}
|
|
while (_missing_columns_to_read > 0) {
|
|
--_missing_columns_to_read;
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_row->_columns_selector.flip(this->_u64);
|
|
}
|
|
skip_absent_columns();
|
|
} else {
|
|
_row->_columns_selector.set();
|
|
}
|
|
}
|
|
column_label:
|
|
if (_subcolumns_to_read == 0) {
|
|
if (no_more_columns()) {
|
|
// Release buffer used to read column values.
|
|
_column_value = fragmented_temporary_buffer();
|
|
_state = state::FLAGS;
|
|
if (_consumer.consume_row_end() == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
goto flags_label;
|
|
}
|
|
if (!is_column_simple()) {
|
|
if (!_flags.has_complex_deletion()) {
|
|
_complex_column_tombstone = {};
|
|
} else {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_complex_column_marked_for_delete = parse_timestamp(_header, this->_u64);
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_complex_column_tombstone = {_complex_column_marked_for_delete, parse_expiry(_header, this->_u64)};
|
|
}
|
|
if (_consumer.consume_complex_column_start(get_column_info(), _complex_column_tombstone) == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_subcolumns_to_read = this->_u64;
|
|
if (_subcolumns_to_read == 0) {
|
|
const sstables::column_translation::column_info& column_info = get_column_info();
|
|
move_to_next_column();
|
|
if (_consumer.consume_complex_column_end(column_info) == data_consumer::proceed::no) {
|
|
_consuming = false;
|
|
co_yield data_consumer::proceed::no;
|
|
_consuming = true;
|
|
}
|
|
}
|
|
goto column_label;
|
|
}
|
|
_subcolumns_to_read = 0;
|
|
}
|
|
co_yield this->read_8(*_processing_data);
|
|
_column_flags = column_flags_m(this->_u8);
|
|
|
|
if (_column_flags.use_row_timestamp()) {
|
|
_column_timestamp = _liveness.timestamp();
|
|
} else {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_column_timestamp = parse_timestamp(_header, this->_u64);
|
|
}
|
|
if (_column_flags.use_row_ttl()) {
|
|
_column_local_deletion_time = _liveness.local_deletion_time();
|
|
} else if (!_column_flags.is_deleted() && ! _column_flags.is_expiring()) {
|
|
_column_local_deletion_time = gc_clock::time_point::max();
|
|
} else {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_column_local_deletion_time = parse_expiry(_header, this->_u64);
|
|
}
|
|
if (_column_flags.use_row_ttl()) {
|
|
_column_ttl = _liveness.ttl();
|
|
} else if (!_column_flags.is_expiring()) {
|
|
_column_ttl = gc_clock::duration::zero();
|
|
} else {
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_column_ttl = parse_ttl(_header, this->_u64);
|
|
}
|
|
if (!is_column_simple()) {
|
|
co_yield this->read_unsigned_vint_length_bytes_contiguous(*_processing_data, _cell_path);
|
|
} else {
|
|
_cell_path = temporary_buffer<char>(0);
|
|
}
|
|
if (!_column_flags.has_value()) {
|
|
_column_value = fragmented_temporary_buffer();
|
|
} else {
|
|
read_status status = read_status::waiting;
|
|
if (auto len = get_column_value_length()) {
|
|
status = this->read_bytes(*_processing_data, *len, _column_value);
|
|
} else {
|
|
status = this->read_unsigned_vint_length_bytes(*_processing_data, _column_value);
|
|
}
|
|
co_yield status;
|
|
}
|
|
_consuming = false;
|
|
if (is_column_counter() && !_column_flags.is_deleted()) {
|
|
if (_consumer.consume_counter_column(get_column_info(),
|
|
fragmented_temporary_buffer::view(_column_value),
|
|
_column_timestamp) == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
} else {
|
|
if (_consumer.consume_column(get_column_info(),
|
|
to_bytes_view(_cell_path),
|
|
fragmented_temporary_buffer::view(_column_value),
|
|
_column_timestamp,
|
|
_column_ttl,
|
|
_column_local_deletion_time,
|
|
_column_flags.is_deleted()) == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
}
|
|
if (!is_column_simple()) {
|
|
--_subcolumns_to_read;
|
|
if (_subcolumns_to_read == 0) {
|
|
const sstables::column_translation::column_info& column_info = get_column_info();
|
|
move_to_next_column();
|
|
if (_consumer.consume_complex_column_end(column_info) == data_consumer::proceed::no) {
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
}
|
|
} else {
|
|
move_to_next_column();
|
|
}
|
|
_consuming = true;
|
|
goto column_label;
|
|
range_tombstone_body_label:
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
// Ignore result (marker_body_size or row_body_size)
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
// Ignore result (prev_unfiltered_size)
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_left_range_tombstone.timestamp = parse_timestamp(_header, this->_u64);
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_left_range_tombstone.deletion_time = parse_expiry(_header, this->_u64);
|
|
if (!is_boundary_between_adjacent_intervals(_range_tombstone_kind)) {
|
|
if (!is_bound_kind(_range_tombstone_kind)) {
|
|
throw sstables::malformed_sstable_exception(
|
|
format("Corrupted range tombstone: invalid boundary type {}", _range_tombstone_kind));
|
|
}
|
|
_sst->get_stats().on_range_tombstone_read();
|
|
_state = state::FLAGS;
|
|
if (_consumer.consume_range_tombstone(_row_key,
|
|
to_bound_kind(_range_tombstone_kind),
|
|
_left_range_tombstone) == data_consumer::proceed::no) {
|
|
_row_key.clear();
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
_row_key.clear();
|
|
goto flags_label;
|
|
}
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_right_range_tombstone.timestamp = parse_timestamp(_header, this->_u64);
|
|
co_yield this->read_unsigned_vint(*_processing_data);
|
|
_sst->get_stats().on_range_tombstone_read();
|
|
_right_range_tombstone.deletion_time = parse_expiry(_header, this->_u64);
|
|
_state = state::FLAGS;
|
|
if (_consumer.consume_range_tombstone(_row_key,
|
|
_range_tombstone_kind,
|
|
_left_range_tombstone,
|
|
_right_range_tombstone) == data_consumer::proceed::no) {
|
|
_row_key.clear();
|
|
co_yield data_consumer::proceed::no;
|
|
}
|
|
_row_key.clear();
|
|
goto flags_label;
|
|
}
|
|
public:
|
|
|
|
data_consume_rows_context_m(const schema& s,
|
|
const shared_sstable& sst,
|
|
Consumer& consumer,
|
|
input_stream<char> && input,
|
|
uint64_t start,
|
|
uint64_t maxlen)
|
|
: data_consumer::continuous_data_consumer<data_consume_rows_context_m<Consumer>>(consumer.permit(), std::move(input), start, maxlen)
|
|
, _consumer(consumer)
|
|
, _sst(sst)
|
|
, _header(sst->get_serialization_header())
|
|
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
|
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
|
, _gen(do_process_state())
|
|
{
|
|
setup_columns(_regular_row, _column_translation.regular_columns());
|
|
setup_columns(_static_row, _column_translation.static_columns());
|
|
}
|
|
|
|
void verify_end_state() {
|
|
// If reading a partial row (i.e., when we have a clustering row
|
|
// filter and using a promoted index), we may be in FLAGS
|
|
// state instead of PARTITION_START.
|
|
if (_state == state::FLAGS) {
|
|
_consumer.on_end_of_stream();
|
|
return;
|
|
}
|
|
|
|
// We may end up in state::DELETION_TIME after consuming last partition's end marker
|
|
// and proceeding to attempt to parse the next partition, since state::DELETION_TIME
|
|
// is the first state corresponding to the contents of a new partition.
|
|
if (_state != state::DELETION_TIME
|
|
&& (_state != state::PARTITION_START || data_consumer::primitive_consumer::active())) {
|
|
throw malformed_sstable_exception("end of input, but not end of partition");
|
|
}
|
|
}
|
|
|
|
void reset(indexable_element el) {
|
|
auto reset_to_state = [this, el] (state s) {
|
|
_state = s;
|
|
_consumer.reset(el);
|
|
_gen = do_process_state();
|
|
};
|
|
switch (el) {
|
|
case indexable_element::partition:
|
|
return reset_to_state(state::PARTITION_START);
|
|
case indexable_element::cell:
|
|
return reset_to_state(state::FLAGS);
|
|
}
|
|
// We should not get here unless some enum member is not handled by the switch
|
|
throw std::logic_error(format("Unable to reset - unknown indexable element: {}", el));
|
|
}
|
|
|
|
// Call after a reverse index skip is performed during reversed reads.
|
|
void reset_after_reversed_read_skip() {
|
|
// During reversed reads the source is always returning whole rows
|
|
// even when we perform an index skip in the middle of a row.
|
|
// Thus we must not reset the parser state as we do in regular reset.
|
|
// We need only to inform the consumer.
|
|
_consumer.reset_after_reversed_read_skip();
|
|
}
|
|
|
|
reader_permit& permit() {
|
|
return _consumer.permit();
|
|
}
|
|
};
|
|
|
|
class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
|
|
using DataConsumeRowsContext = data_consume_rows_context_m<mp_row_consumer_m>;
|
|
using Consumer = mp_row_consumer_m;
|
|
static_assert(RowConsumer<Consumer>);
|
|
value_or_reference<query::partition_slice> _slice_holder;
|
|
const query::partition_slice& _slice;
|
|
Consumer _consumer;
|
|
bool _will_likely_slice = false;
|
|
bool _read_enabled = true;
|
|
std::unique_ptr<DataConsumeRowsContext> _context;
|
|
std::unique_ptr<abstract_index_reader> _index_reader;
|
|
// We avoid unnecessary lookup for single partition reads thanks to this flag
|
|
bool _single_partition_read = false;
|
|
std::reference_wrapper<const dht::partition_range> _pr;
|
|
std::optional<utils::hashed_key> _single_partition_read_murmur_hash;
|
|
streamed_mutation::forwarding _fwd;
|
|
mutation_reader::forwarding _fwd_mr;
|
|
read_monitor& _monitor;
|
|
integrity_check _integrity;
|
|
lw_shared_ptr<checksum> _checksum;
|
|
|
|
// For reversed (single partition) reads, points to the current position in the sstable
|
|
// of the reversing data source used underneath (see `partition_reversing_data_source`).
|
|
// Engaged after `_context` is engaged, i.e. after `initialize()`.
|
|
const uint64_t* _reversed_read_sstable_position;
|
|
public:
|
|
mx_sstable_mutation_reader(shared_sstable sst,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& pr,
|
|
value_or_reference<query::partition_slice> slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor& mon,
|
|
integrity_check integrity,
|
|
std::unique_ptr<abstract_index_reader> ir,
|
|
const utils::hashed_key* single_partition_read_murmur_hash)
|
|
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
|
|
, _slice_holder(std::move(slice))
|
|
, _slice(_slice_holder.get())
|
|
, _consumer(this, _schema, std::move(permit), _slice, std::move(trace_state), fwd, _sst)
|
|
, _index_reader(std::move(ir))
|
|
// FIXME: I want to add `&& fwd_mr == mutation_reader::forwarding::no` below
|
|
// but can't because many call sites use the default value for
|
|
// `mutation_reader::forwarding` which is `yes`.
|
|
, _single_partition_read(pr.is_singular())
|
|
, _pr(pr)
|
|
, _single_partition_read_murmur_hash(
|
|
single_partition_read_murmur_hash
|
|
? std::optional<utils::hashed_key>(*single_partition_read_murmur_hash)
|
|
: std::nullopt)
|
|
, _fwd(fwd)
|
|
, _fwd_mr(fwd_mr)
|
|
, _monitor(mon)
|
|
, _integrity(integrity) {
|
|
sstlog.trace("mx_sstable_mutation_reader {}: init with _pr={}", fmt::ptr(this), _pr.get());
|
|
if (reversed()) {
|
|
if (!_single_partition_read) {
|
|
on_internal_error(sstlog, format(
|
|
// Not only in the reader, they are disabled in CQL.
|
|
"mx reader: multi-partition reversed queries are not supported yet;"
|
|
" partition range: {}", pr));
|
|
}
|
|
// FIXME: if only the defaults were better...
|
|
//parse_assert(fwd_mr == mutation_reader::forwarding::no);
|
|
}
|
|
}
|
|
|
|
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
|
|
mx_sstable_mutation_reader(mx_sstable_mutation_reader&&) = delete;
|
|
mx_sstable_mutation_reader(const mx_sstable_mutation_reader&) = delete;
|
|
~mx_sstable_mutation_reader() {
|
|
if (_context || _index_reader) {
|
|
sstlog.warn("sstable_mutation_reader was not closed. Closing in the background. Backtrace: {}", current_backtrace());
|
|
// FIXME: discarded future.
|
|
(void)close();
|
|
}
|
|
}
|
|
private:
|
|
static bool will_likely_slice(const query::partition_slice& slice) {
|
|
return (!slice.default_row_ranges().empty() && !slice.default_row_ranges()[0].is_full())
|
|
|| slice.get_specific_ranges();
|
|
}
|
|
abstract_index_reader& get_index_reader() {
|
|
return *_index_reader;
|
|
}
|
|
future<> advance_to_next_partition() {
|
|
sstlog.trace("reader {}: advance_to_next_partition()", fmt::ptr(this));
|
|
_before_partition = true;
|
|
auto& consumer = _consumer;
|
|
if (consumer.is_mutation_end()) {
|
|
sstlog.trace("reader {}: already at partition boundary", fmt::ptr(this));
|
|
_index_in_current_partition = false;
|
|
return make_ready_future<>();
|
|
}
|
|
return (_index_in_current_partition
|
|
? _index_reader->advance_to_next_partition()
|
|
: get_index_reader().advance_past_definitely_present_partition(*_current_partition_key))
|
|
.then([this] {
|
|
_index_in_current_partition = true;
|
|
auto [start, end] = _index_reader->data_file_positions();
|
|
if (end && start > *end) {
|
|
_read_enabled = false;
|
|
return make_ready_future<>();
|
|
}
|
|
parse_assert(_index_reader->element_kind() == indexable_element::partition, _sst->get_filename());
|
|
return skip_to(_index_reader->element_kind(), start).then([this] {
|
|
_sst->get_stats().on_partition_seek();
|
|
});
|
|
});
|
|
}
|
|
future<> read_from_index() {
|
|
sstlog.trace("reader {}: read from index", fmt::ptr(this));
|
|
auto tomb = _index_reader->partition_tombstone();
|
|
if (!tomb) {
|
|
sstlog.trace("reader {}: no tombstone", fmt::ptr(this));
|
|
return read_from_datafile();
|
|
}
|
|
std::optional<partition_key> pk = _index_reader->get_partition_key();
|
|
if (!pk) {
|
|
sstlog.trace("reader {}: no partition key", fmt::ptr(this));
|
|
return read_from_datafile();
|
|
}
|
|
auto key = dht::decorate_key(*_schema, std::move(*pk));
|
|
_consumer.setup_for_partition(key.key());
|
|
on_next_partition(std::move(key), tombstone(*tomb));
|
|
return make_ready_future<>();
|
|
}
|
|
future<> read_from_datafile() {
|
|
sstlog.trace("reader {}: read from data file", fmt::ptr(this));
|
|
return _context->consume_input();
|
|
}
|
|
// Assumes that we're currently positioned at partition boundary.
|
|
future<> read_partition() {
|
|
sstlog.trace("reader {}: reading partition", fmt::ptr(this));
|
|
|
|
_end_of_stream = true; // on_next_partition() will set it to true
|
|
if (!_read_enabled) {
|
|
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (_saved_partition_tombstone) {
|
|
// This is the special case where we ended reading last partition range
|
|
// only after parsing the partition key after the range,
|
|
// and then the reader was forwarded to that key.
|
|
// Since the parser can't be moved back, we serve the partition key
|
|
// and the tombstone that we saved after parsing.
|
|
auto tomb = *_saved_partition_tombstone;
|
|
_saved_partition_tombstone.reset();
|
|
sstlog.trace("reader {}: serving partition key {} due to _saved_partition_tombstone {}", fmt::ptr(this), *_current_partition_key, tomb);
|
|
on_next_partition(*_current_partition_key, tomb);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
if (!_consumer.is_mutation_end()) {
|
|
throw malformed_sstable_exception(format("consumer not at partition boundary, position: {}",
|
|
position_in_partition_view::printer(*_schema, _consumer.position())), _sst->get_filename());
|
|
}
|
|
|
|
// It's better to obtain partition information from the index if we already have it.
|
|
// We can save on IO if the user will skip past the front of partition immediately.
|
|
//
|
|
// It is also better to pay the cost of reading the index if we know that we will
|
|
// need to use the index anyway soon.
|
|
//
|
|
if (_index_in_current_partition) {
|
|
if (_context->eof()) {
|
|
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
|
return make_ready_future<>();
|
|
}
|
|
if (_index_reader->partition_data_ready()) {
|
|
return read_from_index();
|
|
}
|
|
if (_will_likely_slice) {
|
|
return _index_reader->read_partition_data().then([this] {
|
|
return read_from_index();
|
|
});
|
|
}
|
|
}
|
|
|
|
// FIXME: advance index to current partition if _will_likely_slice
|
|
return read_from_datafile();
|
|
}
|
|
// Can be called from any position.
|
|
future<> read_next_partition() {
|
|
sstlog.trace("reader {}: read next partition", fmt::ptr(this));
|
|
// If next partition exists then on_next_partition will be called
|
|
// and _end_of_stream will be set to false again.
|
|
_end_of_stream = true;
|
|
if (!_read_enabled || _single_partition_read) {
|
|
sstlog.trace("reader {}: eof", fmt::ptr(this));
|
|
return make_ready_future<>();
|
|
}
|
|
return advance_to_next_partition().then([this] {
|
|
return read_partition();
|
|
});
|
|
}
|
|
future<> advance_context(std::optional<position_in_partition_view> pos) {
|
|
if (!pos || pos->is_before_all_fragments(*_schema)) {
|
|
return make_ready_future<>();
|
|
}
|
|
parse_assert(bool(_current_partition_key), _sst->get_filename());
|
|
return [this] {
|
|
if (!_index_in_current_partition) {
|
|
_index_in_current_partition = true;
|
|
// FIXME reversed multi partition reads
|
|
return get_index_reader().advance_to_definitely_present_partition(*_current_partition_key);
|
|
}
|
|
return make_ready_future();
|
|
}().then([this, pos = *pos] {
|
|
if (reversed()) {
|
|
// The position `pos` conforms to the query schema (it is the start of a reversed range),
|
|
// which is reversed w.r.t. the table schema. We use the table schema in index_reader,
|
|
// so we need to unreverse `pos` before passing it into index_reader.
|
|
auto rev_pos = pos.reversed();
|
|
return get_index_reader().advance_reverse(std::move(rev_pos)).then([this] {
|
|
// The reversing data source will notice the skip and update the data ranges
|
|
// from which it prepares the data given to us.
|
|
|
|
parse_assert(_reversed_read_sstable_position, _sst->get_filename());
|
|
auto ip = _index_reader->data_file_positions();
|
|
if (ip.end >= *_reversed_read_sstable_position) {
|
|
// The reversing data source was already ahead (in reverse - its position was smaller)
|
|
// than the index. We must not update the current range tombstone in this case
|
|
// or reset the context since all fragments up to the new position of the index
|
|
// will be (or already have been) provided to the context by the source.
|
|
return;
|
|
}
|
|
|
|
_context->reset_after_reversed_read_skip();
|
|
|
|
_sst->get_stats().on_partition_seek();
|
|
auto open_end_marker = _index_reader->reverse_end_open_marker();
|
|
if (open_end_marker) {
|
|
_consumer.set_range_tombstone(open_end_marker->tomb);
|
|
} else {
|
|
_consumer.set_range_tombstone({});
|
|
}
|
|
});
|
|
} else {
|
|
return get_index_reader().advance_to(pos).then([this] {
|
|
abstract_index_reader& idx = *_index_reader;
|
|
auto index_position = idx.data_file_positions();
|
|
if (index_position.start <= _context->position()) {
|
|
return make_ready_future<>();
|
|
}
|
|
return skip_to(idx.element_kind(), index_position.start).then([this, &idx] {
|
|
_sst->get_stats().on_partition_seek();
|
|
auto open_end_marker = idx.end_open_marker();
|
|
if (open_end_marker) {
|
|
_consumer.set_range_tombstone(open_end_marker->tomb);
|
|
} else {
|
|
_consumer.set_range_tombstone({});
|
|
}
|
|
});
|
|
});
|
|
}
|
|
});
|
|
}
|
|
bool is_initialized() const {
|
|
return bool(_context);
|
|
}
|
|
// Returns true if reader is initialized, by either a previous or current request
|
|
future<bool> maybe_initialize() {
|
|
if (is_initialized()) {
|
|
co_return true;
|
|
}
|
|
|
|
_will_likely_slice = will_likely_slice(_slice);
|
|
|
|
if (_single_partition_read) {
|
|
_sst->get_stats().on_single_partition_read();
|
|
const auto& key = dht::ring_position_view(_pr.get().start()->value());
|
|
|
|
const auto present = _single_partition_read_murmur_hash
|
|
? co_await get_index_reader().advance_lower_and_check_if_present(key, *_single_partition_read_murmur_hash)
|
|
: co_await get_index_reader().advance_lower_and_check_if_present(key);
|
|
|
|
if (!present) {
|
|
_sst->get_filter_tracker().add_false_positive();
|
|
co_return false;
|
|
}
|
|
|
|
if (_will_likely_slice && !reversed()) {
|
|
// Warm up the clustered cursor using lower bound so that later upper bound lookup
|
|
// works on a populated cached_promoted_index.
|
|
// We use lower bound so that we read the same blocks as later lower-bound slicing would,
|
|
// so that we don't incur extra IO for cases where looking up upper bound is not worth it, that
|
|
// is when upper bound is far from the lower bound. If upper bound is near lower bound, then
|
|
// warming up using lower bound will populate cached_promoted_index with blocks which will
|
|
// allow us to locate the upper bound block accurately.
|
|
// This is especially important for single-row reads, where the bounds are around the same key.
|
|
// In this case we want to read the data file range which belongs to a single promoted index block.
|
|
// It doesn't matter that the upper bound is not exactly the same. They both will likely lie in the
|
|
// same block, and if not, binary search will bring adjacent blocks into cache.
|
|
// Even if upper bound is not near, the binary search will populate the cache with blocks
|
|
// which can be used to narrow down the data file range somewhat.
|
|
position_in_partition_view lb = get_slice_lower_bound(*_schema, _slice, key);
|
|
co_await _index_reader->prefetch_lower_bound(lb);
|
|
}
|
|
|
|
position_in_partition_view pos = get_slice_upper_bound(*_schema, _slice, key);
|
|
co_await _index_reader->advance_upper_past(pos);
|
|
|
|
_sst->get_filter_tracker().add_true_positive();
|
|
if (reversed()) {
|
|
co_await _index_reader->advance_reverse_to_next_partition();
|
|
}
|
|
} else {
|
|
_sst->get_stats().on_range_partition_read();
|
|
co_await get_index_reader().advance_to(_pr);
|
|
}
|
|
|
|
auto [begin, end] = _index_reader->data_file_positions();
|
|
parse_assert(bool(end), _sst->get_filename());
|
|
|
|
sstlog.trace("sstable_reader: {}: data file range [{}, {})", fmt::ptr(this), begin, *end);
|
|
|
|
if (_integrity) {
|
|
// Caller must retain a reference to checksum component while in use by the stream.
|
|
_checksum = co_await _sst->read_checksum();
|
|
// The stream checks the digest only if the read range covers all data.
|
|
if (begin == 0 && *end == _sst->data_size()) {
|
|
co_await _sst->read_digest();
|
|
}
|
|
}
|
|
|
|
if (_single_partition_read) {
|
|
_read_enabled = (begin != *end);
|
|
if (reversed()) {
|
|
if (_integrity) {
|
|
on_internal_error(sstlog, "mx reader: integrity checking not supported for single-partition reversed reads");
|
|
}
|
|
auto reversed_context = data_consume_reversed_partition<DataConsumeRowsContext>(
|
|
*_schema, _sst, *_index_reader, _consumer, { begin, *end });
|
|
_context = std::move(reversed_context.the_context);
|
|
_reversed_read_sstable_position = &reversed_context.current_position_in_sstable;
|
|
} else {
|
|
_context = co_await data_consume_single_partition<DataConsumeRowsContext>(*_schema, _sst, _consumer, { begin, *end }, _integrity);
|
|
}
|
|
} else {
|
|
sstable::disk_read_range drr{begin, *end};
|
|
auto last_end = _fwd_mr ? _sst->data_size() : drr.end;
|
|
_read_enabled = bool(drr);
|
|
_context = co_await data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end, _integrity);
|
|
}
|
|
|
|
_monitor.on_read_started(_context->reader_position());
|
|
_index_in_current_partition = true;
|
|
co_return true;
|
|
}
|
|
future<> skip_to(indexable_element el, uint64_t begin) {
|
|
sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast<int>(el));
|
|
if (begin <= _context->position()) {
|
|
return make_ready_future<>();
|
|
}
|
|
_context->reset(el);
|
|
return _context->skip_to(begin);
|
|
}
|
|
bool reversed() const {
|
|
return _slice.is_reversed();
|
|
}
|
|
public:
|
|
void on_out_of_clustering_range() override {
|
|
if (_fwd == streamed_mutation::forwarding::yes) {
|
|
_end_of_stream = true;
|
|
} else {
|
|
this->push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
|
|
_partition_finished = true;
|
|
}
|
|
}
|
|
// Advances the index to the first position
|
|
// which hasn't been crossed by the Data file parser yet.
|
|
future<> advance_index_until_unseen_partition() {
|
|
while (true) {
|
|
auto [start, end] = _index_reader->data_file_positions();
|
|
if (start >= _context->position()) {
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: advance_index_until_unseen_partition(): advanced to {}", fmt::ptr(this), start);
|
|
co_return;
|
|
} else {
|
|
co_await _index_reader->advance_to_next_partition();
|
|
}
|
|
}
|
|
}
|
|
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
|
if (reversed()) {
|
|
// FIXME
|
|
on_internal_error(sstlog, "mx reader: fast_forward_to(partition_range) not supported for reversed queries");
|
|
}
|
|
|
|
return maybe_initialize().then([this, &pr] (bool initialized) {
|
|
_pr = pr;
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: fast_forward_to({})", fmt::ptr(this), _pr.get());
|
|
if (!initialized) {
|
|
_end_of_stream = true;
|
|
return make_ready_future<>();
|
|
} else {
|
|
clear_buffer();
|
|
_partition_finished = true;
|
|
_before_partition = true;
|
|
_end_of_stream = false;
|
|
parse_assert(bool(_index_reader), _sst->get_filename());
|
|
auto f1 = _index_reader->advance_to(pr);
|
|
return f1.then([this] {
|
|
auto [start, end] = _index_reader->data_file_positions();
|
|
parse_assert(bool(end), _sst->get_filename());
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: fast_forward_to({}), index returned range [{}, {}), parser currently at {}", fmt::ptr(this), _pr.get(), start, *end, _context->position());
|
|
if (start < _context->position()) {
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: _saved_partition_tombstone={}", fmt::ptr(this), _saved_partition_tombstone);
|
|
// If we got here, the index returned a Data start which precedes
|
|
// the data parser's position.
|
|
// But by contract, fast_forward_to can only be used to move the reader forward.
|
|
// The new range must lie after the old range.
|
|
//
|
|
// So there are two ways for this to happen:
|
|
// 1. When reading the last range, the parser was advanced past the first
|
|
// partition key lying after the range.
|
|
// (This is signified by _saved_partition_tombstone).
|
|
// And then, the reader was advanced to a new range that
|
|
// to the best knowledge of the index, might contain that exact key.
|
|
// In this case, we have to start reading from that key (which we have remembered).
|
|
// 2. The parser was not advanced past the last range, but the inexact
|
|
// index returned a start position for the new range which in reality
|
|
// lies inside the old range. In this case, we should start reading
|
|
// from the first partition with position greater or equal to the parser's position.
|
|
|
|
// The index *could* be in the current position,
|
|
// but setting `false` here is fine.
|
|
// Worst case, the reader will forward the index to the current position again.
|
|
// Lack of this optimization doesn't matter to a range read.
|
|
_index_in_current_partition = false;
|
|
if (_saved_partition_tombstone) {
|
|
// Case 1 from the comment above.
|
|
if (*end >= _context->position()) {
|
|
_read_enabled = true;
|
|
return _context->fast_forward_to(_context->position(), *end);
|
|
} else {
|
|
_read_enabled = false;
|
|
return make_ready_future<>();
|
|
}
|
|
} else {
|
|
// Case 2 from the comment above.
|
|
return advance_index_until_unseen_partition().then([this] {
|
|
auto [start, end] = _index_reader->data_file_positions();
|
|
_read_enabled = true;
|
|
_context->reset(indexable_element::partition);
|
|
return _context->fast_forward_to(start, *end);
|
|
});
|
|
}
|
|
}
|
|
if (start != *end) {
|
|
_read_enabled = true;
|
|
_index_in_current_partition = true;
|
|
_saved_partition_tombstone.reset();
|
|
_context->reset(indexable_element::partition);
|
|
return _context->fast_forward_to(start, *end);
|
|
}
|
|
_index_in_current_partition = false;
|
|
_read_enabled = false;
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
});
|
|
}
|
|
virtual future<> fill_buffer() override {
|
|
if (_end_of_stream) {
|
|
return make_ready_future<>();
|
|
}
|
|
if (!is_initialized()) {
|
|
return maybe_initialize().then([this] (bool initialized) {
|
|
if (!initialized) {
|
|
_end_of_stream = true;
|
|
return make_ready_future<>();
|
|
} else {
|
|
return fill_buffer();
|
|
}
|
|
});
|
|
}
|
|
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
|
if (utils::get_local_injector().enter("sstables_mx_reader_fill_buffer_timeout")) {
|
|
const sstring table_name = utils::get_local_injector().get_injection_parameters("sstables_mx_reader_fill_buffer_timeout")["table"];
|
|
const sstring this_table_name = format("{}.{}", _schema->ks_name(), _schema->cf_name());
|
|
// Repeat the sleep until the permit is aborted due to timeout.
|
|
if (table_name == this_table_name && !get_abort_exception()) {
|
|
return seastar::sleep(std::chrono::milliseconds(10));
|
|
}
|
|
}
|
|
if (_partition_finished) {
|
|
if (const auto& ex = get_abort_exception(); ex) {
|
|
return make_exception_future<>(ex);
|
|
}
|
|
if (_before_partition) {
|
|
return read_partition();
|
|
} else {
|
|
return read_next_partition();
|
|
}
|
|
} else {
|
|
return do_until([this] { return is_buffer_full() || _partition_finished || _end_of_stream; }, [this] {
|
|
_consumer.push_ready_fragments();
|
|
if (is_buffer_full() || _partition_finished || _end_of_stream) {
|
|
return make_ready_future<>();
|
|
}
|
|
if (const auto& ex = get_abort_exception(); ex) {
|
|
return make_exception_future<>(ex);
|
|
}
|
|
return advance_context(_consumer.maybe_skip()).then([this] {
|
|
return _context->consume_input();
|
|
});
|
|
});
|
|
}
|
|
}).handle_exception([this] (std::exception_ptr ep) {
|
|
if (auto e = try_catch<sstables::malformed_sstable_exception>(ep); e) {
|
|
return make_exception_future<>(sstables::malformed_sstable_exception(format("Failed to read partition from SSTable {} due to {}", _sst->get_filename(), e->what())));
|
|
}
|
|
return make_exception_future<>(std::move(ep));
|
|
});
|
|
}
|
|
virtual future<> next_partition() override {
|
|
if (is_initialized()) {
|
|
if (_fwd == streamed_mutation::forwarding::yes) {
|
|
clear_buffer();
|
|
_partition_finished = true;
|
|
_end_of_stream = false;
|
|
} else {
|
|
clear_buffer_to_next_partition();
|
|
if (!_partition_finished && is_buffer_empty()) {
|
|
_partition_finished = true;
|
|
}
|
|
}
|
|
}
|
|
return make_ready_future<>();
|
|
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
|
}
|
|
virtual future<> fast_forward_to(position_range cr) override {
|
|
clear_buffer();
|
|
if (!_partition_finished) {
|
|
_end_of_stream = false;
|
|
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
|
} else {
|
|
_end_of_stream = true;
|
|
return make_ready_future<>();
|
|
}
|
|
}
|
|
virtual future<> close() noexcept override {
|
|
auto close_context = make_ready_future<>();
|
|
if (_context) {
|
|
_monitor.on_read_completed();
|
|
// move _context to prevent double-close from destructor.
|
|
close_context = _context->close().finally([_ = std::move(_context)] {});
|
|
}
|
|
|
|
auto close_index_reader = make_ready_future<>();
|
|
if (_index_reader) {
|
|
// move _index_reader to prevent double-close from destructor.
|
|
close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {});
|
|
}
|
|
|
|
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) {
|
|
// close can not fail as it is called either from the destructor or from mutation_reader::close
|
|
sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
|
|
});
|
|
}
|
|
|
|
data_consumer::proceed on_next_partition(dht::decorated_key key, tombstone tomb) override {
|
|
if (_pr.get().before(key, dht::ring_position_comparator(*_schema))) {
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: on_next_partition({}), _pr={}, skipping key before range", fmt::ptr(this), key, _pr.get());
|
|
// If we got here, then the index returned a Data file range which
|
|
// includes some partitions before the queried range.
|
|
//
|
|
// A BTI index is inexact in general (it can return a Data file range
|
|
// which includes some partitions before and after the queried range),
|
|
// but it is guaranteed to return the exact position if it's queried
|
|
// for a key which is present in the sstable.
|
|
//
|
|
// So, for a single partition read, if we parsed a partition which is before
|
|
// the queried range, it means that the queried partition doesn't exist in the sstable.
|
|
// (I.e. the index gave us a false positive).
|
|
// In this case, the read is over. There's nothing to read.
|
|
//
|
|
// Otherwise, for range reads, we are going to skip this partition
|
|
// (i.e. advance the index to the range after `key`) and resume reading.
|
|
_end_of_stream = _single_partition_read;
|
|
_before_partition = false;
|
|
_partition_finished = true;
|
|
_current_partition_key = std::move(key);
|
|
return data_consumer::proceed::no;
|
|
} else if (_pr.get().after(key, dht::ring_position_comparator(*_schema))) {
|
|
// If we got here, then the index returned a Data file range which
|
|
// includes some partitions after the queried range.
|
|
// The read is over. The new key and everything after it should be ignored.
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: on_next_partition({}), _pr={}, skipping key after range", fmt::ptr(this), key, _pr.get());
|
|
_end_of_stream = true;
|
|
// The read is over for now, but the reader can be later forwarded to the key we just read.
|
|
// The parser can't move backwards, so we have to remember the key and tombstone
|
|
// to handle that case.
|
|
_saved_partition_tombstone = tomb;
|
|
_current_partition_key = std::move(key);
|
|
return data_consumer::proceed::no;
|
|
} else {
|
|
// This is the normal path.
|
|
sstlog.trace("mp_row_consumer_reader_mx {}: on_next_partition({}), _pr={}, consuming key in range", fmt::ptr(this), key, _pr.get());
|
|
return mp_row_consumer_reader_mx::on_next_partition(std::move(key), tomb);
|
|
}
|
|
}
|
|
};
|
|
|
|
static mutation_reader make_reader(
|
|
shared_sstable sstable,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
value_or_reference<query::partition_slice> slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor& monitor,
|
|
integrity_check integrity,
|
|
std::unique_ptr<abstract_index_reader> ir,
|
|
const utils::hashed_key* single_partition_read_murmur_hash
|
|
) {
|
|
return make_mutation_reader<mx_sstable_mutation_reader>(
|
|
std::move(sstable), std::move(schema), std::move(permit), range,
|
|
std::move(slice), std::move(trace_state), fwd, fwd_mr, monitor, integrity, std::move(ir), single_partition_read_murmur_hash);
|
|
}
|
|
|
|
mutation_reader make_reader(
|
|
shared_sstable sstable,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor& monitor,
|
|
integrity_check integrity,
|
|
std::unique_ptr<abstract_index_reader> ir,
|
|
const utils::hashed_key* single_partition_read_murmur_hash
|
|
) {
|
|
return make_reader(std::move(sstable), std::move(schema), std::move(permit), range,
|
|
value_or_reference(slice), std::move(trace_state), fwd, fwd_mr, monitor, integrity, std::move(ir), single_partition_read_murmur_hash);
|
|
}
|
|
|
|
mutation_reader make_reader(
|
|
shared_sstable sstable,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
query::partition_slice&& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr,
|
|
read_monitor& monitor,
|
|
integrity_check integrity,
|
|
std::unique_ptr<abstract_index_reader> ir,
|
|
const utils::hashed_key* single_partition_read_murmur_hash
|
|
) {
|
|
return make_reader(std::move(sstable), std::move(schema), std::move(permit), range,
|
|
value_or_reference(std::move(slice)), std::move(trace_state), fwd, fwd_mr, monitor, integrity, std::move(ir), single_partition_read_murmur_hash);
|
|
}
|
|
|
|
/// a reader which does not support seeking to given position.
|
|
///
|
|
/// unlike mx_sstable_mutation_reader which allows fast forwarding read,
|
|
/// mx_sstable_full_scan_reader
|
|
///
|
|
/// - always reads the full range, and it is not able to read a subset of the
|
|
/// sstable
|
|
/// - does not support fast forwarding
|
|
///
|
|
/// It is designed to be used in conditions where:
|
|
/// - the index is not reliable, or
|
|
/// - the consumer reads the whole sstable
|
|
class mx_sstable_full_scan_reader : public mp_row_consumer_reader_mx {
|
|
using DataConsumeRowsContext = data_consume_rows_context_m<mp_row_consumer_m>;
|
|
using Consumer = mp_row_consumer_m;
|
|
static_assert(RowConsumer<Consumer>);
|
|
Consumer _consumer;
|
|
std::unique_ptr<DataConsumeRowsContext> _context;
|
|
read_monitor& _monitor;
|
|
integrity_check _integrity;
|
|
lw_shared_ptr<checksum> _checksum;
|
|
public:
|
|
mx_sstable_full_scan_reader(shared_sstable sst, schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state,
|
|
read_monitor& mon,
|
|
integrity_check integrity)
|
|
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
|
|
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), std::move(trace_state), streamed_mutation::forwarding::no, _sst)
|
|
, _monitor(mon)
|
|
, _integrity(integrity) {}
|
|
private:
|
|
bool is_initialized() const {
|
|
return bool(_context);
|
|
}
|
|
future<> maybe_initialize() {
|
|
if (is_initialized()) {
|
|
co_return;
|
|
}
|
|
if (_integrity) {
|
|
// Caller must retain a reference to checksum component while in use by the stream.
|
|
_checksum = co_await _sst->read_checksum();
|
|
co_await _sst->read_digest();
|
|
}
|
|
_context = co_await data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, _integrity);
|
|
_monitor.on_read_started(_context->reader_position());
|
|
}
|
|
public:
|
|
void on_out_of_clustering_range() override { }
|
|
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
|
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support fast_forward_to(const dht::partition_range&)");
|
|
}
|
|
virtual future<> fast_forward_to(position_range cr) override {
|
|
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support fast_forward_to(position_range)");
|
|
}
|
|
virtual future<> next_partition() override {
|
|
on_internal_error(sstlog, "mx_sstable_full_scan_reader: doesn't support next_partition()");
|
|
}
|
|
virtual future<> fill_buffer() override {
|
|
if (_end_of_stream) {
|
|
return make_ready_future<>();
|
|
}
|
|
if (!is_initialized()) {
|
|
return maybe_initialize().then([this] { return fill_buffer(); });
|
|
}
|
|
if (_context->eof()) {
|
|
_end_of_stream = true;
|
|
return make_ready_future<>();
|
|
}
|
|
return _context->consume_input();
|
|
}
|
|
virtual future<> close() noexcept override {
|
|
if (!_context) {
|
|
return make_ready_future<>();
|
|
}
|
|
_monitor.on_read_completed();
|
|
return _context->close().handle_exception([_ = std::move(_context)] (std::exception_ptr ep) {
|
|
sstlog.warn("Failed closing of mx_sstable_full_scan_reader: {}. Ignored since the reader is already done.", ep);
|
|
});
|
|
}
|
|
};
|
|
|
|
mutation_reader make_full_scan_reader(
|
|
shared_sstable sstable,
|
|
schema_ptr schema,
|
|
reader_permit permit,
|
|
tracing::trace_state_ptr trace_state,
|
|
read_monitor& monitor,
|
|
integrity_check integrity) {
|
|
return make_mutation_reader<mx_sstable_full_scan_reader>(std::move(sstable), std::move(schema), std::move(permit),
|
|
std::move(trace_state), monitor, integrity);
|
|
}
|
|
|
|
data_consumer::proceed mp_row_consumer_reader_mx::on_next_partition(dht::decorated_key key, tombstone tomb) {
|
|
_partition_finished = false;
|
|
_before_partition = false;
|
|
_end_of_stream = false;
|
|
_current_partition_key = std::move(key);
|
|
push_mutation_fragment(
|
|
mutation_fragment_v2(*_schema, _permit, partition_start(*_current_partition_key, tomb)));
|
|
_sst->get_stats().on_partition_read();
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
// A validating consumer implementing the Consumer concept of data_consume_rows_context_m.
|
|
//
|
|
// It consumes the atoms coming from the parser and validates that the mutation
|
|
// fragment stream they form is valid, namely it checks:
|
|
// * partition ordering
|
|
// * mutation fragment kind ordering
|
|
// * clustering element ordering
|
|
// * partitions being ended properly (before EOS)
|
|
// * range tombstones being ended properly (before end-of-partition)
|
|
//
|
|
// For this, it relies on the mutation_fragment_stream_validator.
|
|
//
|
|
// It also allows for checking that partitions and clustering keys are laid out
|
|
// as expected by the index and promoted index respectively.
|
|
class validating_consumer {
|
|
public:
|
|
struct clustering_block {
|
|
position_in_partition start;
|
|
position_in_partition end;
|
|
bool done = false;
|
|
|
|
clustering_block(position_in_partition_view start, position_in_partition_view end) : start(start), end(end) {
|
|
}
|
|
};
|
|
|
|
private:
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
std::function<void(sstring)> _error_handler;
|
|
// For static-compact tables C* stores the only row in the static row but in our representation they're regular rows.
|
|
const bool _treat_static_row_as_regular;
|
|
mutation_fragment_stream_validator _validator;
|
|
uint64_t _error_count = 0;
|
|
std::optional<partition_key> _expected_pkey;
|
|
std::optional<partition_key> _last_pkey;
|
|
std::optional<clustering_block> _expected_clustering_block;
|
|
position_in_partition _current_pos;
|
|
bool _stop_after_partition_header = false;
|
|
|
|
private:
|
|
clustering_key from_fragmented_buffer(const std::vector<fragmented_temporary_buffer>& ecp) {
|
|
return clustering_key_prefix::from_range(ecp | std::views::transform(
|
|
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
|
|
}
|
|
void validate_fragment_order(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone) {
|
|
if (auto res = _validator(kind, _current_pos, new_current_tombstone); !res) {
|
|
report_error(res.what());
|
|
_validator.reset(kind, _current_pos, new_current_tombstone);
|
|
}
|
|
if (_current_pos.region() != partition_region::clustered) {
|
|
return;
|
|
}
|
|
if (_expected_clustering_block) {
|
|
auto cmp = position_in_partition::tri_compare(*_schema);
|
|
const auto cmp_start = cmp(_expected_clustering_block->start, _current_pos);
|
|
const auto cmp_end = cmp(_expected_clustering_block->end, _current_pos);
|
|
if (cmp_start > 0 || cmp_end < 0) {
|
|
if (_expected_clustering_block->done) {
|
|
report_error(format("mismatching index/data: promoted index has no more blocks, but partition {} ({}) has more rows",
|
|
_validator.previous_partition_key().key().with_schema(*_schema),
|
|
_validator.previous_partition_key().key()));
|
|
} else {
|
|
report_error(format("mismatching index/data: clustering element {} is outside of current promoted-index block [{}, {}]",
|
|
_current_pos,
|
|
_expected_clustering_block->start,
|
|
_expected_clustering_block->end));
|
|
}
|
|
}
|
|
if (cmp_end == 0) {
|
|
sstlog.trace("validating_consumer {}: {}() current block is done", fmt::ptr(this), __FUNCTION__);
|
|
_expected_clustering_block->done = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
public:
|
|
validating_consumer(const schema_ptr schema, reader_permit permit, const shared_sstable& sst, std::function<void(sstring)> error_handler)
|
|
: _schema(schema)
|
|
, _permit(std::move(permit))
|
|
, _error_handler(std::move(error_handler))
|
|
, _treat_static_row_as_regular(_schema->is_static_compact_table()
|
|
&& (!sst->has_scylla_component() || sst->features().is_enabled(sstable_feature::CorrectStaticCompact))) // See #4139
|
|
, _validator(*_schema)
|
|
, _current_pos(position_in_partition::end_of_partition_tag_t{})
|
|
{
|
|
}
|
|
|
|
const reader_permit& permit() const { return _permit; }
|
|
tracing::trace_state_ptr trace_state() { return {}; }
|
|
uint64_t error_count() const { return _error_count; }
|
|
position_in_partition_view current_position() const { return _current_pos; }
|
|
|
|
void set_stop_after_partition_header() {
|
|
_stop_after_partition_header = true;
|
|
}
|
|
|
|
void set_index_expected_partition(partition_key pkey) {
|
|
_expected_pkey.emplace(std::move(pkey));
|
|
}
|
|
void reset_index_expected_partition() {
|
|
_expected_pkey.reset();
|
|
}
|
|
void set_index_expected_clustering_block(position_in_partition_view start, position_in_partition_view end) {
|
|
_expected_clustering_block.emplace(start, end);
|
|
}
|
|
bool in_expected_clustering_block() const {
|
|
return _expected_clustering_block && !_expected_clustering_block->done;
|
|
}
|
|
partition_key get_last_pkey() {
|
|
return _last_pkey.value();
|
|
}
|
|
|
|
void report_error(sstring what) {
|
|
++_error_count;
|
|
_error_handler(what);
|
|
}
|
|
|
|
data_consumer::proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) {
|
|
_last_pkey = key.to_partition_key(*_schema);
|
|
auto dk = dht::decorate_key(*_schema, *_last_pkey);
|
|
_current_pos = position_in_partition(position_in_partition::partition_start_tag_t{});
|
|
sstlog.trace("validating_consumer {}: {}({}) _expected_pkey={}", fmt::ptr(this), __FUNCTION__, _last_pkey, _expected_pkey);
|
|
validate_fragment_order(mutation_fragment_v2::kind::partition_start, {});
|
|
if (_expected_pkey && !_expected_pkey->equal(*_schema, dk.key())) {
|
|
report_error(format("mismatching index/data: partition mismatch: index: {}, data: {}", *_expected_pkey, dk.key()));
|
|
}
|
|
if (auto res = _validator(dk); !res) {
|
|
report_error(res.what());
|
|
_validator.reset(dk);
|
|
}
|
|
_expected_clustering_block.reset();
|
|
if (_stop_after_partition_header) {
|
|
_stop_after_partition_header = false;
|
|
return data_consumer::proceed::no;
|
|
}
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
row_processing_result consume_row_start(const std::vector<fragmented_temporary_buffer>& ecp) {
|
|
auto ck = from_fragmented_buffer(ecp);
|
|
_current_pos = position_in_partition::for_key(std::move(ck));
|
|
sstlog.trace("validating_consumer {}: {}({})", fmt::ptr(this), __FUNCTION__, _current_pos);
|
|
validate_fragment_order(mutation_fragment_v2::kind::clustering_row, {});
|
|
return row_processing_result::do_proceed;
|
|
}
|
|
|
|
data_consumer::proceed consume_row_marker_and_tombstone(const liveness_info& info, tombstone tomb, tombstone shadowable_tomb) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
row_processing_result consume_static_row_start() {
|
|
sstlog.trace("validating_consumer {}: {}()", fmt::ptr(this), __FUNCTION__);
|
|
if (_treat_static_row_as_regular) {
|
|
return consume_row_start({});
|
|
}
|
|
_current_pos = position_in_partition(position_in_partition::static_row_tag_t{});
|
|
validate_fragment_order(mutation_fragment_v2::kind::static_row, {});
|
|
return row_processing_result::do_proceed;
|
|
}
|
|
|
|
data_consumer::proceed consume_column(const column_translation::column_info& column_info, bytes_view cell_path, fragmented_temporary_buffer::view value,
|
|
api::timestamp_type timestamp, gc_clock::duration ttl, gc_clock::time_point local_deletion_time, bool is_deleted) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_complex_column_start(const sstables::column_translation::column_info& column_info, tombstone tomb) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_complex_column_end(const sstables::column_translation::column_info& column_info) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_counter_column(const column_translation::column_info& column_info, fragmented_temporary_buffer::view value, api::timestamp_type timestamp) {
|
|
return data_consumer::proceed::yes;
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp, bound_kind kind, tombstone tomb) {
|
|
auto ck = from_fragmented_buffer(ecp);
|
|
_current_pos = position_in_partition(position_in_partition::range_tag_t(), kind, std::move(ck));
|
|
tombstone new_current_tomb;
|
|
if (kind == bound_kind::incl_start || kind == bound_kind::excl_start) {
|
|
new_current_tomb = tomb;
|
|
}
|
|
sstlog.trace("validating_consumer {}: {}({}, {})", fmt::ptr(this), __FUNCTION__, _current_pos, new_current_tomb);
|
|
validate_fragment_order(mutation_fragment_v2::kind::range_tombstone_change, new_current_tomb);
|
|
if (_expected_clustering_block) {
|
|
return data_consumer::proceed(!_expected_clustering_block->done);
|
|
} else {
|
|
return data_consumer::proceed(!need_preempt());
|
|
}
|
|
}
|
|
|
|
data_consumer::proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp, sstables::bound_kind_m kind, tombstone end_tombstone, tombstone start_tombstone) {
|
|
sstlog.trace("validating_consumer {}: {}()", fmt::ptr(this), __FUNCTION__);
|
|
switch (kind) {
|
|
case bound_kind_m::incl_end_excl_start:
|
|
return consume_range_tombstone(ecp, bound_kind::excl_start, start_tombstone);
|
|
case bound_kind_m::excl_end_incl_start:
|
|
return consume_range_tombstone(ecp, bound_kind::incl_start, start_tombstone);
|
|
default:
|
|
on_parse_error(format("Invalid boundary type", static_cast<std::underlying_type_t<bound_kind_m>>(kind)), {});
|
|
}
|
|
}
|
|
|
|
data_consumer::proceed consume_row_end() {
|
|
sstlog.trace("validating_consumer {}: {}()", fmt::ptr(this), __FUNCTION__);
|
|
if (_expected_clustering_block) {
|
|
return data_consumer::proceed(!_expected_clustering_block->done);
|
|
} else {
|
|
return data_consumer::proceed(!need_preempt());
|
|
}
|
|
}
|
|
|
|
void on_end_of_stream() {
|
|
if (auto res = _validator.on_end_of_stream(); !res) {
|
|
report_error(res.what());
|
|
}
|
|
sstlog.trace("validating_consumer {}: {}()", fmt::ptr(this), __FUNCTION__);
|
|
}
|
|
|
|
data_consumer::proceed consume_partition_end() {
|
|
sstlog.trace("validating_consumer {}: {}()", fmt::ptr(this), __FUNCTION__);
|
|
_current_pos = position_in_partition(position_in_partition::end_of_partition_tag_t{});
|
|
validate_fragment_order(mutation_fragment_v2::kind::partition_end, {});
|
|
return data_consumer::proceed::no;
|
|
}
|
|
};
|
|
|
|
future<uint64_t> validate(
|
|
shared_sstable sstable,
|
|
reader_permit permit,
|
|
abort_source& abort,
|
|
std::function<void(sstring)> error_handler,
|
|
sstables::read_monitor& monitor) {
|
|
auto schema = sstable->get_schema();
|
|
validating_consumer consumer(schema, permit, sstable, std::move(error_handler));
|
|
auto context = co_await data_consume_rows<data_consume_rows_context_m<validating_consumer>>(*schema, sstable, consumer, integrity_check::yes);
|
|
|
|
auto idx_reader = sstable->make_index_reader(permit, tracing::trace_state_ptr{}, sstables::use_caching::no, false);
|
|
auto big_index_reader = dynamic_cast<index_reader*>(idx_reader.get());
|
|
|
|
try {
|
|
monitor.on_read_started(context->reader_position());
|
|
while (!context->eof() && !abort.abort_requested()) {
|
|
uint64_t current_partition_pos = 0;
|
|
clustered_index_cursor* idx_cursor = nullptr;
|
|
|
|
if (idx_reader && idx_reader->eof()) {
|
|
consumer.report_error("mismatching index/data: index is at EOF, but data file has more data");
|
|
co_await idx_reader->close();
|
|
idx_reader.reset();
|
|
big_index_reader = nullptr;
|
|
}
|
|
|
|
consumer.reset_index_expected_partition();
|
|
|
|
if (idx_reader) {
|
|
co_await idx_reader->read_partition_data();
|
|
|
|
if (big_index_reader) {
|
|
idx_cursor = big_index_reader->current_clustered_cursor();
|
|
}
|
|
|
|
const auto index_pos = idx_reader->data_file_positions().start;
|
|
const auto data_pos = context->position();
|
|
auto pk = idx_reader->get_partition_key();
|
|
|
|
if (pk) {
|
|
sstlog.trace("validate(): index-data position check for partition {}: {} == {}", *pk, data_pos, index_pos);
|
|
} else {
|
|
sstlog.trace("validate(): index-data position check for a partition: {} == {}", data_pos, index_pos);
|
|
}
|
|
if (index_pos != data_pos) {
|
|
consumer.report_error(format("mismatching index/data: position mismatch: index: {}, data: {}", index_pos, data_pos));
|
|
}
|
|
current_partition_pos = data_pos;
|
|
|
|
if (pk) {
|
|
consumer.set_index_expected_partition(*pk);
|
|
}
|
|
}
|
|
|
|
std::optional<clustered_index_cursor::entry_info> current_pi_block;
|
|
|
|
if (idx_cursor) {
|
|
consumer.set_stop_after_partition_header();
|
|
co_await context->consume_input();
|
|
}
|
|
bool first_block = true;
|
|
|
|
do {
|
|
if (!consumer.in_expected_clustering_block() && idx_cursor && (current_pi_block = co_await idx_cursor->next_entry())) {
|
|
// The mx format always has position-in-partition in the variant.
|
|
const auto start = std::get<position_in_partition_view>(current_pi_block->start);
|
|
const auto end = std::get<position_in_partition_view>(current_pi_block->end);
|
|
const auto index_pos = current_partition_pos + current_pi_block->offset;
|
|
const auto data_pos = context->position();
|
|
sstlog.trace("validate(): index-data position check for clustering block (first={}) [{}, {}]: {} == {}, partition starts at {}", first_block, start, end, index_pos, data_pos, current_partition_pos);
|
|
// We cannot reliably position the parser at the start of
|
|
// the first block, because there is no way to check what
|
|
// the next element is (static row or clustering row)
|
|
// without reading its header. And once read, we cannot
|
|
// rewind the parser.
|
|
// So we do a best-effort here: we ask the consumer to stop
|
|
// after consuming the partition-start. For schemas without
|
|
// static row this should result in the exact position, but
|
|
// even then it is not guaranteed if the schema has a dropped
|
|
// static column.
|
|
if (first_block) {
|
|
first_block = false;
|
|
if (data_pos > index_pos) {
|
|
consumer.report_error(format("mismatching index/data: position mismatch: first promoted index block: {}, data: {}", index_pos, data_pos));
|
|
}
|
|
} else {
|
|
if (data_pos != index_pos) {
|
|
consumer.report_error(format("mismatching index/data: position mismatch: promoted index: {}, data: {}", index_pos, data_pos));
|
|
}
|
|
}
|
|
|
|
consumer.set_index_expected_clustering_block(start, end);
|
|
}
|
|
co_await context->consume_input();
|
|
|
|
co_await coroutine::maybe_yield();
|
|
} while (consumer.current_position().region() != partition_region::partition_end && !abort.abort_requested());
|
|
|
|
if (abort.abort_requested()) {
|
|
// Prevent fall-through to the post-checks below if the the loop above was broken due to abort.
|
|
break;
|
|
}
|
|
|
|
// Check if promoted index still has more entries.
|
|
if (idx_cursor && (current_pi_block = co_await idx_cursor->next_entry())) {
|
|
consumer.report_error(format("mismatching index/data: promoted index has more blocks, but it is end of partition {} ({})",
|
|
consumer.get_last_pkey().with_schema(*schema),
|
|
consumer.get_last_pkey()));
|
|
}
|
|
|
|
if (idx_reader) {
|
|
co_await idx_reader->advance_to_next_partition();
|
|
}
|
|
}
|
|
} catch (...) {
|
|
consumer.report_error(format("unexpected exception: {}", std::current_exception()));
|
|
}
|
|
|
|
monitor.on_read_completed();
|
|
if (idx_reader) {
|
|
co_await idx_reader->close();
|
|
}
|
|
|
|
co_await context->close();
|
|
co_return consumer.error_count();
|
|
}
|
|
|
|
} // namespace mx
|
|
} // namespace sstables
|