Merge "Support md format" from Benny

"
This series adds support for the "md" sstable format.

Support is based on the following:

* do not use clustering based filtering in the presence
  of static row, tombstones.
* Disabling min/max column names in the metadata for
  formats older than "md".
* When updating the metadata, reset and disable min/max
  in the presence of range tombstones (like Cassandra does
  and until we process them accurately).
* Fix the way we maintain min/max column names by:
  keeping whole clustering key prefixes as min/max
  rather than calculating min/max independently for
  each component, like Cassandra does in the "md" format.

Fixes #4442

Tests: unit(dev), cql_query_test -t test_clustering_filtering* (debug)
md migration_test dtest from git@github.com:bhalevy/scylla-dtest.git migration_test-md-v1
"

* tag 'md-format-v4' of github.com:bhalevy/scylla: (27 commits)
  config: enable_sstables_md_format by default
  test: cql_query_test: add test_clustering_filtering unit tests
  table: filter_sstable_for_reader: allow clustering filtering md-format sstables
  table: create_single_key_sstable_reader: emit partition_start/end for empty filtered results
  table: filter_sstable_for_reader: adjust to md-format
  table: filter_sstable_for_reader: include non-scylla sstables with tombstones
  table: filter_sstable_for_reader: do not filter if static column is requested
  table: filter_sstable_for_reader: refactor clustering filtering conditional expression
  features: add MD_SSTABLE_FORMAT cluster feature
  config: add enable_sstables_md_format
  database: add set_format_by_config
  test: sstable_3_x_test: test both mc and md versions
  test: Add support for the "md" format
  sstables: mx/writer: use version from sstable for write calls
  sstables: mx/writer: update_min_max_components for partition tombstone
  sstables: metadata_collector: support min_max_components for range tombstones
  sstable: validate_min_max_metadata: drop outdated logic
  sstables: rename mc folder to mx
  sstables: may_contain_rows: always true for old formats
  sstables: add may_contain_rows
  ...
This commit is contained in:
Avi Kivity
2020-08-11 13:17:24 +03:00
119 changed files with 1347 additions and 592 deletions

View File

@@ -2468,7 +2468,7 @@
"version":{
"type":"string",
"enum":[
"ka", "la", "mc"
"ka", "la", "mc", "md"
],
"description":"SSTable version"
},

View File

