Files
scylladb/test/boost/sstable_datafile_test.cc
Botond Dénes aa1d3f1170 test/lib/cql_assertions: columns_assertions: add T* with_typed_column() overload
To enable assertions on columns which are sometimes null.
One existing user of with_typed_column() needs adjustment, because the
previous version of with_typed_column() covered up silently for null
value, but after this patch this caused a failure.
2025-12-02 14:21:26 +02:00

3426 lines
161 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include <fmt/ranges.h>
#include <seastar/core/sstring.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/align.hh>
#include <seastar/core/aligned_buffer.hh>
#include <seastar/util/closeable.hh>
#include <seastar/testing/test_fixture.hh>
#include "sstables/sstables.hh"
#include "sstables/compress.hh"
#include "sstables/metadata_collector.hh"
#include <seastar/testing/thread_test_case.hh>
#include "schema/schema.hh"
#include "schema/schema_builder.hh"
#include "replica/database.hh"
#include "sstables/sstable_writer.hh"
#include <memory>
#include "test/boost/sstable_test.hh"
#include <seastar/core/seastar.hh>
#include <seastar/core/do_with.hh>
#include <seastar/testing/test_case.hh>
#include "dht/i_partitioner.hh"
#include "test/lib/mutation_reader_assertions.hh"
#include "test/lib/mutation_assertions.hh"
#include "mutation/counters.hh"
#include "test/lib/index_reader_assertions.hh"
#include "test/lib/make_random_string.hh"
#include "test/lib/simple_schema.hh"
#include "dht/ring_position.hh"
#include "partition_slice_builder.hh"
#include "replica/memtable-sstable.hh"
#include <stdio.h>
#include <ftw.h>
#include <unistd.h>
#include <boost/algorithm/cxx11/is_sorted.hpp>
#include <boost/range/algorithm.hpp>
#include <boost/icl/interval_map.hpp>
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/cql_test_env.hh"
#include "readers/from_mutations.hh"
#include "readers/from_fragments.hh"
#include "readers/combined.hh"
#include "test/lib/random_schema.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/cql_assertions.hh"
#include "test/lib/gcs_fixture.hh"
namespace fs = std::filesystem;
using namespace sstables;
using namespace tests;
static const sstring some_keyspace("ks");
static const sstring some_column_family("cf");
atomic_cell make_atomic_cell(data_type dt, bytes_view value, uint32_t ttl = 0, uint32_t expiration = 0) {
if (ttl) {
return atomic_cell::make_live(*dt, 0, value,
gc_clock::time_point(gc_clock::duration(expiration)), gc_clock::duration(ttl));
} else {
return atomic_cell::make_live(*dt, 0, value);
}
}
atomic_cell make_dead_atomic_cell(uint32_t deletion_time) {
return atomic_cell::make_dead(0, gc_clock::time_point(gc_clock::duration(deletion_time)));
}
static void assert_sstable_set_size(const sstable_set& s, size_t expected_size) {
BOOST_REQUIRE(s.size() == expected_size && s.size() == s.all()->size());
}
SEASTAR_TEST_CASE(datafile_generation_09) {
// Test that generated sstable components can be successfully loaded.
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("c1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(m)});
auto sst2 = env.reusable_sst(sst).get();
sstables::test(sst2).read_summary().get();
const summary& sst1_s = sst->get_summary();
const summary& sst2_s = sst2->get_summary();
BOOST_REQUIRE(::memcmp(&sst1_s.header, &sst2_s.header, sizeof(summary::header)) == 0);
BOOST_REQUIRE(sst1_s.positions == sst2_s.positions);
BOOST_REQUIRE(sst1_s.entries == sst2_s.entries);
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
sstables::test(sst2).read_toc().get();
auto& sst1_c = sstables::test(sst).get_components();
auto& sst2_c = sstables::test(sst2).get_components();
BOOST_REQUIRE(sst1_c == sst2_c);
});
}
SEASTAR_TEST_CASE(datafile_generation_11) {
return test_env::do_with_async([] (test_env& env) {
auto s = complex_schema();
const column_definition& set_col = *s->get_column_definition("reg_set");
const column_definition& static_set_col = *s->get_column_definition("static_collection");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
collection_mutation_description set_mut;
set_mut.tomb = tomb;
set_mut.cells.emplace_back(to_bytes("1"), make_atomic_cell(bytes_type, {}));
set_mut.cells.emplace_back(to_bytes("2"), make_atomic_cell(bytes_type, {}));
set_mut.cells.emplace_back(to_bytes("3"), make_atomic_cell(bytes_type, {}));
m.set_clustered_cell(c_key, set_col, set_mut.serialize(*set_col.type));
m.set_static_cell(static_set_col, set_mut.serialize(*static_set_col.type));
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
collection_mutation_description set_mut_single;
set_mut_single.cells.emplace_back(to_bytes("4"), make_atomic_cell(bytes_type, {}));
m2.set_clustered_cell(c_key, set_col, set_mut_single.serialize(*set_col.type));
auto mt = make_memtable(s, {std::move(m), std::move(m2)});
auto verifier = [s, set_col, c_key] (auto& mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto r = mp.find_row(*s, c_key);
BOOST_REQUIRE(r);
BOOST_REQUIRE(r->size() == 1);
auto cell = r->find_cell(set_col.id);
BOOST_REQUIRE(cell);
return cell->as_collection_mutation().with_deserialized(*set_col.type, [&] (collection_mutation_view_description m) {
return m.materialize(*set_col.type);
});
};
auto sstp = verify_mutation(env, env.make_sstable(s), mt, "key1", [&] (mutation_opt& mutation) {
auto verify_set = [&tomb] (const collection_mutation_description& m) {
BOOST_REQUIRE(bool(m.tomb) == true);
BOOST_REQUIRE(m.tomb == tomb);
BOOST_REQUIRE(m.cells.size() == 3);
BOOST_REQUIRE(m.cells[0].first == to_bytes("1"));
BOOST_REQUIRE(m.cells[1].first == to_bytes("2"));
BOOST_REQUIRE(m.cells[2].first == to_bytes("3"));
};
auto& mp = mutation->partition();
auto& ssr = mp.static_row();
auto scol = ssr.find_cell(static_set_col.id);
BOOST_REQUIRE(scol);
// The static set
scol->as_collection_mutation().with_deserialized(*static_set_col.type, [&] (collection_mutation_view_description mut) {
verify_set(mut.materialize(*static_set_col.type));
});
// The clustered set
auto m = verifier(mutation);
verify_set(m);
});
verify_mutation(env, sstp, "key2", [&] (mutation_opt& mutation) {
auto m = verifier(mutation);
BOOST_REQUIRE(!m.tomb);
BOOST_REQUIRE(m.cells.size() == 1);
BOOST_REQUIRE(m.cells[0].first == to_bytes("4"));
});
});
}
SEASTAR_TEST_CASE(datafile_generation_12) {
return test_env::do_with_async([] (test_env& env) {
auto s = complex_schema();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto cp = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, cp, tomb);
auto mt = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mt, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
for (auto& rt: mp.row_tombstones()) {
BOOST_REQUIRE(rt.tombstone().tomb == tomb);
}
});
});
}
static future<> sstable_compression_test(compression_parameters::algorithm c) {
return test_env::do_with_async([c] (test_env& env) {
// NOTE: set a given compressor algorithm to schema.
schema_builder builder(complex_schema());
builder.set_compressor_params(c);
auto s = builder.build(schema_builder::compact_storage::no);
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto cp = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, cp, tomb);
auto mtp = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mtp, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.row_tombstones().size() == 1);
for (auto& rt: mp.row_tombstones()) {
BOOST_REQUIRE(rt.tombstone().tomb == tomb);
}
});
});
}
SEASTAR_TEST_CASE(datafile_generation_13) {
return sstable_compression_test(compression_parameters::algorithm::lz4);
}
SEASTAR_TEST_CASE(datafile_generation_14) {
return sstable_compression_test(compression_parameters::algorithm::snappy);
}
SEASTAR_TEST_CASE(datafile_generation_15) {
return sstable_compression_test(compression_parameters::algorithm::deflate);
}
future<> test_datafile_generation_16(test_env_config cfg) {
return test_env::do_with_async([] (test_env& env) {
auto s = uncompressed_schema();
auto mtp = make_lw_shared<replica::memtable>(s);
// Create a number of keys that is a multiple of the sampling level
for (int i = 0; i < 0x80; ++i) {
sstring k = "key" + to_sstring(i);
auto key = partition_key::from_exploded(*s, {to_bytes(k)});
mutation m(s, key);
auto c_key = clustering_key::make_empty();
m.set_clustered_cell(c_key, to_bytes("col2"), i, api::max_timestamp);
mtp->apply(std::move(m));
}
auto sst = make_sstable_containing(env.make_sstable(s), mtp);
// Not crashing is enough
BOOST_REQUIRE(sst);
sst->destroy().get();
}, std::move(cfg));
}
SEASTAR_TEST_CASE(datafile_generation_16) {
return test_datafile_generation_16({});
}
SEASTAR_TEST_CASE(datafile_generation_16_s3, *boost::unit_test::precondition(tests::has_scylla_test_env)) {
return test_datafile_generation_16(test_env_config{ .storage = make_test_object_storage_options("S3") });
}
SEASTAR_FIXTURE_TEST_CASE(datafile_generation_16_gs, gcs_fixture, *check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) {
return test_datafile_generation_16(test_env_config{ .storage = make_test_object_storage_options("GS") });
}
// mutation_reader for sstable keeping all the required objects alive.
static mutation_reader sstable_mutation_reader(shared_sstable sst, schema_ptr s, reader_permit permit) {
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
}
static mutation_reader sstable_mutation_reader(shared_sstable sst, schema_ptr s, reader_permit permit, const dht::partition_range& pr) {
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), pr, s->full_slice());
}
SEASTAR_TEST_CASE(datafile_generation_37) {
return test_env::do_with_async([] (test_env& env) {
auto s = compact_simple_dense_schema();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
mutation m(s, key);
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")});
const column_definition& cl2 = *s->get_column_definition("cl2");
m.set_clustered_cell(c_key, cl2, make_atomic_cell(bytes_type, bytes_type->decompose(data_value(to_bytes("cl2")))));
auto mtp = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mtp, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")});
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
});
});
}
SEASTAR_TEST_CASE(datafile_generation_38) {
return test_env::do_with_async([] (test_env& env) {
auto s = compact_dense_schema();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
mutation m(s, key);
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
const column_definition& cl3 = *s->get_column_definition("cl3");
m.set_clustered_cell(c_key, cl3, make_atomic_cell(bytes_type, bytes_type->decompose(data_value(to_bytes("cl3")))));
auto mtp = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mtp, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
});
});
}
SEASTAR_TEST_CASE(datafile_generation_39) {
return test_env::do_with_async([] (test_env& env) {
auto s = compact_sparse_schema();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
mutation m(s, key);
auto c_key = clustering_key::make_empty();
const column_definition& cl1 = *s->get_column_definition("cl1");
m.set_clustered_cell(c_key, cl1, make_atomic_cell(bytes_type, bytes_type->decompose(data_value(to_bytes("cl1")))));
const column_definition& cl2 = *s->get_column_definition("cl2");
m.set_clustered_cell(c_key, cl2, make_atomic_cell(bytes_type, bytes_type->decompose(data_value(to_bytes("cl2")))));
auto mtp = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mtp, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1"))));
match_live_cell(row.cells(), *s, "cl2", data_value(data_value(to_bytes("cl2"))));
});
});
}
SEASTAR_TEST_CASE(datafile_generation_41) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("c1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.with_column("r2", int32_type)
.build();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, std::move(c_key), tomb);
auto mt = make_memtable(s, {std::move(m)});
verify_mutation(env, env.make_sstable(s), mt, "key1", [&] (mutation_opt& mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto& c_row = *(mp.clustered_rows().begin());
BOOST_REQUIRE(c_row.row().deleted_at().tomb() == tomb);
});
});
}
SEASTAR_TEST_CASE(datafile_generation_47) {
// Tests the problem in which the sstable row parser would hang.
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("c1", utf8_type, column_kind::clustering_key)
.with_column("r1", utf8_type)
.build();
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(utf8_type, bytes(512*1024, 'a')));
auto sstp = make_sstable_containing(env.make_sstable(s), {std::move(m)});
auto reader = sstable_mutation_reader(sstp, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
while (reader().get()) {
}
});
}
SEASTAR_TEST_CASE(test_counter_write) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family)
.with_column("p1", utf8_type, column_kind::partition_key)
.with_column("c1", utf8_type, column_kind::clustering_key)
.with_column("r1", counter_type)
.with_column("r2", counter_type)
.build();
auto& r1_col = *s->get_column_definition("r1");
auto& r2_col = *s->get_column_definition("r2");
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
auto c_key2 = clustering_key::from_exploded(*s, {to_bytes("c2")});
mutation m(s, key);
std::vector<counter_id> ids;
std::generate_n(std::back_inserter(ids), 3, counter_id::create_random_id);
std::ranges::sort(ids);
counter_cell_builder b1;
b1.add_shard(counter_shard(ids[0], 5, 1));
b1.add_shard(counter_shard(ids[1], -4, 1));
b1.add_shard(counter_shard(ids[2], 9, 1));
auto ts = api::new_timestamp();
m.set_clustered_cell(c_key, r1_col, b1.build(ts));
counter_cell_builder b2;
b2.add_shard(counter_shard(ids[1], -1, 1));
b2.add_shard(counter_shard(ids[2], 2, 1));
m.set_clustered_cell(c_key, r2_col, b2.build(ts));
m.set_clustered_cell(c_key2, r1_col, make_dead_atomic_cell(1));
auto sstp = make_sstable_containing(env.make_sstable(s), {m});
assert_that(sstable_mutation_reader(sstp, s, env.make_reader_permit()))
.produces(m)
.produces_end_of_stream();
});
}
static shared_sstable sstable_for_overlapping_test(test_env& env, const schema_ptr& schema,
const partition_key& first_key, const partition_key& last_key, uint32_t level = 0) {
auto sst = env.make_sstable(schema);
sstables::test(sst).set_values_for_leveled_strategy(1 /* data_size */, level, 0 /* max_timestamp */, first_key, last_key);
return sst;
}
SEASTAR_TEST_CASE(check_read_indexes) {
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
if (!has_summary_and_index(version)) {
// read_indexes isn't implemented for BTI indexes
return make_ready_future<>();
}
return seastar::async([&env, version] {
auto builder = schema_builder("test", "summary_test")
.with_column("a", int32_type, column_kind::partition_key);
builder.set_min_index_interval(256);
auto s = builder.build();
auto sst = env.reusable_sst(s, get_test_dir("summary_test", s), 1, version).get();
auto list = sstables::test(sst).read_indexes(env.make_reader_permit()).get();
BOOST_REQUIRE(list.size() == 130);
});
}).get();
});
}
SEASTAR_TEST_CASE(check_multi_schema) {
// Schema used to write sstable:
// CREATE TABLE multi_schema_test (
// a int PRIMARY KEY,
// b int,
// c int,
// d set<int>,
// e int
//);
// Schema used to read sstable:
// CREATE TABLE multi_schema_test (
// a int PRIMARY KEY,
// c set<int>,
// d int,
// e blob
//);
return test_env::do_with_async([] (test_env& env) {
for_each_sstable_version([&env] (const sstables::sstable::version_types version) {
return seastar::async([&env, version] {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("test", "test_multi_schema")
.with_column("a", int32_type, column_kind::partition_key)
.with_column("c", set_of_ints_type)
.with_column("d", int32_type)
.with_column("e", bytes_type);
auto s = builder.build();
auto sst = env.reusable_sst(s, get_test_dir("multi_schema_test", s), 1, version).get();
auto reader = sstable_mutation_reader(sst, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
std::invoke([&] {
mutation_opt m = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, 0)));
auto rows = m->partition().clustered_rows();
BOOST_REQUIRE_EQUAL(rows.calculate_size(), 1);
auto& row = rows.begin()->row();
BOOST_REQUIRE(!row.deleted_at());
auto& cells = row.cells();
BOOST_REQUIRE_EQUAL(cells.size(), 1);
auto& cdef = *s->get_column_definition("e");
BOOST_REQUIRE_EQUAL(cells.cell_at(cdef.id).as_atomic_cell(cdef).value(), managed_bytes(int32_type->decompose(5)));
});
std::invoke([&] {
auto m = reader().get();
BOOST_REQUIRE(!m);
});
});
}).get();
});
}
void test_sliced_read_row_presence(shared_sstable sst, schema_ptr s, reader_permit permit, const query::partition_slice& ps,
std::vector<std::pair<partition_key, std::vector<clustering_key>>> expected)
{
auto reader = sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, ps);
auto close_reader = deferred_close(reader);
partition_key::equality pk_eq(*s);
clustering_key::equality ck_eq(*s);
auto mfopt = reader().get();
while (mfopt) {
BOOST_REQUIRE(mfopt->is_partition_start());
auto it = std::find_if(expected.begin(), expected.end(), [&] (auto&& x) {
return pk_eq(x.first, mfopt->as_partition_start().key().key());
});
BOOST_REQUIRE(it != expected.end());
auto expected_cr = std::move(it->second);
expected.erase(it);
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
while (!mfopt->is_end_of_partition()) {
if (mfopt->is_clustering_row()) {
auto& cr = mfopt->as_clustering_row();
auto it = std::find_if(expected_cr.begin(), expected_cr.end(), [&] (auto&& x) {
return ck_eq(x, cr.key());
});
if (it == expected_cr.end()) {
fmt::print(std::cout, "unexpected clustering row: {}\n", cr.key());
}
BOOST_REQUIRE(it != expected_cr.end());
expected_cr.erase(it);
}
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
}
BOOST_REQUIRE(expected_cr.empty());
mfopt = reader().get();
}
BOOST_REQUIRE(expected.empty());
}
SEASTAR_TEST_CASE(test_sliced_mutation_reads) {
// CREATE TABLE sliced_mutation_reads_test (
// pk int,
// ck int,
// v1 int,
// v2 set<int>,
// PRIMARY KEY (pk, ck)
//);
//
// insert into sliced_mutation_reads_test (pk, ck, v1) values (0, 0, 1);
// insert into sliced_mutation_reads_test (pk, ck, v2) values (0, 1, { 0, 1 });
// update sliced_mutation_reads_test set v1 = 3 where pk = 0 and ck = 2;
// insert into sliced_mutation_reads_test (pk, ck, v1) values (0, 3, null);
// insert into sliced_mutation_reads_test (pk, ck, v2) values (0, 4, null);
// insert into sliced_mutation_reads_test (pk, ck, v1) values (1, 1, 1);
// insert into sliced_mutation_reads_test (pk, ck, v1) values (1, 3, 1);
// insert into sliced_mutation_reads_test (pk, ck, v1) values (1, 5, 1);
return test_env::do_with_async([] (test_env& env) {
for (auto version : all_sstable_versions) {
auto set_of_ints_type = set_type_impl::get_instance(int32_type, true);
auto builder = schema_builder("ks", "sliced_mutation_reads_test")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v1", int32_type)
.with_column("v2", set_of_ints_type);
auto s = builder.build();
auto sst = env.reusable_sst(s, get_test_dir("sliced_mutation_reads", s), 1, version).get();
{
auto ps = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, int32_type->decompose(0))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, int32_type->decompose(5))))
.build();
test_sliced_read_row_presence(sst, s, env.make_reader_permit(), ps, {
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(0)),
std::vector<clustering_key> { clustering_key_prefix::from_single_value(*s, int32_type->decompose(0)) }),
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(1)),
std::vector<clustering_key> { clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)) }),
});
}
{
auto ps = partition_slice_builder(*s)
.with_range(query::clustering_range {
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(0)) },
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(3)), false },
}).build();
test_sliced_read_row_presence(sst, s, env.make_reader_permit(), ps, {
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(0)),
std::vector<clustering_key> {
clustering_key_prefix::from_single_value(*s, int32_type->decompose(0)),
clustering_key_prefix::from_single_value(*s, int32_type->decompose(1)),
clustering_key_prefix::from_single_value(*s, int32_type->decompose(2)),
}),
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(1)),
std::vector<clustering_key> { clustering_key_prefix::from_single_value(*s, int32_type->decompose(1)) }),
});
}
{
auto ps = partition_slice_builder(*s)
.with_range(query::clustering_range {
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(3)) },
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(9)) },
}).build();
test_sliced_read_row_presence(sst, s, env.make_reader_permit(), ps, {
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(0)),
std::vector<clustering_key> {
clustering_key_prefix::from_single_value(*s, int32_type->decompose(3)),
clustering_key_prefix::from_single_value(*s, int32_type->decompose(4)),
}),
std::make_pair(partition_key::from_single_value(*s, int32_type->decompose(1)),
std::vector<clustering_key> {
clustering_key_prefix::from_single_value(*s, int32_type->decompose(3)),
clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)),
}),
});
}
}
});
}
SEASTAR_TEST_CASE(test_wrong_range_tombstone_order) {
// create table wrong_range_tombstone_order (
// p int,
// a int,
// b int,
// c int,
// r int,
// primary key (p,a,b,c)
// ) with compact storage;
//
// delete from wrong_range_tombstone_order where p = 0 and a = 0;
// insert into wrong_range_tombstone_order (p,a,r) values (0,1,1);
// insert into wrong_range_tombstone_order (p,a,b,r) values (0,1,1,2);
// insert into wrong_range_tombstone_order (p,a,b,r) values (0,1,2,3);
// insert into wrong_range_tombstone_order (p,a,b,c,r) values (0,1,2,3,4);
// delete from wrong_range_tombstone_order where p = 0 and a = 1 and b = 3;
// insert into wrong_range_tombstone_order (p,a,b,r) values (0,1,3,5);
// insert into wrong_range_tombstone_order (p,a,b,c,r) values (0,1,3,4,6);
// insert into wrong_range_tombstone_order (p,a,b,r) values (0,1,4,7);
// insert into wrong_range_tombstone_order (p,a,b,c,r) values (0,1,4,0,8);
// delete from wrong_range_tombstone_order where p = 0 and a = 1 and b = 4 and c = 0;
// delete from wrong_range_tombstone_order where p = 0 and a = 2;
// delete from wrong_range_tombstone_order where p = 0 and a = 2 and b = 1;
// delete from wrong_range_tombstone_order where p = 0 and a = 2 and b = 2;
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "wrong_range_tombstone_order")
.with(schema_builder::compact_storage::yes)
.with_column("p", int32_type, column_kind::partition_key)
.with_column("a", int32_type, column_kind::clustering_key)
.with_column("b", int32_type, column_kind::clustering_key)
.with_column("c", int32_type, column_kind::clustering_key)
.with_column("r", int32_type)
.build();
clustering_key::equality ck_eq(*s);
auto pkey = partition_key::from_exploded(*s, { int32_type->decompose(0) });
auto dkey = dht::decorate_key(*s, std::move(pkey));
auto sst = env.reusable_sst(s, get_test_dir("wrong_range_tombstone_order", s), 1, version).get();
auto reader = sstable_mutation_reader(sst, s, env.make_reader_permit());
using kind = mutation_fragment_v2::kind;
assert_that(std::move(reader))
.produces_partition_start(dkey)
.produces(kind::range_tombstone_change, { 0 })
.produces(kind::range_tombstone_change, { 0 })
.produces(kind::clustering_row, { 1 })
.produces(kind::clustering_row, { 1, 1 })
.produces(kind::clustering_row, { 1, 2 })
.produces(kind::clustering_row, { 1, 2, 3 })
.produces(kind::range_tombstone_change, { 1, 3 })
.produces(kind::clustering_row, { 1, 3 })
.produces(kind::clustering_row, { 1, 3, 4 })
.produces(kind::range_tombstone_change, { 1, 3 })
.produces(kind::clustering_row, { 1, 4 })
.produces(kind::clustering_row, { 1, 4, 0 })
.produces(kind::range_tombstone_change, { 2 })
.produces(kind::range_tombstone_change, { 2, 1 })
.produces(kind::range_tombstone_change, { 2, 1 })
.produces(kind::range_tombstone_change, { 2, 2 })
.produces(kind::range_tombstone_change, { 2, 2 })
.produces(kind::range_tombstone_change, { 2 })
.produces_partition_end()
.produces_end_of_stream();
}
});
}
SEASTAR_TEST_CASE(test_counter_read) {
// create table counter_test (
// pk int,
// ck int,
// c1 counter,
// c2 counter,
// primary key (pk, ck)
// );
//
// Node 1:
// update counter_test set c1 = c1 + 8 where pk = 0 and ck = 0;
// update counter_test set c2 = c2 - 99 where pk = 0 and ck = 0;
// update counter_test set c1 = c1 + 3 where pk = 0 and ck = 0;
// update counter_test set c1 = c1 + 42 where pk = 0 and ck = 1;
//
// Node 2:
// update counter_test set c2 = c2 + 7 where pk = 0 and ck = 0;
// update counter_test set c1 = c1 + 2 where pk = 0 and ck = 0;
// delete c1 from counter_test where pk = 0 and ck = 1;
//
// select * from counter_test;
// pk | ck | c1 | c2
// ----+----+----+-----
// 0 | 0 | 13 | -92
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "counter_test")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("c1", counter_type)
.with_column("c2", counter_type)
.build();
auto node1 = counter_id(utils::UUID("8379ab99-4507-4ab1-805d-ac85a863092b"));
auto node2 = counter_id(utils::UUID("b8a6c3f3-e222-433f-9ce9-de56a8466e07"));
auto sst = env.reusable_sst(s, get_test_dir("counter_test", s), 5, version).get();
auto reader = sstable_mutation_reader(sst, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
auto mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_clustering_row());
const clustering_row* cr = &mfopt->as_clustering_row();
cr->cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
counter_cell_view ccv(c.as_atomic_cell(s->regular_column_at(id)));
auto& col = s->column_at(column_kind::regular_column, id);
if (col.name_as_text() == "c1") {
BOOST_REQUIRE_EQUAL(ccv.total_value(), 13);
BOOST_REQUIRE_EQUAL(ccv.shard_count(), 2);
auto it = ccv.shards().begin();
auto shard = *it++;
BOOST_REQUIRE_EQUAL(shard.id(), node1);
BOOST_REQUIRE_EQUAL(shard.value(), 11);
BOOST_REQUIRE_EQUAL(shard.logical_clock(), 2);
shard = *it++;
BOOST_REQUIRE_EQUAL(shard.id(), node2);
BOOST_REQUIRE_EQUAL(shard.value(), 2);
BOOST_REQUIRE_EQUAL(shard.logical_clock(), 1);
} else if (col.name_as_text() == "c2") {
BOOST_REQUIRE_EQUAL(ccv.total_value(), -92);
} else {
BOOST_FAIL(format("Unexpected column \'{}\'", col.name_as_text()));
}
});
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_clustering_row());
cr = &mfopt->as_clustering_row();
cr->cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
auto& col = s->column_at(column_kind::regular_column, id);
if (col.name_as_text() == "c1") {
BOOST_REQUIRE(!c.as_atomic_cell(col).is_live());
} else {
BOOST_FAIL(format("Unexpected column \'{}\'", col.name_as_text()));
}
});
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
mfopt = reader().get();
BOOST_REQUIRE(!mfopt);
}
});
}
SEASTAR_TEST_CASE(test_sstable_max_local_deletion_time) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
schema_builder builder(some_keyspace, some_column_family);
builder.with_column("p1", utf8_type, column_kind::partition_key);
builder.with_column("c1", utf8_type, column_kind::clustering_key);
builder.with_column("r1", utf8_type);
schema_ptr s = builder.build(schema_builder::compact_storage::no);
auto mt = make_lw_shared<replica::memtable>(s);
int32_t last_expiry = 0;
for (auto i = 0; i < 10; i++) {
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(i))});
mutation m(s, key);
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
last_expiry = (gc_clock::now() + gc_clock::duration(3600 + i)).time_since_epoch().count();
m.set_clustered_cell(c_key, *s->get_column_definition("r1"),
make_atomic_cell(utf8_type, bytes("a"), 3600 + i, last_expiry));
mt->apply(std::move(m));
}
auto sstp = make_sstable_containing(env.make_sstable(s, version), mt);
BOOST_REQUIRE(last_expiry == sstp->get_stats_metadata().max_local_deletion_time);
}
});
}
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,
// ck1 int,
// ck2 int,
// v int,
// primary key (pk, ck1, ck2)
// );
//
// column_index_size_in_kb: 0
//
// delete from promoted_index_read where pk = 0 and ck1 = 0;
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 0, 0);
// insert into promoted_index_read (pk, ck1, ck2, v) values (0, 0, 1, 1);
//
// SSTable:
// [
// {"key": "0",
// "cells": [["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:0:","",1468923308379491],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:0:v","0",1468923308379491],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:1:","",1468923311744298],
// ["0:_","0:!",1468923292708929,"t",1468923292],
// ["0:1:v","1",1468923311744298]]}
// ]
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "promoted_index_read")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck1", int32_type, column_kind::clustering_key)
.with_column("ck2", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();
auto sst = env.reusable_sst(s, get_test_dir("promoted_index_read", s), 1, version).get();
auto pkey = partition_key::from_exploded(*s, { int32_type->decompose(0) });
auto dkey = dht::decorate_key(*s, std::move(pkey));
auto ck1 = clustering_key::from_exploded(*s, {int32_type->decompose(0)});
auto ck2 = clustering_key::from_exploded(*s, {int32_type->decompose(0), int32_type->decompose(0)});
auto ck3 = clustering_key::from_exploded(*s, {int32_type->decompose(0), int32_type->decompose(1)});
auto rd = sstable_mutation_reader(sst, s, env.make_reader_permit());
using kind = mutation_fragment_v2::kind;
assert_that(std::move(rd))
.produces_partition_start(dkey)
.produces(kind::range_tombstone_change, { 0 })
.produces(kind::clustering_row, { 0, 0 })
.produces(kind::clustering_row, { 0, 1 })
.produces(kind::range_tombstone_change, { 0 })
.produces_partition_end()
.produces_end_of_stream();
}
});
}
static void check_min_max_column_names(const sstable_ptr& sst, std::vector<bytes> min_components, std::vector<bytes> max_components) {
const auto& st = sst->get_stats_metadata();
BOOST_TEST_MESSAGE(fmt::format("min {}/{} max {}/{}", st.min_column_names.elements.size(), min_components.size(), st.max_column_names.elements.size(), max_components.size()));
BOOST_REQUIRE(st.min_column_names.elements.size() == min_components.size());
for (auto i = 0U; i < st.min_column_names.elements.size(); i++) {
BOOST_REQUIRE(min_components[i] == st.min_column_names.elements[i].value);
}
BOOST_REQUIRE(st.max_column_names.elements.size() == max_components.size());
for (auto i = 0U; i < st.max_column_names.elements.size(); i++) {
BOOST_REQUIRE(max_components[i] == st.max_column_names.elements[i].value);
}
}
static void test_min_max_clustering_key(test_env& env, schema_ptr s, std::vector<bytes> exploded_pk, std::vector<std::vector<bytes>> exploded_cks,
std::vector<bytes> min_components, std::vector<bytes> max_components, sstable_version_types version, bool remove = false) {
auto mt = make_lw_shared<replica::memtable>(s);
auto insert_data = [&mt, &s] (std::vector<bytes>& exploded_pk, std::vector<bytes>&& exploded_ck) {
const column_definition& r1_col = *s->get_column_definition("r1");
auto key = partition_key::from_exploded(*s, exploded_pk);
auto c_key = clustering_key::make_empty();
if (!exploded_ck.empty()) {
c_key = clustering_key::from_exploded(*s, exploded_ck);
}
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mt->apply(std::move(m));
};
auto remove_data = [&mt, &s] (std::vector<bytes>& exploded_pk, std::vector<bytes>&& exploded_ck) {
auto key = partition_key::from_exploded(*s, exploded_pk);
auto c_key = clustering_key::from_exploded(*s, exploded_ck);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
mt->apply(std::move(m));
};
if (exploded_cks.empty()) {
insert_data(exploded_pk, {});
} else {
for (auto& exploded_ck : exploded_cks) {
if (remove) {
remove_data(exploded_pk, std::move(exploded_ck));
} else {
insert_data(exploded_pk, std::move(exploded_ck));
}
}
}
auto sst = make_sstable_containing(env.make_sstable(s, version), mt);
check_min_max_column_names(sst, std::move(min_components), std::move(max_components));
sst->unlink().get();
}
SEASTAR_TEST_CASE(min_max_clustering_key_test) {
return test_env::do_with_async([] (test_env& env) {
for (auto version : writable_sstable_versions) {
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "b"},
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with(schema_builder::compact_storage::yes)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "b"},
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} version={}", version));
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with(schema_builder::compact_storage::yes)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} with compact storage version={}", version));
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"z\"}} max={{\"a\", \"a\"}} version={}", version));
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "a"}, {"a", "z"}}, {"a", "z"}, {"a", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"a\"}} max={{\"b\", \"z\"}} version={}", version));
test_min_max_clustering_key(env, s, {"key1"}, {{"b", "z"}, {"a", "a"}}, {"a", "a"}, {"b", "z"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(env, s, {"key1"}, {{"a"},
{"z"}}, {"a"}, {"z"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(env, s, {"key1"}, {{"a"},
{"z"}}, {"a"}, {"z"}, version, true);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("r1", int32_type)
.build();
test_min_max_clustering_key(env, s, {"key1"}, {}, {}, {}, version);
}
if (version >= sstable_version_types::mc) {
{
auto s = schema_builder("ks", "cf")
.with(schema_builder::compact_storage::yes)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\"}} max={{\"a\"}} with compact storage version={}", version));
test_min_max_clustering_key(env, s, {"key1"}, {{"a", "z"}, {"a"}}, {"a"}, {"a"}, version);
}
}
}
});
}
SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
auto sst_gen = env.make_sst_factory(s, version);
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", version));
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m), std::move(m2)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply(tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(clustering_key_prefix::from_single_value(*s, bytes(
"a")), clustering_key_prefix::from_single_value(*s, bytes("a")), tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"a"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("a")),
clustering_key_prefix::from_single_value(*s, bytes("a")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"c1"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("c")),
clustering_key_prefix::from_single_value(*s, bytes("d")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c"}, {"d"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("d")),
clustering_key_prefix::from_single_value(*s, bytes("z")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1"}, {"z"});
}
}
if (version >= sstable_version_types::mc) {
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, clustering_key_prefix::make_empty(), tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
}
}
});
}
SEASTAR_TEST_CASE(sstable_composite_tombstone_metadata_check) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
auto sst_gen = env.make_sst_factory(s, version);
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", version));
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m), std::move(m2)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply(tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "aa"}, {"z", "zz"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"c1", "c2"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("aa")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "aa"}, {"c1", "zz"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "c2"}, {"z", "zz"});
}
}
if (version >= sstable_version_types::mc) {
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
}
}
});
}
SEASTAR_TEST_CASE(sstable_composite_reverse_tombstone_metadata_check) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
auto sst_gen = env.make_sst_factory(s, version);
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", version));
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
auto sst = make_sstable_containing(sst_gen, {std::move(m), std::move(m2)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply(tomb);
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "zz"}, {"a", "aa"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "zz"}, {"c1", "c2"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "zz"}, {"c1"});
}
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "zz"}, {"c1", "c2"});
}
}
if (version >= sstable_version_types::mc) {
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
auto sst = make_sstable_containing(sst_gen, {std::move(m)});
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
}
}
});
}
SEASTAR_TEST_CASE(test_partition_skipping) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "test_skipping_partitions")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("v", int32_type)
.build();
auto sst = env.reusable_sst(s, get_test_dir("partition_skipping",s), 1, version).get();
std::vector<dht::decorated_key> keys;
for (int i = 0; i < 10; i++) {
auto pk = partition_key::from_single_value(*s, int32_type->decompose(i));
keys.emplace_back(dht::decorate_key(*s, std::move(pk)));
}
dht::decorated_key::less_comparator cmp(s);
std::sort(keys.begin(), keys.end(), cmp);
assert_that(sstable_mutation_reader(sst, s, env.make_reader_permit())).produces(keys);
auto pr = dht::partition_range::make(dht::ring_position(keys[0]), dht::ring_position(keys[1]));
assert_that(sstable_mutation_reader(sst, s, env.make_reader_permit(), pr))
.produces(keys[0])
.produces(keys[1])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make_starting_with(dht::ring_position(keys[8])))
.produces(keys[8])
.produces(keys[9])
.produces_end_of_stream();
pr = dht::partition_range::make(dht::ring_position(keys[1]), dht::ring_position(keys[1]));
assert_that(sstable_mutation_reader(sst, s, env.make_reader_permit(), pr))
.produces(keys[1])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(dht::ring_position(keys[3]), dht::ring_position(keys[4])))
.produces(keys[3])
.produces(keys[4])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make({ dht::ring_position(keys[4]), false }, dht::ring_position(keys[5])))
.produces(keys[5])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(dht::ring_position(keys[6]), dht::ring_position(keys[6])))
.produces(keys[6])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(dht::ring_position(keys[7]), dht::ring_position(keys[8])))
.produces(keys[7])
.fast_forward_to(dht::partition_range::make(dht::ring_position(keys[9]), dht::ring_position(keys[9])))
.produces(keys[9])
.produces_end_of_stream();
pr = dht::partition_range::make({ dht::ring_position(keys[0]), false }, { dht::ring_position(keys[1]), false});
assert_that(sstable_mutation_reader(sst, s, env.make_reader_permit(), pr))
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(dht::ring_position(keys[6]), dht::ring_position(keys[6])))
.produces(keys[6])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make({ dht::ring_position(keys[8]), false }, { dht::ring_position(keys[9]), false }))
.produces_end_of_stream();
pr = dht::partition_range::make(dht::ring_position(keys[0]), dht::ring_position(keys[1]));
assert_that(sstable_mutation_reader(sst, s, env.make_reader_permit(), pr))
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[0].token()), dht::ring_position::ending_at(keys[1].token())))
.produces(keys[0])
.produces(keys[1])
.fast_forward_to(dht::partition_range::make(dht::ring_position::starting_at(keys[3].token()), dht::ring_position::ending_at(keys[4].token())))
.produces(keys[3])
.produces(keys[4])
.fast_forward_to(dht::partition_range::make_starting_with(dht::ring_position::starting_at(keys[8].token())))
.produces(keys[8])
.produces(keys[9])
.produces_end_of_stream();
}
});
}
SEASTAR_TEST_CASE(test_repeated_tombstone_skipping) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
simple_schema table;
auto permit = env.make_reader_permit();
std::vector<mutation_fragment> fragments;
uint32_t count = 1000; // large enough to cross index block several times
auto rt = table.make_range_tombstone(query::clustering_range::make(
query::clustering_range::bound(table.make_ckey(0), true),
query::clustering_range::bound(table.make_ckey(count - 1), true)
));
fragments.push_back(mutation_fragment(*table.schema(), permit, range_tombstone(rt)));
std::vector<range_tombstone> rts;
uint32_t seq = 1;
while (seq < count) {
rts.push_back(table.make_range_tombstone(query::clustering_range::make(
query::clustering_range::bound(table.make_ckey(seq), true),
query::clustering_range::bound(table.make_ckey(seq + 1), false)
)));
fragments.emplace_back(*table.schema(), permit, range_tombstone(rts.back()));
++seq;
fragments.emplace_back(*table.schema(), permit, table.make_row(permit, table.make_ckey(seq), make_random_string(1)));
++seq;
}
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 100;
auto mut = mutation(table.schema(), table.make_pkey("key"));
for (auto&& mf : fragments) {
mut.apply(mf);
}
auto ms = make_sstable_easy(env, make_mutation_reader_from_mutations(table.schema(), std::move(permit), std::move(mut)), cfg, version)->as_mutation_source();
for (uint32_t i = 3; i < seq; i++) {
auto ck1 = table.make_ckey(1);
auto ck2 = table.make_ckey((1 + i) / 2);
auto ck3 = table.make_ckey(i);
testlog.info("checking {} {}", ck2, ck3);
auto slice = partition_slice_builder(*table.schema())
.with_range(query::clustering_range::make_singular(ck1))
.with_range(query::clustering_range::make_singular(ck2))
.with_range(query::clustering_range::make_singular(ck3))
.build();
auto rd = ms.make_mutation_reader(table.schema(), env.make_reader_permit(), query::full_partition_range, slice);
assert_that(std::move(rd)).has_monotonic_positions();
}
}
});
}
SEASTAR_TEST_CASE(test_skipping_using_index) {
return test_env::do_with_async([] (test_env& env) {
for (const auto version : writable_sstable_versions) {
simple_schema table;
const unsigned rows_per_part = 10;
const unsigned partition_count = 10;
std::vector<dht::decorated_key> keys;
for (unsigned i = 0; i < partition_count; ++i) {
keys.push_back(table.make_pkey(i));
}
std::sort(keys.begin(), keys.end(), dht::decorated_key::less_comparator(table.schema()));
utils::chunked_vector<mutation> partitions;
uint32_t row_id = 0;
for (auto&& key : keys) {
mutation m(table.schema(), key);
for (unsigned j = 0; j < rows_per_part; ++j) {
table.add_row(m, table.make_ckey(row_id++), make_random_string(1));
}
partitions.emplace_back(std::move(m));
}
std::sort(partitions.begin(), partitions.end(), mutation_decorated_key_less_comparator());
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = 1; // So that every fragment is indexed
cfg.promoted_index_auto_scale_threshold = 0; // disable auto-scaling
auto ms = make_sstable_easy(env, make_mutation_reader_from_mutations(table.schema(), env.make_reader_permit(), partitions), cfg, version)->as_mutation_source();
auto rd = ms.make_mutation_reader(table.schema(),
env.make_reader_permit(),
query::full_partition_range,
table.schema()->full_slice(),
nullptr,
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::yes);
auto assertions = assert_that(std::move(rd));
// Consume first partition completely so that index is stale
{
assertions
.produces_partition_start(keys[0])
.fast_forward_to(position_range::all_clustered_rows());
for (auto i = 0u; i < rows_per_part; i++) {
assertions.produces_row_with_key(table.make_ckey(i));
}
assertions.produces_end_of_stream();
}
{
auto base = rows_per_part;
assertions
.next_partition()
.produces_partition_start(keys[1])
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base)),
position_in_partition::for_key(table.make_ckey(base + 3))))
.produces_row_with_key(table.make_ckey(base))
.produces_row_with_key(table.make_ckey(base + 1))
.produces_row_with_key(table.make_ckey(base + 2))
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base + 5)),
position_in_partition::for_key(table.make_ckey(base + 6))))
.produces_row_with_key(table.make_ckey(base + 5))
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base + rows_per_part)), // Skip all rows in current partition
position_in_partition::after_all_clustered_rows()))
.produces_end_of_stream();
}
// Consume few fragments then skip
{
auto base = rows_per_part * 2;
assertions
.next_partition()
.produces_partition_start(keys[2])
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base)),
position_in_partition::for_key(table.make_ckey(base + 3))))
.produces_row_with_key(table.make_ckey(base))
.produces_row_with_key(table.make_ckey(base + 1))
.produces_row_with_key(table.make_ckey(base + 2))
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base + rows_per_part - 1)), // last row
position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(table.make_ckey(base + rows_per_part - 1))
.produces_end_of_stream();
}
// Consume nothing from the next partition
{
assertions
.next_partition()
.produces_partition_start(keys[3])
.next_partition();
}
{
auto base = rows_per_part * 4;
assertions
.next_partition()
.produces_partition_start(keys[4])
.fast_forward_to(position_range(
position_in_partition::for_key(table.make_ckey(base + rows_per_part - 1)), // last row
position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(table.make_ckey(base + rows_per_part - 1))
.produces_end_of_stream();
}
}
});
}
SEASTAR_TEST_CASE(test_unknown_component) {
return test_env::do_with_async([] (test_env& env) {
copy_directory("test/resource/sstables/unknown_component", std::string(env.tempdir().path().string()) + "/unknown_component");
auto sstp = env.reusable_sst(uncompressed_schema(), env.tempdir().path().string() + "/unknown_component").get();
test::create_links(*sstp, env.tempdir().path().string()).get();
// check that create_links() moved unknown component to new dir
BOOST_REQUIRE(file_exists(env.tempdir().path().string() + "/la-1-big-UNKNOWN.txt").get());
sstp = env.reusable_sst(uncompressed_schema(), generation_type{1}).get();
env.manager().delete_atomically({sstp}).get();
// assure unknown component is deleted
BOOST_REQUIRE(!file_exists(env.tempdir().path().string() + "/la-1-big-UNKNOWN.txt").get());
});
}
SEASTAR_TEST_CASE(sstable_set_incremental_selector) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family).with_column("p1", utf8_type, column_kind::partition_key).build();
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::leveled, s->compaction_strategy_options());
const auto decorated_keys = tests::generate_partition_keys(8, s);
auto new_sstable = [&] (sstable_set& set, size_t k0, size_t k1, uint32_t level) {
auto key0 = decorated_keys[k0];
auto tok0 = key0.token();
auto key1 = decorated_keys[k1];
auto tok1 = key1.token();
testlog.debug("creating sstable with k[{}] token={} k[{}] token={} level={}", k0, tok0, k1, tok1, level);
auto sst = sstable_for_overlapping_test(env, s, key0.key(), key1.key(), level);
set.insert(sst);
return sst;
};
auto check = [&] (sstable_set::incremental_selector& selector, size_t k, std::unordered_set<shared_sstable> expected_ssts) {
const dht::decorated_key& key = decorated_keys[k];
auto sstables = selector.select(key).sstables;
testlog.debug("checking sstables for key[{}] token={} found={} expected={}", k, decorated_keys[k].token(), sstables.size(), expected_ssts.size());
BOOST_REQUIRE_EQUAL(sstables.size(), expected_ssts.size());
for (auto& sst : sstables) {
BOOST_REQUIRE(expected_ssts.contains(sst));
expected_ssts.erase(sst);
}
BOOST_REQUIRE(expected_ssts.empty());
};
{
sstable_set set = env.make_sstable_set(cs, s);
std::vector<shared_sstable> ssts;
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 3, 4, 1));
ssts.push_back(new_sstable(set, 4, 4, 1));
ssts.push_back(new_sstable(set, 4, 5, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, 0, std::unordered_set<shared_sstable>{ssts[0], ssts[1]});
check(sel, 1, std::unordered_set<shared_sstable>{ssts[0], ssts[1]});
check(sel, 2, std::unordered_set<shared_sstable>{});
check(sel, 3, std::unordered_set<shared_sstable>{ssts[2]});
check(sel, 4, std::unordered_set<shared_sstable>{ssts[2], ssts[3], ssts[4]});
check(sel, 5, std::unordered_set<shared_sstable>{ssts[4]});
check(sel, 6, std::unordered_set<shared_sstable>{});
check(sel, 7, std::unordered_set<shared_sstable>{});
}
{
sstable_set set = env.make_sstable_set(cs, s);
std::unordered_map<dht::token, std::unordered_set<shared_sstable>> map;
std::vector<shared_sstable> ssts;
ssts.push_back(new_sstable(set, 0, 7, 0)); // simulates L0 sstable spanning most of the range.
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 0, 1, 1));
ssts.push_back(new_sstable(set, 3, 4, 1));
ssts.push_back(new_sstable(set, 4, 4, 1));
ssts.push_back(new_sstable(set, 4, 5, 1));
sstable_set::incremental_selector sel = set.make_incremental_selector();
check(sel, 0, std::unordered_set<shared_sstable>{ssts[0], ssts[1], ssts[2]});
check(sel, 1, std::unordered_set<shared_sstable>{ssts[0], ssts[1], ssts[2]});
check(sel, 2, std::unordered_set<shared_sstable>{ssts[0]});
check(sel, 3, std::unordered_set<shared_sstable>{ssts[0], ssts[3]});
check(sel, 4, std::unordered_set<shared_sstable>{ssts[0], ssts[3], ssts[4], ssts[5]});
check(sel, 5, std::unordered_set<shared_sstable>{ssts[0], ssts[5]});
check(sel, 6, std::unordered_set<shared_sstable>{ssts[0]});
check(sel, 7, std::unordered_set<shared_sstable>{ssts[0]});
}
});
}
SEASTAR_TEST_CASE(sstable_set_erase) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family).with_column("p1", utf8_type, column_kind::partition_key).build();
const auto key = tests::generate_partition_key(s).key();
// check that sstable_set::erase is capable of working properly when a non-existing element is given.
{
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = env.make_sstable_set(cs, s);
auto sst = sstable_for_overlapping_test(env, s, key, key, 0);
set.insert(sst);
assert_sstable_set_size(set, 1);
auto unleveled_sst = sstable_for_overlapping_test(env, s, key, key, 0);
auto leveled_sst = sstable_for_overlapping_test(env, s, key, key, 1);
set.erase(unleveled_sst);
set.erase(leveled_sst);
assert_sstable_set_size(set, 1);
BOOST_REQUIRE(set.all()->contains(sst));
}
{
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = env.make_sstable_set(cs, s);
// triggers use-after-free, described in #4572, by operating on interval that relies on info of a destroyed sstable object.
{
auto sst = sstable_for_overlapping_test(env, s, key, key, 1);
set.insert(sst);
assert_sstable_set_size(set, 1);
}
auto sst2 = sstable_for_overlapping_test(env, s, key, key, 1);
set.insert(sst2);
assert_sstable_set_size(set, 2);
set.erase(sst2);
}
{
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::size_tiered, s->compaction_strategy_options());
sstable_set set = env.make_sstable_set(cs, s);
auto sst = sstable_for_overlapping_test(env, s, key, key, 0);
set.insert(sst);
assert_sstable_set_size(set, 1);
auto sst2 = sstable_for_overlapping_test(env, s, key, key, 0);
set.erase(sst2);
assert_sstable_set_size(set, 1);
BOOST_REQUIRE(set.all()->contains(sst));
}
});
}
SEASTAR_TEST_CASE(sstable_tombstone_histogram_test) {
return test_env::do_with_async([] (test_env& env) {
for (auto version : writable_sstable_versions) {
auto builder = schema_builder("tests", "tombstone_histogram_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
auto next_timestamp = [] {
static thread_local api::timestamp_type next = 1;
return next++;
};
auto make_delete = [&](partition_key key) {
mutation m(s, key);
tombstone tomb(next_timestamp(), gc_clock::now());
m.partition().apply(tomb);
return m;
};
utils::chunked_vector<mutation> mutations;
for (auto i = 0; i < sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE * 2; i++) {
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(i))});
mutations.push_back(make_delete(key));
forward_jump_clocks(std::chrono::seconds(1));
}
auto sst = make_sstable_containing(env.make_sstable(s, version), mutations);
auto histogram = sst->get_stats_metadata().estimated_tombstone_drop_time;
sst = env.reusable_sst(sst).get();
auto histogram2 = sst->get_stats_metadata().estimated_tombstone_drop_time;
// check that histogram respected limit
BOOST_REQUIRE(histogram.bin.size() == TOMBSTONE_HISTOGRAM_BIN_SIZE);
// check that load procedure will properly load histogram from statistics component
BOOST_REQUIRE(histogram.bin == histogram2.bin);
}
});
}
SEASTAR_TEST_CASE(sstable_bad_tombstone_histogram_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "tombstone_histogram_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
auto sst = env.reusable_sst(s, "test/resource/sstables/bad_tombstone_histogram").get();
auto histogram = sst->get_stats_metadata().estimated_tombstone_drop_time;
BOOST_REQUIRE(histogram.max_bin_size == sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE);
// check that bad histogram was discarded
BOOST_REQUIRE(histogram.bin.empty());
});
}
SEASTAR_TEST_CASE(sstable_owner_shards) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
auto make_insert = [&] (const dht::decorated_key& key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1);
return m;
};
auto make_shared_sstable = [&] (std::unordered_set<unsigned> shards, unsigned ignore_msb, unsigned smp_count) {
auto key_schema = schema_builder(s).with_sharder(smp_count, ignore_msb).build();
auto mut = [&] (auto shard) {
return make_insert(tests::generate_partition_key(key_schema, shard));
};
auto muts = shards
| std::views::transform([&] (auto shard) { return mut(shard); })
| std::ranges::to<utils::chunked_vector<mutation>>();
auto sst_gen = [&] () mutable {
auto schema = schema_builder(s).with_sharder(1, ignore_msb).build();
auto sst = env.make_sstable(std::move(schema));
return sst;
};
auto sst = make_sstable_containing(sst_gen, std::move(muts));
auto schema = schema_builder(s).with_sharder(smp_count, ignore_msb).build();
sst = env.reusable_sst(std::move(schema), sst).get();
return sst;
};
auto assert_sstable_owners = [&] (std::unordered_set<unsigned> expected_owners, unsigned ignore_msb, unsigned smp_count) {
SCYLLA_ASSERT(expected_owners.size() <= smp_count);
auto sst = make_shared_sstable(expected_owners, ignore_msb, smp_count);
auto owners = sst->get_shards_for_this_sstable() | std::ranges::to<std::unordered_set<unsigned>>();
BOOST_REQUIRE(std::ranges::all_of(expected_owners, [&] (unsigned expected_owner) {
return owners.contains(expected_owner);
}));
};
assert_sstable_owners({ 0 }, 0, 1);
assert_sstable_owners({ 0 }, 0, 1);
assert_sstable_owners({ 0 }, 0, 4);
assert_sstable_owners({ 0, 1 }, 0, 4);
assert_sstable_owners({ 0, 2 }, 0, 4);
assert_sstable_owners({ 0, 1, 2, 3 }, 0, 4);
assert_sstable_owners({ 0 }, 12, 4);
assert_sstable_owners({ 0, 1 }, 12, 4);
assert_sstable_owners({ 0, 2 }, 12, 4);
assert_sstable_owners({ 0, 1, 2, 3 }, 12, 4);
assert_sstable_owners({ 10 }, 0, 63);
assert_sstable_owners({ 10 }, 12, 63);
assert_sstable_owners({ 10, 15 }, 0, 63);
assert_sstable_owners({ 10, 15 }, 12, 63);
assert_sstable_owners({ 0, 10, 15, 20, 30, 40, 50 }, 0, 63);
assert_sstable_owners({ 0, 10, 15, 20, 30, 40, 50 }, 12, 63);
});
}
SEASTAR_TEST_CASE(test_summary_entry_spanning_more_keys_than_min_interval) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family)
.with_column("p1", int32_type, column_kind::partition_key)
.with_column("c1", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
const column_definition& r1_col = *s->get_column_definition("r1");
utils::chunked_vector<mutation> mutations;
auto keys_written = 0;
for (auto i = 0; i < s->min_index_interval()*1.5; i++) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mutations.push_back(std::move(m));
keys_written++;
}
auto version = sstable_version_types::me;
auto sst = make_sstable_containing(env.make_sstable(s, version), mutations);
const summary& sum = sst->get_summary();
BOOST_REQUIRE(sum.entries.size() == 1);
std::set<mutation, mutation_decorated_key_less_comparator> merged;
merged.insert(mutations.begin(), mutations.end());
auto rd = assert_that(sst->as_mutation_source().make_mutation_reader(s, env.make_reader_permit(), query::full_partition_range));
auto keys_read = 0;
for (auto&& m : merged) {
keys_read++;
rd.produces(m);
}
rd.produces_end_of_stream();
BOOST_REQUIRE(keys_read == keys_written);
auto r = dht::partition_range::make({mutations.back().decorated_key(), true}, {mutations.back().decorated_key(), true});
assert_that(sst->as_mutation_source().make_mutation_reader(s, env.make_reader_permit(), r))
.produces(slice(mutations, r))
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_wrong_counter_shard_order) {
// CREATE TABLE IF NOT EXISTS scylla_bench.test_counters (
// pk bigint,
// ck bigint,
// c1 counter,
// c2 counter,
// c3 counter,
// c4 counter,
// c5 counter,
// PRIMARY KEY(pk, ck)
// ) WITH compression = { }
//
// Populated with:
// scylla-bench -mode counter_update -workload uniform -duration 15s
// -replication-factor 3 -partition-count 2 -clustering-row-count 4
// on a three-node Scylla 1.7.4 cluster.
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
auto s = schema_builder("scylla_bench", "test_counters")
.with_column("pk", long_type, column_kind::partition_key)
.with_column("ck", long_type, column_kind::clustering_key)
.with_column("c1", counter_type)
.with_column("c2", counter_type)
.with_column("c3", counter_type)
.with_column("c4", counter_type)
.with_column("c5", counter_type)
.build();
auto sst = env.reusable_sst(s, get_test_dir("wrong_counter_shard_order", s), 2, version).get();
auto reader = sstable_mutation_reader(sst, s, env.make_reader_permit());
auto close_reader = deferred_close(reader);
auto verify_row = [&s] (mutation_fragment_v2_opt mfopt, int64_t expected_value) {
BOOST_REQUIRE(bool(mfopt));
auto& mf = *mfopt;
BOOST_REQUIRE(mf.is_clustering_row());
auto& row = mf.as_clustering_row();
size_t n = 0;
row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) {
auto acv = ac_o_c.as_atomic_cell(s->regular_column_at(id));
counter_cell_view ccv(acv);
counter_shard_view::less_compare_by_id cmp;
BOOST_REQUIRE_MESSAGE(std::ranges::is_sorted(ccv.shards(), cmp),
fmt::format("{} is expected to be sorted", ccv));
BOOST_REQUIRE_EQUAL(ccv.total_value(), expected_value);
n++;
});
BOOST_REQUIRE_EQUAL(n, 5);
};
{
auto mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
verify_row(reader().get(), 28545);
verify_row(reader().get(), 27967);
verify_row(reader().get(), 28342);
verify_row(reader().get(), 28325);
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
}
{
auto mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_partition_start());
verify_row(reader().get(), 28386);
verify_row(reader().get(), 28378);
verify_row(reader().get(), 28129);
verify_row(reader().get(), 28260);
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
BOOST_REQUIRE(mfopt->is_end_of_partition());
}
BOOST_REQUIRE(!reader().get());
}
});
}
static std::unique_ptr<abstract_index_reader> get_index_reader(shared_sstable sst, reader_permit permit) {
return sst->make_index_reader(std::move(permit));
}
SEASTAR_TEST_CASE(test_broken_promoted_index_is_skipped) {
// create table ks.test (pk int, ck int, v int, primary key(pk, ck)) with compact storage;
//
// Populated with:
//
// insert into ks.test (pk, ck, v) values (1, 1, 1);
// insert into ks.test (pk, ck, v) values (1, 2, 1);
// insert into ks.test (pk, ck, v) values (1, 3, 1);
// delete from ks.test where pk = 1 and ck = 2;
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
if (!has_summary_and_index(version)) {
// This is an old test for some workaround for
// incorrectly-generated promoted indexes.
// It doesn't make sense to port this test to
// newer sstable formats.
continue;
}
auto s = schema_builder("ks", "test")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build(schema_builder::compact_storage::yes);
auto sst = env.make_sstable(s, get_test_dir("broken_non_compound_pi_and_range_tombstone", s), sstables::generation_type(1), version);
try {
sst->load(sst->get_schema()->get_sharder()).get();
} catch (...) {
BOOST_REQUIRE_EXCEPTION(current_exception_as_future().get(), sstables::malformed_sstable_exception, exception_predicate::message_contains(
"Failed to read partition from SSTable "));
}
{
assert_that(get_index_reader(sst, env.make_reader_permit())).is_empty(*s);
}
}
});
}
SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) {
// create table ks.test (pk int, ck int, v int, primary key(pk, ck)) with compact storage;
//
// Populated with:
//
// insert into ks.test (pk, ck, v) values (1, 1, 1);
// insert into ks.test (pk, ck, v) values (1, 2, 1);
// insert into ks.test (pk, ck, v) values (1, 3, 1);
// delete from ks.test where pk = 1 and ck = 2;
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
if (version < sstable_version_types::mc) { // Applies only to formats older than 'm'
auto s = schema_builder("ks", "test")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build(schema_builder::compact_storage::yes);
auto sst = env.reusable_sst(s, get_test_dir("broken_non_compound_pi_and_range_tombstone", s), 1, version).get();
auto pk = partition_key::from_exploded(*s, { int32_type->decompose(1) });
auto dk = dht::decorate_key(*s, pk);
auto ck = clustering_key::from_exploded(*s, {int32_type->decompose(2)});
mutation m(s, dk);
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell::make_live(*int32_type, 1511270919978349, int32_type->decompose(1), { }));
m.partition().apply_delete(*s, ck, {1511270943827278, gc_clock::from_time_t(1511270943)});
{
auto slice = partition_slice_builder(*s).with_range(query::clustering_range::make_singular({ck})).build();
assert_that(sst->as_mutation_source().make_mutation_reader(s, env.make_reader_permit(), dht::partition_range::make_singular(dk), slice))
.produces(m)
.produces_end_of_stream();
}
}
}
});
}
SEASTAR_TEST_CASE(summary_rebuild_sanity) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", utf8_type);
builder.set_compressor_params(compression_parameters::no_compression());
auto s = builder.build(schema_builder::compact_storage::no);
const column_definition& col = *s->get_column_definition("value");
auto make_insert = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(utf8_type, bytes(1024, 'a')));
return m;
};
utils::chunked_vector<mutation> mutations;
for (auto i = 0; i < s->min_index_interval()*2; i++) {
auto key = to_bytes("key" + to_sstring(i));
mutations.push_back(make_insert(partition_key::from_exploded(*s, {std::move(key)})));
}
auto version = sstable_version_types::me;
auto sst = make_sstable_containing(env.make_sstable(s, version), mutations);
summary s1 = std::move(sstables::test(sst)._summary());
BOOST_REQUIRE(!(bool)sstables::test(sst)._summary()); // make sure std::move above took place
BOOST_REQUIRE(s1.entries.size() > 1);
sstables::test(sst).remove_component(component_type::Summary).get();
sst = env.reusable_sst(sst).get();
const summary& s2 = sst->get_summary();
BOOST_REQUIRE(::memcmp(&s1.header, &s2.header, sizeof(summary::header)) == 0);
BOOST_REQUIRE(s1.positions == s2.positions);
BOOST_REQUIRE(s1.entries == s2.entries);
BOOST_REQUIRE(s1.first_key.value == s2.first_key.value);
BOOST_REQUIRE(s1.last_key.value == s2.last_key.value);
});
}
SEASTAR_TEST_CASE(sstable_partition_estimation_sanity_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", utf8_type);
builder.set_compressor_params(compression_parameters::no_compression());
auto s = builder.build(schema_builder::compact_storage::no);
const column_definition& col = *s->get_column_definition("value");
auto summary_byte_cost = sstables::index_sampling_state::default_summary_byte_cost;
auto make_large_partition = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(utf8_type, bytes(20 * summary_byte_cost, 'a')));
return m;
};
auto make_small_partition = [&] (partition_key key) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(utf8_type, bytes(100, 'a')));
return m;
};
{
auto total_partitions = s->min_index_interval()*2;
utils::chunked_vector<mutation> mutations;
for (auto i = 0; i < total_partitions; i++) {
auto key = to_bytes("key" + to_sstring(i));
mutations.push_back(make_large_partition(partition_key::from_exploded(*s, {std::move(key)})));
}
auto sst = make_sstable_containing(env.make_sstable(s), mutations);
BOOST_REQUIRE(std::abs(int64_t(total_partitions) - int64_t(sst->get_estimated_key_count())) <= s->min_index_interval());
}
{
auto total_partitions = s->min_index_interval()*2;
utils::chunked_vector<mutation> mutations;
for (auto i = 0; i < total_partitions; i++) {
auto key = to_bytes("key" + to_sstring(i));
mutations.push_back(make_small_partition(partition_key::from_exploded(*s, {std::move(key)})));
}
auto sst = make_sstable_containing(env.make_sstable(s), mutations);
BOOST_REQUIRE(std::abs(int64_t(total_partitions) - int64_t(sst->get_estimated_key_count())) <= s->min_index_interval());
}
});
}
SEASTAR_TEST_CASE(sstable_timestamp_metadata_correcness_with_negative) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
for (auto version : writable_sstable_versions) {
auto s = schema_builder("tests", "ts_correcness_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto make_insert = [&](partition_key key, api::timestamp_type ts) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), ts);
return m;
};
auto alpha = partition_key::from_exploded(*s, {to_bytes("alpha")});
auto beta = partition_key::from_exploded(*s, {to_bytes("beta")});
auto mut1 = make_insert(alpha, -50);
auto mut2 = make_insert(beta, 5);
auto sst = make_sstable_containing(env.make_sstable(s, version), {mut1, mut2});
BOOST_REQUIRE(sst->get_stats_metadata().min_timestamp == -50);
BOOST_REQUIRE(sst->get_stats_metadata().max_timestamp == 5);
}
});
}
SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder("tests", "ts_correcness_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
sstable_writer_config cfg = env.manager().configure_writer();
cfg.run_identifier = sstables::run_id::create_random_id();
auto sst = make_sstable_easy(env, make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)), cfg);
BOOST_REQUIRE(sst->run_identifier() == cfg.run_identifier);
});
}
SEASTAR_TEST_CASE(sstable_run_disjoint_invariant_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
const auto keys = tests::generate_partition_keys(6, s);
sstables::sstable_run run;
auto insert = [&] (int first_key_idx, int last_key_idx) {
auto sst = sstable_for_overlapping_test(env, s, keys[first_key_idx].key(), keys[last_key_idx].key());
return run.insert(sst);
};
// insert ranges [0, 0], [1, 1], [3, 4]
BOOST_REQUIRE(insert(0, 0) == true);
BOOST_REQUIRE(insert(1, 1) == true);
BOOST_REQUIRE(insert(3, 4) == true);
BOOST_REQUIRE(run.all().size() == 3);
// check overlapping candidates won't be inserted
BOOST_REQUIRE(insert(0, 4) == false);
BOOST_REQUIRE(insert(4, 5) == false);
BOOST_REQUIRE(run.all().size() == 3);
// check non-overlapping candidates will be inserted
BOOST_REQUIRE(insert(2, 2) == true);
BOOST_REQUIRE(insert(5, 5) == true);
BOOST_REQUIRE(run.all().size() == 5);
});
}
SEASTAR_TEST_CASE(sstable_run_clustering_disjoint_invariant_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(1);
auto make_sstable = [&] (int first_ckey_idx, int last_ckey_idx) {
utils::chunked_vector<mutation> muts;
auto mut = mutation(s, pks[0]);
auto first_ckey_prefix = ss.make_ckey(first_ckey_idx);
mut.partition().apply_insert(*s, first_ckey_prefix, ss.new_timestamp());
auto last_ckey_prefix = ss.make_ckey(last_ckey_idx);
if (first_ckey_prefix != last_ckey_prefix) {
mut.partition().apply_insert(*s, last_ckey_prefix, ss.new_timestamp());
}
muts.push_back(std::move(mut));
auto sst = make_sstable_containing(env.make_sstable(s), std::move(muts));
BOOST_REQUIRE(sst->min_position().key() == first_ckey_prefix);
BOOST_REQUIRE(sst->max_position().key() == last_ckey_prefix);
testlog.info("sstable: {} {} -> {} {}", first_ckey_idx, last_ckey_idx, sst->first_partition_first_position(), sst->last_partition_last_position());
return sst;
};
sstables::sstable_run run;
auto insert = [&] (int first_ckey_idx, int last_ckey_idx) {
auto sst = make_sstable(first_ckey_idx, last_ckey_idx);
return run.insert(sst);
};
// insert sstables with disjoint clustering ranges [0, 1], [4, 5], [6, 7]
BOOST_REQUIRE(insert(0, 1) == true);
BOOST_REQUIRE(insert(4, 5) == true);
BOOST_REQUIRE(insert(6, 7) == true);
BOOST_REQUIRE(run.all().size() == 3);
// check overlapping candidates won't be inserted
BOOST_REQUIRE(insert(0, 4) == false);
BOOST_REQUIRE(insert(1, 3) == false);
BOOST_REQUIRE(insert(5, 6) == false);
BOOST_REQUIRE(insert(7, 8) == false);
BOOST_REQUIRE(run.all().size() == 3);
// check non-overlapping candidates will be inserted
BOOST_REQUIRE(insert(2, 3) == true);
BOOST_REQUIRE(insert(8, 9) == true);
BOOST_REQUIRE(run.all().size() == 5);
});
}
SEASTAR_TEST_CASE(test_reads_cassandra_static_compact) {
return test_env::do_with_async([] (test_env& env) {
// CREATE COLUMNFAMILY cf (key varchar PRIMARY KEY, c2 text, c1 text) WITH COMPACT STORAGE ;
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("c1", utf8_type)
.with_column("c2", utf8_type)
.build(schema_builder::compact_storage::yes);
// INSERT INTO ks.cf (key, c1, c2) VALUES ('a', 'abc', 'cde');
auto sst = env.reusable_sst(s, get_test_dir("cassandra_static_compact", s), 1, sstables::sstable::version_types::mc).get();
auto pkey = partition_key::from_exploded(*s, { utf8_type->decompose("a") });
auto dkey = dht::decorate_key(*s, std::move(pkey));
mutation m(s, dkey);
m.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("c1"),
atomic_cell::make_live(*utf8_type, 1551785032379079, utf8_type->decompose("abc"), {}));
m.set_clustered_cell(clustering_key::make_empty(), *s->get_column_definition("c2"),
atomic_cell::make_live(*utf8_type, 1551785032379079, utf8_type->decompose("cde"), {}));
assert_that(sst->as_mutation_source().make_mutation_reader(s, env.make_reader_permit()))
.produces(m)
.produces_end_of_stream();
});
}
static dht::token token_from_long(int64_t value) {
return dht::token{ value };
}
SEASTAR_TEST_CASE(basic_interval_map_testing_for_sstable_set) {
using value_set = std::unordered_set<int64_t>;
using interval_map_type = boost::icl::interval_map<dht::compatible_ring_position_or_view, value_set>;
using interval_type = interval_map_type::interval_type;
interval_map_type map;
auto builder = schema_builder("tests", "test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type);
auto s = builder.build();
auto make_pos = [&] (int64_t token) {
return dht::compatible_ring_position_or_view(s, dht::ring_position::starting_at(token_from_long(token)));
};
auto add = [&] (int64_t start, int64_t end, int gen) {
map.insert({interval_type::closed(make_pos(start), make_pos(end)), value_set({gen})});
};
auto subtract = [&] (int64_t start, int64_t end, int gen) {
map.subtract({interval_type::closed(make_pos(start), make_pos(end)), value_set({gen})});
};
add(6052159333454473039, 9223347124876901511, 0);
add(957694089857623813, 6052133625299168475, 1);
add(-9223359752074096060, -4134836824175349559, 2);
add(-4134776408386727187, 957682147550689253, 3);
add(6092345676202690928, 9223332435915649914, 4);
add(-5395436281861775460, -1589168419922166021, 5);
add(-1589165560271708558, 6092259415972553765, 6);
add(-9223362900961284625, -5395452288575292639, 7);
subtract(-9223359752074096060, -4134836824175349559, 2);
subtract(-9223362900961284625, -5395452288575292639, 7);
subtract(-4134776408386727187, 957682147550689253, 3);
subtract(-5395436281861775460, -1589168419922166021, 5);
subtract(957694089857623813, 6052133625299168475, 1);
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
const auto pk = tests::generate_partition_key(s);
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
for (const auto version : writable_sstable_versions) {
testlog.info("version={}", version);
auto mr = make_mutation_reader_from_mutations(ss.schema(), env.make_reader_permit(), mut);
sstable_writer_config cfg = env.manager().configure_writer();
auto sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
auto sst_mr = sst->as_mutation_source().make_mutation_reader(s, env.make_reader_permit(), query::full_partition_range, s->full_slice());
auto close_mr = deferred_close(sst_mr);
auto sst_mut = read_mutation_from_mutation_reader(sst_mr).get();
// The real test here is that we don't SCYLLA_ASSERT() in
// sstables::prepare_summary() with the write_components() call above,
// this is only here as a sanity check.
BOOST_REQUIRE(sst_mr.is_buffer_empty());
BOOST_REQUIRE(sst_mr.is_end_of_stream());
BOOST_REQUIRE(sst_mut);
BOOST_REQUIRE_EQUAL(mut, *sst_mut);
}
});
}
SEASTAR_TEST_CASE(test_may_have_partition_tombstones) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(2);
for (auto version : all_sstable_versions) {
if (version < sstable_version_types::md) {
continue;
}
auto mut1 = mutation(s, pks[0]);
auto mut2 = mutation(s, pks[1]);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
mut1.partition().apply_delete(*s, ss.make_ckey(1), ss.new_tombstone());
ss.add_row(mut1, ss.make_ckey(2), "val");
ss.delete_range(mut1, query::clustering_range::make({ss.make_ckey(3)}, {ss.make_ckey(5)}));
ss.add_row(mut2, ss.make_ckey(6), "val");
{
auto sst = make_sstable_containing(env.make_sstable(s, version), {mut1, mut2});
BOOST_REQUIRE(!sst->may_have_partition_tombstones());
}
mut2.partition().apply(ss.new_tombstone());
auto sst = make_sstable_containing(env.make_sstable(s, version), {mut1, mut2});
BOOST_REQUIRE(sst->may_have_partition_tombstones());
}
});
}
SEASTAR_TEST_CASE(test_missing_partition_end_fragment) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pkeys = ss.make_pkeys(2);
set_abort_on_internal_error(false);
auto enable_aborts = defer([] { set_abort_on_internal_error(true); }); // FIXME: restore to previous value
for (const auto version : writable_sstable_versions) {
testlog.info("version={}", version);
std::deque<mutation_fragment_v2> frags;
frags.push_back(mutation_fragment_v2(*s, env.make_reader_permit(), partition_start(pkeys[0], tombstone())));
frags.push_back(mutation_fragment_v2(*s, env.make_reader_permit(), clustering_row(ss.make_ckey(0))));
// partition_end is missing
frags.push_back(mutation_fragment_v2(*s, env.make_reader_permit(), partition_start(pkeys[1], tombstone())));
frags.push_back(mutation_fragment_v2(*s, env.make_reader_permit(), clustering_row(ss.make_ckey(0))));
frags.push_back(mutation_fragment_v2(*s, env.make_reader_permit(), partition_end()));
auto mr = make_mutation_reader_from_fragments(s, env.make_reader_permit(), std::move(frags));
auto close_mr = deferred_close(mr);
auto sst = env.make_sstable(s, version);
sstable_writer_config cfg = env.manager().configure_writer();
try {
auto wr = sst->get_writer(*s, 1, cfg, encoding_stats{});
mr.consume_in_thread(std::move(wr));
BOOST_FAIL("write_components() should have failed");
} catch (const std::runtime_error&) {
testlog.info("failed as expected: {}", std::current_exception());
}
}
});
}
SEASTAR_TEST_CASE(test_sstable_origin) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
const auto pk = tests::generate_partition_key(s);
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
for (const auto version : all_sstable_versions) {
if (version < sstable_version_types::mc) {
continue;
}
// Test empty sstable_origin.
auto mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), mut);
sstable_writer_config cfg = env.manager().configure_writer("");
auto sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
BOOST_REQUIRE_EQUAL(sst->get_origin(), "");
// Test that a random sstable_origin is stored and retrieved properly.
mr = make_mutation_reader_from_mutations(s, env.make_reader_permit(), mut);
sstring origin = fmt::format("test-{}", tests::random::get_sstring());
cfg = env.manager().configure_writer(origin);
sst = make_sstable_easy(env, std::move(mr), cfg, version, 0);
BOOST_REQUIRE_EQUAL(sst->get_origin(), origin);
}
});
}
SEASTAR_TEST_CASE(compound_sstable_set_basic_test) {
return test_env::do_with_async([] (test_env& env) {
auto s = schema_builder(some_keyspace, some_column_family).with_column("p1", utf8_type, column_kind::partition_key).build();
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::size_tiered, s->compaction_strategy_options());
lw_shared_ptr<sstables::sstable_set> set1 = make_lw_shared(env.make_sstable_set(cs, s));
lw_shared_ptr<sstables::sstable_set> set2 = make_lw_shared(env.make_sstable_set(cs, s));
lw_shared_ptr<sstables::sstable_set> compound = make_lw_shared(sstables::make_compound_sstable_set(s, {set1, set2}));
const auto keys = tests::generate_partition_keys(2, s);
std::array<sstables::shared_sstable, 3> sstables_for_overlapping_test = {
sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0),
sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0),
sstable_for_overlapping_test(env, s, keys[0].key(), keys[1].key(), 0)
};
set1->insert(sstables_for_overlapping_test[0]);
set2->insert(sstables_for_overlapping_test[1]);
set2->insert(sstables_for_overlapping_test[2]);
std::unordered_set<utils::UUID> expected_generations;
for (const auto& sst : sstables_for_overlapping_test) {
expected_generations.insert(sst->generation().as_uuid());
}
std::unordered_set<utils::UUID> found_generations;
for (const auto& sst : *compound->all()) {
found_generations.insert(sst->generation().as_uuid());
}
BOOST_REQUIRE(found_generations == expected_generations);
{
auto cloned_compound = *compound;
assert_sstable_set_size(cloned_compound, 3);
}
set2 = make_lw_shared(env.make_sstable_set(cs, s));
compound = make_lw_shared(sstables::make_compound_sstable_set(s, {set1, set2}));
{
unsigned found = 0;
for (auto sstables = compound->all(); [[maybe_unused]] auto& sst : *sstables) {
found++;
}
size_t compound_size = compound->all()->size();
BOOST_REQUIRE(compound_size == 1);
BOOST_REQUIRE(compound_size == found);
}
});
}
SEASTAR_TEST_CASE(sstable_reader_with_timeout) {
return test_env::do_with_async([] (test_env& env) {
auto s = complex_schema();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto cp = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, cp, tomb);
auto sstp = make_sstable_containing(env.make_sstable(s), {std::move(m)});
auto pr = dht::partition_range::make_singular(make_dkey(s, "key1"));
auto timeout = db::timeout_clock::now();
auto rd = sstp->make_reader(s, env.make_reader_permit(timeout), pr, s->full_slice());
auto close_rd = deferred_close(rd);
auto f = read_mutation_from_mutation_reader(rd);
BOOST_REQUIRE_THROW(f.get(), timed_out_error);
});
}
SEASTAR_TEST_CASE(test_validate_checksums) {
return test_env::do_with_async([&] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
auto permit = env.make_reader_permit();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema).get();
auto make_sstable = [&env, &permit, &muts] (schema_ptr schema, sstable_version_types version) {
auto mr = make_mutation_reader_from_mutations(schema, permit, muts);
auto close_mr = deferred_close(mr);
auto sst = env.make_sstable(schema, version);
sstable_writer_config cfg = env.manager().configure_writer();
auto wr = sst->get_writer(*schema, 1, cfg, encoding_stats{});
mr.consume_in_thread(std::move(wr));
return sst;
};
const std::map<sstring, sstring> no_compression_params = {};
const std::map<sstring, sstring> lz4_compression_params = {{compression_parameters::SSTABLE_COMPRESSION, "LZ4Compressor"}};
for (const auto version : writable_sstable_versions) {
testlog.info("version={}", version);
for (const auto& compression_params : {no_compression_params, lz4_compression_params}) {
testlog.info("compression={}", compression_params);
auto sst_schema = schema_builder(schema).set_compressor_params(compression_params).build();
auto sst = make_sstable(sst_schema, version);
sst->load(sst->get_schema()->get_sharder()).get();
validate_checksums_result res;
testlog.info("Validating intact {}", sst->get_filename());
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::valid);
BOOST_REQUIRE(res.has_digest);
auto sst_file = open_file_dma(test(sst).filename(sstables::component_type::Data).native(), open_flags::wo).get();
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
testlog.info("Validating digest-corrupted {}", sst->get_filename());
auto valid_digest = sst->read_digest().get();
BOOST_REQUIRE(valid_digest.has_value());
sstables::test(sst).set_digest(valid_digest.value() + 1);
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
sstables::test(sst).set_digest(valid_digest); // restore it for next test cases
testlog.info("Validating checksum-corrupted {}", sst->get_filename());
{ // corrupt the sstable
const auto size = std::min(sst->ondisk_data_size() / 2, uint64_t(1024));
auto buf = temporary_buffer<char>::aligned(sst_file.disk_write_dma_alignment(), size);
std::fill(buf.get_write(), buf.get_write() + size, 0xba);
sst_file.dma_write(sst->ondisk_data_size() / 2, buf.begin(), buf.size()).get();
}
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
testlog.info("Validating post-load minor truncation (last byte removed) on {}", sst->get_filename());
{ // truncate the sstable
sst_file.truncate(sst->ondisk_data_size() - 1).get();
}
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
testlog.info("Validating post-load major truncation (half of data removed) on {}", sst->get_filename());
{ // truncate the sstable
sst_file.truncate(sst->ondisk_data_size() / 2).get();
}
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
testlog.info("Validating with no digest {}", sst->get_filename());
sstables::test(sst).set_digest(std::nullopt);
sstables::test(sst).rewrite_toc_without_component(component_type::Digest);
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(!res.has_digest);
if (compression_params == no_compression_params) {
testlog.info("Validating with no checksums {}", sst->get_filename());
sstables::test(sst).rewrite_toc_without_component(component_type::CRC);
auto res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::no_checksum);
BOOST_REQUIRE(!res.has_digest);
}
{ // truncate the sstable
auto sst = make_sstable(sst_schema, version);
testlog.info("Validating pre-load minor truncation (last byte removed) on {}", sst->get_filename());
auto sst_file = open_file_dma(test(sst).filename(sstables::component_type::Data).native(), open_flags::wo).get();
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
sst_file.truncate(sst_file.size().get() - 1).get();
sst->load(sst->get_schema()->get_sharder()).get();
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
}
{ // truncate the sstable
auto sst = make_sstable(sst_schema, version);
testlog.info("Validating pre-load major truncation (half of data removed) on {}", sst->get_filename());
auto sst_file = open_file_dma(test(sst).filename(sstables::component_type::Data).native(), open_flags::wo).get();
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
sst_file.truncate(sst_file.size().get() / 2).get();
sst->load(sst->get_schema()->get_sharder()).get();
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
}
{ // append data to the sstable
auto sst = make_sstable(sst_schema, version);
testlog.info("Validating appended {}", sst->get_filename());
auto sst_file = open_file_dma(test(sst).filename(sstables::component_type::Data).native(), open_flags::rw).get();
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
auto buf = temporary_buffer<char>::aligned(sst_file.disk_write_dma_alignment(), 2 * 64 * 1024);
std::fill(buf.get_write(), buf.get_write() + buf.size(), 0xba);
auto fsize = sst_file.size().get();
auto wpos = align_down(fsize, sst_file.disk_write_dma_alignment());
sst_file.dma_read<char>(wpos, buf.get_write(), fsize - wpos).get();
sst_file.dma_write(wpos, buf.begin(), buf.size()).get();
sst->load(sst->get_schema()->get_sharder()).get();
res = sstables::validate_checksums(sst, permit).get();
BOOST_REQUIRE(res.status == validate_checksums_status::invalid);
BOOST_REQUIRE(res.has_digest);
}
}
}
});
}
SEASTAR_TEST_CASE(partial_sstable_deletion_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(1);
auto mut1 = mutation(s, pks[0]);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut1)});
// Rename TOC into TMP toc, to stress deletion path for partial files
rename_file(test(sst).filename(sstables::component_type::TOC).native(), test(sst).filename(sstables::component_type::TemporaryTOC).native()).get();
sst->unlink().get();
});
}
SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
return test_env::do_with_async([&] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
auto permit = env.make_reader_permit();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sst = env.make_sstable(schema, writable_sstable_versions.back());
{
auto mr = make_mutation_reader_from_mutations(schema, permit, muts);
auto close_mr = deferred_close(mr);
sstable_writer_config cfg = env.manager().configure_writer();
auto wr = sst->get_writer(*schema, 1, cfg, encoding_stats{});
mr.consume_in_thread(std::move(wr));
sst->load(sst->get_schema()->get_sharder()).get();
}
const auto t1 = muts.front().decorated_key()._token;
const auto t2 = muts.back().decorated_key()._token;
dht::partition_range_vector prs;
prs.emplace_back(dht::ring_position::starting_at(dht::token{t1.raw() - 200}), dht::ring_position::ending_at(dht::token{t1.raw() - 100}));
prs.emplace_back(dht::ring_position::starting_at(dht::token{t1.raw() + 2}), dht::ring_position::ending_at(dht::token{t2.raw() + 2}));
// Should be at eof() after the above range is finished
prs.emplace_back(dht::ring_position::starting_at(dht::token{t2.raw() + 100}), dht::ring_position::ending_at(dht::token{t2.raw() + 200}));
prs.emplace_back(dht::ring_position::starting_at(dht::token{t2.raw() + 300}), dht::ring_position::ending_at(dht::token{t2.raw() + 400}));
auto reader = sst->make_reader(schema, permit, prs.front(), schema->full_slice());
auto close_reader = deferred_close(reader);
while (reader().get());
auto& region = env.manager().get_cache_tracker().region();
for (auto it = std::next(prs.begin()); it != prs.end(); ++it) {
testlog.info("fast_forward_to({})", *it);
reader.fast_forward_to(*it).get();
while (reader().get());
// Make sure the index page linked into LRU after EOF is evicted.
while (region.evict_some() == memory::reclaiming_result::reclaimed_something);
}
});
}
SEASTAR_TEST_CASE(test_full_scan_reader_out_of_range_last_range_tombstone_change) {
return test_env::do_with_async([] (test_env& env) {
simple_schema table;
auto mut = table.new_mutation("pk0");
auto ckeys = table.make_ckeys(4);
table.add_row(mut, ckeys[0], "v0");
table.add_row(mut, ckeys[1], "v1");
table.add_row(mut, ckeys[2], "v2");
using bound = query::clustering_range::bound;
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
auto sst = make_sstable_containing(env.make_sstable(table.schema()), {mut});
assert_that(sst->make_full_scan_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(test_full_scan_reader_random_schema_random_mutations) {
return test_env::do_with_async([] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto sst = make_sstable_containing(env.make_sstable(schema), muts);
{
auto rd = assert_that(sst->make_full_scan_reader(schema, env.make_reader_permit()));
for (const auto& mut : muts) {
rd.produces(mut);
}
}
assert_that(sst->make_full_scan_reader(schema, env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(find_first_position_in_partition_from_sstable_test) {
return test_env::do_with_async([] (test_env& env) {
class with_range_tombstone_tag;
using with_range_tombstone = bool_class<with_range_tombstone_tag>;
class with_static_row_tag;
using with_static_row = bool_class<with_static_row_tag>;
auto check_sstable_first_and_last_positions = [&] (size_t partitions, with_range_tombstone with_range_tombstone, with_static_row with_static_row) {
testlog.info("check_sstable_first_and_last_positions: partitions={}, with_range_tombstone={}, with_static_row={}",
partitions, bool(with_range_tombstone), bool(with_static_row));
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(partitions);
auto tmp = env.tempdir().make_sweeper();
utils::chunked_vector<mutation> muts;
std::optional<position_in_partition> first_position, last_position;
static constexpr size_t ckeys_per_partition = 10;
size_t ck_idx = 0;
for (size_t pr = 0; pr < partitions; pr++) {
auto mut1 = mutation(s, pks[pr]);
if (with_static_row) {
ss.add_static_row(mut1, "svalue");
if (!first_position) {
first_position = position_in_partition::for_static_row();
}
}
for (size_t ck = 0; ck < ckeys_per_partition; ck++) {
auto ckey = ss.make_ckey(ck_idx + ck);
if (!first_position) {
first_position = position_in_partition::for_key(ckey);
}
last_position = position_in_partition::for_key(ckey);
if (with_range_tombstone && ck % 2 == 0) {
tombstone tomb(ss.new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(ckey, bound_kind::incl_start),
bound_view(ckey, bound_kind::incl_end),
tomb);
mut1.partition().apply_delete(*s, std::move(rt));
} else {
mut1.partition().apply_insert(*s, ckey, ss.new_timestamp());
}
}
muts.push_back(std::move(mut1));
}
auto sst = make_sstable_containing(env.make_sstable(s), std::move(muts));
position_in_partition::equal_compare eq(*s);
if (!with_static_row) {
BOOST_REQUIRE(sst->min_position().key() == first_position->key());
BOOST_REQUIRE(sst->max_position().key() == last_position->key());
}
auto first_position_opt = sst->find_first_position_in_partition(env.make_reader_permit(), sst->get_first_decorated_key(), false).get();
BOOST_REQUIRE(first_position_opt);
auto last_position_opt = sst->find_first_position_in_partition(env.make_reader_permit(), sst->get_last_decorated_key(), true).get();
BOOST_REQUIRE(last_position_opt);
BOOST_REQUIRE(eq(*first_position_opt, *first_position));
BOOST_REQUIRE(eq(*last_position_opt, *last_position));
BOOST_REQUIRE(eq(sst->first_partition_first_position(), *first_position));
BOOST_REQUIRE(eq(sst->last_partition_last_position(), *last_position));
};
std::array<size_t, 2> partitions_options = { 1, 5 };
std::array<with_range_tombstone, 2> range_tombstone_options = { with_range_tombstone::no, with_range_tombstone::yes };
std::array<with_static_row, 2> static_row_options = { with_static_row::no, with_static_row::yes };
for (size_t partitions : partitions_options) {
for (with_range_tombstone range_tombstone_opt : range_tombstone_options) {
for (with_static_row static_row_opt : static_row_options) {
check_sstable_first_and_last_positions(partitions, range_tombstone_opt, static_row_opt);
}
}
}
});
}
future<> test_sstable_bytes_correctness(sstring tname, test_env_config cfg) {
return test_env::do_with_async([tname] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
tname,
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto sst = make_sstable_containing(env.make_sstable(schema), muts);
auto free_space = sst->get_storage().free_space().get();
BOOST_REQUIRE(free_space > 0);
testlog.info("prefix: {}, free space: {}", sst->get_storage().prefix(), free_space);
auto get_bytes_on_disk_from_storage = [&] (const sstables::shared_sstable& sst) {
uint64_t bytes_on_disk = 0;
auto& underlying_storage = const_cast<sstables::storage&>(sst->get_storage());
for (auto& component_type : sstables::test(sst).get_components()) {
file f = underlying_storage.open_component(*sst, component_type, open_flags::ro, file_open_options{}, true).get();
bytes_on_disk += f.size().get();
}
return bytes_on_disk;
};
auto expected_bytes_on_disk = get_bytes_on_disk_from_storage(sst);
testlog.info("expected={}, actual={}", expected_bytes_on_disk, sst->bytes_on_disk());
BOOST_REQUIRE(sst->bytes_on_disk() == expected_bytes_on_disk);
}, std::move(cfg));
}
SEASTAR_TEST_CASE(test_sstable_bytes_on_disk_correctness) {
return test_sstable_bytes_correctness(get_name() + "_disk", {});
}
SEASTAR_TEST_CASE(test_sstable_bytes_on_s3_correctness) {
return test_sstable_bytes_correctness(get_name() + "_s3", test_env_config{ .storage = make_test_object_storage_options("S3") });
}
SEASTAR_FIXTURE_TEST_CASE(test_sstable_bytes_on_gs_correctness, gcs_fixture, *check_run_test_decorator("ENABLE_GCP_STORAGE_TEST", true)) {
return test_sstable_bytes_correctness(seastar::testing::seastar_test::get_name() + "_gs", test_env_config{ .storage = make_test_object_storage_options("GS") });
}
SEASTAR_TEST_CASE(test_sstable_set_predicate) {
return test_env::do_with_async([] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto s = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto sst = make_sstable_containing(env.make_sstable(s), muts);
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::leveled, s->compaction_strategy_options());
sstable_set set = env.make_sstable_set(cs, s);
set.insert(sst);
auto first_key_pr = dht::partition_range::make_singular(sst->get_first_decorated_key());
auto make_point_query_reader = [&] (std::predicate<const sstable&> auto& pred) {
auto t = env.make_table_for_tests(s);
auto close_t = deferred_stop(t);
utils::estimated_histogram eh;
return set.create_single_key_sstable_reader(&*t, s, env.make_reader_permit(), eh,
first_key_pr,
s->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
pred);
};
auto make_full_scan_reader = [&] (std::predicate<const sstable&> auto& pred) {
return set.make_local_shard_sstable_reader(s, env.make_reader_permit(),
query::full_partition_range,
s->full_slice(),
tracing::trace_state_ptr(),
::streamed_mutation::forwarding::no,
::mutation_reader::forwarding::no,
default_read_monitor_generator(),
pred);
};
auto verify_reader_result = [&] (mutation_reader sst_mr, bool expect_eos) {
auto close_mr = deferred_close(sst_mr);
auto sst_mut = read_mutation_from_mutation_reader(sst_mr).get();
if (expect_eos) {
BOOST_REQUIRE(sst_mr.is_buffer_empty());
BOOST_REQUIRE(sst_mr.is_end_of_stream());
BOOST_REQUIRE(!sst_mut);
} else {
BOOST_REQUIRE(sst_mut);
}
};
{
static std::predicate<const sstable&> auto excluding_pred = [] (const sstable&) {
return false;
};
testlog.info("excluding_pred: point query");
verify_reader_result(make_point_query_reader(excluding_pred), true);
testlog.info("excluding_pred: range query");
verify_reader_result(make_full_scan_reader(excluding_pred), true);
}
{
static std::predicate<const sstable&> auto inclusive_pred = [] (const sstable&) {
return true;
};
testlog.info("inclusive_pred: point query");
verify_reader_result(make_point_query_reader(inclusive_pred), false);
testlog.info("inclusive_pred: range query");
verify_reader_result(make_full_scan_reader(inclusive_pred), false);
}
});
}
SEASTAR_TEST_CASE(sstable_identifier_correctness) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto pks = ss.make_pkeys(1);
auto mut1 = mutation(s, pks[0]);
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut1)});
BOOST_REQUIRE(sst->sstable_identifier());
BOOST_REQUIRE_EQUAL(sst->sstable_identifier()->uuid(), sst->generation().as_uuid());
});
}
SEASTAR_TEST_CASE(test_non_full_and_empty_row_keys) {
return do_with_cql_env_thread([] (cql_test_env& env) {
const auto seed = tests::random::get_int<uint32_t>();
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema(seed, *random_spec);
testlog.info("Random schema:\n{}", random_schema.cql());
random_schema.create_with_cql(env).get();
auto schema = random_schema.schema();
auto& db = env.local_db();
auto& table = db.find_column_family(schema);
auto& manager = db.get_user_sstables_manager();
const auto generated_mutations = tests::generate_random_mutations(random_schema).get();
auto mutation_description = random_schema.new_mutation(0);
auto engine = std::mt19937(seed);
random_schema.add_row(engine, mutation_description, 0, [] (std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) {
switch (destination) {
case tests::timestamp_destination::partition_tombstone:
case tests::timestamp_destination::row_tombstone:
case tests::timestamp_destination::collection_tombstone:
case tests::timestamp_destination::range_tombstone:
return api::missing_timestamp;
default:
return api::timestamp_type(100);
}
});
const auto row_mutation = mutation_description.build(schema);
auto check = [&] (const clustering_key& ck) {
testlog.info("check({})", ck);
auto permit = db.obtain_reader_permit(schema, "test_non_full_and_empty_row_keys::write", db::no_timeout, {}).get();
const auto dk = generated_mutations.front().decorated_key();
const auto row_mutation_fragment = mutation_fragment_v2(*schema, permit,
clustering_row(ck, deletable_row(*schema, row_mutation.partition().clustered_rows().begin()->row())));
std::deque<mutation_fragment_v2> fragments;
fragments.emplace_back(*schema, permit, partition_start(dk, {}));
fragments.emplace_back(*schema, permit, row_mutation_fragment);
fragments.emplace_back(*schema, permit, partition_end());
const auto original_mutation_fragment = mutation_fragment_v2(*schema, permit, fragments[1]);
auto reader = make_combined_reader(schema, permit,
make_mutation_reader_from_mutations(schema, permit, generated_mutations, query::full_partition_range),
make_mutation_reader_from_fragments(schema, permit, std::move(fragments)));
auto sst = table.make_sstable();
auto& corrupt_data_handler = sst->get_corrupt_data_handler();
const auto stats_before = corrupt_data_handler.get_stats();
sst->write_components(std::move(reader), generated_mutations.size(), schema, manager.configure_writer("test"), encoding_stats{}).get();
sst->load(schema->get_sharder(), {}).get();
testlog.info("mutations written to : {}", sst->get_filename());
// The sstable should not contain the row with the bad key -- that should be passed to the corrupt_data_handler.
assert_that(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice()))
.produces(generated_mutations);
const auto stats_after = corrupt_data_handler.get_stats();
for (const uint64_t db::corrupt_data_handler::stats::* member : {
&db::corrupt_data_handler::stats::corrupt_data_reported,
&db::corrupt_data_handler::stats::corrupt_data_recorded,
&db::corrupt_data_handler::stats::corrupt_clustering_rows_reported,
&db::corrupt_data_handler::stats::corrupt_clustering_rows_recorded}) {
BOOST_REQUIRE_EQUAL(stats_after.*member, stats_before.*member + 1);
}
auto res = env.execute_cql(format("SELECT * FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'",
db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get();
assert_that(res)
.is_rows()
.with_size(1)
.with_columns_of_row(0)
.with_typed_column<timeuuid_native_type>("id", [] (const timeuuid_native_type& v) { return !v.uuid.is_null(); })
.with_typed_column<bytes>("partition_key", [&] (const bytes& v) {
return partition_key::from_bytes(v).equal(*schema, dk.key());
})
.with_typed_column<bytes>("clustering_key", [&] (const bytes* v) {
if (!v) {
return ck.is_empty();
}
return clustering_key::from_bytes(*v).equal(*schema, ck);
})
.with_typed_column<sstring>("mutation_fragment_kind", "clustering row")
.with_typed_column<bytes>("frozen_mutation_fragment", [&] (const bytes& v) {
bytes_ostream fmf_bytes;
fmf_bytes.write(v);
frozen_mutation_fragment_v2 fmf(std::move(fmf_bytes));
const auto unfreezed_mutation_fragment = fmf.unfreeze(*schema, permit);
return unfreezed_mutation_fragment.equal(*schema, original_mutation_fragment);
})
.with_typed_column<sstring>("origin", "sstable-write")
.with_typed_column<sstring>("sstable_name", fmt::to_string(sst->get_filename()))
;
// Clear the corrupt data table so that it doesn't affect other checks.
env.execute_cql(format("DELETE FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'",
db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get();
};
check(clustering_key::make_empty());
if (schema->clustering_key_size() > 1) {
auto full_ckey = random_schema.make_ckey(0);
full_ckey.erase(full_ckey.end() - 1);
check(clustering_key::from_exploded(*schema, full_ckey));
}
});
}