Merge "Support reading collections" from Piotr

"
Implement and test support for reading collections in SSTables 3.

Tests: unit {release}
"

* 'haaawk/sstables3/read-collections-v1' of ssh://github.com/scylladb/seastar-dev:
  sstables 3: Add tests for reading collections
  flat_mutation_reader_assertions: add more flexible asserts
  data_consume_rows_context_m: add support for collections
  mp_row_consumer_m: Add support for collections
  data_consume_rows_context_m: introduce cell_path
  Use column_translation::*_is_collection in reading
  column_translation: add *_column_is_collection()
  column_flags_m: add HAS_COMPLEX_DELETION
  Use read_unsigned_vint_length_bytes for COLUMN_VALUE
  Use read_unsigned_vint_length_bytes for CK_BLOCKS
  Implement read_unsigned_vint_length_bytes
This commit is contained in:
Avi Kivity
2018-06-10 17:10:52 +03:00
15 changed files with 411 additions and 69 deletions

View File

@@ -39,21 +39,25 @@ class column_translation {
struct state {
static std::pair<std::vector<stdx::optional<column_id>>, std::vector<std::optional<uint32_t>>> build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
bool is_static) {
static std::tuple<std::vector<stdx::optional<column_id>>,
std::vector<std::optional<uint32_t>>,
std::vector<bool>> build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
bool is_static) {
std::vector<stdx::optional<column_id>> ids;
std::vector<std::optional<column_id>> lens;
std::vector<bool> is_collection;
if (s.is_dense()) {
if (is_static) {
ids.push_back(s.static_begin()->id);
lens.push_back(s.static_begin()->type->value_length_if_fixed());
is_collection.push_back(s.static_begin()->is_multi_cell());
} else {
ids.push_back(s.regular_begin()->id);
lens.push_back(s.regular_begin()->type->value_length_if_fixed());
is_collection.push_back(s.regular_begin()->is_multi_cell());
}
} else {
ids.reserve(src.size());
lens.reserve(src.size());
@@ -62,13 +66,15 @@ class column_translation {
if (def) {
ids.push_back(def->id);
lens.push_back(def->type->value_length_if_fixed());
is_collection.push_back(def->is_multi_cell());
} else {
ids.push_back(stdx::nullopt);
lens.push_back(std::nullopt);
is_collection.push_back(false);
}
}
}
return std::make_pair(std::move(ids), std::move(lens));
return std::make_tuple(std::move(ids), std::move(lens), std::move(is_collection));
}
utils::UUID schema_uuid;
@@ -77,6 +83,8 @@ class column_translation {
std::vector<std::optional<uint32_t>> regular_column_value_fix_lengths;
std::vector<std::optional<uint32_t>> static_column_value_fix_lengths;
std::vector<std::optional<uint32_t>> clustering_column_value_fix_lengths;
std::vector<bool> static_column_is_collection;
std::vector<bool> regular_column_is_collection;
state() = default;
state(const state&) = delete;
@@ -87,9 +95,13 @@ class column_translation {
state(const schema& s, const serialization_header& header)
: schema_uuid(s.version())
{
std::tie(regular_schema_column_id_from_sstable, regular_column_value_fix_lengths) =
std::tie(regular_schema_column_id_from_sstable,
regular_column_value_fix_lengths,
regular_column_is_collection) =
build(s, header.regular_columns.elements, false);
std::tie(static_schema_column_id_from_sstable, static_column_value_fix_lengths) =
std::tie(static_schema_column_id_from_sstable,
static_column_value_fix_lengths,
static_column_is_collection) =
build(s, header.static_columns.elements, true);
clustering_column_value_fix_lengths.reserve(header.clustering_key_types_names.elements.size());
for (auto&& t : header.clustering_key_types_names.elements) {
@@ -124,6 +136,12 @@ public:
const std::vector<std::optional<uint32_t>>& clustering_column_value_fix_legths() const {
return _state->clustering_column_value_fix_lengths;
}
const std::vector<bool>& static_column_is_collection() const {
return _state->static_column_is_collection;
}
const std::vector<bool>& regular_column_is_collection() const {
return _state->regular_column_is_collection;
}
};
}; // namespace sstables

View File

@@ -73,7 +73,9 @@ protected:
READING_BYTES,
READING_U16_BYTES,
READING_UNSIGNED_VINT,
READING_UNSIGNED_VINT_LENGTH_BYTES,
READING_UNSIGNED_VINT_WITH_LEN,
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN,
} _prestate = prestate::NONE;
// state for non-NONE prestates
@@ -188,6 +190,29 @@ protected:
}
}
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
_read_bytes_where = &where;
return read_status::waiting;
} else {
const vint_size_type len = unsigned_vint::serialized_size_from_first_byte(*data.begin());
if (data.size() >= len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(data.get_write()), len)).value;
data.trim_front(len);
return read_bytes(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes = temporary_buffer<char>(len);
std::copy(data.begin(), data.end(), _read_bytes.get_write());
_pos = data.size();
data.trim(0);
_read_bytes_where = &where;
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN;
return read_status::waiting;
}
}
}
inline void process_buffer(temporary_buffer<char>& data) {
if (__builtin_expect((_prestate != prestate::NONE), 0)) {
@@ -205,6 +230,10 @@ private:
if (read_unsigned_vint(data) == read_status::ready) {
_prestate = prestate::NONE;
}
} else if (_prestate == prestate::READING_UNSIGNED_VINT_LENGTH_BYTES) {
if (read_unsigned_vint_length_bytes(data, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
}
} else if (_prestate == prestate::READING_UNSIGNED_VINT_WITH_LEN) {
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
@@ -215,6 +244,18 @@ private:
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size())).value;
_prestate = prestate::NONE;
}
} else if (_prestate == prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN) {
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size())).value;
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
}
}
} else if (_prestate == prestate::READING_BYTES) {
auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy(data.begin(), data.begin() + n,

View File

@@ -838,6 +838,7 @@ class mp_row_consumer_m : public consumer_m {
atomic_cell_or_collection val;
};
std::vector<cell> _cells;
collection_type_impl::mutation _cm;
void set_up_ck_ranges(const partition_key& pk) {
_ck_ranges = query::clustering_key_filter_ranges::get_ranges(*_schema, _slice, pk);
@@ -846,6 +847,10 @@ class mp_row_consumer_m : public consumer_m {
_out_of_range = false;
}
const column_definition& get_column_definition(stdx::optional<column_id> column_id) {
auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column;
return _schema->column_at(column_type, *column_id);
}
public:
mp_row_consumer_m(mp_row_consumer_reader* reader,
const schema_ptr schema,
@@ -959,6 +964,7 @@ public:
}
virtual proceed consume_column(stdx::optional<column_id> column_id,
bytes_view cell_path,
bytes_view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
@@ -966,13 +972,43 @@ public:
if (!column_id) {
return proceed::yes;
}
auto column_type = _inside_static_row ? column_kind::static_column : column_kind::regular_column;
const column_definition& column_def = _schema->column_at(column_type, *column_id);
const column_definition& column_def = get_column_definition(column_id);
if (timestamp <= column_def.dropped_at()) {
return proceed::yes;
}
auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time, atomic_cell::collection_member::no);
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
if (column_def.is_multi_cell()) {
auto ctype = static_pointer_cast<const collection_type_impl>(column_def.type);
auto ac = make_atomic_cell(*ctype->value_comparator(),
timestamp,
value,
ttl,
local_deletion_time,
atomic_cell::collection_member::yes);
_cm.cells.emplace_back(to_bytes(cell_path), std::move(ac));
} else {
auto ac = make_atomic_cell(*column_def.type, timestamp, value, ttl, local_deletion_time,
atomic_cell::collection_member::no);
_cells.push_back({*column_id, atomic_cell_or_collection(std::move(ac))});
}
return proceed::yes;
}
virtual proceed consume_complex_column_start(stdx::optional<column_id> column_id,
tombstone tomb) override {
_cm.tomb = tomb;
_cm.cells.clear();
return proceed::yes;
}
virtual proceed consume_complex_column_end(stdx::optional<column_id> column_id) override {
if (column_id) {
const column_definition& column_def = get_column_definition(column_id);
auto ctype = static_pointer_cast<const collection_type_impl>(column_def.type);
auto ac = atomic_cell_or_collection::from_collection_mutation(ctype->serialize_mutation_form(_cm));
_cells.push_back({column_def.id, atomic_cell_or_collection(std::move(ac))});
_cm.tomb = {};
_cm.cells.clear();
}
return proceed::yes;
}

View File

@@ -155,11 +155,17 @@ public:
virtual proceed consume_static_row_start() = 0;
virtual proceed consume_column(stdx::optional<column_id> column_id,
bytes_view cell_path,
bytes_view value,
api::timestamp_type timestamp,
gc_clock::duration ttl,
gc_clock::time_point local_deletion_time) = 0;
virtual proceed consume_complex_column_start(stdx::optional<column_id> column_id,
tombstone tomb) = 0;
virtual proceed consume_complex_column_end(stdx::optional<column_id> column_id) = 0;
virtual proceed consume_row_end(const sstables::liveness_info&) = 0;
// Called when the reader is fast forwarded to given element.
@@ -529,8 +535,6 @@ private:
CK_BLOCK,
CK_BLOCK_HEADER,
CK_BLOCK2,
CK_BLOCK_VALUE_LENGTH,
CK_BLOCK_VALUE_BYTES,
CK_BLOCK_END,
CLUSTERING_ROW_CONSUME,
ROW_BODY,
@@ -549,6 +553,11 @@ private:
COLUMN,
SIMPLE_COLUMN,
COMPLEX_COLUMN,
COMPLEX_COLUMN_MARKED_FOR_DELETE,
COMPLEX_COLUMN_LOCAL_DELETION_TIME,
COMPLEX_COLUMN_2,
COMPLEX_COLUMN_SIZE,
COMPLEX_COLUMN_SIZE_2,
NEXT_COLUMN,
COLUMN_FLAGS,
COLUMN_TIMESTAMP,
@@ -556,9 +565,9 @@ private:
COLUMN_DELETION_TIME_2,
COLUMN_TTL,
COLUMN_TTL_2,
COLUMN_CELL_PATH,
CELL_PATH_SIZE,
COLUMN_VALUE,
COLUMN_VALUE_LENGTH,
COLUMN_VALUE_BYTES,
COLUMN_END,
RANGE_TOMBSTONE_MARKER,
} _state = state::PARTITION_START;
@@ -578,6 +587,7 @@ private:
boost::iterator_range<std::vector<stdx::optional<column_id>>::const_iterator> _column_ids;
boost::iterator_range<std::vector<std::optional<uint32_t>>::const_iterator> _column_value_fix_lengths;
boost::iterator_range<std::vector<bool>::const_iterator> _column_is_collection;
boost::dynamic_bitset<uint64_t> _columns_selector;
uint64_t _missing_columns_to_read;
@@ -589,13 +599,19 @@ private:
gc_clock::duration _column_ttl;
uint32_t _column_value_length;
temporary_buffer<char> _column_value;
temporary_buffer<char> _cell_path;
uint64_t _ck_blocks_header;
uint32_t _ck_blocks_header_offset;
uint64_t _subcolumns_to_read = 0;
api::timestamp_type _complex_column_marked_for_delete;
tombstone _complex_column_tombstone;
void setup_columns(const std::vector<stdx::optional<column_id>>& column_ids,
const std::vector<std::optional<uint32_t>>& column_value_fix_lengths) {
const std::vector<std::optional<uint32_t>>& column_value_fix_lengths,
const std::vector<bool>& column_is_collection) {
_column_ids = boost::make_iterator_range(column_ids);
_column_value_fix_lengths = boost::make_iterator_range(column_value_fix_lengths);
_column_is_collection = boost::make_iterator_range(column_is_collection);
}
bool is_current_column_present() {
return _columns_selector.test(_columns_selector.size() - _column_ids.size());
@@ -607,6 +623,7 @@ private:
}
_column_ids.advance_begin(pos);
_column_value_fix_lengths.advance_begin(pos);
_column_is_collection.advance_begin(pos);
}
bool no_more_columns() { return _column_ids.empty(); }
void move_to_next_column() {
@@ -616,8 +633,9 @@ private:
: next_pos - current_pos;
_column_ids.advance_begin(jump_to_next);
_column_value_fix_lengths.advance_begin(jump_to_next);
_column_is_collection.advance_begin(jump_to_next);
}
bool is_column_simple() { return true; }
bool is_column_simple() { return !_column_is_collection.front(); }
stdx::optional<column_id> get_column_id() {
return _column_ids.front();
}
@@ -655,7 +673,6 @@ public:
|| _state == state::EXTENDED_FLAGS
|| _state == state::CLUSTERING_ROW
|| _state == state::CK_BLOCK_HEADER
|| _state == state::CK_BLOCK_VALUE_LENGTH
|| _state == state::CK_BLOCK_END
|| _state == state::CLUSTERING_ROW_CONSUME
|| _state == state::ROW_BODY_TIMESTAMP_DELTIME
@@ -667,7 +684,6 @@ public:
|| _state == state::COLUMN_TIMESTAMP
|| _state == state::COLUMN_DELETION_TIME_2
|| _state == state::COLUMN_TTL_2
|| _state == state::COLUMN_VALUE_LENGTH
|| _state == state::COLUMN_END) && (_prestate == prestate::NONE);
}
@@ -725,7 +741,8 @@ public:
_extended_flags = unfiltered_extended_flags_m(uint8_t{0u});
_state = state::CLUSTERING_ROW;
setup_columns(_column_translation.regular_columns(),
_column_translation.regular_column_value_fix_legths());
_column_translation.regular_column_value_fix_legths(),
_column_translation.regular_column_is_collection());
goto clustering_row_label;
}
if (read_8(data) != read_status::ready) {
@@ -737,7 +754,8 @@ public:
if (_extended_flags.is_static()) {
if (_is_first_unfiltered) {
setup_columns(_column_translation.static_columns(),
_column_translation.static_column_value_fix_legths());
_column_translation.static_column_value_fix_legths(),
_column_translation.static_column_is_collection());
_is_first_unfiltered = false;
_consumer.consume_static_row_start();
goto row_body_label;
@@ -746,7 +764,8 @@ public:
}
}
setup_columns(_column_translation.regular_columns(),
_column_translation.regular_column_value_fix_legths());
_column_translation.regular_column_value_fix_legths(),
_column_translation.regular_column_is_collection());
case state::CLUSTERING_ROW:
clustering_row_label:
_is_first_unfiltered = false;
@@ -767,31 +786,23 @@ public:
case state::CK_BLOCK_HEADER:
_ck_blocks_header = _u64;
case state::CK_BLOCK2:
ck_block2_label:
ck_block2_label: {
if (is_block_empty()) {
_row_key.push_back({});
move_to_next_ck_block();
goto ck_block_label;
}
read_status status = read_status::waiting;
if (auto len = get_ck_block_value_length()) {
_column_value_length = *len;
_column_value = temporary_buffer<char>(_column_value_length);
_state = state::CK_BLOCK_VALUE_BYTES;
goto ck_block_value_bytes_label;
status = read_bytes(data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(data, _column_value);
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::CK_BLOCK_VALUE_LENGTH;
break;
}
case state::CK_BLOCK_VALUE_LENGTH:
_column_value_length = static_cast<uint32_t>(_u64);
_column_value = temporary_buffer<char>(_column_value_length);
case state::CK_BLOCK_VALUE_BYTES:
ck_block_value_bytes_label:
if (read_bytes(data, _column_value_length, _column_value) != read_status::ready) {
if (status != read_status::ready) {
_state = state::CK_BLOCK_END;
break;
}
}
case state::CK_BLOCK_END:
_row_key.push_back(std::move(_column_value));
move_to_next_ck_block();
@@ -879,16 +890,19 @@ public:
}
case state::COLUMN:
column_label:
if (no_more_columns()) {
_state = state::FLAGS;
if (_consumer.consume_row_end(_liveness) == consumer_m::proceed::no) {
return consumer_m::proceed::no;
if (_subcolumns_to_read == 0) {
if (no_more_columns()) {
_state = state::FLAGS;
if (_consumer.consume_row_end(_liveness) == consumer_m::proceed::no) {
return consumer_m::proceed::no;
}
goto flags_label;
}
goto flags_label;
}
if (!is_column_simple()) {
_state = state::COMPLEX_COLUMN;
goto complex_column_label;
if (!is_column_simple()) {
_state = state::COMPLEX_COLUMN;
goto complex_column_label;
}
_subcolumns_to_read = 0;
}
case state::SIMPLE_COLUMN:
if (read_8(data) != read_status::ready) {
@@ -931,11 +945,11 @@ public:
if (_column_flags.use_row_timestamp()) {
_column_ttl = _liveness.ttl();
_state = state::COLUMN_VALUE;
goto column_value_label;
goto column_cell_path_label;
} else if (!_column_flags.is_expiring()) {
_column_ttl = gc_clock::duration::zero();
_state = state::COLUMN_VALUE;
goto column_value_label;
goto column_cell_path_label;
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::COLUMN_TTL_2;
@@ -943,36 +957,39 @@ public:
}
case state::COLUMN_TTL_2:
_column_ttl = parse_ttl(_header, _u64);
case state::COLUMN_CELL_PATH:
column_cell_path_label:
if (!is_column_simple()) {
if (read_unsigned_vint_length_bytes(data, _cell_path) != read_status::ready) {
_state = state::COLUMN_VALUE;
break;
}
} else {
_cell_path = temporary_buffer<char>(0);
}
case state::COLUMN_VALUE:
column_value_label:
{
if (!_column_flags.has_value()) {
_column_value = temporary_buffer<char>(0);
_state = state::COLUMN_END;
goto column_end_label;
}
read_status status = read_status::waiting;
if (auto len = get_column_value_length()) {
_column_value_length = *len;
_column_value = temporary_buffer<char>(_column_value_length);
_state = state::COLUMN_VALUE_BYTES;
goto column_value_bytes_label;
status = read_bytes(data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(data, _column_value);
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::COLUMN_VALUE_LENGTH;
break;
}
case state::COLUMN_VALUE_LENGTH:
_column_value_length = static_cast<uint32_t>(_u64);
_column_value = temporary_buffer<char>(_column_value_length);
case state::COLUMN_VALUE_BYTES:
column_value_bytes_label:
if (read_bytes(data, _column_value_length, _column_value) != read_status::ready) {
if (status != read_status::ready) {
_state = state::COLUMN_END;
break;
}
}
case state::COLUMN_END:
column_end_label:
_state = state::NEXT_COLUMN;
if (_consumer.consume_column(get_column_id(),
to_bytes_view(_cell_path),
to_bytes_view(_column_value),
_column_timestamp,
_column_ttl,
@@ -980,8 +997,19 @@ public:
return consumer_m::proceed::no;
}
case state::NEXT_COLUMN:
move_to_next_column();
_state = state::COLUMN;
if (!is_column_simple()) {
--_subcolumns_to_read;
if (_subcolumns_to_read == 0) {
auto id = get_column_id();
move_to_next_column();
if (_consumer.consume_complex_column_end(std::move(id)) != consumer_m::proceed::yes) {
_state = state::COLUMN;
return consumer_m::proceed::no;
}
}
} else {
move_to_next_column();
}
goto column_label;
case state::ROW_BODY_MISSING_COLUMNS_2:
row_body_missing_columns_2_label: {
@@ -1020,7 +1048,36 @@ public:
goto row_body_missing_columns_read_columns_label;
case state::COMPLEX_COLUMN:
complex_column_label:
throw malformed_sstable_exception("unimplemented state: complex columns not supported");
if (!_flags.has_complex_deletion()) {
_complex_column_tombstone = {};
goto complex_column_2_label;
}
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::COMPLEX_COLUMN_MARKED_FOR_DELETE;
break;
}
case state::COMPLEX_COLUMN_MARKED_FOR_DELETE:
_complex_column_marked_for_delete = parse_timestamp(_header, _u64);
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::COMPLEX_COLUMN_LOCAL_DELETION_TIME;
break;
}
case state::COMPLEX_COLUMN_LOCAL_DELETION_TIME:
_complex_column_tombstone = {_complex_column_marked_for_delete, parse_expiry(_header, _u64)};
case state::COMPLEX_COLUMN_2:
complex_column_2_label:
if (_consumer.consume_complex_column_start(get_column_id(), _complex_column_tombstone) == consumer_m::proceed::no) {
_state = state::COMPLEX_COLUMN_SIZE;
return consumer_m::proceed::no;
}
case state::COMPLEX_COLUMN_SIZE:
if (read_unsigned_vint(data) != read_status::ready) {
_state = state::COMPLEX_COLUMN_SIZE_2;
break;
}
case state::COMPLEX_COLUMN_SIZE_2:
_subcolumns_to_read = _u64;
goto column_label;
case state::RANGE_TOMBSTONE_MARKER:
range_tombstone_marker_label:
throw malformed_sstable_exception("unimplemented state");

View File

@@ -563,7 +563,8 @@ class unfiltered_flags_m final {
static const uint8_t HAS_TIMESTAMP = 0x04u;
static const uint8_t HAS_TTL = 0x08u;
static const uint8_t HAS_DELETION = 0x10u;
static const uint8_t HAS_ALL_COLUMNS = 0x20;
static const uint8_t HAS_ALL_COLUMNS = 0x20u;
static const uint8_t HAS_COMPLEX_DELETION = 0x40u;
static const uint8_t HAS_EXTENDED_FLAGS = 0x80u;
uint8_t _flags;
bool check_flag(const uint8_t flag) const {
@@ -592,6 +593,9 @@ public:
bool has_all_columns() const {
return check_flag(HAS_ALL_COLUMNS);
}
bool has_complex_deletion() const {
return check_flag(HAS_COMPLEX_DELETION);
}
};
class unfiltered_extended_flags_m final {

View File

@@ -25,6 +25,7 @@
#include <seastar/util/backtrace.hh>
#include "flat_mutation_reader.hh"
#include "mutation_assertions.hh"
#include "schema.hh"
// Intended to be called in a seastar thread
class flat_reader_assertions {
@@ -150,17 +151,50 @@ public:
BOOST_FAIL(sprint("Expected row with column %s, but it is not present", columns[i].name));
}
auto& cdef = _reader.schema()->regular_column_at(columns[i].id);
assert (!cdef.is_multi_cell());
auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize());
if (cmp != 0) {
BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s",
columns[i].name,
columns[i].value,
cell->as_atomic_cell(cdef).value()));
cell->as_atomic_cell(cdef).value().linearize()));
}
}
return *this;
}
using assert_function = noncopyable_function<void(const column_definition&, const atomic_cell_or_collection*)>;
flat_reader_assertions& produces_row(const clustering_key& ck,
const std::vector<column_id>& column_ids,
const std::vector<assert_function>& column_assert) {
BOOST_TEST_MESSAGE(sprint("Expect %s", ck));
auto mfopt = read_next();
if (!mfopt) {
BOOST_FAIL(sprint("Expected row with key %s, but got end of stream", ck));
}
if (!mfopt->is_clustering_row()) {
BOOST_FAIL(sprint("Expected row with key %s, but got %s", ck, *mfopt));
}
auto& actual = mfopt->as_clustering_row().key();
if (!actual.equal(*_reader.schema(), ck)) {
BOOST_FAIL(sprint("Expected row with key %s, but key is %s", ck, actual));
}
auto& cells = mfopt->as_clustering_row().cells();
if (cells.size() != column_ids.size()) {
BOOST_FAIL(sprint("Expected row with %s columns, but has %s", column_ids.size(), cells.size()));
}
for (size_t i = 0; i < column_ids.size(); ++i) {
const atomic_cell_or_collection* cell = cells.find_cell(column_ids[i]);
if (!cell) {
BOOST_FAIL(sprint("Expected row with column %d, but it is not present", column_ids[i]));
}
auto& cdef = _reader.schema()->regular_column_at(column_ids[i]);
column_assert[i](cdef, cell);
}
return *this;
}
// If ck_ranges is passed, verifies only that information relevant for ck_ranges matches.
flat_reader_assertions& produces_range_tombstone(const range_tombstone& rt, const query::clustering_row_ranges& ck_ranges = {}) {
BOOST_TEST_MESSAGE(sprint("Expect %s", rt));

View File

@@ -1306,6 +1306,149 @@ SEASTAR_THREAD_TEST_CASE(test_uncompressed_compound_ck_read) {
.produces_end_of_stream();
}
// Following tests run on files in tests/sstables/3.x/uncompressed/collections
// They were created using following CQL statements:
//
// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
//
// CREATE TABLE test_ks.test_table ( pk INT, set_val set<int>, list_val list<text>, map_val map<int, text>, PRIMARY KEY(pk))
// WITH compression = { 'enabled' : false };
//
// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val)
// VALUES(1, {1, 2, 3}, ['Text 1', 'Text 2', 'Text 3'], {1 : 'Text 1', 2 : 'Text 2', 3 : 'Text 3'});
// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val)
// VALUES(2, {4, 5, 6}, ['Text 4', 'Text 5', 'Text 6'], {4 : 'Text 4', 5 : 'Text 5', 6 : 'Text 6'});
// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val)
// VALUES(3, {7, 8, 9}, ['Text 7', 'Text 8', 'Text 9'], {7 : 'Text 7', 8 : 'Text 8', 9 : 'Text 9'});
// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val)
// VALUES(4, {10, 11, 12}, ['Text 10', 'Text 11', 'Text 12'], {10 : 'Text 10', 11 : 'Text 11', 12 : 'Text 12'});
// INSERT INTO test_ks.test_table(pk, set_val, list_val, map_val)
// VALUES(5, {13, 14, 15}, ['Text 13', 'Text 14', 'Text 15'], {13 : 'Text 13', 14 : 'Text 14', 15 : 'Text 15'});
static thread_local const sstring UNCOMPRESSED_COLLECTIONS_PATH =
"tests/sstables/3.x/uncompressed/collections";
static thread_local const schema_ptr UNCOMPRESSED_COLLECTIONS_SCHEMA =
schema_builder("test_ks", "test_table")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("set_val", set_type_impl::get_instance(int32_type, true))
.with_column("list_val", list_type_impl::get_instance(utf8_type, true))
.with_column("map_val", map_type_impl::get_instance(int32_type, utf8_type, true))
.build();
SEASTAR_THREAD_TEST_CASE(test_uncompressed_collections_read) {
sstable_assertions sst(UNCOMPRESSED_COLLECTIONS_SCHEMA, UNCOMPRESSED_COLLECTIONS_PATH);
sst.load();
auto to_key = [] (int key) {
auto bytes = int32_type->decompose(int32_t(key));
auto pk = partition_key::from_single_value(*UNCOMPRESSED_COLLECTIONS_SCHEMA, bytes);
return dht::global_partitioner().decorate_key(*UNCOMPRESSED_COLLECTIONS_SCHEMA, pk);
};
auto set_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("set_val"));
BOOST_REQUIRE(set_cdef);
auto list_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("list_val"));
BOOST_REQUIRE(list_cdef);
auto map_cdef = UNCOMPRESSED_COLLECTIONS_SCHEMA->get_column_definition(to_bytes("map_val"));
BOOST_REQUIRE(map_cdef);
auto generate = [&] (std::vector<int> set_val, std::vector<sstring> list_val, std::vector<std::pair<int, sstring>> map_val) {
std::vector<flat_reader_assertions::assert_function> assertions;
assertions.push_back([val = std::move(set_val)] (const column_definition& def,
const atomic_cell_or_collection* cell) {
BOOST_REQUIRE(def.is_multi_cell());
auto ctype = static_pointer_cast<const collection_type_impl>(def.type);
int idx = 0;
cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
auto m_view = ctype->deserialize_mutation_form(c_bv);
for (auto&& entry : m_view.cells) {
auto cmp = compare_unsigned(int32_type->decompose(int32_t(val[idx])), entry.first);
if (cmp != 0) {
BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s",
def.id,
int32_type->decompose(int32_t(val[idx])),
entry.first));
}
++idx;
}
});
});
assertions.push_back([val = std::move(list_val)] (const column_definition& def,
const atomic_cell_or_collection* cell) {
BOOST_REQUIRE(def.is_multi_cell());
auto ctype = static_pointer_cast<const collection_type_impl>(def.type);
int idx = 0;
cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
auto m_view = ctype->deserialize_mutation_form(c_bv);
for (auto&& entry : m_view.cells) {
auto cmp = compare_unsigned(utf8_type->decompose(val[idx]), entry.second.value().linearize());
if (cmp != 0) {
BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s",
def.id,
utf8_type->decompose(val[idx]),
entry.second.value().linearize()));
}
++idx;
}
});
});
assertions.push_back([val = std::move(map_val)] (const column_definition& def,
const atomic_cell_or_collection* cell) {
BOOST_REQUIRE(def.is_multi_cell());
auto ctype = static_pointer_cast<const collection_type_impl>(def.type);
int idx = 0;
cell->as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
auto m_view = ctype->deserialize_mutation_form(c_bv);
for (auto&& entry : m_view.cells) {
auto cmp1 = compare_unsigned(int32_type->decompose(int32_t(val[idx].first)), entry.first);
auto cmp2 = compare_unsigned(utf8_type->decompose(val[idx].second), entry.second.value().linearize());
if (cmp1 != 0 || cmp2 != 0) {
BOOST_FAIL(
sprint("Expected row with column %s having value (%s, %s), but it has value (%s, %s)",
def.id,
int32_type->decompose(int32_t(val[idx].first)),
utf8_type->decompose(val[idx].second),
entry.first,
entry.second.value().linearize()));
}
++idx;
}
});
});
return std::move(assertions);
};
std::vector<column_id> ids{set_cdef->id, list_cdef->id, map_cdef->id};
assert_that(sst.read_rows_flat())
.produces_partition_start(to_key(5))
.produces_row(clustering_key_prefix::make_empty(), ids,
generate({13, 14, 15}, {"Text 13", "Text 14", "Text 15"}, {{13,"Text 13"}, {14,"Text 14"}, {15,"Text 15"}}))
.produces_partition_end()
.produces_partition_start(to_key(1))
.produces_row(clustering_key_prefix::make_empty(), ids,
generate({1, 2, 3}, {"Text 1", "Text 2", "Text 3"}, {{1,"Text 1"}, {2,"Text 2"}, {3,"Text 3"}}))
.produces_partition_end()
.produces_partition_start(to_key(2))
.produces_row(clustering_key_prefix::make_empty(), ids,
generate({4, 5, 6}, {"Text 4", "Text 5", "Text 6"}, {{4,"Text 4"}, {5,"Text 5"}, {6, "Text 6"}}))
.produces_partition_end()
.produces_partition_start(to_key(4))
.produces_row(clustering_key_prefix::make_empty(), ids,
generate({10, 11, 12}, {"Text 10", "Text 11", "Text 12"}, {{10,"Text 10"}, {11,"Text 11"}, {12,"Text 12"}}))
.produces_partition_end()
.produces_partition_start(to_key(3))
.produces_row(clustering_key_prefix::make_empty(), ids,
generate({7, 8, 9}, {"Text 7", "Text 8", "Text 9"}, {{7,"Text 7"}, {8,"Text 8"}, {9,"Text 9"}}))
.produces_partition_end()
.produces_end_of_stream();
}
static void compare_files(sstring filename1, sstring filename2) {
std::ifstream ifs1(filename1);
std::ifstream ifs2(filename2);

View File

@@ -0,0 +1 @@
938712690

View File

@@ -0,0 +1,8 @@
Statistics.db
Digest.crc32
Filter.db
CRC.db
Summary.db
Data.db
TOC.txt
Index.db