@@ -545,7 +545,7 @@ scylla_core = (['database.cc',
'sstables/mp_row_consumer.cc',
'sstables/sstables.cc',
'sstables/sstables_manager.cc',
'sstables/mc/writer.cc',
'sstables/mx/writer.cc',
'sstables/sstable_version.cc',
'sstables/compress.cc',
'sstables/partition.cc',
@@ -560,6 +560,7 @@ scylla_core = (['database.cc',
'sstables/m_format_read_helpers.cc',
'sstables/sstable_directory.cc',
'sstables/random_access_reader.cc',
'sstables/metadata_collector.cc',
'transport/cql_protocol_extension.cc',
'transport/event.cc',
'transport/event_notifier.cc',

View File

@@ -544,6 +544,16 @@ void database::set_format(sstables::sstable_version_types format) {
get_system_sstables_manager().set_format(format);
}
void database::set_format_by_config() {
if (_cfg.enable_sstables_md_format()) {
set_format(sstables::sstable_version_types::md);
} else if (_cfg.enable_sstables_mc_format()) {
set_format(sstables::sstable_version_types::mc);
} else {
set_format(sstables::sstable_version_types::la);
}
}
database::~database() {
_read_concurrency_sem.clear_inactive_reads();
_streaming_concurrency_sem.clear_inactive_reads();

View File

@@ -1534,6 +1534,7 @@ public:
}
void set_format(sstables::sstable_version_types format);
void set_format_by_config();
future<> flush_all_memtables();

View File

@@ -715,6 +715,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, cpu_scheduler(this, "cpu_scheduler", value_status::Used, true, "Enable cpu scheduling")
, view_building(this, "view_building", value_status::Used, true, "Enable view building; should only be set to false when the node is experience issues due to view building")
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Used, true, "Enable SSTables 'mc' format to be used as the default file format")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format (requires enable_sstables_mc_format)")
, enable_dangerous_direct_import_of_cassandra_counters(this, "enable_dangerous_direct_import_of_cassandra_counters", value_status::Used, false, "Only turn this option on if you want to import tables from Cassandra containing counters, and you are SURE that no counters in that table were created in a version earlier than Cassandra 2.1."
" It is not enough to have ever since upgraded to newer versions of Cassandra. If you EVER used a version earlier than 2.1 in the cluster where these SSTables come from, DO NOT TURN ON THIS OPTION! You will corrupt your data. You have been warned.")
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")

View File

@@ -299,6 +299,7 @@ public:
named_value<bool> cpu_scheduler;
named_value<bool> view_building;
named_value<bool> enable_sstables_mc_format;
named_value<bool> enable_sstables_md_format;
named_value<bool> enable_dangerous_direct_import_of_cassandra_counters;
named_value<bool> enable_shard_aware_drivers;
named_value<bool> enable_ipv6_dns_lookup;

View File

@@ -45,8 +45,9 @@ sstables_format_selector::sstables_format_selector(gms::gossiper& g, sharded<gms
: _gossiper(g)
, _features(f)
, _db(db)
, _mc_feature_listener(*this, sstables::sstable_version_types::mc) {
}
, _mc_feature_listener(*this, sstables::sstable_version_types::mc)
, _md_feature_listener(*this, sstables::sstable_version_types::md)
{ }
future<> sstables_format_selector::maybe_select_format(sstables::sstable_version_types new_format) {
return with_gate(_sel, [this, new_format] {
@@ -75,6 +76,7 @@ future<> sstables_format_selector::start() {
assert(this_shard_id() == 0);
return read_sstables_format().then([this] {
_features.local().cluster_supports_mc_sstable().when_enabled(_mc_feature_listener);
_features.local().cluster_supports_md_sstable().when_enabled(_md_feature_listener);
return make_ready_future<>();
});
}

View File

@@ -62,6 +62,7 @@ class sstables_format_selector {
seastar::gate _sel;
feature_enabled_listener _mc_feature_listener;
feature_enabled_listener _md_feature_listener;
sstables::sstable_version_types _selected_format = sstables::sstable_version_types::la;
future<> select_format(sstables::sstable_version_types new_format);

View File

@@ -128,6 +128,7 @@ extern const std::string_view ROLES;
extern const std::string_view LA_SSTABLE;
extern const std::string_view STREAM_WITH_RPC_STREAM;
extern const std::string_view MC_SSTABLE;
extern const std::string_view MD_SSTABLE;
extern const std::string_view ROW_LEVEL_REPAIR;
extern const std::string_view TRUNCATION_TABLE;
extern const std::string_view CORRECT_STATIC_COMPACT_IN_MC;

View File

@@ -43,6 +43,7 @@ constexpr std::string_view features::ROLES = "ROLES";
constexpr std::string_view features::LA_SSTABLE = "LA_SSTABLE_FORMAT";
constexpr std::string_view features::STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM";
constexpr std::string_view features::MC_SSTABLE = "MC_SSTABLE_FORMAT";
constexpr std::string_view features::MD_SSTABLE = "MD_SSTABLE_FORMAT";
constexpr std::string_view features::ROW_LEVEL_REPAIR = "ROW_LEVEL_REPAIR";
constexpr std::string_view features::TRUNCATION_TABLE = "TRUNCATION_TABLE";
constexpr std::string_view features::CORRECT_STATIC_COMPACT_IN_MC = "CORRECT_STATIC_COMPACT_IN_MC";
@@ -78,6 +79,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _roles_feature(*this, features::ROLES)
, _stream_with_rpc_stream_feature(*this, features::STREAM_WITH_RPC_STREAM)
, _mc_sstable_feature(*this, features::MC_SSTABLE)
, _md_sstable_feature(*this, features::MD_SSTABLE)
, _row_level_repair_feature(*this, features::ROW_LEVEL_REPAIR)
, _truncation_table(*this, features::TRUNCATION_TABLE)
, _correct_static_compact_in_mc(*this, features::CORRECT_STATIC_COMPACT_IN_MC)
@@ -101,8 +103,16 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
fcfg._disabled_features = std::move(disabled);
if (!cfg.enable_sstables_mc_format()) {
if (cfg.enable_sstables_md_format()) {
throw std::runtime_error(
"You must use both enable_sstables_mc_format and enable_sstables_md_format "
"to enable SSTables md format support");
}
fcfg._disabled_features.insert(sstring(gms::features::MC_SSTABLE));
}
if (!cfg.enable_sstables_md_format()) {
fcfg._disabled_features.insert(sstring(gms::features::MD_SSTABLE));
}
if (!cfg.enable_user_defined_functions()) {
fcfg._disabled_features.insert(sstring(gms::features::UDF));
} else {
@@ -177,6 +187,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::PER_TABLE_CACHING,
gms::features::LWT,
gms::features::MC_SSTABLE,
gms::features::MD_SSTABLE,
gms::features::UDF,
gms::features::CDC,
};
@@ -256,6 +267,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_roles_feature),
std::ref(_stream_with_rpc_stream_feature),
std::ref(_mc_sstable_feature),
std::ref(_md_sstable_feature),
std::ref(_row_level_repair_feature),
std::ref(_truncation_table),
std::ref(_correct_static_compact_in_mc),

View File

@@ -90,6 +90,7 @@ private:
gms::feature _roles_feature;
gms::feature _stream_with_rpc_stream_feature;
gms::feature _mc_sstable_feature;
gms::feature _md_sstable_feature;
gms::feature _row_level_repair_feature;
gms::feature _truncation_table;
gms::feature _correct_static_compact_in_mc;
@@ -165,6 +166,10 @@ public:
return _mc_sstable_feature;
}
const feature& cluster_supports_md_sstable() const {
return _md_sstable_feature;
}
const feature& cluster_supports_cdc() const {
return _cdc_feature;
}

View File

@@ -3699,6 +3699,7 @@ class scylla_features(gdb.Command):
"ROLES": true
"LA_SSTABLE_FORMAT": true
"MC_SSTABLE_FORMAT": true
"MD_SSTABLE_FORMAT": true
"STREAM_WITH_RPC_STREAM": true
"ROW_LEVEL_REPAIR": true
"TRUNCATION_TABLE": true

View File

@@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2015 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "schema_fwd.hh"
#include "compound_compat.hh"
#include <cmath>
#include <algorithm>
#include <vector>
class column_name_helper {
private:
static inline void may_grow(std::vector<bytes_opt>& v, size_t target_size) {
if (target_size > v.size()) {
v.resize(target_size);
}
}
public:
template <typename T>
static void min_max_components(const schema& schema, std::vector<bytes_opt>& min_seen, std::vector<bytes_opt>& max_seen, T components) {
may_grow(min_seen, schema.clustering_key_size());
may_grow(max_seen, schema.clustering_key_size());
auto& types = schema.clustering_key_type()->types();
auto i = 0U;
for (auto& value : components) {
auto& type = types[i];
if (!max_seen[i] || type->compare(value, max_seen[i].value()) > 0) {
max_seen[i] = bytes(value.data(), value.size());
}
if (!min_seen[i] || type->compare(value, min_seen[i].value()) < 0) {
min_seen[i] = bytes(value.data(), value.size());
}
i++;
}
}
};

View File

@@ -486,6 +486,7 @@ protected:
void setup_new_sstable(shared_sstable& sst) {
_info->new_sstables.push_back(sst);
_new_unused_sstables.push_back(sst);
sst->make_metadata_collector();
sst->get_metadata_collector().set_replay_position(_rp);
sst->get_metadata_collector().sstable_level(_sstable_level);
for (auto ancestor : _ancestors) {
@@ -1482,6 +1483,9 @@ get_fully_expired_sstables(column_family& cf, const std::vector<sstables::shared
// Get ancestors from metadata collector which is empty after restart. It works for this purpose because
// we only need to check that a sstable compacted *in this instance* hasn't an ancestor undeleted.
// Not getting it from sstable metadata because mc format hasn't it available.
if (!candidate->has_metadata_collector()) {
return false;
}
return boost::algorithm::any_of(candidate->get_metadata_collector().ancestors(), [&compacted_undeleted_gens] (auto gen) {
return compacted_undeleted_gens.count(gen);
});

View File

@@ -28,7 +28,7 @@
#include "column_translation.hh"
#include "m_format_read_helpers.hh"
#include "utils/overloaded_functor.hh"
#include "sstables/mc/parsers.hh"
#include "sstables/mx/parsers.hh"
namespace sstables {

View File

@@ -29,7 +29,7 @@
#include "sstables/prepended_input_stream.hh"
#include "tracing/traced_file.hh"
#include "sstables/scanning_clustered_index_cursor.hh"
#include "sstables/mc/bsearch_clustered_cursor.hh"
#include "sstables/mx/bsearch_clustered_cursor.hh"
namespace sstables {
@@ -376,7 +376,7 @@ class index_reader {
trace_state ? sst->filename(component_type::Index) : sstring(),
get_file(*sst, permit, trace_state),
get_file_input_stream_options(sst, pc), begin, end - begin,
(sst->get_version() == sstable_version_types::mc
(sst->get_version() >= sstable_version_types::mc
? std::make_optional(get_clustering_values_fixed_lengths(sst->get_serialization_header()))
: std::optional<column_values_fixed_lengths>{}),
trace_state)

View File

@@ -29,7 +29,7 @@
#include "sstables/types.hh"
#include "sstables/exceptions.hh"
#include "clustering_bounds_comparator.hh"
#include "sstables/mc/types.hh"
#include "sstables/mx/types.hh"
namespace sstables {

View File

@@ -0,0 +1,84 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "log.hh"
#include "metadata_collector.hh"
#include "range_tombstone.hh"
logging::logger mdclogger("metadata_collector");
namespace sstables {
void metadata_collector::convert(disk_array<uint32_t, disk_string<uint16_t>>& to, const std::optional<clustering_key_prefix>& from) {
if (!from) {
mdclogger.trace("{}: convert: empty", _name);
return;
}
mdclogger.trace("{}: convert: {}", _name, clustering_key_prefix::with_schema_wrapper(_schema, *from));
for (auto& value : from->components()) {
to.elements.push_back(disk_string<uint16_t>{bytes(value.data(), value.size())});
}
}
void metadata_collector::update_min_max_components(const clustering_key_prefix& key) {
if (!_min_clustering_key) {
mdclogger.trace("{}: initializing min/max clustering keys={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
_min_clustering_key.emplace(key);
_max_clustering_key.emplace(key);
return;
}
const bound_view::tri_compare cmp(_schema);
auto res = cmp(bound_view(key, bound_kind::incl_start), bound_view(*_min_clustering_key, bound_kind::incl_start));
if (res < 0) {
mdclogger.trace("{}: setting min_clustering_key={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
_min_clustering_key.emplace(key);
}
res = cmp(bound_view(key, bound_kind::incl_end), bound_view(*_max_clustering_key, bound_kind::incl_end));
if (res > 0) {
mdclogger.trace("{}: setting max_clustering_key={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
_max_clustering_key.emplace(key);
}
}
void metadata_collector::update_min_max_components(const range_tombstone& rt) {
const bound_view::tri_compare cmp(_schema);
if (!_min_clustering_key) {
mdclogger.trace("{}: initializing min_clustering_key to rt.start={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.start));
_min_clustering_key.emplace(rt.start);
} else if (cmp(rt.start_bound(), bound_view(*_min_clustering_key, bound_kind::incl_start)) < 0) {
mdclogger.trace("{}: updating min_clustering_key to rt.start={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.start));
_min_clustering_key.emplace(rt.start);
}
if (!_max_clustering_key) {
mdclogger.trace("{}: initializing max_clustering_key to rt.end={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.end));
_max_clustering_key.emplace(rt.end);
} else if (cmp(rt.end_bound(), bound_view(*_max_clustering_key, bound_kind::incl_end)) > 0) {
mdclogger.trace("{}: updating max_clustering_key to rt.end={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.end));
_max_clustering_key.emplace(rt.end);
}
}
} // namespace sstables

View File

@@ -46,8 +46,12 @@
#include "utils/murmur_hash.hh"
#include "hyperloglog.hh"
#include "db/commitlog/replay_position.hh"
#include "clustering_bounds_comparator.hh"
#include <algorithm>
class range_tombstone;
namespace sstables {
static constexpr int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
@@ -145,6 +149,8 @@ public:
return hll::HyperLogLog();
}
private:
const schema& _schema;
sstring _name;
// EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
utils::estimated_histogram _estimated_partition_size{150};
// EH of 114 can track a max value of 2395318855, i.e., > 2B cells
@@ -158,8 +164,8 @@ private:
std::set<int> _ancestors;
utils::streaming_histogram _estimated_tombstone_drop_time{TOMBSTONE_HISTOGRAM_BIN_SIZE};
int _sstable_level = 0;
std::vector<bytes_opt> _min_column_names;
std::vector<bytes_opt> _max_column_names;
std::optional<clustering_key_prefix> _min_clustering_key;
std::optional<clustering_key_prefix> _max_clustering_key;
bool _has_legacy_counter_shards = false;
uint64_t _columns_count = 0;
uint64_t _rows_count = 0;
@@ -172,20 +178,23 @@ private:
*/
hll::HyperLogLog _cardinality = hyperloglog(13, 25);
private:
/*
* Convert a vector of bytes into a disk array of disk_string<uint16_t>.
*/
static void convert(disk_array<uint32_t, disk_string<uint16_t>>&to, std::vector<bytes_opt>&& from) {
for (auto i = 0U; i < from.size(); i++) {
if (!from[i]) {
break;
}
disk_string<uint16_t> s;
s.value = std::move(from[i].value());
to.elements.push_back(std::move(s));
void convert(disk_array<uint32_t, disk_string<uint16_t>>&to, const std::optional<clustering_key_prefix>& from);
public:
explicit metadata_collector(const schema& schema, sstring name)
: _schema(schema)
, _name(name)
{
if (!schema.clustering_key_size()) {
// Empty min/max components represent the full range
// And so they will never be narrowed down.
update_min_max_components(clustering_key_prefix::make_empty(_schema));
}
}
public:
const schema& get_schema() {
return _schema;
}
void add_key(bytes_view key) {
long hashed = utils::murmur_hash::hash2_64(key, 0);
_cardinality.offer_hashed(hashed);
@@ -231,18 +240,14 @@ public:
_sstable_level = sstable_level;
}
std::vector<bytes_opt>& min_column_names() {
return _min_column_names;
}
std::vector<bytes_opt>& max_column_names() {
return _max_column_names;
}
void update_has_legacy_counter_shards(bool has_legacy_counter_shards) {
_has_legacy_counter_shards = _has_legacy_counter_shards || has_legacy_counter_shards;
}
void update_min_max_components(const clustering_key_prefix& key);
void update_min_max_components(const range_tombstone& rt);
void update(column_stats&& stats) {
_timestamp_tracker.update(stats.timestamp_tracker);
_local_deletion_time_tracker.update(stats.local_deletion_time_tracker);
@@ -277,8 +282,8 @@ public:
m.estimated_tombstone_drop_time = std::move(_estimated_tombstone_drop_time);
m.sstable_level = _sstable_level;
m.repaired_at = _repaired_at;
convert(m.min_column_names, std::move(_min_column_names));
convert(m.max_column_names, std::move(_max_column_names));
convert(m.min_column_names, _min_clustering_key);
convert(m.max_column_names, _max_clustering_key);
m.has_legacy_counter_shards = _has_legacy_counter_shards;
m.columns_count = _columns_count;
m.rows_count = _rows_count;

View File

@@ -19,14 +19,14 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sstables/mc/writer.hh"
#include "sstables/mx/writer.hh"
#include "sstables/writer.hh"
#include "encoding_stats.hh"
#include "schema.hh"
#include "mutation_fragment.hh"
#include "vint-serialization.hh"
#include "sstables/types.hh"
#include "sstables/mc/types.hh"
#include "sstables/mx/types.hh"
#include "db/config.hh"
#include "atomic_cell.hh"
#include "utils/exceptions.hh"
@@ -192,20 +192,20 @@ public:
template <typename W>
requires Writer<W>
static void write(W& out, const clustering_block& block) {
static void write(sstable_version_types v, W& out, const clustering_block& block) {
write_vint(out, block.header);
for (const auto& [value, type]: block.values) {
write_cell_value(out, type, value);
write_cell_value(v, out, type, value);
}
}
template <typename W>
requires Writer<W>
void write_clustering_prefix(W& out, const schema& s,
void write_clustering_prefix(sstable_version_types v, W& out, const schema& s,
const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full) {
clustering_blocks_input_range range{s, prefix, is_ephemerally_full};
for (const auto block: range) {
write(out, block);
write(v, out, block);
}
}
@@ -896,6 +896,9 @@ void writer::drain_tombstones(std::optional<position_in_partition_view> pos) {
position_in_partition::equal_compare eq{_schema};
while (auto mfo = get_next_rt()) {
range_tombstone rt {std::move(mfo)->as_range_tombstone()};
_sst.get_metadata_collector().update_min_max_components(rt);
bool need_write_start = true;
if (_end_open_marker) {
if (eq(_end_open_marker->position(), rt.position())) {
@@ -979,6 +982,10 @@ void writer::consume(tombstone t) {
_pi_write_m.tomb = t;
_tombstone_written = true;
if (t) {
_sst.get_metadata_collector().update_min_max_components(clustering_key_prefix::make_empty(_schema));
}
}
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
@@ -1033,14 +1040,14 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus
if (!is_deleted) {
assert(!cell.is_counter_update());
counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) {
write_counter_value(ccv, writer, sstable_version_types::mc, [] (bytes_ostream& out, uint32_t value) {
write_counter_value(ccv, writer, _sst.get_version(), [] (bytes_ostream& out, uint32_t value) {
return write_vint(out, value);
});
});
}
} else {
if (has_value) {
write_cell_value(writer, *cdef.type, cell.value());
write_cell_value(_sst.get_version(), writer, *cdef.type, cell.value());
}
}
@@ -1249,7 +1256,7 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_
write(_sst.get_version(), *_data_writer, ext_flags);
}
write_clustering_prefix(*_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
write_clustering_prefix(_sst.get_version(), *_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
write_vint(_tmp_bufs, prev_row_size);
write_row_body(_tmp_bufs, clustered_row, has_complex_deletion);
@@ -1259,10 +1266,7 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_
flush_tmp_bufs();
// Collect statistics
if (_schema.clustering_key_size()) {
column_name_helper::min_max_components(_schema, _sst.get_metadata_collector().min_column_names(),
_sst.get_metadata_collector().max_column_names(), clustered_row.key().components());
}
_sst.get_metadata_collector().update_min_max_components(clustered_row.key());
collect_row_stats(_data_writer->offset() - current_pos, &clustered_row.key());
}
@@ -1280,17 +1284,17 @@ stop_iteration writer::consume(clustering_row&& cr) {
// Write clustering prefix along with its bound kind and, if not full, its size
template <typename W>
requires Writer<W>
static void write_clustering_prefix(W& writer, bound_kind_m kind,
static void write_clustering_prefix(sstable_version_types v, W& writer, bound_kind_m kind,
const schema& s, const clustering_key_prefix& clustering) {
assert(kind != bound_kind_m::static_clustering);
write(sstable_version_types::mc, writer, kind);
write(v, writer, kind);
auto is_ephemerally_full = ephemerally_full_prefix{s.is_compact_table()};
if (kind != bound_kind_m::clustering) {
// Don't use ephemerally full for RT bounds as they're always non-full
is_ephemerally_full = ephemerally_full_prefix::no;
write(sstable_version_types::mc, writer, static_cast<uint16_t>(clustering.size(s)));
write(v, writer, static_cast<uint16_t>(clustering.size(s)));
}
write_clustering_prefix(writer, s, clustering, is_ephemerally_full);
write_clustering_prefix(v, writer, s, clustering, is_ephemerally_full);
}
void writer::write_promoted_index() {
@@ -1313,19 +1317,19 @@ void writer::write_pi_block(const pi_block& block) {
bytes_ostream& blocks = _pi_write_m.blocks;
uint32_t offset = blocks.size();
write(_sst.get_version(), _pi_write_m.offsets, offset);
write_clustering_prefix(blocks, block.first.kind, _schema, block.first.clustering);
write_clustering_prefix(blocks, block.last.kind, _schema, block.last.clustering);
write_clustering_prefix(_sst.get_version(), blocks, block.first.kind, _schema, block.first.clustering);
write_clustering_prefix(_sst.get_version(), blocks, block.last.kind, _schema, block.last.clustering);
write_vint(blocks, block.offset);
write_signed_vint(blocks, block.width - width_base);
write(_sst.get_version(), blocks, static_cast<std::byte>(block.open_marker ? 1 : 0));
if (block.open_marker) {
write(sstable_version_types::mc, blocks, to_deletion_time(*block.open_marker));
write(_sst.get_version(), blocks, to_deletion_time(*block.open_marker));
}
}
void writer::write_clustered(const rt_marker& marker, uint64_t prev_row_size) {
write(sstable_version_types::mc, *_data_writer, row_flags::is_marker);
write_clustering_prefix(*_data_writer, marker.kind, _schema, marker.clustering);
write(_sst.get_version(), *_data_writer, row_flags::is_marker);
write_clustering_prefix(_sst.get_version(), *_data_writer, marker.kind, _schema, marker.clustering);
auto write_marker_body = [this, &marker] (bytes_ostream& writer) {
write_delta_deletion_time(writer, marker.tomb);
_c_stats.update(marker.tomb);
@@ -1339,11 +1343,6 @@ void writer::write_clustered(const rt_marker& marker, uint64_t prev_row_size) {
write_marker_body(_tmp_bufs);
write_vint(*_data_writer, _tmp_bufs.size());
flush_tmp_bufs();
if (_schema.clustering_key_size()) {
column_name_helper::min_max_components(_schema, _sst.get_metadata_collector().min_column_names(),
_sst.get_metadata_collector().max_column_names(), marker.clustering.components());
}
}
void writer::consume(rt_marker&& marker) {

View File

@@ -480,7 +480,7 @@ void mp_row_consumer_reader::on_next_partition(dht::decorated_key key, tombstone
flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, reader_permit permit, const io_priority_class& pc,
streamed_mutation::forwarding fwd) {
get_stats().on_sstable_partition_read();
if (_version == version_types::mc) {
if (_version >= version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), std::move(permit), pc, tracing::trace_state_ptr(), fwd, default_read_monitor());
}
@@ -499,7 +499,7 @@ sstables::sstable::read_row_flat(schema_ptr schema,
read_monitor& mon)
{
get_stats().on_single_partition_read();
if (_version == version_types::mc) {
if (_version >= version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc,
std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
@@ -519,7 +519,7 @@ sstable::read_range_rows_flat(schema_ptr schema,
mutation_reader::forwarding fwd_mr,
read_monitor& mon) {
get_stats().on_range_partition_read();
if (_version == version_types::mc) {
if (_version >= version_types::mc) {
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
}

View File

@@ -51,6 +51,7 @@ sstable_version_constants::get_component_map(sstable_version_types version) {
case sstable_version_types::la:
return sstable_version_constants_k_l::_component_map;
case sstable_version_types::mc:
case sstable_version_types::md:
return sstable_version_constants_m::_component_map;
}
// Should never reach this.

View File

@@ -38,7 +38,7 @@
#include "dht/sharder.hh"
#include "types.hh"
#include "mc/writer.hh"
#include "sstables/mx/writer.hh"
#include "writer.hh"
#include "writer_impl.hh"
#include "m_format_read_helpers.hh"
@@ -199,6 +199,7 @@ std::unordered_map<sstable::version_types, sstring, enum_hash<sstable::version_t
{ sstable::version_types::ka , "ka" },
{ sstable::version_types::la , "la" },
{ sstable::version_types::mc , "mc" },
{ sstable::version_types::md , "md" },
};
std::unordered_map<sstable::format_types, sstring, enum_hash<sstable::format_types>> sstable::_format_string = {
@@ -663,7 +664,7 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
case metadata_type::Stats:
return parse<stats_metadata>(schema, v, in, s.contents[type]);
case metadata_type::Serialization:
if (v != sstable_version_types::mc) {
if (v < sstable_version_types::mc) {
throw std::runtime_error(
"Statistics is malformed: SSTable is in 2.x format but contains serialization header.");
} else {
@@ -1209,28 +1210,21 @@ void sstable::validate_min_max_metadata() {
}
// The min/max metadata is wrong if:
// 1) it's not empty and schema defines no clustering key.
// 2) their size differ.
// 3) column name is stored instead of clustering value.
// 4) clustering component is stored as composite.
if ((!_schema->clustering_key_size() && (min_column_names.size() || max_column_names.size())) ||
(min_column_names.size() != max_column_names.size())) {
// - it's not empty and schema defines no clustering key.
//
// Notes:
// - we are going to rely on min/max column names for
// clustering filtering only from md-format sstables,
// see sstable::may_contain_rows().
// We choose not to clear_incorrect_min_max_column_names
// from older versions here as this disturbs sstable unit tests.
//
// - now that we store min/max metadata for range tombstones,
// their size may differ.
if (!_schema->clustering_key_size()) {
clear_incorrect_min_max_column_names();
return;
}
for (auto i = 0U; i < min_column_names.size(); i++) {
if (_schema->get_column_definition(min_column_names[i].value) || _schema->get_column_definition(max_column_names[i].value)) {
clear_incorrect_min_max_column_names();
break;
}
if (_schema->is_compound() && _schema->clustering_key_size() > 1 && _schema->is_dense() &&
(composite_view(min_column_names[i].value).is_valid() || composite_view(max_column_names[i].value).is_valid())) {
clear_incorrect_min_max_column_names();
break;
}
}
}
void sstable::validate_max_local_deletion_time() {
@@ -1241,23 +1235,29 @@ void sstable::validate_max_local_deletion_time() {
}
}
void sstable::set_clustering_components_ranges() {
void sstable::set_position_range() {
if (!_schema->clustering_key_size()) {
return;
}
auto& min_column_names = get_stats_metadata().min_column_names.elements;
auto& max_column_names = get_stats_metadata().max_column_names.elements;
auto s = std::min(min_column_names.size(), max_column_names.size());
_clustering_components_ranges.reserve(s);
for (auto i = 0U; i < s; i++) {
auto r = nonwrapping_range<bytes_view>({{ min_column_names[i].value, true }}, {{ max_column_names[i].value, true }});
_clustering_components_ranges.push_back(std::move(r));
auto& min_elements = get_stats_metadata().min_column_names.elements;
auto& max_elements = get_stats_metadata().max_column_names.elements;
if (min_elements.empty() && max_elements.empty()) {
return;
}
}
const std::vector<nonwrapping_range<bytes_view>>& sstable::clustering_components_ranges() const {
return _clustering_components_ranges;
auto pip = [] (const utils::chunked_vector<disk_string<uint16_t>>& column_names, bound_kind kind) {
std::vector<bytes> key_bytes;
key_bytes.reserve(column_names.size());
for (auto& value : column_names) {
key_bytes.emplace_back(bytes_view(value));
}
auto ckp = clustering_key_prefix(std::move(key_bytes));
return position_in_partition(position_in_partition::range_tag_t(), kind, std::move(ckp));
};
_position_range = position_range(pip(min_elements, bound_kind::incl_start), pip(max_elements, bound_kind::incl_end));
}
double sstable::estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const {
@@ -1370,7 +1370,7 @@ future<> sstable::update_info_for_opened_data() {
}
return make_ready_future<>();
}).then([this] {
this->set_clustering_components_ranges();
this->set_position_range();
this->set_first_and_last_keys();
_run_identifier = _components->scylla_metadata->get_optional_run_identifier().value_or(utils::make_random_uuid());
@@ -1411,7 +1411,7 @@ future<> sstable::read_filter(const io_priority_class& pc) {
read_simple<component_type::Filter>(filter, pc).get();
auto nr_bits = filter.buckets.elements.size() * std::numeric_limits<typename decltype(filter.buckets.elements)::value_type>::digits;
large_bitset bs(nr_bits, std::move(filter.buckets.elements));
utils::filter_format format = (_version == sstable_version_types::mc)
utils::filter_format format = (_version >= sstable_version_types::mc)
? utils::filter_format::m_format
: utils::filter_format::k_l_format;
_components->filter = utils::filter::create_filter(filter.hashes, std::move(bs), format);
@@ -1760,10 +1760,7 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
maybe_write_row_marker(out, schema, clustered_row.marker(), clustering_key);
maybe_write_row_tombstone(out, clustering_key, clustered_row);
if (schema.clustering_key_size()) {
column_name_helper::min_max_components(schema, _collector.min_column_names(), _collector.max_column_names(),
clustered_row.key().components());
}
get_metadata_collector().update_min_max_components(clustered_row.key());
// Write all cells of a partition's row.
clustered_row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
@@ -2058,7 +2055,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key));
_sst._components->filter->add(bytes_view(*_partition_key));
_sst._collector.add_key(bytes_view(*_partition_key));
_sst.get_metadata_collector().add_key(bytes_view(*_partition_key));
auto p_key = disk_string_view<uint16_t>();
p_key.value = bytes_view(*_partition_key);
@@ -2172,7 +2169,7 @@ stop_iteration components_writer::consume_end_of_partition() {
_sst.get_large_data_handler().maybe_log_too_many_rows(_sst, *_partition_key, _sst._c_stats.rows_count);
// update is about merging column_stats with the data being stored by collector.
_sst._collector.update(std::move(_sst._c_stats));
_sst.get_metadata_collector().update(std::move(_sst._c_stats));
_sst._c_stats.reset();
if (!_first_key) {
@@ -2195,11 +2192,11 @@ void components_writer::consume_end_of_stream() {
_index.close();
if (_sst.has_component(component_type::CompressionInfo)) {
_sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
_sst.get_metadata_collector().add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
}
_sst.set_first_and_last_keys();
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector, _schema.get_partitioner().name(), _schema.bloom_filter_fp_chance(),
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(), _schema.get_partitioner().name(), _schema.bloom_filter_fp_chance(),
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key());
}
@@ -2250,6 +2247,13 @@ sstable::write_scylla_metadata(const io_priority_class& pc, shard_id shard, ssta
write_simple<component_type::Scylla>(*_components->scylla_metadata, pc);
}
void
sstable::make_metadata_collector() {
if (!_collector) {
_collector.emplace(*get_schema(), get_filename());
}
}
void sstable::update_stats_on_end_of_stream()
{
if (_c_stats.capped_local_deletion_time) {
@@ -2257,6 +2261,29 @@ void sstable::update_stats_on_end_of_stream()
}
}
bool sstable::may_contain_rows(const query::clustering_row_ranges& ranges) const {
if (_version < sstables::sstable_version_types::md) {
return true;
}
// Include sstables with tombstones that are not scylla's since
// they may contain partition tombstones that are not taken into
// account in min/max coloumn names metadata.
// We clear min/max metadata for partition tombstones so they
// will match as containing the rows we're looking for.
if (!has_scylla_component()) {
if (get_stats_metadata().estimated_tombstone_drop_time.bin.size()) {
return true;
}
}
return std::ranges::any_of(ranges, [this] (const query::clustering_range& range) {
return _position_range.overlaps(*_schema,
position_in_partition_view::for_range_start(range),
position_in_partition_view::for_range_end(range));
});
}
class sstable_writer_k_l : public sstable_writer::writer_impl {
bool _backup;
bool _leave_unsealed;
@@ -2379,7 +2406,8 @@ void sstable_writer_k_l::consume_end_of_stream()
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard) {
if (sst.get_version() == sstable_version_types::mc) {
sst.make_metadata_collector();
if (sst.get_version() >= sstable_version_types::mc) {
_impl = mc::make_writer(sst, s, estimated_partitions, cfg, enc_stats, pc, shard);
} else {
_impl = std::make_unique<sstable_writer_k_l>(sst, s, estimated_partitions, cfg, pc, shard);
@@ -2477,11 +2505,11 @@ future<> sstable::write_components(
encoding_stats stats,
const io_priority_class& pc) {
assert_large_data_handler_is_running();
if (cfg.replay_position) {
_collector.set_replay_position(cfg.replay_position.value());
}
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc);
if (cfg.replay_position) {
get_metadata_collector().set_replay_position(cfg.replay_position.value());
}
auto validator = mutation_fragment_stream_validating_filter(format("sstable writer {}", get_filename()), *schema,
cfg.validate_keys);
mr.consume_in_thread(std::move(wr), std::move(validator), db::no_timeout);
@@ -2539,7 +2567,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
auto sem = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
sem->make_permit(), s, trust_promoted_index::yes, *_schema, "", index_file, std::move(options), 0, index_size,
(_version == sstable_version_types::mc
(_version >= sstable_version_types::mc
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
: std::optional<column_values_fixed_lengths>{}));
return ctx->consume_input().finally([ctx] {
@@ -2638,6 +2666,7 @@ sstring sstable::component_basename(const sstring& ks, const sstring& cf, versio
case sstable::version_types::la:
return v + "-" + g + "-" + f + "-" + component;
case sstable::version_types::mc:
case sstable::version_types::md:
return v + "-" + g + "-" + f + "-" + component;
}
assert(0 && "invalid version");
@@ -2742,7 +2771,7 @@ future<> sstable::move_to_new_dir(sstring new_dir, int64_t new_generation, bool
}
entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname) {
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
static std::regex la_mx("(la|m[cd])-(\\d+)-(\\w+)-(.*)");
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
@@ -2759,7 +2788,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
sstlog.debug("Make descriptor sstdir: {}; fname: {}", sstdir, fname);
std::string s(fname);
if (std::regex_match(s, match, la_mc)) {
if (std::regex_match(s, match, la_mx)) {
std::string sdir(sstdir);
std::smatch dirmatch;
if (std::regex_match(sdir, dirmatch, dir)) {
@@ -2824,7 +2853,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
input_stream<char> stream;
if (_components->compression) {
if (_version == sstable_version_types::mc) {
if (_version >= sstable_version_types::mc) {
return make_compressed_file_m_format_input_stream(f, &_components->compression,
pos, len, std::move(options));
} else {

View File

@@ -61,6 +61,7 @@
#include "utils/observable.hh"
#include "sstables/shareable_components.hh"
#include "sstables/open_info.hh"
#include "query-request.hh"
#include <seastar/util/optimized_optional.hh>
#include <boost/intrusive/list.hpp>
@@ -305,7 +306,7 @@ public:
}
void add_ancestor(int64_t generation) {
_collector.add_ancestor(generation);
_collector->add_ancestor(generation);
}
// Returns true iff this sstable contains data which belongs to many shards.
@@ -408,8 +409,15 @@ public:
bool requires_view_building() const;
bool has_metadata_collector() const {
return _collector.has_value();
}
metadata_collector& get_metadata_collector() {
return _collector;
if (!_collector.has_value()) {
on_internal_error(sstlog, "No metadata collector");
}
return *_collector;
}
std::vector<std::pair<component_type, sstring>> all_components() const;
@@ -468,7 +476,7 @@ private:
bool _open = false;
// NOTE: _collector and _c_stats are used to generation of statistics file
// when writing a new sstable.
metadata_collector _collector;
std::optional<metadata_collector> _collector;
column_stats _c_stats;
file _index_file;
file _data_file;
@@ -477,7 +485,7 @@ private:
uint64_t _filter_file_size = 0;
uint64_t _bytes_on_disk = 0;
db_clock::time_point _data_file_write_time;
std::vector<nonwrapping_range<bytes_view>> _clustering_components_ranges;
position_range _position_range = position_range::all_clustered_rows();
std::vector<unsigned> _shards;
std::optional<dht::decorated_key> _first;
std::optional<dht::decorated_key> _last;
@@ -611,11 +619,10 @@ private:
void set_first_and_last_keys();
// Create one range for each clustering component of this sstable.
// Each range stores min and max value for that specific component.
// Create a position range based on the min/max_column_names metadata of this sstable.
// It does nothing if schema defines no clustering key, and it's supposed
// to be called when loading an existing sstable or after writing a new one.
void set_clustering_components_ranges();
void set_position_range();
future<> create_data() noexcept;
@@ -680,6 +687,8 @@ private:
}
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
void make_metadata_collector();
public:
future<> read_toc() noexcept;
@@ -715,7 +724,7 @@ public:
}
bool has_correct_max_deletion_time() const {
return (_version == sstable_version_types::mc) || has_scylla_component();
return (_version >= sstable_version_types::mc) || has_scylla_component();
}
bool filter_has_key(const key& key) const {
@@ -817,8 +826,6 @@ public:
return _components->summary;
}
const std::vector<nonwrapping_range<bytes_view>>& clustering_components_ranges() const;
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
// for cells expired before gc_before and regular tombstones older than gc_before.
double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const;
@@ -833,6 +840,13 @@ public:
void update_stats_on_end_of_stream();
bool has_correct_min_max_column_names() const noexcept {
return _version >= sstable_version_types::md;
}
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
bool may_contain_rows(const query::clustering_row_ranges& ranges) const;
// Allow the test cases from sstable_test.cc to test private methods. We use
// a placeholder to avoid cluttering this class too much. The sstable_test class
// will then re-export as public every method it needs.
@@ -842,6 +856,8 @@ public:
friend class sstable_writer_k_l;
friend class mc::writer;
friend class index_reader;
friend class sstable_writer;
friend class compaction;
template <typename DataConsumeRowsContext>
friend data_consume_context<DataConsumeRowsContext>
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t);

View File

@@ -28,7 +28,6 @@
#include "tombstone.hh"
#include "utils/streaming_histogram.hh"
#include "utils/estimated_histogram.hh"
#include "column_name_helper.hh"
#include "sstables/key.hh"
#include "db/commitlog/replay_position.hh"
#include "version.hh"
@@ -278,6 +277,7 @@ struct compaction_metadata : public metadata_base<compaction_metadata> {
auto describe_type(sstable_version_types v, Describer f) {
switch (v) {
case sstable_version_types::mc:
case sstable_version_types::md:
return f(
cardinality
);
@@ -319,6 +319,7 @@ struct stats_metadata : public metadata_base<stats_metadata> {
auto describe_type(sstable_version_types v, Describer f) {
switch (v) {
case sstable_version_types::mc:
case sstable_version_types::md:
return f(
estimated_partition_size,
estimated_cells_count,
@@ -389,6 +390,7 @@ struct serialization_header : public metadata_base<serialization_header> {
auto describe_type(sstable_version_types v, Describer f) {
switch (v) {
case sstable_version_types::mc:
case sstable_version_types::md:
return f(
min_timestamp_base,
min_local_deletion_time_base,

View File

@@ -27,13 +27,14 @@
namespace sstables {
enum class sstable_version_types { ka, la, mc };
enum class sstable_version_types { ka, la, mc, md };
enum class sstable_format_types { big };
inline std::array<sstable_version_types, 3> all_sstable_versions = {
inline std::array<sstable_version_types, 4> all_sstable_versions = {
sstable_version_types::ka,
sstable_version_types::la,
sstable_version_types::mc,
sstable_version_types::md,
};
inline sstable_version_types from_string(const seastar::sstring& format) {
@@ -46,6 +47,9 @@ inline sstable_version_types from_string(const seastar::sstring& format) {
if (format == "mc") {
return sstable_version_types::mc;
}
if (format == "md") {
return sstable_version_types::md;
}
throw std::invalid_argument("Wrong sstable format name: " + format);
}
@@ -54,14 +58,11 @@ inline seastar::sstring to_string(sstable_version_types format) {
case sstable_version_types::ka: return "ka";
case sstable_version_types::la: return "la";
case sstable_version_types::mc: return "mc";
case sstable_version_types::md: return "md";
}
throw std::runtime_error("Wrong sstable format");
}
inline bool is_latest_supported(sstable_version_types format) {
return format == sstable_version_types::mc;
}
inline int operator<=>(sstable_version_types a, sstable_version_types b) {
auto to_int = [] (sstable_version_types x) {
return static_cast<std::underlying_type_t<sstable_version_types>>(x);

View File

@@ -544,26 +544,26 @@ void write_column_name(sstable_version_types v, Writer& out, const schema& s, co
template <typename W>
requires Writer<W>
void write_cell_value(W& out, const abstract_type& type, bytes_view value) {
void write_cell_value(sstable_version_types v, W& out, const abstract_type& type, bytes_view value) {
if (!value.empty()) {
if (type.value_length_if_fixed()) {
write(sstable_version_types::mc, out, value);
write(v, out, value);
} else {
write_vint(out, value.size());
write(sstable_version_types::mc, out, value);
write(v, out, value);
}
}
}
template <typename W>
requires Writer<W>
void write_cell_value(W& out, const abstract_type& type, atomic_cell_value_view value) {
void write_cell_value(sstable_version_types v, W& out, const abstract_type& type, atomic_cell_value_view value) {
if (!value.empty()) {
if (!type.value_length_if_fixed()) {
write_vint(out, value.size_bytes());
}
using boost::range::for_each;
for_each(value, [&] (bytes_view fragment) { write(sstable_version_types::mc, out, fragment); });
for_each(value, [&] (bytes_view fragment) { write(v, out, fragment); });
}
}

157
table.cc
View File

@@ -40,8 +40,6 @@
#include "query-result-writer.hh"
#include "db/view/view.hh"
#include <seastar/core/seastar.hh>
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/algorithm/cxx11/any_of.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/adaptor/map.hpp>
#include "utils/error_injection.hh"
@@ -54,89 +52,10 @@ static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
// Stores ranges for all components of the same clustering key, index 0 referring to component
// range 0, and so on.
using ck_filter_clustering_key_components = std::vector<nonwrapping_range<bytes_view>>;
// Stores an entry for each clustering key range specified by the filter.
using ck_filter_clustering_key_ranges = std::vector<ck_filter_clustering_key_components>;
// Used to split a clustering key range into a range for each component.
// If a range in ck_filtering_all_ranges is composite, a range will be created
// for each component. If it's not composite, a single range is created.
// This split is needed to check for overlap in each component individually.
static ck_filter_clustering_key_ranges
ranges_for_clustering_key_filter(const schema_ptr& schema, const query::clustering_row_ranges& ck_filtering_all_ranges) {
ck_filter_clustering_key_ranges ranges;
for (auto& r : ck_filtering_all_ranges) {
// this vector stores a range for each component of a key, only one if not composite.
ck_filter_clustering_key_components composite_ranges;
if (r.is_full()) {
ranges.push_back({ nonwrapping_range<bytes_view>::make_open_ended_both_sides() });
continue;
}
auto start = r.start() ? r.start()->value().components() : clustering_key_prefix::make_empty().components();
auto end = r.end() ? r.end()->value().components() : clustering_key_prefix::make_empty().components();
auto start_it = start.begin();
auto end_it = end.begin();
// This test is enough because equal bounds in nonwrapping_range are inclusive.
auto is_singular = [&schema] (const auto& type_it, const bytes_view& b1, const bytes_view& b2) {
if (type_it == schema->clustering_key_type()->types().end()) {
throw std::runtime_error(format("clustering key filter passed more components than defined in schema of {}.{}",
schema->ks_name(), schema->cf_name()));
}
return (*type_it)->compare(b1, b2) == 0;
};
auto type_it = schema->clustering_key_type()->types().begin();
composite_ranges.reserve(schema->clustering_key_size());
// the rule is to ignore any component cn if another component ck (k < n) is not if the form [v, v].
// If we have [v1, v1], [v2, v2], ... {vl3, vr3}, ....
// then we generate [v1, v1], [v2, v2], ... {vl3, vr3}. Where { = '(' or '[', etc.
while (start_it != start.end() && end_it != end.end() && is_singular(type_it++, *start_it, *end_it)) {
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it++), true }},
{{ std::move(*end_it++), true }}));
}
// handle a single non-singular tail element, if present
if (start_it != start.end() && end_it != end.end()) {
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it), r.start()->is_inclusive() }},
{{ std::move(*end_it), r.end()->is_inclusive() }}));
} else if (start_it != start.end()) {
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it), r.start()->is_inclusive() }}, {}));
} else if (end_it != end.end()) {
composite_ranges.push_back(nonwrapping_range<bytes_view>({}, {{ std::move(*end_it), r.end()->is_inclusive() }}));
}
ranges.push_back(std::move(composite_ranges));
}
return ranges;
}
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
static inline bool
contains_rows(const sstables::sstable& sst, const schema_ptr& schema, const ck_filter_clustering_key_ranges& ranges) {
auto& clustering_key_types = schema->clustering_key_type()->types();
auto& clustering_components_ranges = sst.clustering_components_ranges();
if (!schema->clustering_key_size() || clustering_components_ranges.empty()) {
return true;
}
return boost::algorithm::any_of(ranges, [&] (const ck_filter_clustering_key_components& range) {
auto s = std::min(range.size(), clustering_components_ranges.size());
return boost::algorithm::all_of(boost::irange<unsigned>(0, s), [&] (unsigned i) {
auto& type = clustering_key_types[i];
return range[i].is_full() || range[i].overlaps(clustering_components_ranges[i], type->as_tri_comparator());
});
});
}
// Filter out sstables for reader using bloom filter and sstable metadata that keeps track
// of a range for each clustering component.
// Filter out sstables for reader using bloom filter
static std::vector<sstables::shared_sstable>
filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const dht::partition_range& pr, const sstables::key& key, const query::partition_slice& slice) {
filter_sstable_for_reader_by_pk(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const dht::partition_range& pr, const sstables::key& key) {
const dht::ring_position& pr_key = pr.start()->value();
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) {
return cmp(pr_key, sst->get_first_decorated_key()) < 0 ||
@@ -144,16 +63,21 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
!sst->filter_has_key(key);
};
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
return sstables;
}
// FIXME: Workaround for https://github.com/scylladb/scylla/issues/3552
// and https://github.com/scylladb/scylla/issues/3553
const bool filtering_broken = true;
// Filter out sstables for reader using sstable metadata that keeps track
// of a range for each clustering component.
static std::vector<sstables::shared_sstable>
filter_sstable_for_reader_by_ck(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const query::partition_slice& slice) {
// no clustering filtering is applied if schema defines no clustering key or
// compaction strategy thinks it will not benefit from such an optimization.
if (filtering_broken || !schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
return sstables;
// compaction strategy thinks it will not benefit from such an optimization,
// or the partition_slice includes static columns.
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
return sstables;
}
::cf_stats* stats = cf.cf_stats();
stats->clustering_filter_count++;
stats->sstables_checked_by_clustering_filter += sstables.size();
@@ -166,28 +90,11 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
}
auto ranges = ranges_for_clustering_key_filter(schema, ck_filtering_all_ranges);
if (ranges.empty()) {
return {};
}
int64_t min_timestamp = std::numeric_limits<int64_t>::max();
auto sstable_has_clustering_key = [&min_timestamp, &schema, &ranges] (const sstables::shared_sstable& sst) {
if (!contains_rows(*sst, schema, ranges)) {
return false; // ordered after sstables that contain clustering rows.
} else {
min_timestamp = std::min(min_timestamp, sst->get_stats_metadata().min_timestamp);
return true;
}
};
auto sstable_has_relevant_tombstone = [&min_timestamp] (const sstables::shared_sstable& sst) {
const auto& stats = sst->get_stats_metadata();
// re-add sstable as candidate if it contains a tombstone that may cover a row in an included sstable.
return (stats.max_timestamp > min_timestamp && stats.estimated_tombstone_drop_time.bin.size());
};
auto skipped = std::partition(sstables.begin(), sstables.end(), sstable_has_clustering_key);
auto actually_skipped = std::partition(skipped, sstables.end(), sstable_has_relevant_tombstone);
sstables.erase(actually_skipped, sstables.end());
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) {
return sst->may_contain_rows(ranges);
});
sstables.erase(skipped, sstables.end());
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
@@ -287,18 +194,34 @@ create_single_key_sstable_reader(column_family* cf,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto key = sstables::key::from_partition_key(*schema, *pr.start()->value().key());
auto& pk = pr.start()->value().key();
auto key = sstables::key::from_partition_key(*schema, *pk);
auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader(schema);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
})
);
if (readers.empty()) {
return make_empty_flat_reader(schema);
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
// we want to emit partition_start/end if no rows were found,
// to prevent https://github.com/scylladb/scylla/issues/3552.
//
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
// the partition_start/end pair and append it to the list of readers passed
// to make_combined_reader to ensure partition_start/end are emitted even if
// all sstables actually containing the partition were filtered.
auto num_readers = readers.size();
if (num_readers != num_sstables) {
readers.push_back(flat_mutation_reader_from_mutations({mutation(schema, *pk)}, slice, fwd));
}
sstable_histogram.add(readers.size());
sstable_histogram.add(num_readers);
return make_combined_reader(schema, std::move(readers), fwd, fwd_mr);
}

View File

@@ -4653,3 +4653,84 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
}
}, std::move(cfg)).get();
}
// reproduces https://github.com/scylladb/scylla/issues/3552
// when clustering-key filtering is enabled in filter_sstable_for_reader
static future<> test_clustering_filtering_with_compaction_strategy(const std::string_view& cs) {
auto db_config = make_shared<db::config>();
db_config->enable_sstables_md_format.set(true);
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
require_rows(e, "SELECT v FROM cf", {{T("a1")}});
}, cql_test_config(db_config));
}
SEASTAR_TEST_CASE(test_clustering_filtering) {
static std::array<std::string_view, 2> test_compaction_strategies = {
"SizeTieredCompactionStrategy",
"TimeWindowCompactionStrategy",
};
return do_for_each(test_compaction_strategies,
test_clustering_filtering_with_compaction_strategy);
}
static future<> test_clustering_filtering_2_with_compaction_strategy(const std::string_view& cs) {
auto db_config = make_shared<db::config>();
db_config->enable_sstables_md_format.set(true);
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('b', 2, 'b2')");
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
require_rows(e, "SELECT v FROM cf", {{T("a1")}, {T("b2")}});
}, cql_test_config(db_config));
}
SEASTAR_TEST_CASE(test_clustering_filtering_2) {
static std::array<std::string_view, 2> test_compaction_strategies = {
"SizeTieredCompactionStrategy",
"TimeWindowCompactionStrategy",
};
return do_for_each(test_compaction_strategies,
test_clustering_filtering_2_with_compaction_strategy);
}
static future<> test_clustering_filtering_3_with_compaction_strategy(const std::string_view& cs) {
auto db_config = make_shared<db::config>();
db_config->enable_sstables_md_format.set(true);
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
e.db().invoke_on_all([] (database& db) {
auto& table = db.find_column_family("ks", "cf");
table.disable_auto_compaction();
}).get();
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('b', 0, 'b0')");
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
require_rows(e, "SELECT v FROM cf", {{T("a1")}, {T("b0")}});
}, cql_test_config(db_config));
}
SEASTAR_TEST_CASE(test_clustering_filtering_3) {
static std::array<std::string_view, 2> test_compaction_strategies = {
"SizeTieredCompactionStrategy",
"TimeWindowCompactionStrategy",
};
return do_for_each(test_compaction_strategies,
test_clustering_filtering_3_with_compaction_strategy);
}

View File

@@ -563,8 +563,7 @@ struct sst_factory {
sstables::shared_sstable operator()() {
auto sst = env.make_sstable(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
//sst->set_sstable_level(level);
sst->get_metadata_collector().sstable_level(level);
sst->set_sstable_level(level);
return sst;
}

File diff suppressed because it is too large Load Diff

View File

@@ -3282,10 +3282,13 @@ SEASTAR_TEST_CASE(test_promoted_index_read) {
static void check_min_max_column_names(const sstable_ptr& sst, std::vector<bytes> min_components, std::vector<bytes> max_components) {
const auto& st = sst->get_stats_metadata();
BOOST_TEST_MESSAGE(fmt::format("min {}/{} max {}/{}", st.min_column_names.elements.size(), min_components.size(), st.max_column_names.elements.size(), max_components.size()));
BOOST_REQUIRE(st.min_column_names.elements.size() == min_components.size());
BOOST_REQUIRE(st.min_column_names.elements.size() == st.max_column_names.elements.size());
for (auto i = 0U; i < st.min_column_names.elements.size(); i++) {
BOOST_REQUIRE(min_components[i] == st.min_column_names.elements[i].value);
}
BOOST_REQUIRE(st.max_column_names.elements.size() == max_components.size());
for (auto i = 0U; i < st.max_column_names.elements.size(); i++) {
BOOST_REQUIRE(max_components[i] == st.max_column_names.elements[i].value);
}
}
@@ -3357,6 +3360,47 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
test_min_max_clustering_key(s, {"key1"}, {{"a", "b"},
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with(schema_builder::compact_storage::yes)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} with compact storage version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"z\"}} max={{\"a\", \"a\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"a", "a"}, {"a", "z"}}, {"a", "z"}, {"a", "a"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"a\"}} max={{\"b\", \"z\"}} version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"b", "z"}, {"a", "a"}}, {"a", "a"}, {"b", "z"}, version);
}
{
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
@@ -3382,6 +3426,19 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
.build();
test_min_max_clustering_key(s, {"key1"}, {}, {}, {}, version);
}
if (version >= sstable_version_types::mc) {
{
auto s = schema_builder("ks", "cf")
.with(schema_builder::compact_storage::yes)
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\"}} max={{\"a\"}} with compact storage version={}", to_string(version)));
test_min_max_clustering_key(s, {"key1"}, {{"a", "z"}, {"a"}}, {"a"}, {"a"}, version);
}
}
}
});
}
@@ -3449,6 +3506,8 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
@@ -3458,8 +3517,8 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
sstables::sstlog.warn("Version {}", (int)version);
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
@@ -3471,6 +3530,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
@@ -3482,6 +3542,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
@@ -3501,6 +3562,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1"}, {"c1"});
}
{
@@ -3513,6 +3575,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
@@ -3527,6 +3590,535 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"a"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("a")),
clustering_key_prefix::from_single_value(*s, bytes("a")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"c1"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("c")),
clustering_key_prefix::from_single_value(*s, bytes("d")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c"}, {"d"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_single_value(*s, bytes("d")),
clustering_key_prefix::from_single_value(*s, bytes("z")),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1"}, {"z"});
}
}
if (version >= sstable_version_types::mc) {
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, clustering_key_prefix::make_empty(), tomb);
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 12, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 12, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
}
}
});
}
SEASTAR_TEST_CASE(sstable_composite_tombstone_metadata_check) {
return test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
auto tmp = tmpdir();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 2, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 3, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
mt->apply(std::move(m));
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mt->apply(std::move(m2));
auto sst = env.make_sstable(s, tmp.path().string(), 4, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply(tomb);
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 5, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 6, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "aa"}, {"z", "zz"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a"}, {"c1", "c2"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("aa")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "aa"}, {"c1", "zz"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "c2"}, {"z", "zz"});
}
}
if (version >= sstable_version_types::mc) {
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
}
}
});
}
SEASTAR_TEST_CASE(sstable_composite_reverse_tombstone_metadata_check) {
return test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
for (const auto version : all_sstable_versions) {
auto s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
.with_column("r1", int32_type)
.build();
auto tmp = tmpdir();
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
const column_definition& r1_col = *s->get_column_definition("r1");
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 2, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 3, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_delete(*s, c_key, tomb);
mt->apply(std::move(m));
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
mutation m2(s, key2);
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
mt->apply(std::move(m2));
auto sst = env.make_sstable(s, tmp.path().string(), 4, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply(tomb);
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 5, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 6, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "zz"}, {"a", "aa"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"a", "zz"}, {"c1", "c2"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "zz"}, {"c1"});
}
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
if (version >= sstable_version_types::mc) {
check_min_max_column_names(sst, {"c1", "zz"}, {"c1", "c2"});
}
}
if (version >= sstable_version_types::mc) {
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view::bottom(),
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {}, {"z"});
}
{
auto mt = make_lw_shared<memtable>(s);
mutation m(s, key);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
tombstone tomb(api::new_timestamp(), gc_clock::now());
range_tombstone rt(
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
bound_view::top(),
tomb);
m.partition().apply_delete(*s, std::move(rt));
mt->apply(std::move(m));
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
write_memtable_to_sstable_for_test(*mt, sst).get();
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
check_min_max_column_names(sst, {"a"}, {});
}
}
}
});
@@ -4467,7 +5059,7 @@ SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) {
// delete from ks.test where pk = 1 and ck = 2;
return test_env::do_with_async([] (test_env& env) {
for (const auto version : all_sstable_versions) {
if (version != sstables::sstable::version_types::mc) { // Does not apply to 'mc' format
if (version < sstable_version_types::mc) { // Applies only to formats older than 'm'
auto s = schema_builder("ks", "test")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)

View File

@@ -59,6 +59,7 @@
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/sstables-format-selector.hh"
using namespace std::chrono_literals;
@@ -476,6 +477,10 @@ public:
db.stop().get();
});
db.invoke_on_all([] (database& db) {
db.set_format_by_config();
}).get();
auto stop_ms_fd_gossiper = defer([] {
gms::get_gossiper().stop().get();
});

View File

@@ -211,7 +211,7 @@ int main(int argc, char** argv) {
("read-concurrency", bpo::value<unsigned>()->default_value(1), "Concurrency of reads, the amount of reads to fire at once")
("sstables", bpo::value<uint64_t>()->default_value(100), "")
("sstable-size", bpo::value<uint64_t>()->default_value(10000000), "")
("sstable-format", bpo::value<std::string>()->default_value("mc"), "Sstable format version to use during population")
("sstable-format", bpo::value<std::string>()->default_value("md"), "Sstable format version to use during population")
("collect-stats", "Enable collecting statistics.")
("stats-file", bpo::value<sstring>()->default_value("stats.csv"), "Store statistics in the specified file.")
("stats-period-ms", bpo::value<unsigned>()->default_value(100), "Tick period of the stats collection.")
@@ -229,10 +229,15 @@ int main(int argc, char** argv) {
db_cfg.virtual_dirty_soft_limit(1.0);
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
if (sstable_format_name == "mc") {
if (sstable_format_name == "md") {
db_cfg.enable_sstables_mc_format(true);
db_cfg.enable_sstables_md_format(true);
} else if (sstable_format_name == "mc") {
db_cfg.enable_sstables_mc_format(true);
db_cfg.enable_sstables_md_format(false);
} else if (sstable_format_name == "la") {
db_cfg.enable_sstables_mc_format(false);
db_cfg.enable_sstables_md_format(false);
} else {
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
}

View File

@@ -1774,7 +1774,7 @@ int main(int argc, char** argv) {
("test-case-duration", bpo::value<double>()->default_value(1), "Duration in seconds of a single test case (0 for a single run).")
("data-directory", bpo::value<sstring>()->default_value("./perf_large_partition_data"), "Data directory")
("output-directory", bpo::value<sstring>()->default_value("./perf_fast_forward_output"), "Results output directory (for 'json')")
("sstable-format", bpo::value<std::string>()->default_value("mc"), "Sstable format version to use during population")
("sstable-format", bpo::value<std::string>()->default_value("md"), "Sstable format version to use during population")
("use-binary-search-in-promoted-index", bpo::value<bool>()->default_value(true), "Use binary search based variant of the promoted index cursor")
("dump-all-results", "Write results of all iterations of all tests to text files in the output directory")
;
@@ -1815,10 +1815,15 @@ int main(int argc, char** argv) {
db_cfg.virtual_dirty_soft_limit(1.0); // prevent background memtable flushes.
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
if (sstable_format_name == "mc") {
if (sstable_format_name == "md") {
db_cfg.enable_sstables_mc_format(true);
db_cfg.enable_sstables_md_format(true);
} else if (sstable_format_name == "mc") {
db_cfg.enable_sstables_mc_format(true);
db_cfg.enable_sstables_md_format(false);
} else if (sstable_format_name == "la") {
db_cfg.enable_sstables_mc_format(false);
db_cfg.enable_sstables_md_format(false);
} else {
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
}

View File

@@ -0,0 +1,8 @@
Digest.crc32
Filter.db
Index.db
Statistics.db
Summary.db
Data.db
CompressionInfo.db
TOC.txt

View File

@@ -0,0 +1,8 @@
Digest.crc32
Filter.db
Index.db
Statistics.db
Summary.db
Data.db
CompressionInfo.db
TOC.txt

Binary file not shown.

After

Width:  |  Height:  |  Size: 141 KiB

View File

@@ -0,0 +1,8 @@
Digest.crc32
Filter.db
Index.db
Statistics.db
Summary.db
Data.db
CompressionInfo.db
TOC.txt

View File

@@ -0,0 +1,8 @@
CRC.db
Data.db
Digest.crc32
Index.db
TOC.txt
Statistics.db
Filter.db
Summary.db

View File

@@ -0,0 +1,8 @@
Statistics.db
Summary.db
CRC.db
TOC.txt
Filter.db
Digest.crc32
Data.db
Index.db

View File

@@ -0,0 +1,8 @@
Data.db
Digest.crc32
Index.db
TOC.txt
Filter.db
Statistics.db
Summary.db
CompressionInfo.db

View File

@@ -0,0 +1,8 @@
Statistics.db
Summary.db
CRC.db
TOC.txt
Filter.db
Digest.crc32
Data.db
Index.db

Some files were not shown because too many files have changed in this diff Show More