Compare commits
8 Commits
scylla-3.2
...
next-3.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d112a230c0 | ||
|
|
4371cb41d0 | ||
|
|
a8b9f94dcb | ||
|
|
77500f9171 | ||
|
|
13328e7253 | ||
|
|
79b58f89f1 | ||
|
|
ba2821ec70 | ||
|
|
d72555e786 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=3.2.4
|
||||
VERSION=3.2.5
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -254,6 +254,9 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
if (column_family.empty()) {
|
||||
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
|
||||
} else {
|
||||
if (keynames.empty()) {
|
||||
throw httpd::bad_param_exception("The keyspace of column families must be specified");
|
||||
}
|
||||
if (keynames.size() > 1) {
|
||||
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
|
||||
}
|
||||
|
||||
@@ -691,6 +691,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
|
||||
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
|
||||
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
|
||||
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
|
||||
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
|
||||
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt' and 'cdc'). Can be repeated.")
|
||||
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
|
||||
|
||||
@@ -269,6 +269,7 @@ public:
|
||||
named_value<uint32_t> shutdown_announce_in_ms;
|
||||
named_value<bool> developer_mode;
|
||||
named_value<int32_t> skip_wait_for_gossip_to_settle;
|
||||
named_value<int32_t> force_gossip_generation;
|
||||
named_value<bool> experimental;
|
||||
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
|
||||
named_value<size_t> lsa_reclamation_step;
|
||||
|
||||
@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
|
||||
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
|
||||
Scylla with issue #4363 fixed)
|
||||
|
||||
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
|
||||
by Scylla with issue #6130 fixed)
|
||||
|
||||
## extension_attributes subcomponent
|
||||
|
||||
extension_attributes = extension_attribute_count extension_attribute*
|
||||
|
||||
@@ -1622,11 +1622,15 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
// message on all cpus and forard them to cpu0 to process.
|
||||
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
g.init_messaging_service_handler(do_bind);
|
||||
}).then([this, generation_nbr, preload_local_states] {
|
||||
}).then([this, generation_nbr, preload_local_states] () mutable {
|
||||
build_seeds_list();
|
||||
/* initialize the heartbeat state for this localEndpoint */
|
||||
maybe_initialize_local_state(generation_nbr);
|
||||
if (_cfg.force_gossip_generation() > 0) {
|
||||
generation_nbr = _cfg.force_gossip_generation();
|
||||
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
|
||||
}
|
||||
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
|
||||
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
|
||||
local_state.mark_alive();
|
||||
for (auto& entry : preload_local_states) {
|
||||
local_state.add_application_state(entry.first, entry.second);
|
||||
}
|
||||
|
||||
@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
|
||||
return handle_end_of_stream();
|
||||
}
|
||||
});
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
for (auto& q : _queue_reader_handles) {
|
||||
if (q) {
|
||||
q->abort(ep);
|
||||
}
|
||||
}
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -444,7 +444,7 @@ class repair_writer {
|
||||
uint64_t _estimated_partitions;
|
||||
size_t _nr_peer_nodes;
|
||||
// Needs more than one for repair master
|
||||
std::vector<std::optional<future<uint64_t>>> _writer_done;
|
||||
std::vector<std::optional<future<>>> _writer_done;
|
||||
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
|
||||
// Current partition written to disk
|
||||
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
|
||||
@@ -524,7 +524,15 @@ public:
|
||||
return consumer(std::move(reader));
|
||||
});
|
||||
},
|
||||
t.stream_in_progress());
|
||||
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
|
||||
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
|
||||
_schema->ks_name(), _schema->cf_name(), partitions);
|
||||
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
_mq[node_idx]->abort(ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
|
||||
future<> write_partition_end(unsigned node_idx) {
|
||||
@@ -551,21 +559,33 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
future<> write_end_of_stream(unsigned node_idx) {
|
||||
if (_mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
|
||||
});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> do_wait_for_writer_done(unsigned node_idx) {
|
||||
if (_writer_done[node_idx]) {
|
||||
return std::move(*(_writer_done[node_idx]));
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
}
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
if (_writer_done[node_idx] && _mq[node_idx]) {
|
||||
// Partition_end is never sent on wire, so we have to write one ourselves.
|
||||
return write_partition_end(node_idx).then([this, node_idx] () mutable {
|
||||
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
|
||||
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
|
||||
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
|
||||
rlogger.debug("Managed to write partitions={} to sstable", partitions);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
return make_exception_future<>(std::move(ep));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -288,10 +288,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
|
||||
+ column_offset(column_kind::regular_column),
|
||||
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
|
||||
|
||||
std::sort(_raw._columns.begin(),
|
||||
std::stable_sort(_raw._columns.begin(),
|
||||
_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
|
||||
_raw._columns.begin() + column_offset(column_kind::static_column),
|
||||
[] (auto x, auto y) { return x.id < y.id; });
|
||||
|
||||
|
||||
@@ -72,47 +72,8 @@ private:
|
||||
static std::vector<column_info> build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static);
|
||||
|
||||
utils::UUID schema_uuid;
|
||||
std::vector<column_info> regular_schema_columns_from_sstable;
|
||||
@@ -125,10 +86,10 @@ private:
|
||||
state(state&&) = default;
|
||||
state& operator=(state&&) = default;
|
||||
|
||||
state(const schema& s, const serialization_header& header)
|
||||
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
|
||||
: schema_uuid(s.version())
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
|
||||
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
|
||||
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
|
||||
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
|
||||
{}
|
||||
};
|
||||
@@ -136,9 +97,10 @@ private:
|
||||
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
|
||||
|
||||
public:
|
||||
column_translation get_for_schema(const schema& s, const serialization_header& header) {
|
||||
column_translation get_for_schema(
|
||||
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
|
||||
if (s.version() != _state->schema_uuid) {
|
||||
_state = make_lw_shared(state(s, header));
|
||||
_state = make_lw_shared(state(s, header, features));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -38,6 +38,8 @@
|
||||
*/
|
||||
|
||||
#include "mp_row_consumer.hh"
|
||||
#include "column_translation.hh"
|
||||
#include "concrete_types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
|
||||
return ccb.build(timestamp);
|
||||
}
|
||||
|
||||
// See #6130.
|
||||
static data_type freeze_types_in_collections(data_type t) {
|
||||
return ::visit(*t, make_visitor(
|
||||
[] (const map_type_impl& typ) -> data_type {
|
||||
return map_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_keys_type()->freeze()),
|
||||
freeze_types_in_collections(typ.get_values_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const set_type_impl& typ) -> data_type {
|
||||
return set_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[] (const list_type_impl& typ) -> data_type {
|
||||
return list_type_impl::get_instance(
|
||||
freeze_types_in_collections(typ.get_elements_type()->freeze()),
|
||||
typ.is_multi_cell());
|
||||
},
|
||||
[&] (const abstract_type& typ) -> data_type {
|
||||
return std::move(t);
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
|
||||
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
|
||||
* but that should never happen. */
|
||||
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
|
||||
return features.enabled_features;
|
||||
}
|
||||
|
||||
std::vector<column_translation::column_info> column_translation::state::build(
|
||||
const schema& s,
|
||||
const utils::chunked_vector<serialization_header::column_desc>& src,
|
||||
const sstable_enabled_features& features,
|
||||
bool is_static) {
|
||||
std::vector<column_info> cols;
|
||||
if (s.is_dense()) {
|
||||
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
|
||||
cols.push_back(column_info{
|
||||
&col.name(),
|
||||
col.type,
|
||||
col.id,
|
||||
col.type->value_length_if_fixed(),
|
||||
col.is_multi_cell(),
|
||||
col.is_counter(),
|
||||
false
|
||||
});
|
||||
} else {
|
||||
cols.reserve(src.size());
|
||||
for (auto&& desc : src) {
|
||||
const bytes& type_name = desc.type_name.value;
|
||||
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
|
||||
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
|
||||
// See #6130.
|
||||
type = freeze_types_in_collections(std::move(type));
|
||||
}
|
||||
const column_definition* def = s.get_column_definition(desc.name.value);
|
||||
std::optional<column_id> id;
|
||||
bool schema_mismatch = false;
|
||||
if (def) {
|
||||
id = def->id;
|
||||
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
|
||||
def->is_counter() != type->is_counter() ||
|
||||
!def->type->is_value_compatible_with(*type);
|
||||
}
|
||||
cols.push_back(column_info{
|
||||
&desc.name.value,
|
||||
type,
|
||||
id,
|
||||
type->value_length_if_fixed(),
|
||||
type->is_multi_cell(),
|
||||
type->is_counter(),
|
||||
schema_mismatch
|
||||
});
|
||||
}
|
||||
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1344,7 +1344,7 @@ public:
|
||||
, _consumer(consumer)
|
||||
, _sst(sst)
|
||||
, _header(sst->get_serialization_header())
|
||||
, _column_translation(sst->get_column_translation(s, _header))
|
||||
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
|
||||
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
|
||||
{
|
||||
setup_columns(_regular_row, _column_translation.regular_columns());
|
||||
|
||||
@@ -780,8 +780,9 @@ public:
|
||||
const serialization_header& get_serialization_header() const {
|
||||
return get_mutable_serialization_header(*_components);
|
||||
}
|
||||
column_translation get_column_translation(const schema& s, const serialization_header& h) {
|
||||
return _column_translation.get_for_schema(s, h);
|
||||
column_translation get_column_translation(
|
||||
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
|
||||
return _column_translation.get_for_schema(s, h, f);
|
||||
}
|
||||
const std::vector<unsigned>& get_shards_for_this_sstable() const {
|
||||
return _shards;
|
||||
|
||||
@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
|
||||
ShadowableTombstones = 2, // See #3885
|
||||
CorrectStaticCompact = 3, // See #4139
|
||||
CorrectEmptyCounters = 4, // See #4363
|
||||
End = 5,
|
||||
CorrectUDTsInCollections = 5, // See #6130
|
||||
End = 6,
|
||||
};
|
||||
|
||||
// Scylla-specific features enabled for a particular sstable.
|
||||
|
||||
@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
|
||||
auto muts = gen(partition_nr);
|
||||
schema_ptr s = gen.schema();
|
||||
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
|
||||
int mf_produced = 0;
|
||||
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
|
||||
if (mf_produced++ > 800) {
|
||||
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
|
||||
} else {
|
||||
return source_reader(db::no_timeout);
|
||||
}
|
||||
};
|
||||
auto& partitioner = dht::global_partitioner();
|
||||
try {
|
||||
distribute_reader_and_consume_on_shards(s, partitioner,
|
||||
make_generating_reader(s, std::move(get_next_mutation_fragment)),
|
||||
[&partitioner, error] (flat_mutation_reader reader) mutable {
|
||||
if (error) {
|
||||
return make_exception_future<>(std::runtime_error("Failed to write"));
|
||||
}
|
||||
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
|
||||
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
|
||||
if (mf_opt) {
|
||||
if (mf_opt->is_partition_start()) {
|
||||
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
|
||||
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
|
||||
}
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
).get0();
|
||||
} catch (...) {
|
||||
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
|
||||
}
|
||||
};
|
||||
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
|
||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
|
||||
});
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class bucket_writer {
|
||||
|
||||
@@ -5262,3 +5262,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
|
||||
test_sstable_log_too_many_rows_f(random, (random + 1), false);
|
||||
test_sstable_log_too_many_rows_f((random + 1), random, true);
|
||||
}
|
||||
|
||||
// The following test runs on tests/sstables/3.x/uncompressed/legacy_udt_in_collection
|
||||
// It was created using Scylla 3.0.x using the following CQL statements:
|
||||
//
|
||||
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
|
||||
// CREATE TYPE ks.ut (a int, b int);
|
||||
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
|
||||
// m map<int, frozen<ut>>,
|
||||
// fm frozen<map<int, frozen<ut>>>,
|
||||
// mm map<int, frozen<map<int, frozen<ut>>>>,
|
||||
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
|
||||
// s set<frozen<ut>>,
|
||||
// fs frozen<set<frozen<ut>>>,
|
||||
// l list<frozen<ut>>,
|
||||
// fl frozen<list<frozen<ut>>>
|
||||
// ) WITH compression = {};
|
||||
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
|
||||
// m[0] = {a: 0, b: 0},
|
||||
// fm = {0: {a: 0, b: 0}},
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
// fs = {{a: 0, b: 0}},
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
// fl = [{a: 0, b: 0}]
|
||||
// WHERE pk = 0;
|
||||
//
|
||||
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
|
||||
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
|
||||
|
||||
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
|
||||
"tests/sstables/3.x/uncompressed/legacy_udt_in_collection";
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
|
||||
auto abj = defer([] { await_background_jobs().get(); });
|
||||
|
||||
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
|
||||
{to_bytes("a"), to_bytes("b")},
|
||||
{int32_type, int32_type}, false);
|
||||
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
|
||||
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
|
||||
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
|
||||
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
|
||||
auto s_type = set_type_impl::get_instance(ut, true);
|
||||
auto fs_type = set_type_impl::get_instance(ut, false);
|
||||
auto l_type = list_type_impl::get_instance(ut, true);
|
||||
auto fl_type = list_type_impl::get_instance(ut, false);
|
||||
|
||||
auto s = schema_builder("ks", "t")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("m", m_type)
|
||||
.with_column("fm", fm_type)
|
||||
.with_column("mm", mm_type)
|
||||
.with_column("fmm", fmm_type)
|
||||
.with_column("s", s_type)
|
||||
.with_column("fs", fs_type)
|
||||
.with_column("l", l_type)
|
||||
.with_column("fl", fl_type)
|
||||
.set_compressor_params(compression_parameters::no_compression())
|
||||
.build();
|
||||
|
||||
auto m_cdef = s->get_column_definition(to_bytes("m"));
|
||||
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
|
||||
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
|
||||
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
|
||||
auto s_cdef = s->get_column_definition(to_bytes("s"));
|
||||
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
|
||||
auto l_cdef = s->get_column_definition(to_bytes("l"));
|
||||
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
|
||||
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
|
||||
|
||||
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
|
||||
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
|
||||
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
|
||||
auto fs_val = make_set_value(fs_type, {ut_val});
|
||||
auto fl_val = make_list_value(fl_type, {ut_val});
|
||||
|
||||
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
|
||||
auto ckey = clustering_key::make_empty();
|
||||
|
||||
// m[0] = {a: 0, b: 0}
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
|
||||
}
|
||||
|
||||
// fm = {0: {a: 0, b: 0}}
|
||||
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
|
||||
|
||||
// mm[0] = {0: {a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(int32_type->decompose(0),
|
||||
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
|
||||
}
|
||||
|
||||
// fmm = {0: {0: {a: 0, b: 0}}},
|
||||
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
|
||||
|
||||
// s = s + {{a: 0, b: 0}},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(ut->decompose(ut_val),
|
||||
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
|
||||
}
|
||||
|
||||
// fs = {{a: 0, b: 0}},
|
||||
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
|
||||
|
||||
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
|
||||
{
|
||||
collection_mutation_description desc;
|
||||
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
|
||||
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
|
||||
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
|
||||
}
|
||||
|
||||
// fl = [{a: 0, b: 0}]
|
||||
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
|
||||
|
||||
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
|
||||
sst.load();
|
||||
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3519784297
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,9 @@
|
||||
Scylla.db
|
||||
CRC.db
|
||||
Filter.db
|
||||
Statistics.db
|
||||
TOC.txt
|
||||
Digest.crc32
|
||||
Index.db
|
||||
Summary.db
|
||||
Data.db
|
||||
Reference in New Issue
Block a user