Files
scylladb/sstables/mx/reader.cc
Botond Dénes 9548200e85 sstables: mx/reader: add crawling reader
A special-purpose reader which doesn't use the index at all and hence
doesn't support skipping at all. It is designed to be used in conditions
in which the index is not reliable (scrub compaction).
2021-09-01 08:44:13 +03:00

1665 lines
74 KiB
C++

/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "reader.hh"
#include "concrete_types.hh"
#include "sstables/liveness_info.hh"
#include "sstables/mutation_fragment_filter.hh"
#include "sstables/sstable_mutation_reader.hh"
#include "sstables/processing_result_generator.hh"
namespace sstables {
namespace mx {
class mp_row_consumer_reader_mx : public mp_row_consumer_reader_base, public flat_mutation_reader_v2::impl {
friend class sstables::mx::mp_row_consumer_m;
public:
mp_row_consumer_reader_mx(schema_ptr s, reader_permit permit, shared_sstable sst)
: mp_row_consumer_reader_base(std::move(sst))
, impl(std::move(s), std::move(permit))
{ }
void on_next_partition(dht::decorated_key, tombstone);
};
class mp_row_consumer_m {
reader_permit _permit;
const shared_sstable& _sst;
tracing::trace_state_ptr _trace_state;
const io_priority_class& _pc;
public:
using proceed = data_consumer::proceed;
enum class row_processing_result {
// Causes the parser to return the control to the caller without advancing.
// Next time when the parser is called, the same consumer method will be called.
retry_later,
// Causes the parser to proceed to the next element.
do_proceed,
// Causes the parser to skip the whole row. consume_row_end() will not be called for the current row.
skip_row
};
mp_row_consumer_reader_mx* _reader;
schema_ptr _schema;
const query::partition_slice& _slice;
std::optional<mutation_fragment_filter> _mf_filter;
bool _is_mutation_end = true;
streamed_mutation::forwarding _fwd;
// For static-compact tables C* stores the only row in the static row but in our representation they're regular rows.
const bool _treat_static_row_as_regular;
std::optional<clustering_row> _in_progress_row;
std::optional<range_tombstone_change> _stored_tombstone;
static_row _in_progress_static_row;
bool _inside_static_row = false;
struct cell {
column_id id;
atomic_cell_or_collection val;
};
std::vector<cell> _cells;
collection_mutation_description _cm;
struct range_tombstone_start {
clustering_key_prefix ck;
bound_kind kind;
tombstone tomb;
position_in_partition_view position() const {
return position_in_partition_view(position_in_partition_view::range_tag_t{}, bound_view(ck, kind));
}
};
inline friend std::ostream& operator<<(std::ostream& o, const mp_row_consumer_m::range_tombstone_start& rt_start) {
o << "{ clustering: " << rt_start.ck
<< ", kind: " << rt_start.kind
<< ", tombstone: " << rt_start.tomb << " }";
return o;
}
proceed consume_range_tombstone_start(clustering_key_prefix ck, bound_kind k, tombstone t) {
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_start(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
if (_mf_filter->current_tombstone()) {
throw sstables::malformed_sstable_exception(
format("Range tombstones have to be disjoint: current opened range tombstone {}, new tombstone {}",
_mf_filter->current_tombstone(), t));
}
auto pos = position_in_partition(position_in_partition::range_tag_t(), k, std::move(ck));
return on_range_tombstone_change(std::move(pos), t);
}
proceed consume_range_tombstone_end(clustering_key_prefix ck, bound_kind k, tombstone t) {
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_end(ck={}, k={}, t={})", fmt::ptr(this), ck, k, t);
if (!_mf_filter->current_tombstone()) {
throw sstables::malformed_sstable_exception(
format("Closing range tombstone that wasn't opened: clustering {}, kind {}, tombstone {}",
ck, k, t));
}
if (_mf_filter->current_tombstone() != t) {
throw sstables::malformed_sstable_exception(
format("Range tombstone with ck {} and two different tombstones at ends: {}, {}",
ck, _mf_filter->current_tombstone(), t));
}
auto pos = position_in_partition(position_in_partition::range_tag_t(), k, std::move(ck));
return on_range_tombstone_change(std::move(pos), {});
}
proceed consume_range_tombstone_boundary(position_in_partition pos, tombstone left, tombstone right) {
sstlog.trace("mp_row_consumer_m {}: consume_range_tombstone_boundary(pos={}, left={}, right={})", fmt::ptr(this), pos, left, right);
if (!_mf_filter->current_tombstone()) {
throw sstables::malformed_sstable_exception(
format("Closing range tombstone that wasn't opened: pos {}, tombstone {}", pos, left));
}
if (_mf_filter->current_tombstone() != left) {
throw sstables::malformed_sstable_exception(
format("Range tombstone at {} and two different tombstones at ends: {}, {}",
pos, _mf_filter->current_tombstone(), left));
}
return on_range_tombstone_change(std::move(pos), right);
}
const column_definition& get_column_definition(std::optional<column_id> column_id) const {
auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column;
return _schema->column_at(column_type, *column_id);
}
inline proceed on_range_tombstone_change(position_in_partition pos, tombstone t) {
sstlog.trace("mp_row_consumer_m {}: on_range_tombstone_change({}, {}->{})", fmt::ptr(this), pos,
_mf_filter->current_tombstone(), t);
mutation_fragment_filter::clustering_result result = _mf_filter->apply(pos, t);
for (auto&& rt : result.rts) {
sstlog.trace("mp_row_consumer_m {}: push({})", fmt::ptr(this), rt);
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
}
switch (result.action) {
case mutation_fragment_filter::result::emit:
sstlog.trace("mp_row_consumer_m {}: emit", fmt::ptr(this));
break;
case mutation_fragment_filter::result::ignore:
sstlog.trace("mp_row_consumer_m {}: ignore", fmt::ptr(this));
if (_mf_filter->out_of_range()) {
_reader->on_out_of_clustering_range();
return proceed::no;
}
if (_mf_filter->is_current_range_changed()) {
return proceed::no;
}
break;
case mutation_fragment_filter::result::store_and_finish:
sstlog.trace("mp_row_consumer_m {}: store", fmt::ptr(this));
_stored_tombstone = range_tombstone_change(pos, t);
_reader->on_out_of_clustering_range();
return proceed::no;
}
return proceed(!_reader->is_buffer_full() && !need_preempt());
}
inline void reset_for_new_partition() {
_is_mutation_end = true;
_in_progress_row.reset();
_stored_tombstone.reset();
_mf_filter.reset();
}
void check_schema_mismatch(const column_translation::column_info& column_info, const column_definition& column_def) const {
if (column_info.schema_mismatch) {
throw malformed_sstable_exception(
format("{} definition in serialization header does not match schema. Expected {} but got {}",
column_def.name_as_text(),
column_def.type->name(),
column_info.type->name()));
}
}
void check_column_missing_in_current_schema(const column_translation::column_info& column_info,
api::timestamp_type timestamp) const {
if (!column_info.id) {
sstring name = sstring(to_sstring_view(*column_info.name));
auto it = _schema->dropped_columns().find(name);
if (it == _schema->dropped_columns().end() || timestamp > it->second.timestamp) {
throw malformed_sstable_exception(format("Column {} missing in current schema", name));
}
}
}
public:
mp_row_consumer_m(mp_row_consumer_reader_mx* reader,
const schema_ptr schema,
reader_permit permit,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: _permit(std::move(permit))
, _sst(sst)
, _trace_state(std::move(trace_state))
, _pc(pc)
, _reader(reader)
, _schema(schema)
, _slice(slice)
, _fwd(fwd)
, _treat_static_row_as_regular(_schema->is_static_compact_table()
&& (!sst->has_scylla_component() || sst->features().is_enabled(sstable_feature::CorrectStaticCompact))) // See #4139
{
_cells.reserve(std::max(_schema->static_columns_count(), _schema->regular_columns_count()));
}
mp_row_consumer_m(mp_row_consumer_reader_mx* reader,
const schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
const shared_sstable& sst)
: mp_row_consumer_m(reader, schema, std::move(permit), schema->full_slice(), pc, std::move(trace_state), fwd, sst)
{ }
~mp_row_consumer_m() {}
// See the RowConsumer concept
void push_ready_fragments() {
if (auto rto = std::move(_stored_tombstone)) {
_stored_tombstone = std::nullopt;
on_range_tombstone_change(rto->position(), rto->tombstone());
}
}
std::optional<position_in_partition_view> maybe_skip() {
if (!_mf_filter) {
return {};
}
return _mf_filter->maybe_skip();
}
bool is_mutation_end() const {
return _is_mutation_end;
}
void setup_for_partition(const partition_key& pk) {
sstlog.trace("mp_row_consumer_m {}: setup_for_partition({})", fmt::ptr(this), pk);
_is_mutation_end = false;
_mf_filter.emplace(*_schema, _slice, pk, _fwd);
}
std::optional<position_in_partition_view> fast_forward_to(position_range r) {
if (!_mf_filter) {
_reader->on_out_of_clustering_range();
return {};
}
// r is used to trim range tombstones and range_tombstone:s can be trimmed only to positions
// which are !is_clustering_row(). Replace with equivalent ranges.
// Long-term we should guarantee this on position_range.
if (r.start().is_clustering_row()) {
r.set_start(position_in_partition::before_key(r.start().key()));
}
if (r.end().is_clustering_row()) {
r.set_end(position_in_partition::before_key(r.end().key()));
}
auto skip = _mf_filter->fast_forward_to(std::move(r));
if (skip) {
position_in_partition::less_compare less(*_schema);
// No need to skip using index if stored fragments are after the start of the range
if (_in_progress_row && !less(_in_progress_row->position(), *skip)) {
return {};
}
if (_stored_tombstone && !less(_stored_tombstone->position(), *skip)) {
return {};
}
}
if (_mf_filter->out_of_range()) {
_reader->on_out_of_clustering_range();
}
return skip;
}
/*
* Sets the range tombstone start. Overwrites the currently set RT start if any.
* Used for skipping through wide partitions using index when the data block
* skipped to starts in the middle of an opened range tombstone.
*/
void set_range_tombstone(tombstone t) {
sstlog.trace("mp_row_consumer_m {}: set_range_tombstone({})", fmt::ptr(this), t);
_mf_filter->set_tombstone(t);
}
// Consume the row's key and deletion_time. The latter determines if the
// row is a tombstone, and if so, when it has been deleted.
// Note that the key is in serialized form, and should be deserialized
// (according to the schema) before use.
// As explained above, the key object is only valid during this call, and
// if the implementation wishes to save it, it must copy the *contents*.
proceed consume_partition_start(sstables::key_view key, sstables::deletion_time deltime) {
sstlog.trace("mp_row_consumer_m {}: consume_partition_start(deltime=({}, {})), _is_mutation_end={}", fmt::ptr(this),
deltime.local_deletion_time, deltime.marked_for_delete_at, _is_mutation_end);
if (!_is_mutation_end) {
return proceed::yes;
}
auto pk = partition_key::from_exploded(key.explode(*_schema));
setup_for_partition(pk);
auto dk = dht::decorate_key(*_schema, pk);
_reader->on_next_partition(std::move(dk), tombstone(deltime));
return proceed(!_reader->is_buffer_full() && !need_preempt());
}
mp_row_consumer_m::row_processing_result consume_row_start(const std::vector<fragmented_temporary_buffer>& ecp) {
auto key = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
sstlog.trace("mp_row_consumer_m {}: consume_row_start({})", fmt::ptr(this), key);
_in_progress_row.emplace(std::move(key));
mutation_fragment_filter::clustering_result res = _mf_filter->apply(_in_progress_row->position());
for (auto&& rt : res.rts) {
sstlog.trace("mp_row_consumer_m {}: push({})", fmt::ptr(this), rt);
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
}
switch (res.action) {
case mutation_fragment_filter::result::emit:
sstlog.trace("mp_row_consumer_m {}: emit", fmt::ptr(this));
return mp_row_consumer_m::row_processing_result::do_proceed;
case mutation_fragment_filter::result::ignore:
sstlog.trace("mp_row_consumer_m {}: ignore", fmt::ptr(this));
if (_mf_filter->out_of_range()) {
_reader->on_out_of_clustering_range();
// We actually want skip_later, which doesn't exist, but retry_later
// is ok because signalling out-of-range on the reader will cause it
// to either stop reading or skip to the next partition using index,
// not by ignoring fragments.
return mp_row_consumer_m::row_processing_result::retry_later;
}
if (_mf_filter->is_current_range_changed()) {
return mp_row_consumer_m::row_processing_result::retry_later;
} else {
_in_progress_row.reset();
return mp_row_consumer_m::row_processing_result::skip_row;
}
case mutation_fragment_filter::result::store_and_finish:
sstlog.trace("mp_row_consumer_m {}: store_and_finish", fmt::ptr(this));
_reader->on_out_of_clustering_range();
return mp_row_consumer_m::row_processing_result::retry_later;
}
abort();
}
proceed consume_row_marker_and_tombstone(
const liveness_info& info, tombstone tomb, tombstone shadowable_tomb) {
sstlog.trace("mp_row_consumer_m {}: consume_row_marker_and_tombstone({}, {}, {}), key={}",
fmt::ptr(this), info.to_row_marker(), tomb, shadowable_tomb, _in_progress_row->position());
_in_progress_row->apply(info.to_row_marker());
_in_progress_row->apply(tomb);
if (shadowable_tomb) {
_in_progress_row->apply(shadowable_tombstone{shadowable_tomb});
}
if (_in_progress_row->tomb()) {
_sst->get_stats().on_row_tombstone_read();
}
return proceed::yes;
}
mp_row_consumer_m::row_processing_result consume_static_row_start() {
sstlog.trace("mp_row_consumer_m {}: consume_static_row_start()", fmt::ptr(this));
if (_treat_static_row_as_regular) {
return consume_row_start({});
}
_inside_static_row = true;
_in_progress_static_row = static_row();
return mp_row_consumer_m::row_processing_result::do_proceed;
}
proceed consume_column(const column_translation::column_info& column_info,
bytes_view cell_path,
fragmented_temporary_buffer::view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time,
bool is_deleted) {
const std::optional<column_id>& column_id = column_info.id;
sstlog.trace("mp_row_consumer_m {}: consume_column(id={}, path={}, value={}, ts={}, ttl={}, del_time={}, deleted={})", fmt::ptr(this),
column_id, fmt_hex(cell_path), value, timestamp, ttl.count(), local_deletion_time.time_since_epoch().count(), is_deleted);
check_column_missing_in_current_schema(column_info, timestamp);
if (!column_id) {
return proceed::yes;
}
const column_definition& column_def = get_column_definition(column_id);
if (timestamp <= column_def.dropped_at()) {
return proceed::yes;
}
check_schema_mismatch(column_info, column_def);
if (column_def.is_multi_cell()) {
auto& value_type = visit(*column_def.type, make_visitor(
[] (const collection_type_impl& ctype) -> const abstract_type& { return *ctype.value_comparator(); },
[&] (const user_type_impl& utype) -> const abstract_type& {
if (cell_path.size() != sizeof(int16_t)) {
throw malformed_sstable_exception(format("wrong size of field index while reading UDT column: expected {}, got {}",
sizeof(int16_t), cell_path.size()));
}
auto field_idx = deserialize_field_index(cell_path);
if (field_idx >= utype.size()) {
throw malformed_sstable_exception(format("field index too big while reading UDT column: type has {} fields, got {}",
utype.size(), field_idx));
}
return *utype.type(field_idx);
},
[] (const abstract_type& o) -> const abstract_type& {
throw malformed_sstable_exception(format("attempted to read multi-cell column, but expected type was {}", o.name()));
}
));
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
: make_atomic_cell(value_type,
timestamp,
value,
ttl,
local_deletion_time,
atomic_cell::collection_member::yes);
_cm.cells.emplace_back(to_bytes(cell_path), std::move(ac));
} else {
auto ac = is_deleted ? atomic_cell::make_dead(timestamp, local_deletion_time)
: make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time,
atomic_cell::collection_member::no);
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
}
return proceed::yes;
}
proceed consume_complex_column_start(const sstables::column_translation::column_info& column_info,
tombstone tomb) {
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_start({}, {})", fmt::ptr(this), column_info.id, tomb);
_cm.tomb = tomb;
_cm.cells.clear();
return proceed::yes;
}
proceed consume_complex_column_end(const sstables::column_translation::column_info& column_info) {
const std::optional<column_id>& column_id = column_info.id;
sstlog.trace("mp_row_consumer_m {}: consume_complex_column_end({})", fmt::ptr(this), column_id);
if (_cm.tomb) {
check_column_missing_in_current_schema(column_info, _cm.tomb.timestamp);
}
if (column_id) {
const column_definition& column_def = get_column_definition(column_id);
if (!_cm.cells.empty() || (_cm.tomb && _cm.tomb.timestamp > column_def.dropped_at())) {
check_schema_mismatch(column_info, column_def);
_cells.push_back({column_def.id, _cm.serialize(*column_def.type)});
}
}
_cm.tomb = {};
_cm.cells.clear();
return proceed::yes;
}
proceed consume_counter_column(const column_translation::column_info& column_info,
fragmented_temporary_buffer::view value,
api::timestamp_type timestamp) {
const std::optional<column_id>& column_id = column_info.id;
sstlog.trace("mp_row_consumer_m {}: consume_counter_column({}, {}, {})", fmt::ptr(this), column_id, value, timestamp);
check_column_missing_in_current_schema(column_info, timestamp);
if (!column_id) {
return proceed::yes;
}
const column_definition& column_def = get_column_definition(column_id);
if (timestamp <= column_def.dropped_at()) {
return proceed::yes;
}
check_schema_mismatch(column_info, column_def);
auto ac = make_counter_cell(timestamp, value);
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
return proceed::yes;
}
proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
bound_kind kind,
tombstone tomb) {
auto ck = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
if (kind == bound_kind::incl_start || kind == bound_kind::excl_start) {
return consume_range_tombstone_start(std::move(ck), kind, std::move(tomb));
} else { // *_end kind
return consume_range_tombstone_end(std::move(ck), kind, std::move(tomb));
}
}
proceed consume_range_tombstone(const std::vector<fragmented_temporary_buffer>& ecp,
sstables::bound_kind_m kind,
tombstone end_tombstone,
tombstone start_tombstone) {
auto ck = clustering_key_prefix::from_range(ecp | boost::adaptors::transformed(
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
switch (kind) {
case bound_kind_m::incl_end_excl_start: {
auto pos = position_in_partition(position_in_partition::range_tag_t(), bound_kind::incl_end, std::move(ck));
return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone);
}
case bound_kind_m::excl_end_incl_start: {
auto pos = position_in_partition(position_in_partition::range_tag_t(), bound_kind::excl_end, std::move(ck));
return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone);
}
default:
assert(false && "Invalid boundary type");
}
}
proceed consume_row_end() {
auto fill_cells = [this] (column_kind kind, row& cells) {
for (auto &&c : _cells) {
cells.apply(_schema->column_at(kind, c.id), std::move(c.val));
}
_cells.clear();
};
if (_inside_static_row) {
fill_cells(column_kind::static_column, _in_progress_static_row.cells());
sstlog.trace("mp_row_consumer_m {}: consume_row_end(_in_progress_static_row={})", fmt::ptr(this), static_row::printer(*_schema, _in_progress_static_row));
_inside_static_row = false;
if (!_in_progress_static_row.empty()) {
auto action = _mf_filter->apply(_in_progress_static_row);
switch (action) {
case mutation_fragment_filter::result::emit:
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(_in_progress_static_row)));
break;
case mutation_fragment_filter::result::ignore:
break;
case mutation_fragment_filter::result::store_and_finish:
// static row is always either emited or ignored.
throw runtime_exception("We should never need to store static row");
}
}
} else {
if (!_cells.empty()) {
fill_cells(column_kind::regular_column, _in_progress_row->cells());
}
_reader->push_mutation_fragment(mutation_fragment_v2(
*_schema, permit(), *std::exchange(_in_progress_row, {})));
}
return proceed(!_reader->is_buffer_full() && !need_preempt());
}
void on_end_of_stream() {
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream()", fmt::ptr(this));
if (_mf_filter && _mf_filter->current_tombstone()) {
if (_mf_filter->out_of_range()) {
throw sstables::malformed_sstable_exception("Unclosed range tombstone.");
}
auto result = _mf_filter->apply(position_in_partition_view::after_all_clustered_rows(), {});
for (auto&& rt : result.rts) {
sstlog.trace("mp_row_consumer_m {}: on_end_of_stream(), emitting last tombstone: {}", fmt::ptr(this), rt);
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), std::move(rt)));
}
}
if (!_reader->_partition_finished) {
consume_partition_end();
}
_reader->_end_of_stream = true;
}
// Called at the end of the row, after all cells.
// Returns a flag saying whether the sstable consumer should stop now, or
// proceed consuming more data.
proceed consume_partition_end() {
sstlog.trace("mp_row_consumer_m {}: consume_partition_end()", fmt::ptr(this));
reset_for_new_partition();
if (_fwd == streamed_mutation::forwarding::yes) {
_reader->_end_of_stream = true;
return proceed::no;
}
_reader->_index_in_current_partition = false;
_reader->_partition_finished = true;
_reader->_before_partition = true;
_reader->push_mutation_fragment(mutation_fragment_v2(*_schema, permit(), partition_end()));
return proceed(!_reader->is_buffer_full() && !need_preempt());
}
// Called when the reader is fast forwarded to given element.
void reset(sstables::indexable_element el) {
sstlog.trace("mp_row_consumer_m {}: reset({})", fmt::ptr(this), static_cast<int>(el));
if (el == indexable_element::partition) {
reset_for_new_partition();
} else {
_in_progress_row.reset();
_stored_tombstone.reset();
_is_mutation_end = false;
}
}
position_in_partition_view position() {
if (_inside_static_row) {
return position_in_partition_view(position_in_partition_view::static_row_tag_t{});
}
if (_stored_tombstone) {
return _stored_tombstone->position();
}
if (_in_progress_row) {
return _in_progress_row->position();
}
if (_is_mutation_end) {
return position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{});
}
return position_in_partition_view(position_in_partition_view::partition_start_tag_t{});
}
// Under which priority class to place I/O coming from this consumer
const io_priority_class& io_priority() const {
return _pc;
}
// The permit for this read
reader_permit& permit() {
return _permit;
}
tracing::trace_state_ptr trace_state() const {
return _trace_state;
}
};
// data_consume_rows_context_m remembers the context that an ongoing
// data_consume_rows() future is in for SSTable in 3_x format.
class data_consume_rows_context_m : public data_consumer::continuous_data_consumer<data_consume_rows_context_m> {
private:
enum class state {
PARTITION_START,
DELETION_TIME,
FLAGS,
OTHER,
} _state = state::PARTITION_START;
// becomes false when we yield in the main coroutine, although we don't need to consume
// more data buffers to continue, switch back to true afterwards
bool _consuming = true;
mp_row_consumer_m& _consumer;
shared_sstable _sst;
const serialization_header& _header;
column_translation _column_translation;
const bool _has_shadowable_tombstones;
temporary_buffer<char> _pk;
unfiltered_flags_m _flags{0};
unfiltered_extended_flags_m _extended_flags{0};
uint64_t _next_row_offset;
liveness_info _liveness;
bool _is_first_unfiltered = true;
std::vector<fragmented_temporary_buffer> _row_key;
struct row_schema {
using column_range = boost::iterator_range<std::vector<column_translation::column_info>::const_iterator>;
// All columns for this kind of row inside column_translation of the current sstable
column_range _all_columns;
// Subrange of _all_columns which is yet to be processed for current row
column_range _columns;
// Represents the subset of _all_columns present in current row
boost::dynamic_bitset<uint64_t> _columns_selector; // size() == _columns.size()
};
row_schema _regular_row;
row_schema _static_row;
row_schema* _row;
uint64_t _missing_columns_to_read;
boost::iterator_range<std::vector<std::optional<uint32_t>>::const_iterator> _ck_column_value_fix_lengths;
tombstone _row_tombstone;
tombstone _row_shadowable_tombstone;
column_flags_m _column_flags{0};
api::timestamp_type _column_timestamp;
gc_clock::time_point _column_local_deletion_time;
gc_clock::duration _column_ttl;
fragmented_temporary_buffer _column_value;
temporary_buffer<char> _cell_path;
uint64_t _ck_blocks_header;
uint32_t _ck_blocks_header_offset;
bool _null_component_occured;
uint64_t _subcolumns_to_read = 0;
api::timestamp_type _complex_column_marked_for_delete;
tombstone _complex_column_tombstone;
bool _reading_range_tombstone_ck = false;
bound_kind_m _range_tombstone_kind;
uint16_t _ck_size;
/*
* We need two range tombstones because range tombstone marker can be either a single bound
* or a double bound that represents end of one range tombstone and start of another at the same time.
* If range tombstone marker is a single bound then only _left_range_tombstone is used.
* Otherwise, _left_range_tombstone represents tombstone for a range tombstone that's being closed
* and _right_range_tombstone represents a tombstone for a range tombstone that's being opened.
*/
tombstone _left_range_tombstone;
tombstone _right_range_tombstone;
processing_result_generator _gen;
temporary_buffer<char>* _processing_data;
void start_row(row_schema& rs) {
_row = &rs;
_row->_columns = _row->_all_columns;
}
void setup_columns(row_schema& rs, const std::vector<column_translation::column_info>& columns) {
rs._all_columns = boost::make_iterator_range(columns);
rs._columns_selector = boost::dynamic_bitset<uint64_t>(columns.size());
}
void skip_absent_columns() {
size_t pos = _row->_columns_selector.find_first();
if (pos == boost::dynamic_bitset<uint64_t>::npos) {
pos = _row->_columns.size();
}
_row->_columns.advance_begin(pos);
}
bool no_more_columns() const { return _row->_columns.empty(); }
void move_to_next_column() {
size_t current_pos = _row->_columns_selector.size() - _row->_columns.size();
size_t next_pos = _row->_columns_selector.find_next(current_pos);
size_t jump_to_next = (next_pos == boost::dynamic_bitset<uint64_t>::npos) ? _row->_columns.size()
: next_pos - current_pos;
_row->_columns.advance_begin(jump_to_next);
}
bool is_column_simple() const { return !_row->_columns.front().is_collection; }
bool is_column_counter() const { return _row->_columns.front().is_counter; }
const column_translation::column_info& get_column_info() const {
return _row->_columns.front();
}
std::optional<uint32_t> get_column_value_length() const {
return _row->_columns.front().value_length;
}
void setup_ck(const std::vector<std::optional<uint32_t>>& column_value_fix_lengths) {
_row_key.clear();
_row_key.reserve(column_value_fix_lengths.size());
if (column_value_fix_lengths.empty()) {
_ck_column_value_fix_lengths = boost::make_iterator_range(column_value_fix_lengths);
} else {
_ck_column_value_fix_lengths = boost::make_iterator_range(std::begin(column_value_fix_lengths),
std::begin(column_value_fix_lengths) + _ck_size);
}
_ck_blocks_header_offset = 0u;
}
bool no_more_ck_blocks() const { return _ck_column_value_fix_lengths.empty(); }
void move_to_next_ck_block() {
_ck_column_value_fix_lengths.advance_begin(1);
++_ck_blocks_header_offset;
if (_ck_blocks_header_offset == 32u) {
_ck_blocks_header_offset = 0u;
}
}
std::optional<uint32_t> get_ck_block_value_length() const {
return _ck_column_value_fix_lengths.front();
}
bool is_block_empty() const {
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset))) != 0;
}
bool is_block_null() const {
return (_ck_blocks_header & (uint64_t(1) << (2 * _ck_blocks_header_offset + 1))) != 0;
}
bool should_read_block_header() const {
return _ck_blocks_header_offset == 0u;
}
public:
using consumer = mp_row_consumer_m;
// assumes !primitive_consumer::active()
bool non_consuming() const {
return !_consuming;
}
data_consumer::processing_result process_state(temporary_buffer<char>& data) {
_processing_data = &data;
return _gen.generate();
}
private:
processing_result_generator do_process_state() {
if (_state != state::PARTITION_START) {
goto flags_label;
}
partition_start_label: {
_is_first_unfiltered = true;
_state = state::DELETION_TIME;
co_yield read_short_length_bytes(*_processing_data, _pk);
_state = state::OTHER;
co_yield read_32(*_processing_data);
co_yield read_64(*_processing_data);
deletion_time del;
del.local_deletion_time = _u32;
del.marked_for_delete_at = _u64;
auto ret = _consumer.consume_partition_start(key_view(to_bytes_view(_pk)), del);
// after calling the consume function, we can release the
// buffers we held for it.
_pk.release();
_state = state::FLAGS;
if (ret == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
}
flags_label:
_liveness = {};
_row_tombstone = {};
_row_shadowable_tombstone = {};
co_yield read_8(*_processing_data);
_flags = unfiltered_flags_m(_u8);
_state = state::OTHER;
if (_flags.is_end_of_partition()) {
_state = state::PARTITION_START;
if (_consumer.consume_partition_end() == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
goto partition_start_label;
} else if (_flags.is_range_tombstone()) {
_is_first_unfiltered = false;
co_yield read_8(*_processing_data);
_range_tombstone_kind = bound_kind_m(_u8);
co_yield read_16(*_processing_data);
_ck_size = _u16;
if (_ck_size == 0) {
_row_key.clear();
_range_tombstone_kind = is_start(_range_tombstone_kind)
? bound_kind_m::incl_start : bound_kind_m::incl_end;
goto range_tombstone_body_label;
} else {
_reading_range_tombstone_ck = true;
}
} else if (!_flags.has_extended_flags()) {
_extended_flags = unfiltered_extended_flags_m(uint8_t{0u});
start_row(_regular_row);
_ck_size = _column_translation.clustering_column_value_fix_legths().size();
} else {
co_yield read_8(*_processing_data);
_extended_flags = unfiltered_extended_flags_m(_u8);
if (_extended_flags.has_cassandra_shadowable_deletion()) {
throw std::runtime_error("SSTables with Cassandra-style shadowable deletion cannot be read by Scylla");
}
if (_extended_flags.is_static()) {
if (_is_first_unfiltered) {
start_row(_static_row);
_is_first_unfiltered = false;
goto row_body_label;
} else {
throw malformed_sstable_exception("static row should be a first unfiltered in a partition");
}
}
start_row(_regular_row);
_ck_size = _column_translation.clustering_column_value_fix_legths().size();
}
_is_first_unfiltered = false;
_null_component_occured = false;
setup_ck(_column_translation.clustering_column_value_fix_legths());
while (!no_more_ck_blocks()) {
if (should_read_block_header()) {
co_yield read_unsigned_vint(*_processing_data);
_ck_blocks_header = _u64;
}
if (is_block_null()) {
_null_component_occured = true;
move_to_next_ck_block();
continue;
}
if (_null_component_occured) {
throw malformed_sstable_exception("non-null component after null component");
}
if (is_block_empty()) {
_row_key.push_back({});
move_to_next_ck_block();
continue;
}
read_status status = read_status::waiting;
if (auto len = get_ck_block_value_length()) {
status = read_bytes(*_processing_data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(*_processing_data, _column_value);
}
co_yield status;
_row_key.push_back(std::move(_column_value));
move_to_next_ck_block();
}
if (_reading_range_tombstone_ck) {
_reading_range_tombstone_ck = false;
goto range_tombstone_body_label;
}
row_body_label: {
co_yield read_unsigned_vint(*_processing_data);
_next_row_offset = position() - _processing_data->size() + _u64;
co_yield read_unsigned_vint(*_processing_data);
// Ignore the result
mp_row_consumer_m::row_processing_result ret = _extended_flags.is_static()
? _consumer.consume_static_row_start()
: _consumer.consume_row_start(_row_key);
while (ret == mp_row_consumer_m::row_processing_result::retry_later) {
co_yield mp_row_consumer_m::proceed::no;
ret = _extended_flags.is_static()
? _consumer.consume_static_row_start()
: _consumer.consume_row_start(_row_key);
}
if (ret == mp_row_consumer_m::row_processing_result::skip_row) {
_state = state::FLAGS;
auto current_pos = position() - _processing_data->size();
auto maybe_skip_bytes = skip(*_processing_data, _next_row_offset - current_pos);
if (std::holds_alternative<skip_bytes>(maybe_skip_bytes)) {
co_yield maybe_skip_bytes;
}
goto flags_label;
}
if (_extended_flags.is_static()) {
if (_flags.has_timestamp() || _flags.has_ttl() || _flags.has_deletion()) {
throw malformed_sstable_exception(format("Static row has unexpected flags: timestamp={}, ttl={}, deletion={}",
_flags.has_timestamp(), _flags.has_ttl(), _flags.has_deletion()));
}
} else {
if (_flags.has_timestamp()) {
co_yield read_unsigned_vint(*_processing_data);
_liveness.set_timestamp(parse_timestamp(_header, _u64));
if (_flags.has_ttl()) {
co_yield read_unsigned_vint(*_processing_data);
_liveness.set_ttl(parse_ttl(_header, _u64));
co_yield read_unsigned_vint(*_processing_data);
_liveness.set_local_deletion_time(parse_expiry(_header, _u64));
}
}
if (_flags.has_deletion()) {
co_yield read_unsigned_vint(*_processing_data);
_row_tombstone.timestamp = parse_timestamp(_header, _u64);
co_yield read_unsigned_vint(*_processing_data);
_row_tombstone.deletion_time = parse_expiry(_header, _u64);
}
if (_extended_flags.has_scylla_shadowable_deletion()) {
if (!_has_shadowable_tombstones) {
throw malformed_sstable_exception("Scylla shadowable tombstone flag is set but not supported on this SSTables");
}
co_yield read_unsigned_vint(*_processing_data);
_row_shadowable_tombstone.timestamp = parse_timestamp(_header, _u64);
co_yield read_unsigned_vint(*_processing_data);
_row_shadowable_tombstone.deletion_time = parse_expiry(_header, _u64);
}
_consumer.consume_row_marker_and_tombstone(
_liveness, std::move(_row_tombstone), std::move(_row_shadowable_tombstone));
}
if (!_flags.has_all_columns()) {
co_yield read_unsigned_vint(*_processing_data);
uint64_t missing_column_bitmap_or_count = _u64;
if (_row->_columns.size() < 64) {
_row->_columns_selector.clear();
_row->_columns_selector.append(missing_column_bitmap_or_count);
_row->_columns_selector.flip();
_row->_columns_selector.resize(_row->_columns.size());
skip_absent_columns();
goto column_label;
}
_row->_columns_selector.resize(_row->_columns.size());
if (_row->_columns.size() - missing_column_bitmap_or_count < _row->_columns.size() / 2) {
_missing_columns_to_read = _row->_columns.size() - missing_column_bitmap_or_count;
_row->_columns_selector.reset();
} else {
_missing_columns_to_read = missing_column_bitmap_or_count;
_row->_columns_selector.set();
}
while (_missing_columns_to_read > 0) {
--_missing_columns_to_read;
co_yield read_unsigned_vint(*_processing_data);
_row->_columns_selector.flip(_u64);
}
skip_absent_columns();
} else {
_row->_columns_selector.set();
}
}
column_label:
if (_subcolumns_to_read == 0) {
if (no_more_columns()) {
_state = state::FLAGS;
if (_consumer.consume_row_end() == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
goto flags_label;
}
if (!is_column_simple()) {
if (!_flags.has_complex_deletion()) {
_complex_column_tombstone = {};
} else {
co_yield read_unsigned_vint(*_processing_data);
_complex_column_marked_for_delete = parse_timestamp(_header, _u64);
co_yield read_unsigned_vint(*_processing_data);
_complex_column_tombstone = {_complex_column_marked_for_delete, parse_expiry(_header, _u64)};
}
if (_consumer.consume_complex_column_start(get_column_info(), _complex_column_tombstone) == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
co_yield read_unsigned_vint(*_processing_data);
_subcolumns_to_read = _u64;
if (_subcolumns_to_read == 0) {
const sstables::column_translation::column_info& column_info = get_column_info();
move_to_next_column();
if (_consumer.consume_complex_column_end(column_info) == mp_row_consumer_m::proceed::no) {
_consuming = false;
co_yield mp_row_consumer_m::proceed::no;
_consuming = true;
}
}
goto column_label;
}
_subcolumns_to_read = 0;
}
co_yield read_8(*_processing_data);
_column_flags = column_flags_m(_u8);
if (_column_flags.use_row_timestamp()) {
_column_timestamp = _liveness.timestamp();
} else {
co_yield read_unsigned_vint(*_processing_data);
_column_timestamp = parse_timestamp(_header, _u64);
}
if (_column_flags.use_row_ttl()) {
_column_local_deletion_time = _liveness.local_deletion_time();
} else if (!_column_flags.is_deleted() && ! _column_flags.is_expiring()) {
_column_local_deletion_time = gc_clock::time_point::max();
} else {
co_yield read_unsigned_vint(*_processing_data);
_column_local_deletion_time = parse_expiry(_header, _u64);
}
if (_column_flags.use_row_ttl()) {
_column_ttl = _liveness.ttl();
} else if (!_column_flags.is_expiring()) {
_column_ttl = gc_clock::duration::zero();
} else {
co_yield read_unsigned_vint(*_processing_data);
_column_ttl = parse_ttl(_header, _u64);
}
if (!is_column_simple()) {
co_yield read_unsigned_vint_length_bytes_contiguous(*_processing_data, _cell_path);
} else {
_cell_path = temporary_buffer<char>(0);
}
if (!_column_flags.has_value()) {
_column_value = fragmented_temporary_buffer();
} else {
read_status status = read_status::waiting;
if (auto len = get_column_value_length()) {
status = read_bytes(*_processing_data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(*_processing_data, _column_value);
}
co_yield status;
}
_consuming = false;
if (is_column_counter() && !_column_flags.is_deleted()) {
if (_consumer.consume_counter_column(get_column_info(),
fragmented_temporary_buffer::view(_column_value),
_column_timestamp) == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
} else {
if (_consumer.consume_column(get_column_info(),
to_bytes_view(_cell_path),
fragmented_temporary_buffer::view(_column_value),
_column_timestamp,
_column_ttl,
_column_local_deletion_time,
_column_flags.is_deleted()) == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
}
if (!is_column_simple()) {
--_subcolumns_to_read;
if (_subcolumns_to_read == 0) {
const sstables::column_translation::column_info& column_info = get_column_info();
move_to_next_column();
if (_consumer.consume_complex_column_end(column_info) == mp_row_consumer_m::proceed::no) {
co_yield mp_row_consumer_m::proceed::no;
}
}
} else {
move_to_next_column();
}
_consuming = true;
goto column_label;
range_tombstone_body_label:
co_yield read_unsigned_vint(*_processing_data);
// Ignore result
co_yield read_unsigned_vint(*_processing_data);
// Ignore result
co_yield read_unsigned_vint(*_processing_data);
_left_range_tombstone.timestamp = parse_timestamp(_header, _u64);
co_yield read_unsigned_vint(*_processing_data);
_left_range_tombstone.deletion_time = parse_expiry(_header, _u64);
if (!is_boundary_between_adjacent_intervals(_range_tombstone_kind)) {
if (!is_bound_kind(_range_tombstone_kind)) {
throw sstables::malformed_sstable_exception(
format("Corrupted range tombstone: invalid boundary type {}", _range_tombstone_kind));
}
_sst->get_stats().on_range_tombstone_read();
_state = state::FLAGS;
if (_consumer.consume_range_tombstone(_row_key,
to_bound_kind(_range_tombstone_kind),
_left_range_tombstone) == mp_row_consumer_m::proceed::no) {
_row_key.clear();
co_yield mp_row_consumer_m::proceed::no;
}
_row_key.clear();
goto flags_label;
}
co_yield read_unsigned_vint(*_processing_data);
_right_range_tombstone.timestamp = parse_timestamp(_header, _u64);
co_yield read_unsigned_vint(*_processing_data);
_sst->get_stats().on_range_tombstone_read();
_right_range_tombstone.deletion_time = parse_expiry(_header, _u64);
_state = state::FLAGS;
if (_consumer.consume_range_tombstone(_row_key,
_range_tombstone_kind,
_left_range_tombstone,
_right_range_tombstone) == mp_row_consumer_m::proceed::no) {
_row_key.clear();
co_yield mp_row_consumer_m::proceed::no;
}
_row_key.clear();
goto flags_label;
}
public:
data_consume_rows_context_m(const schema& s,
const shared_sstable& sst,
mp_row_consumer_m& consumer,
input_stream<char> && input,
uint64_t start,
uint64_t maxlen)
: continuous_data_consumer(consumer.permit(), std::move(input), start, maxlen)
, _consumer(consumer)
, _sst(sst)
, _header(sst->get_serialization_header())
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
, _gen(do_process_state())
{
setup_columns(_regular_row, _column_translation.regular_columns());
setup_columns(_static_row, _column_translation.static_columns());
}
void verify_end_state() {
// If reading a partial row (i.e., when we have a clustering row
// filter and using a promoted index), we may be in FLAGS
// state instead of PARTITION_START.
if (_state == state::FLAGS) {
_consumer.on_end_of_stream();
return;
}
// We may end up in state::DELETION_TIME after consuming last partition's end marker
// and proceeding to attempt to parse the next partition, since state::DELETION_TIME
// is the first state corresponding to the contents of a new partition.
if (_state != state::DELETION_TIME
&& (_state != state::PARTITION_START || primitive_consumer::active())) {
throw malformed_sstable_exception("end of input, but not end of partition");
}
}
void reset(indexable_element el) {
auto reset_to_state = [this, el] (state s) {
_state = s;
_consumer.reset(el);
_gen = do_process_state();
};
switch (el) {
case indexable_element::partition:
return reset_to_state(state::PARTITION_START);
case indexable_element::cell:
return reset_to_state(state::FLAGS);
}
// We should not get here unless some enum member is not handled by the switch
throw std::logic_error(format("Unable to reset - unknown indexable element: {}", el));
}
reader_permit& permit() {
return _consumer.permit();
}
};
class mx_sstable_mutation_reader : public mp_row_consumer_reader_mx {
using DataConsumeRowsContext = data_consume_rows_context_m;
using Consumer = mp_row_consumer_m;
static_assert(RowConsumer<Consumer>);
Consumer _consumer;
bool _will_likely_slice = false;
bool _read_enabled = true;
std::unique_ptr<DataConsumeRowsContext> _context;
std::unique_ptr<index_reader> _index_reader;
// We avoid unnecessary lookup for single partition reads thanks to this flag
bool _single_partition_read = false;
const dht::partition_range& _pr;
const query::partition_slice& _slice;
streamed_mutation::forwarding _fwd;
mutation_reader::forwarding _fwd_mr;
read_monitor& _monitor;
public:
mx_sstable_mutation_reader(shared_sstable sst,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon)
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), slice, pc, std::move(trace_state), fwd, _sst)
// FIXME: I want to add `&& fwd_mr == mutation_reader::forwarding::no` below
// but can't because many call sites use the default value for
// `mutation_reader::forwarding` which is `yes`.
, _single_partition_read(pr.is_singular())
, _pr(pr)
, _slice(slice)
, _fwd(fwd)
, _fwd_mr(fwd_mr)
, _monitor(mon) { }
// Reference to _consumer is passed to data_consume_rows() in the constructor so we must not allow move/copy
mx_sstable_mutation_reader(mx_sstable_mutation_reader&&) = delete;
mx_sstable_mutation_reader(const mx_sstable_mutation_reader&) = delete;
~mx_sstable_mutation_reader() {
if (_context || _index_reader) {
sstlog.warn("sstable_mutation_reader was not closed. Closing in the background. Backtrace: {}", current_backtrace());
// FIXME: discarded future.
(void)close();
}
}
private:
static bool will_likely_slice(const query::partition_slice& slice) {
return (!slice.default_row_ranges().empty() && !slice.default_row_ranges()[0].is_full())
|| slice.get_specific_ranges();
}
index_reader& get_index_reader() {
if (!_index_reader) {
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
_consumer.trace_state(), caching);
}
return *_index_reader;
}
future<> advance_to_next_partition() {
sstlog.trace("reader {}: advance_to_next_partition()", fmt::ptr(this));
_before_partition = true;
auto& consumer = _consumer;
if (consumer.is_mutation_end()) {
sstlog.trace("reader {}: already at partition boundary", fmt::ptr(this));
_index_in_current_partition = false;
return make_ready_future<>();
}
return (_index_in_current_partition
? _index_reader->advance_to_next_partition()
: get_index_reader().advance_to(dht::ring_position_view::for_after_key(*_current_partition_key))).then([this] {
_index_in_current_partition = true;
auto [start, end] = _index_reader->data_file_positions();
if (end && start > *end) {
_read_enabled = false;
return make_ready_future<>();
}
assert(_index_reader->element_kind() == indexable_element::partition);
return skip_to(_index_reader->element_kind(), start).then([this] {
_sst->get_stats().on_partition_seek();
});
});
}
future<> read_from_index() {
sstlog.trace("reader {}: read from index", fmt::ptr(this));
auto tomb = _index_reader->partition_tombstone();
if (!tomb) {
sstlog.trace("reader {}: no tombstone", fmt::ptr(this));
return read_from_datafile();
}
auto pk = _index_reader->get_partition_key();
auto key = dht::decorate_key(*_schema, std::move(pk));
_consumer.setup_for_partition(key.key());
on_next_partition(std::move(key), tombstone(*tomb));
return make_ready_future<>();
}
future<> read_from_datafile() {
sstlog.trace("reader {}: read from data file", fmt::ptr(this));
return _context->consume_input();
}
// Assumes that we're currently positioned at partition boundary.
future<> read_partition() {
sstlog.trace("reader {}: reading partition", fmt::ptr(this));
_end_of_stream = true; // on_next_partition() will set it to true
if (!_read_enabled) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
if (!_consumer.is_mutation_end()) {
throw malformed_sstable_exception(format("consumer not at partition boundary, position: {}",
position_in_partition_view::printer(*_schema, _consumer.position())), _sst->get_filename());
}
// It's better to obtain partition information from the index if we already have it.
// We can save on IO if the user will skip past the front of partition immediately.
//
// It is also better to pay the cost of reading the index if we know that we will
// need to use the index anyway soon.
//
if (_index_in_current_partition) {
if (_context->eof()) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
if (_index_reader->partition_data_ready()) {
return read_from_index();
}
if (_will_likely_slice) {
return _index_reader->read_partition_data().then([this] {
return read_from_index();
});
}
}
// FIXME: advance index to current partition if _will_likely_slice
return read_from_datafile();
}
// Can be called from any position.
future<> read_next_partition() {
sstlog.trace("reader {}: read next partition", fmt::ptr(this));
// If next partition exists then on_next_partition will be called
// and _end_of_stream will be set to false again.
_end_of_stream = true;
if (!_read_enabled || _single_partition_read) {
sstlog.trace("reader {}: eof", fmt::ptr(this));
return make_ready_future<>();
}
return advance_to_next_partition().then([this] {
return read_partition();
});
}
future<> advance_context(std::optional<position_in_partition_view> pos) {
if (!pos || pos->is_before_all_fragments(*_schema)) {
return make_ready_future<>();
}
assert (_current_partition_key);
return [this] {
if (!_index_in_current_partition) {
_index_in_current_partition = true;
return get_index_reader().advance_to(*_current_partition_key);
}
return make_ready_future();
}().then([this, pos] {
return get_index_reader().advance_to(*pos).then([this] {
index_reader& idx = *_index_reader;
auto index_position = idx.data_file_positions();
if (index_position.start <= _context->position()) {
return make_ready_future<>();
}
return skip_to(idx.element_kind(), index_position.start).then([this, &idx] {
_sst->get_stats().on_partition_seek();
auto open_end_marker = idx.end_open_marker();
if (open_end_marker) {
_consumer.set_range_tombstone(open_end_marker->tomb);
} else {
_consumer.set_range_tombstone({});
}
});
});
});
}
bool is_initialized() const {
return bool(_context);
}
future<> initialize() {
if (_single_partition_read) {
_sst->get_stats().on_single_partition_read();
const auto& key = dht::ring_position_view(_pr.start()->value());
position_in_partition_view pos = get_slice_upper_bound(*_schema, _slice, key);
const auto present = co_await get_index_reader().advance_lower_and_check_if_present(key, pos);
if (!present) {
_sst->get_filter_tracker().add_false_positive();
co_return;
}
_sst->get_filter_tracker().add_true_positive();
} else {
_sst->get_stats().on_range_partition_read();
co_await get_index_reader().advance_to(_pr);
}
auto [begin, end] = _index_reader->data_file_positions();
assert(end);
if (_single_partition_read) {
_read_enabled = (begin != *end);
_context = data_consume_single_partition<DataConsumeRowsContext>(*_schema, _sst, _consumer, { begin, *end });
} else {
sstable::disk_read_range drr{begin, *end};
auto last_end = _fwd_mr ? _sst->data_size() : drr.end;
_read_enabled = bool(drr);
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end);
}
_monitor.on_read_started(_context->reader_position());
_index_in_current_partition = true;
_will_likely_slice = will_likely_slice(_slice);
}
future<> ensure_initialized() {
if (is_initialized()) {
return make_ready_future<>();
}
return initialize();
}
future<> skip_to(indexable_element el, uint64_t begin) {
sstlog.trace("sstable_reader: {}: skip_to({} -> {}, el={})", fmt::ptr(_context.get()), _context->position(), begin, static_cast<int>(el));
if (begin <= _context->position()) {
return make_ready_future<>();
}
_context->reset(el);
return _context->skip_to(begin);
}
public:
void on_out_of_clustering_range() override {
if (_fwd == streamed_mutation::forwarding::yes) {
_end_of_stream = true;
} else {
this->push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
_partition_finished = true;
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
return ensure_initialized().then([this, &pr] {
if (!is_initialized()) {
_end_of_stream = true;
return make_ready_future<>();
} else {
clear_buffer();
_partition_finished = true;
_before_partition = true;
_end_of_stream = false;
assert(_index_reader);
auto f1 = _index_reader->advance_to(pr);
return f1.then([this] {
auto [start, end] = _index_reader->data_file_positions();
assert(end);
if (start != *end) {
_read_enabled = true;
_index_in_current_partition = true;
_context->reset(indexable_element::partition);
return _context->fast_forward_to(start, *end);
}
_index_in_current_partition = false;
_read_enabled = false;
return make_ready_future<>();
});
}
});
}
virtual future<> fill_buffer() override {
if (_end_of_stream) {
return make_ready_future<>();
}
if (!is_initialized()) {
return initialize().then([this] {
if (!is_initialized()) {
_end_of_stream = true;
return make_ready_future<>();
} else {
return fill_buffer();
}
});
}
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
if (_partition_finished) {
maybe_timed_out();
if (_before_partition) {
return read_partition();
} else {
return read_next_partition();
}
} else {
return do_until([this] { return is_buffer_full() || _partition_finished || _end_of_stream; }, [this] {
_consumer.push_ready_fragments();
if (is_buffer_full() || _partition_finished || _end_of_stream) {
return make_ready_future<>();
}
maybe_timed_out();
return advance_context(_consumer.maybe_skip()).then([this] {
return _context->consume_input();
});
});
}
}).then_wrapped([this] (future<> f) {
try {
f.get();
} catch(sstables::malformed_sstable_exception& e) {
throw sstables::malformed_sstable_exception(format("Failed to read partition from SSTable {} due to {}", _sst->get_filename(), e.what()));
}
});
}
virtual future<> next_partition() override {
if (is_initialized()) {
if (_fwd == streamed_mutation::forwarding::yes) {
clear_buffer();
_partition_finished = true;
_end_of_stream = false;
} else {
clear_buffer_to_next_partition();
if (!_partition_finished && is_buffer_empty()) {
_partition_finished = true;
}
}
}
return make_ready_future<>();
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr) override {
forward_buffer_to(cr.start());
if (!_partition_finished) {
_end_of_stream = false;
return advance_context(_consumer.fast_forward_to(std::move(cr)));
} else {
_end_of_stream = true;
return make_ready_future<>();
}
}
virtual future<> close() noexcept override {
auto close_context = make_ready_future<>();
if (_context) {
_monitor.on_read_completed();
// move _context to prevent double-close from destructor.
close_context = _context->close().finally([_ = std::move(_context)] {});
}
auto close_index_reader = make_ready_future<>();
if (_index_reader) {
// move _index_reader to prevent double-close from destructor.
close_index_reader = _index_reader->close().finally([_ = std::move(_index_reader)] {});
}
return when_all_succeed(std::move(close_context), std::move(close_index_reader)).discard_result().handle_exception([] (std::exception_ptr ep) {
// close can not fail as it is called either from the destructor or from flat_mutation_reader::close
sstlog.warn("Failed closing of sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
});
}
};
flat_mutation_reader_v2 make_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& monitor) {
return make_flat_mutation_reader_v2<mx_sstable_mutation_reader>(
std::move(sstable), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, monitor);
}
class mx_crawling_sstable_mutation_reader : public mp_row_consumer_reader_mx {
using DataConsumeRowsContext = data_consume_rows_context_m;
using Consumer = mp_row_consumer_m;
static_assert(RowConsumer<Consumer>);
Consumer _consumer;
std::unique_ptr<DataConsumeRowsContext> _context;
read_monitor& _monitor;
public:
mx_crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
reader_permit permit,
const io_priority_class &pc,
tracing::trace_state_ptr trace_state,
read_monitor& mon)
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), pc, std::move(trace_state), streamed_mutation::forwarding::no, _sst)
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer))
, _monitor(mon) {
_monitor.on_read_started(_context->reader_position());
}
public:
void on_out_of_clustering_range() override {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}
virtual future<> fast_forward_to(position_range cr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(position_range)");
}
virtual future<> next_partition() override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support next_partition()");
}
virtual future<> fill_buffer() override {
if (_end_of_stream) {
return make_ready_future<>();
}
if (_context->eof()) {
_end_of_stream = true;
return make_ready_future<>();
}
return _context->consume_input();
}
virtual future<> close() noexcept override {
if (!_context) {
return make_ready_future<>();
}
_monitor.on_read_completed();
return _context->close().handle_exception([_ = std::move(_context)] (std::exception_ptr ep) {
sstlog.warn("Failed closing of mx_crawling_sstable_mutation_reader: {}. Ignored since the reader is already done.", ep);
});
}
};
flat_mutation_reader_v2 make_crawling_reader(
shared_sstable sstable,
schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
read_monitor& monitor) {
return make_flat_mutation_reader_v2<mx_crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit), pc,
std::move(trace_state), monitor);
}
} // namespace mx
void mx::mp_row_consumer_reader_mx::on_next_partition(dht::decorated_key key, tombstone tomb) {
_partition_finished = false;
_before_partition = false;
_end_of_stream = false;
_current_partition_key = std::move(key);
push_mutation_fragment(
mutation_fragment_v2(*_schema, _permit, partition_start(*_current_partition_key, tomb)));
_sst->get_stats().on_partition_read();
}
} // namespace sstables