Merge "Support md format" from Benny
" This series adds support for the "md" sstable format. Support is based on the following: * do not use clustering based filtering in the presence of static row, tombstones. * Disabling min/max column names in the metadata for formats older than "md". * When updating the metadata, reset and disable min/max in the presence of range tombstones (like Cassandra does and until we process them accurately). * Fix the way we maintain min/max column names by: keeping whole clustering key prefixes as min/max rather than calculating min/max independently for each component, like Cassandra does in the "md" format. Fixes #4442 Tests: unit(dev), cql_query_test -t test_clustering_filtering* (debug) md migration_test dtest from git@github.com:bhalevy/scylla-dtest.git migration_test-md-v1 " * tag 'md-format-v4' of github.com:bhalevy/scylla: (27 commits) config: enable_sstables_md_format by default test: cql_query_test: add test_clustering_filtering unit tests table: filter_sstable_for_reader: allow clustering filtering md-format sstables table: create_single_key_sstable_reader: emit partition_start/end for empty filtered results table: filter_sstable_for_reader: adjust to md-format table: filter_sstable_for_reader: include non-scylla sstables with tombstones table: filter_sstable_for_reader: do not filter if static column is requested table: filter_sstable_for_reader: refactor clustering filtering conditional expression features: add MD_SSTABLE_FORMAT cluster feature config: add enable_sstables_md_format database: add set_format_by_config test: sstable_3_x_test: test both mc and md versions test: Add support for the "md" format sstables: mx/writer: use version from sstable for write calls sstables: mx/writer: update_min_max_components for partition tombstone sstables: metadata_collector: support min_max_components for range tombstones sstable: validate_min_max_metadata: drop outdated logic sstables: rename mc folder to mx sstables: may_contain_rows: always true for old formats sstables: add may_contain_rows ...
This commit is contained in:
@@ -2468,7 +2468,7 @@
|
||||
"version":{
|
||||
"type":"string",
|
||||
"enum":[
|
||||
"ka", "la", "mc"
|
||||
"ka", "la", "mc", "md"
|
||||
],
|
||||
"description":"SSTable version"
|
||||
},
|
||||
|
||||
@@ -545,7 +545,7 @@ scylla_core = (['database.cc',
|
||||
'sstables/mp_row_consumer.cc',
|
||||
'sstables/sstables.cc',
|
||||
'sstables/sstables_manager.cc',
|
||||
'sstables/mc/writer.cc',
|
||||
'sstables/mx/writer.cc',
|
||||
'sstables/sstable_version.cc',
|
||||
'sstables/compress.cc',
|
||||
'sstables/partition.cc',
|
||||
@@ -560,6 +560,7 @@ scylla_core = (['database.cc',
|
||||
'sstables/m_format_read_helpers.cc',
|
||||
'sstables/sstable_directory.cc',
|
||||
'sstables/random_access_reader.cc',
|
||||
'sstables/metadata_collector.cc',
|
||||
'transport/cql_protocol_extension.cc',
|
||||
'transport/event.cc',
|
||||
'transport/event_notifier.cc',
|
||||
|
||||
10
database.cc
10
database.cc
@@ -544,6 +544,16 @@ void database::set_format(sstables::sstable_version_types format) {
|
||||
get_system_sstables_manager().set_format(format);
|
||||
}
|
||||
|
||||
void database::set_format_by_config() {
|
||||
if (_cfg.enable_sstables_md_format()) {
|
||||
set_format(sstables::sstable_version_types::md);
|
||||
} else if (_cfg.enable_sstables_mc_format()) {
|
||||
set_format(sstables::sstable_version_types::mc);
|
||||
} else {
|
||||
set_format(sstables::sstable_version_types::la);
|
||||
}
|
||||
}
|
||||
|
||||
database::~database() {
|
||||
_read_concurrency_sem.clear_inactive_reads();
|
||||
_streaming_concurrency_sem.clear_inactive_reads();
|
||||
|
||||
@@ -1534,6 +1534,7 @@ public:
|
||||
}
|
||||
|
||||
void set_format(sstables::sstable_version_types format);
|
||||
void set_format_by_config();
|
||||
|
||||
future<> flush_all_memtables();
|
||||
|
||||
|
||||
@@ -715,6 +715,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, cpu_scheduler(this, "cpu_scheduler", value_status::Used, true, "Enable cpu scheduling")
|
||||
, view_building(this, "view_building", value_status::Used, true, "Enable view building; should only be set to false when the node is experience issues due to view building")
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Used, true, "Enable SSTables 'mc' format to be used as the default file format")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format (requires enable_sstables_mc_format)")
|
||||
, enable_dangerous_direct_import_of_cassandra_counters(this, "enable_dangerous_direct_import_of_cassandra_counters", value_status::Used, false, "Only turn this option on if you want to import tables from Cassandra containing counters, and you are SURE that no counters in that table were created in a version earlier than Cassandra 2.1."
|
||||
" It is not enough to have ever since upgraded to newer versions of Cassandra. If you EVER used a version earlier than 2.1 in the cluster where these SSTables come from, DO NOT TURN ON THIS OPTION! You will corrupt your data. You have been warned.")
|
||||
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")
|
||||
|
||||
@@ -299,6 +299,7 @@ public:
|
||||
named_value<bool> cpu_scheduler;
|
||||
named_value<bool> view_building;
|
||||
named_value<bool> enable_sstables_mc_format;
|
||||
named_value<bool> enable_sstables_md_format;
|
||||
named_value<bool> enable_dangerous_direct_import_of_cassandra_counters;
|
||||
named_value<bool> enable_shard_aware_drivers;
|
||||
named_value<bool> enable_ipv6_dns_lookup;
|
||||
|
||||
@@ -45,8 +45,9 @@ sstables_format_selector::sstables_format_selector(gms::gossiper& g, sharded<gms
|
||||
: _gossiper(g)
|
||||
, _features(f)
|
||||
, _db(db)
|
||||
, _mc_feature_listener(*this, sstables::sstable_version_types::mc) {
|
||||
}
|
||||
, _mc_feature_listener(*this, sstables::sstable_version_types::mc)
|
||||
, _md_feature_listener(*this, sstables::sstable_version_types::md)
|
||||
{ }
|
||||
|
||||
future<> sstables_format_selector::maybe_select_format(sstables::sstable_version_types new_format) {
|
||||
return with_gate(_sel, [this, new_format] {
|
||||
@@ -75,6 +76,7 @@ future<> sstables_format_selector::start() {
|
||||
assert(this_shard_id() == 0);
|
||||
return read_sstables_format().then([this] {
|
||||
_features.local().cluster_supports_mc_sstable().when_enabled(_mc_feature_listener);
|
||||
_features.local().cluster_supports_md_sstable().when_enabled(_md_feature_listener);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ class sstables_format_selector {
|
||||
seastar::gate _sel;
|
||||
|
||||
feature_enabled_listener _mc_feature_listener;
|
||||
feature_enabled_listener _md_feature_listener;
|
||||
|
||||
sstables::sstable_version_types _selected_format = sstables::sstable_version_types::la;
|
||||
future<> select_format(sstables::sstable_version_types new_format);
|
||||
|
||||
@@ -128,6 +128,7 @@ extern const std::string_view ROLES;
|
||||
extern const std::string_view LA_SSTABLE;
|
||||
extern const std::string_view STREAM_WITH_RPC_STREAM;
|
||||
extern const std::string_view MC_SSTABLE;
|
||||
extern const std::string_view MD_SSTABLE;
|
||||
extern const std::string_view ROW_LEVEL_REPAIR;
|
||||
extern const std::string_view TRUNCATION_TABLE;
|
||||
extern const std::string_view CORRECT_STATIC_COMPACT_IN_MC;
|
||||
|
||||
@@ -43,6 +43,7 @@ constexpr std::string_view features::ROLES = "ROLES";
|
||||
constexpr std::string_view features::LA_SSTABLE = "LA_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::STREAM_WITH_RPC_STREAM = "STREAM_WITH_RPC_STREAM";
|
||||
constexpr std::string_view features::MC_SSTABLE = "MC_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::MD_SSTABLE = "MD_SSTABLE_FORMAT";
|
||||
constexpr std::string_view features::ROW_LEVEL_REPAIR = "ROW_LEVEL_REPAIR";
|
||||
constexpr std::string_view features::TRUNCATION_TABLE = "TRUNCATION_TABLE";
|
||||
constexpr std::string_view features::CORRECT_STATIC_COMPACT_IN_MC = "CORRECT_STATIC_COMPACT_IN_MC";
|
||||
@@ -78,6 +79,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
|
||||
, _roles_feature(*this, features::ROLES)
|
||||
, _stream_with_rpc_stream_feature(*this, features::STREAM_WITH_RPC_STREAM)
|
||||
, _mc_sstable_feature(*this, features::MC_SSTABLE)
|
||||
, _md_sstable_feature(*this, features::MD_SSTABLE)
|
||||
, _row_level_repair_feature(*this, features::ROW_LEVEL_REPAIR)
|
||||
, _truncation_table(*this, features::TRUNCATION_TABLE)
|
||||
, _correct_static_compact_in_mc(*this, features::CORRECT_STATIC_COMPACT_IN_MC)
|
||||
@@ -101,8 +103,16 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
|
||||
fcfg._disabled_features = std::move(disabled);
|
||||
|
||||
if (!cfg.enable_sstables_mc_format()) {
|
||||
if (cfg.enable_sstables_md_format()) {
|
||||
throw std::runtime_error(
|
||||
"You must use both enable_sstables_mc_format and enable_sstables_md_format "
|
||||
"to enable SSTables md format support");
|
||||
}
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MC_SSTABLE));
|
||||
}
|
||||
if (!cfg.enable_sstables_md_format()) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MD_SSTABLE));
|
||||
}
|
||||
if (!cfg.enable_user_defined_functions()) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::UDF));
|
||||
} else {
|
||||
@@ -177,6 +187,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
|
||||
gms::features::PER_TABLE_CACHING,
|
||||
gms::features::LWT,
|
||||
gms::features::MC_SSTABLE,
|
||||
gms::features::MD_SSTABLE,
|
||||
gms::features::UDF,
|
||||
gms::features::CDC,
|
||||
};
|
||||
@@ -256,6 +267,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
|
||||
std::ref(_roles_feature),
|
||||
std::ref(_stream_with_rpc_stream_feature),
|
||||
std::ref(_mc_sstable_feature),
|
||||
std::ref(_md_sstable_feature),
|
||||
std::ref(_row_level_repair_feature),
|
||||
std::ref(_truncation_table),
|
||||
std::ref(_correct_static_compact_in_mc),
|
||||
|
||||
@@ -90,6 +90,7 @@ private:
|
||||
gms::feature _roles_feature;
|
||||
gms::feature _stream_with_rpc_stream_feature;
|
||||
gms::feature _mc_sstable_feature;
|
||||
gms::feature _md_sstable_feature;
|
||||
gms::feature _row_level_repair_feature;
|
||||
gms::feature _truncation_table;
|
||||
gms::feature _correct_static_compact_in_mc;
|
||||
@@ -165,6 +166,10 @@ public:
|
||||
return _mc_sstable_feature;
|
||||
}
|
||||
|
||||
const feature& cluster_supports_md_sstable() const {
|
||||
return _md_sstable_feature;
|
||||
}
|
||||
|
||||
const feature& cluster_supports_cdc() const {
|
||||
return _cdc_feature;
|
||||
}
|
||||
|
||||
@@ -3699,6 +3699,7 @@ class scylla_features(gdb.Command):
|
||||
"ROLES": true
|
||||
"LA_SSTABLE_FORMAT": true
|
||||
"MC_SSTABLE_FORMAT": true
|
||||
"MD_SSTABLE_FORMAT": true
|
||||
"STREAM_WITH_RPC_STREAM": true
|
||||
"ROW_LEVEL_REPAIR": true
|
||||
"TRUNCATION_TABLE": true
|
||||
|
||||
@@ -1,76 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "schema_fwd.hh"
|
||||
#include "compound_compat.hh"
|
||||
#include <cmath>
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
|
||||
class column_name_helper {
|
||||
private:
|
||||
static inline void may_grow(std::vector<bytes_opt>& v, size_t target_size) {
|
||||
if (target_size > v.size()) {
|
||||
v.resize(target_size);
|
||||
}
|
||||
}
|
||||
public:
|
||||
template <typename T>
|
||||
static void min_max_components(const schema& schema, std::vector<bytes_opt>& min_seen, std::vector<bytes_opt>& max_seen, T components) {
|
||||
may_grow(min_seen, schema.clustering_key_size());
|
||||
may_grow(max_seen, schema.clustering_key_size());
|
||||
|
||||
auto& types = schema.clustering_key_type()->types();
|
||||
auto i = 0U;
|
||||
for (auto& value : components) {
|
||||
auto& type = types[i];
|
||||
|
||||
if (!max_seen[i] || type->compare(value, max_seen[i].value()) > 0) {
|
||||
max_seen[i] = bytes(value.data(), value.size());
|
||||
}
|
||||
if (!min_seen[i] || type->compare(value, min_seen[i].value()) < 0) {
|
||||
min_seen[i] = bytes(value.data(), value.size());
|
||||
}
|
||||
i++;
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -486,6 +486,7 @@ protected:
|
||||
void setup_new_sstable(shared_sstable& sst) {
|
||||
_info->new_sstables.push_back(sst);
|
||||
_new_unused_sstables.push_back(sst);
|
||||
sst->make_metadata_collector();
|
||||
sst->get_metadata_collector().set_replay_position(_rp);
|
||||
sst->get_metadata_collector().sstable_level(_sstable_level);
|
||||
for (auto ancestor : _ancestors) {
|
||||
@@ -1482,6 +1483,9 @@ get_fully_expired_sstables(column_family& cf, const std::vector<sstables::shared
|
||||
// Get ancestors from metadata collector which is empty after restart. It works for this purpose because
|
||||
// we only need to check that a sstable compacted *in this instance* hasn't an ancestor undeleted.
|
||||
// Not getting it from sstable metadata because mc format hasn't it available.
|
||||
if (!candidate->has_metadata_collector()) {
|
||||
return false;
|
||||
}
|
||||
return boost::algorithm::any_of(candidate->get_metadata_collector().ancestors(), [&compacted_undeleted_gens] (auto gen) {
|
||||
return compacted_undeleted_gens.count(gen);
|
||||
});
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
#include "column_translation.hh"
|
||||
#include "m_format_read_helpers.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "sstables/mc/parsers.hh"
|
||||
#include "sstables/mx/parsers.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
#include "sstables/prepended_input_stream.hh"
|
||||
#include "tracing/traced_file.hh"
|
||||
#include "sstables/scanning_clustered_index_cursor.hh"
|
||||
#include "sstables/mc/bsearch_clustered_cursor.hh"
|
||||
#include "sstables/mx/bsearch_clustered_cursor.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -376,7 +376,7 @@ class index_reader {
|
||||
trace_state ? sst->filename(component_type::Index) : sstring(),
|
||||
get_file(*sst, permit, trace_state),
|
||||
get_file_input_stream_options(sst, pc), begin, end - begin,
|
||||
(sst->get_version() == sstable_version_types::mc
|
||||
(sst->get_version() >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(sst->get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}),
|
||||
trace_state)
|
||||
|
||||
@@ -29,7 +29,7 @@
|
||||
#include "sstables/types.hh"
|
||||
#include "sstables/exceptions.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
#include "sstables/mc/types.hh"
|
||||
#include "sstables/mx/types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
|
||||
84
sstables/metadata_collector.cc
Normal file
84
sstables/metadata_collector.cc
Normal file
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (C) 2020 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "log.hh"
|
||||
#include "metadata_collector.hh"
|
||||
#include "range_tombstone.hh"
|
||||
|
||||
logging::logger mdclogger("metadata_collector");
|
||||
|
||||
namespace sstables {
|
||||
|
||||
void metadata_collector::convert(disk_array<uint32_t, disk_string<uint16_t>>& to, const std::optional<clustering_key_prefix>& from) {
|
||||
if (!from) {
|
||||
mdclogger.trace("{}: convert: empty", _name);
|
||||
return;
|
||||
}
|
||||
mdclogger.trace("{}: convert: {}", _name, clustering_key_prefix::with_schema_wrapper(_schema, *from));
|
||||
for (auto& value : from->components()) {
|
||||
to.elements.push_back(disk_string<uint16_t>{bytes(value.data(), value.size())});
|
||||
}
|
||||
}
|
||||
|
||||
void metadata_collector::update_min_max_components(const clustering_key_prefix& key) {
|
||||
if (!_min_clustering_key) {
|
||||
mdclogger.trace("{}: initializing min/max clustering keys={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
|
||||
_min_clustering_key.emplace(key);
|
||||
_max_clustering_key.emplace(key);
|
||||
return;
|
||||
}
|
||||
|
||||
const bound_view::tri_compare cmp(_schema);
|
||||
|
||||
auto res = cmp(bound_view(key, bound_kind::incl_start), bound_view(*_min_clustering_key, bound_kind::incl_start));
|
||||
if (res < 0) {
|
||||
mdclogger.trace("{}: setting min_clustering_key={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
|
||||
_min_clustering_key.emplace(key);
|
||||
}
|
||||
|
||||
res = cmp(bound_view(key, bound_kind::incl_end), bound_view(*_max_clustering_key, bound_kind::incl_end));
|
||||
if (res > 0) {
|
||||
mdclogger.trace("{}: setting max_clustering_key={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, key));
|
||||
_max_clustering_key.emplace(key);
|
||||
}
|
||||
}
|
||||
|
||||
void metadata_collector::update_min_max_components(const range_tombstone& rt) {
|
||||
const bound_view::tri_compare cmp(_schema);
|
||||
|
||||
if (!_min_clustering_key) {
|
||||
mdclogger.trace("{}: initializing min_clustering_key to rt.start={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.start));
|
||||
_min_clustering_key.emplace(rt.start);
|
||||
} else if (cmp(rt.start_bound(), bound_view(*_min_clustering_key, bound_kind::incl_start)) < 0) {
|
||||
mdclogger.trace("{}: updating min_clustering_key to rt.start={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.start));
|
||||
_min_clustering_key.emplace(rt.start);
|
||||
}
|
||||
|
||||
if (!_max_clustering_key) {
|
||||
mdclogger.trace("{}: initializing max_clustering_key to rt.end={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.end));
|
||||
_max_clustering_key.emplace(rt.end);
|
||||
} else if (cmp(rt.end_bound(), bound_view(*_max_clustering_key, bound_kind::incl_end)) > 0) {
|
||||
mdclogger.trace("{}: updating max_clustering_key to rt.end={}", _name, clustering_key_prefix::with_schema_wrapper(_schema, rt.end));
|
||||
_max_clustering_key.emplace(rt.end);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace sstables
|
||||
@@ -46,8 +46,12 @@
|
||||
#include "utils/murmur_hash.hh"
|
||||
#include "hyperloglog.hh"
|
||||
#include "db/commitlog/replay_position.hh"
|
||||
#include "clustering_bounds_comparator.hh"
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
class range_tombstone;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
static constexpr int TOMBSTONE_HISTOGRAM_BIN_SIZE = 100;
|
||||
@@ -145,6 +149,8 @@ public:
|
||||
return hll::HyperLogLog();
|
||||
}
|
||||
private:
|
||||
const schema& _schema;
|
||||
sstring _name;
|
||||
// EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
|
||||
utils::estimated_histogram _estimated_partition_size{150};
|
||||
// EH of 114 can track a max value of 2395318855, i.e., > 2B cells
|
||||
@@ -158,8 +164,8 @@ private:
|
||||
std::set<int> _ancestors;
|
||||
utils::streaming_histogram _estimated_tombstone_drop_time{TOMBSTONE_HISTOGRAM_BIN_SIZE};
|
||||
int _sstable_level = 0;
|
||||
std::vector<bytes_opt> _min_column_names;
|
||||
std::vector<bytes_opt> _max_column_names;
|
||||
std::optional<clustering_key_prefix> _min_clustering_key;
|
||||
std::optional<clustering_key_prefix> _max_clustering_key;
|
||||
bool _has_legacy_counter_shards = false;
|
||||
uint64_t _columns_count = 0;
|
||||
uint64_t _rows_count = 0;
|
||||
@@ -172,20 +178,23 @@ private:
|
||||
*/
|
||||
hll::HyperLogLog _cardinality = hyperloglog(13, 25);
|
||||
private:
|
||||
/*
|
||||
* Convert a vector of bytes into a disk array of disk_string<uint16_t>.
|
||||
*/
|
||||
static void convert(disk_array<uint32_t, disk_string<uint16_t>>&to, std::vector<bytes_opt>&& from) {
|
||||
for (auto i = 0U; i < from.size(); i++) {
|
||||
if (!from[i]) {
|
||||
break;
|
||||
}
|
||||
disk_string<uint16_t> s;
|
||||
s.value = std::move(from[i].value());
|
||||
to.elements.push_back(std::move(s));
|
||||
void convert(disk_array<uint32_t, disk_string<uint16_t>>&to, const std::optional<clustering_key_prefix>& from);
|
||||
public:
|
||||
explicit metadata_collector(const schema& schema, sstring name)
|
||||
: _schema(schema)
|
||||
, _name(name)
|
||||
{
|
||||
if (!schema.clustering_key_size()) {
|
||||
// Empty min/max components represent the full range
|
||||
// And so they will never be narrowed down.
|
||||
update_min_max_components(clustering_key_prefix::make_empty(_schema));
|
||||
}
|
||||
}
|
||||
public:
|
||||
|
||||
const schema& get_schema() {
|
||||
return _schema;
|
||||
}
|
||||
|
||||
void add_key(bytes_view key) {
|
||||
long hashed = utils::murmur_hash::hash2_64(key, 0);
|
||||
_cardinality.offer_hashed(hashed);
|
||||
@@ -231,18 +240,14 @@ public:
|
||||
_sstable_level = sstable_level;
|
||||
}
|
||||
|
||||
std::vector<bytes_opt>& min_column_names() {
|
||||
return _min_column_names;
|
||||
}
|
||||
|
||||
std::vector<bytes_opt>& max_column_names() {
|
||||
return _max_column_names;
|
||||
}
|
||||
|
||||
void update_has_legacy_counter_shards(bool has_legacy_counter_shards) {
|
||||
_has_legacy_counter_shards = _has_legacy_counter_shards || has_legacy_counter_shards;
|
||||
}
|
||||
|
||||
void update_min_max_components(const clustering_key_prefix& key);
|
||||
|
||||
void update_min_max_components(const range_tombstone& rt);
|
||||
|
||||
void update(column_stats&& stats) {
|
||||
_timestamp_tracker.update(stats.timestamp_tracker);
|
||||
_local_deletion_time_tracker.update(stats.local_deletion_time_tracker);
|
||||
@@ -277,8 +282,8 @@ public:
|
||||
m.estimated_tombstone_drop_time = std::move(_estimated_tombstone_drop_time);
|
||||
m.sstable_level = _sstable_level;
|
||||
m.repaired_at = _repaired_at;
|
||||
convert(m.min_column_names, std::move(_min_column_names));
|
||||
convert(m.max_column_names, std::move(_max_column_names));
|
||||
convert(m.min_column_names, _min_clustering_key);
|
||||
convert(m.max_column_names, _max_clustering_key);
|
||||
m.has_legacy_counter_shards = _has_legacy_counter_shards;
|
||||
m.columns_count = _columns_count;
|
||||
m.rows_count = _rows_count;
|
||||
|
||||
@@ -19,14 +19,14 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sstables/mc/writer.hh"
|
||||
#include "sstables/mx/writer.hh"
|
||||
#include "sstables/writer.hh"
|
||||
#include "encoding_stats.hh"
|
||||
#include "schema.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "vint-serialization.hh"
|
||||
#include "sstables/types.hh"
|
||||
#include "sstables/mc/types.hh"
|
||||
#include "sstables/mx/types.hh"
|
||||
#include "db/config.hh"
|
||||
#include "atomic_cell.hh"
|
||||
#include "utils/exceptions.hh"
|
||||
@@ -192,20 +192,20 @@ public:
|
||||
|
||||
template <typename W>
|
||||
requires Writer<W>
|
||||
static void write(W& out, const clustering_block& block) {
|
||||
static void write(sstable_version_types v, W& out, const clustering_block& block) {
|
||||
write_vint(out, block.header);
|
||||
for (const auto& [value, type]: block.values) {
|
||||
write_cell_value(out, type, value);
|
||||
write_cell_value(v, out, type, value);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename W>
|
||||
requires Writer<W>
|
||||
void write_clustering_prefix(W& out, const schema& s,
|
||||
void write_clustering_prefix(sstable_version_types v, W& out, const schema& s,
|
||||
const clustering_key_prefix& prefix, ephemerally_full_prefix is_ephemerally_full) {
|
||||
clustering_blocks_input_range range{s, prefix, is_ephemerally_full};
|
||||
for (const auto block: range) {
|
||||
write(out, block);
|
||||
write(v, out, block);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,6 +896,9 @@ void writer::drain_tombstones(std::optional<position_in_partition_view> pos) {
|
||||
position_in_partition::equal_compare eq{_schema};
|
||||
while (auto mfo = get_next_rt()) {
|
||||
range_tombstone rt {std::move(mfo)->as_range_tombstone()};
|
||||
|
||||
_sst.get_metadata_collector().update_min_max_components(rt);
|
||||
|
||||
bool need_write_start = true;
|
||||
if (_end_open_marker) {
|
||||
if (eq(_end_open_marker->position(), rt.position())) {
|
||||
@@ -979,6 +982,10 @@ void writer::consume(tombstone t) {
|
||||
|
||||
_pi_write_m.tomb = t;
|
||||
_tombstone_written = true;
|
||||
|
||||
if (t) {
|
||||
_sst.get_metadata_collector().update_min_max_components(clustering_key_prefix::make_empty(_schema));
|
||||
}
|
||||
}
|
||||
|
||||
void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell,
|
||||
@@ -1033,14 +1040,14 @@ void writer::write_cell(bytes_ostream& writer, const clustering_key_prefix* clus
|
||||
if (!is_deleted) {
|
||||
assert(!cell.is_counter_update());
|
||||
counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) {
|
||||
write_counter_value(ccv, writer, sstable_version_types::mc, [] (bytes_ostream& out, uint32_t value) {
|
||||
write_counter_value(ccv, writer, _sst.get_version(), [] (bytes_ostream& out, uint32_t value) {
|
||||
return write_vint(out, value);
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (has_value) {
|
||||
write_cell_value(writer, *cdef.type, cell.value());
|
||||
write_cell_value(_sst.get_version(), writer, *cdef.type, cell.value());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1249,7 +1256,7 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_
|
||||
write(_sst.get_version(), *_data_writer, ext_flags);
|
||||
}
|
||||
|
||||
write_clustering_prefix(*_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
|
||||
write_clustering_prefix(_sst.get_version(), *_data_writer, _schema, clustered_row.key(), ephemerally_full_prefix{_schema.is_compact_table()});
|
||||
|
||||
write_vint(_tmp_bufs, prev_row_size);
|
||||
write_row_body(_tmp_bufs, clustered_row, has_complex_deletion);
|
||||
@@ -1259,10 +1266,7 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_
|
||||
flush_tmp_bufs();
|
||||
|
||||
// Collect statistics
|
||||
if (_schema.clustering_key_size()) {
|
||||
column_name_helper::min_max_components(_schema, _sst.get_metadata_collector().min_column_names(),
|
||||
_sst.get_metadata_collector().max_column_names(), clustered_row.key().components());
|
||||
}
|
||||
_sst.get_metadata_collector().update_min_max_components(clustered_row.key());
|
||||
collect_row_stats(_data_writer->offset() - current_pos, &clustered_row.key());
|
||||
}
|
||||
|
||||
@@ -1280,17 +1284,17 @@ stop_iteration writer::consume(clustering_row&& cr) {
|
||||
// Write clustering prefix along with its bound kind and, if not full, its size
|
||||
template <typename W>
|
||||
requires Writer<W>
|
||||
static void write_clustering_prefix(W& writer, bound_kind_m kind,
|
||||
static void write_clustering_prefix(sstable_version_types v, W& writer, bound_kind_m kind,
|
||||
const schema& s, const clustering_key_prefix& clustering) {
|
||||
assert(kind != bound_kind_m::static_clustering);
|
||||
write(sstable_version_types::mc, writer, kind);
|
||||
write(v, writer, kind);
|
||||
auto is_ephemerally_full = ephemerally_full_prefix{s.is_compact_table()};
|
||||
if (kind != bound_kind_m::clustering) {
|
||||
// Don't use ephemerally full for RT bounds as they're always non-full
|
||||
is_ephemerally_full = ephemerally_full_prefix::no;
|
||||
write(sstable_version_types::mc, writer, static_cast<uint16_t>(clustering.size(s)));
|
||||
write(v, writer, static_cast<uint16_t>(clustering.size(s)));
|
||||
}
|
||||
write_clustering_prefix(writer, s, clustering, is_ephemerally_full);
|
||||
write_clustering_prefix(v, writer, s, clustering, is_ephemerally_full);
|
||||
}
|
||||
|
||||
void writer::write_promoted_index() {
|
||||
@@ -1313,19 +1317,19 @@ void writer::write_pi_block(const pi_block& block) {
|
||||
bytes_ostream& blocks = _pi_write_m.blocks;
|
||||
uint32_t offset = blocks.size();
|
||||
write(_sst.get_version(), _pi_write_m.offsets, offset);
|
||||
write_clustering_prefix(blocks, block.first.kind, _schema, block.first.clustering);
|
||||
write_clustering_prefix(blocks, block.last.kind, _schema, block.last.clustering);
|
||||
write_clustering_prefix(_sst.get_version(), blocks, block.first.kind, _schema, block.first.clustering);
|
||||
write_clustering_prefix(_sst.get_version(), blocks, block.last.kind, _schema, block.last.clustering);
|
||||
write_vint(blocks, block.offset);
|
||||
write_signed_vint(blocks, block.width - width_base);
|
||||
write(_sst.get_version(), blocks, static_cast<std::byte>(block.open_marker ? 1 : 0));
|
||||
if (block.open_marker) {
|
||||
write(sstable_version_types::mc, blocks, to_deletion_time(*block.open_marker));
|
||||
write(_sst.get_version(), blocks, to_deletion_time(*block.open_marker));
|
||||
}
|
||||
}
|
||||
|
||||
void writer::write_clustered(const rt_marker& marker, uint64_t prev_row_size) {
|
||||
write(sstable_version_types::mc, *_data_writer, row_flags::is_marker);
|
||||
write_clustering_prefix(*_data_writer, marker.kind, _schema, marker.clustering);
|
||||
write(_sst.get_version(), *_data_writer, row_flags::is_marker);
|
||||
write_clustering_prefix(_sst.get_version(), *_data_writer, marker.kind, _schema, marker.clustering);
|
||||
auto write_marker_body = [this, &marker] (bytes_ostream& writer) {
|
||||
write_delta_deletion_time(writer, marker.tomb);
|
||||
_c_stats.update(marker.tomb);
|
||||
@@ -1339,11 +1343,6 @@ void writer::write_clustered(const rt_marker& marker, uint64_t prev_row_size) {
|
||||
write_marker_body(_tmp_bufs);
|
||||
write_vint(*_data_writer, _tmp_bufs.size());
|
||||
flush_tmp_bufs();
|
||||
|
||||
if (_schema.clustering_key_size()) {
|
||||
column_name_helper::min_max_components(_schema, _sst.get_metadata_collector().min_column_names(),
|
||||
_sst.get_metadata_collector().max_column_names(), marker.clustering.components());
|
||||
}
|
||||
}
|
||||
|
||||
void writer::consume(rt_marker&& marker) {
|
||||
@@ -480,7 +480,7 @@ void mp_row_consumer_reader::on_next_partition(dht::decorated_key key, tombstone
|
||||
flat_mutation_reader sstable::read_rows_flat(schema_ptr schema, reader_permit permit, const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
get_stats().on_sstable_partition_read();
|
||||
if (_version == version_types::mc) {
|
||||
if (_version >= version_types::mc) {
|
||||
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
|
||||
shared_from_this(), std::move(schema), std::move(permit), pc, tracing::trace_state_ptr(), fwd, default_read_monitor());
|
||||
}
|
||||
@@ -499,7 +499,7 @@ sstables::sstable::read_row_flat(schema_ptr schema,
|
||||
read_monitor& mon)
|
||||
{
|
||||
get_stats().on_single_partition_read();
|
||||
if (_version == version_types::mc) {
|
||||
if (_version >= version_types::mc) {
|
||||
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
|
||||
shared_from_this(), std::move(schema), std::move(permit), std::move(key), slice, pc,
|
||||
std::move(trace_state), fwd, mutation_reader::forwarding::no, mon);
|
||||
@@ -519,7 +519,7 @@ sstable::read_range_rows_flat(schema_ptr schema,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
read_monitor& mon) {
|
||||
get_stats().on_range_partition_read();
|
||||
if (_version == version_types::mc) {
|
||||
if (_version >= version_types::mc) {
|
||||
return make_flat_mutation_reader<sstable_mutation_reader<data_consume_rows_context_m, mp_row_consumer_m>>(
|
||||
shared_from_this(), std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
|
||||
}
|
||||
|
||||
@@ -51,6 +51,7 @@ sstable_version_constants::get_component_map(sstable_version_types version) {
|
||||
case sstable_version_types::la:
|
||||
return sstable_version_constants_k_l::_component_map;
|
||||
case sstable_version_types::mc:
|
||||
case sstable_version_types::md:
|
||||
return sstable_version_constants_m::_component_map;
|
||||
}
|
||||
// Should never reach this.
|
||||
|
||||
@@ -38,7 +38,7 @@
|
||||
|
||||
#include "dht/sharder.hh"
|
||||
#include "types.hh"
|
||||
#include "mc/writer.hh"
|
||||
#include "sstables/mx/writer.hh"
|
||||
#include "writer.hh"
|
||||
#include "writer_impl.hh"
|
||||
#include "m_format_read_helpers.hh"
|
||||
@@ -199,6 +199,7 @@ std::unordered_map<sstable::version_types, sstring, enum_hash<sstable::version_t
|
||||
{ sstable::version_types::ka , "ka" },
|
||||
{ sstable::version_types::la , "la" },
|
||||
{ sstable::version_types::mc , "mc" },
|
||||
{ sstable::version_types::md , "md" },
|
||||
};
|
||||
|
||||
std::unordered_map<sstable::format_types, sstring, enum_hash<sstable::format_types>> sstable::_format_string = {
|
||||
@@ -663,7 +664,7 @@ future<> parse(const schema& schema, sstable_version_types v, random_access_read
|
||||
case metadata_type::Stats:
|
||||
return parse<stats_metadata>(schema, v, in, s.contents[type]);
|
||||
case metadata_type::Serialization:
|
||||
if (v != sstable_version_types::mc) {
|
||||
if (v < sstable_version_types::mc) {
|
||||
throw std::runtime_error(
|
||||
"Statistics is malformed: SSTable is in 2.x format but contains serialization header.");
|
||||
} else {
|
||||
@@ -1209,28 +1210,21 @@ void sstable::validate_min_max_metadata() {
|
||||
}
|
||||
|
||||
// The min/max metadata is wrong if:
|
||||
// 1) it's not empty and schema defines no clustering key.
|
||||
// 2) their size differ.
|
||||
// 3) column name is stored instead of clustering value.
|
||||
// 4) clustering component is stored as composite.
|
||||
if ((!_schema->clustering_key_size() && (min_column_names.size() || max_column_names.size())) ||
|
||||
(min_column_names.size() != max_column_names.size())) {
|
||||
// - it's not empty and schema defines no clustering key.
|
||||
//
|
||||
// Notes:
|
||||
// - we are going to rely on min/max column names for
|
||||
// clustering filtering only from md-format sstables,
|
||||
// see sstable::may_contain_rows().
|
||||
// We choose not to clear_incorrect_min_max_column_names
|
||||
// from older versions here as this disturbs sstable unit tests.
|
||||
//
|
||||
// - now that we store min/max metadata for range tombstones,
|
||||
// their size may differ.
|
||||
if (!_schema->clustering_key_size()) {
|
||||
clear_incorrect_min_max_column_names();
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto i = 0U; i < min_column_names.size(); i++) {
|
||||
if (_schema->get_column_definition(min_column_names[i].value) || _schema->get_column_definition(max_column_names[i].value)) {
|
||||
clear_incorrect_min_max_column_names();
|
||||
break;
|
||||
}
|
||||
|
||||
if (_schema->is_compound() && _schema->clustering_key_size() > 1 && _schema->is_dense() &&
|
||||
(composite_view(min_column_names[i].value).is_valid() || composite_view(max_column_names[i].value).is_valid())) {
|
||||
clear_incorrect_min_max_column_names();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sstable::validate_max_local_deletion_time() {
|
||||
@@ -1241,23 +1235,29 @@ void sstable::validate_max_local_deletion_time() {
|
||||
}
|
||||
}
|
||||
|
||||
void sstable::set_clustering_components_ranges() {
|
||||
void sstable::set_position_range() {
|
||||
if (!_schema->clustering_key_size()) {
|
||||
return;
|
||||
}
|
||||
auto& min_column_names = get_stats_metadata().min_column_names.elements;
|
||||
auto& max_column_names = get_stats_metadata().max_column_names.elements;
|
||||
|
||||
auto s = std::min(min_column_names.size(), max_column_names.size());
|
||||
_clustering_components_ranges.reserve(s);
|
||||
for (auto i = 0U; i < s; i++) {
|
||||
auto r = nonwrapping_range<bytes_view>({{ min_column_names[i].value, true }}, {{ max_column_names[i].value, true }});
|
||||
_clustering_components_ranges.push_back(std::move(r));
|
||||
auto& min_elements = get_stats_metadata().min_column_names.elements;
|
||||
auto& max_elements = get_stats_metadata().max_column_names.elements;
|
||||
|
||||
if (min_elements.empty() && max_elements.empty()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
const std::vector<nonwrapping_range<bytes_view>>& sstable::clustering_components_ranges() const {
|
||||
return _clustering_components_ranges;
|
||||
auto pip = [] (const utils::chunked_vector<disk_string<uint16_t>>& column_names, bound_kind kind) {
|
||||
std::vector<bytes> key_bytes;
|
||||
key_bytes.reserve(column_names.size());
|
||||
for (auto& value : column_names) {
|
||||
key_bytes.emplace_back(bytes_view(value));
|
||||
}
|
||||
auto ckp = clustering_key_prefix(std::move(key_bytes));
|
||||
return position_in_partition(position_in_partition::range_tag_t(), kind, std::move(ckp));
|
||||
};
|
||||
|
||||
_position_range = position_range(pip(min_elements, bound_kind::incl_start), pip(max_elements, bound_kind::incl_end));
|
||||
}
|
||||
|
||||
double sstable::estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const {
|
||||
@@ -1370,7 +1370,7 @@ future<> sstable::update_info_for_opened_data() {
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
this->set_clustering_components_ranges();
|
||||
this->set_position_range();
|
||||
this->set_first_and_last_keys();
|
||||
_run_identifier = _components->scylla_metadata->get_optional_run_identifier().value_or(utils::make_random_uuid());
|
||||
|
||||
@@ -1411,7 +1411,7 @@ future<> sstable::read_filter(const io_priority_class& pc) {
|
||||
read_simple<component_type::Filter>(filter, pc).get();
|
||||
auto nr_bits = filter.buckets.elements.size() * std::numeric_limits<typename decltype(filter.buckets.elements)::value_type>::digits;
|
||||
large_bitset bs(nr_bits, std::move(filter.buckets.elements));
|
||||
utils::filter_format format = (_version == sstable_version_types::mc)
|
||||
utils::filter_format format = (_version >= sstable_version_types::mc)
|
||||
? utils::filter_format::m_format
|
||||
: utils::filter_format::k_l_format;
|
||||
_components->filter = utils::filter::create_filter(filter.hashes, std::move(bs), format);
|
||||
@@ -1760,10 +1760,7 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
|
||||
maybe_write_row_marker(out, schema, clustered_row.marker(), clustering_key);
|
||||
maybe_write_row_tombstone(out, clustering_key, clustered_row);
|
||||
|
||||
if (schema.clustering_key_size()) {
|
||||
column_name_helper::min_max_components(schema, _collector.min_column_names(), _collector.max_column_names(),
|
||||
clustered_row.key().components());
|
||||
}
|
||||
get_metadata_collector().update_min_max_components(clustered_row.key());
|
||||
|
||||
// Write all cells of a partition's row.
|
||||
clustered_row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
|
||||
@@ -2058,7 +2055,7 @@ void components_writer::consume_new_partition(const dht::decorated_key& dk) {
|
||||
|
||||
maybe_add_summary_entry(dk.token(), bytes_view(*_partition_key));
|
||||
_sst._components->filter->add(bytes_view(*_partition_key));
|
||||
_sst._collector.add_key(bytes_view(*_partition_key));
|
||||
_sst.get_metadata_collector().add_key(bytes_view(*_partition_key));
|
||||
|
||||
auto p_key = disk_string_view<uint16_t>();
|
||||
p_key.value = bytes_view(*_partition_key);
|
||||
@@ -2172,7 +2169,7 @@ stop_iteration components_writer::consume_end_of_partition() {
|
||||
_sst.get_large_data_handler().maybe_log_too_many_rows(_sst, *_partition_key, _sst._c_stats.rows_count);
|
||||
|
||||
// update is about merging column_stats with the data being stored by collector.
|
||||
_sst._collector.update(std::move(_sst._c_stats));
|
||||
_sst.get_metadata_collector().update(std::move(_sst._c_stats));
|
||||
_sst._c_stats.reset();
|
||||
|
||||
if (!_first_key) {
|
||||
@@ -2195,11 +2192,11 @@ void components_writer::consume_end_of_stream() {
|
||||
_index.close();
|
||||
|
||||
if (_sst.has_component(component_type::CompressionInfo)) {
|
||||
_sst._collector.add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
|
||||
_sst.get_metadata_collector().add_compression_ratio(_sst._components->compression.compressed_file_length(), _sst._components->compression.uncompressed_file_length());
|
||||
}
|
||||
|
||||
_sst.set_first_and_last_keys();
|
||||
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst._collector, _schema.get_partitioner().name(), _schema.bloom_filter_fp_chance(),
|
||||
seal_statistics(_sst.get_version(), _sst._components->statistics, _sst.get_metadata_collector(), _schema.get_partitioner().name(), _schema.bloom_filter_fp_chance(),
|
||||
_sst._schema, _sst.get_first_decorated_key(), _sst.get_last_decorated_key());
|
||||
}
|
||||
|
||||
@@ -2250,6 +2247,13 @@ sstable::write_scylla_metadata(const io_priority_class& pc, shard_id shard, ssta
|
||||
write_simple<component_type::Scylla>(*_components->scylla_metadata, pc);
|
||||
}
|
||||
|
||||
void
|
||||
sstable::make_metadata_collector() {
|
||||
if (!_collector) {
|
||||
_collector.emplace(*get_schema(), get_filename());
|
||||
}
|
||||
}
|
||||
|
||||
void sstable::update_stats_on_end_of_stream()
|
||||
{
|
||||
if (_c_stats.capped_local_deletion_time) {
|
||||
@@ -2257,6 +2261,29 @@ void sstable::update_stats_on_end_of_stream()
|
||||
}
|
||||
}
|
||||
|
||||
bool sstable::may_contain_rows(const query::clustering_row_ranges& ranges) const {
|
||||
if (_version < sstables::sstable_version_types::md) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Include sstables with tombstones that are not scylla's since
|
||||
// they may contain partition tombstones that are not taken into
|
||||
// account in min/max coloumn names metadata.
|
||||
// We clear min/max metadata for partition tombstones so they
|
||||
// will match as containing the rows we're looking for.
|
||||
if (!has_scylla_component()) {
|
||||
if (get_stats_metadata().estimated_tombstone_drop_time.bin.size()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return std::ranges::any_of(ranges, [this] (const query::clustering_range& range) {
|
||||
return _position_range.overlaps(*_schema,
|
||||
position_in_partition_view::for_range_start(range),
|
||||
position_in_partition_view::for_range_end(range));
|
||||
});
|
||||
}
|
||||
|
||||
class sstable_writer_k_l : public sstable_writer::writer_impl {
|
||||
bool _backup;
|
||||
bool _leave_unsealed;
|
||||
@@ -2379,7 +2406,8 @@ void sstable_writer_k_l::consume_end_of_stream()
|
||||
|
||||
sstable_writer::sstable_writer(sstable& sst, const schema& s, uint64_t estimated_partitions,
|
||||
const sstable_writer_config& cfg, encoding_stats enc_stats, const io_priority_class& pc, shard_id shard) {
|
||||
if (sst.get_version() == sstable_version_types::mc) {
|
||||
sst.make_metadata_collector();
|
||||
if (sst.get_version() >= sstable_version_types::mc) {
|
||||
_impl = mc::make_writer(sst, s, estimated_partitions, cfg, enc_stats, pc, shard);
|
||||
} else {
|
||||
_impl = std::make_unique<sstable_writer_k_l>(sst, s, estimated_partitions, cfg, pc, shard);
|
||||
@@ -2477,11 +2505,11 @@ future<> sstable::write_components(
|
||||
encoding_stats stats,
|
||||
const io_priority_class& pc) {
|
||||
assert_large_data_handler_is_running();
|
||||
if (cfg.replay_position) {
|
||||
_collector.set_replay_position(cfg.replay_position.value());
|
||||
}
|
||||
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, stats, &pc] () mutable {
|
||||
auto wr = get_writer(*schema, estimated_partitions, cfg, stats, pc);
|
||||
if (cfg.replay_position) {
|
||||
get_metadata_collector().set_replay_position(cfg.replay_position.value());
|
||||
}
|
||||
auto validator = mutation_fragment_stream_validating_filter(format("sstable writer {}", get_filename()), *schema,
|
||||
cfg.validate_keys);
|
||||
mr.consume_in_thread(std::move(wr), std::move(validator), db::no_timeout);
|
||||
@@ -2539,7 +2567,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
|
||||
auto sem = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
auto ctx = make_lw_shared<index_consume_entry_context<summary_generator>>(
|
||||
sem->make_permit(), s, trust_promoted_index::yes, *_schema, "", index_file, std::move(options), 0, index_size,
|
||||
(_version == sstable_version_types::mc
|
||||
(_version >= sstable_version_types::mc
|
||||
? std::make_optional(get_clustering_values_fixed_lengths(get_serialization_header()))
|
||||
: std::optional<column_values_fixed_lengths>{}));
|
||||
return ctx->consume_input().finally([ctx] {
|
||||
@@ -2638,6 +2666,7 @@ sstring sstable::component_basename(const sstring& ks, const sstring& cf, versio
|
||||
case sstable::version_types::la:
|
||||
return v + "-" + g + "-" + f + "-" + component;
|
||||
case sstable::version_types::mc:
|
||||
case sstable::version_types::md:
|
||||
return v + "-" + g + "-" + f + "-" + component;
|
||||
}
|
||||
assert(0 && "invalid version");
|
||||
@@ -2742,7 +2771,7 @@ future<> sstable::move_to_new_dir(sstring new_dir, int64_t new_generation, bool
|
||||
}
|
||||
|
||||
entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname) {
|
||||
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
|
||||
static std::regex la_mx("(la|m[cd])-(\\d+)-(\\w+)-(.*)");
|
||||
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
|
||||
|
||||
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
|
||||
@@ -2759,7 +2788,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
|
||||
|
||||
sstlog.debug("Make descriptor sstdir: {}; fname: {}", sstdir, fname);
|
||||
std::string s(fname);
|
||||
if (std::regex_match(s, match, la_mc)) {
|
||||
if (std::regex_match(s, match, la_mx)) {
|
||||
std::string sdir(sstdir);
|
||||
std::smatch dirmatch;
|
||||
if (std::regex_match(sdir, dirmatch, dir)) {
|
||||
@@ -2824,7 +2853,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
|
||||
|
||||
input_stream<char> stream;
|
||||
if (_components->compression) {
|
||||
if (_version == sstable_version_types::mc) {
|
||||
if (_version >= sstable_version_types::mc) {
|
||||
return make_compressed_file_m_format_input_stream(f, &_components->compression,
|
||||
pos, len, std::move(options));
|
||||
} else {
|
||||
|
||||
@@ -61,6 +61,7 @@
|
||||
#include "utils/observable.hh"
|
||||
#include "sstables/shareable_components.hh"
|
||||
#include "sstables/open_info.hh"
|
||||
#include "query-request.hh"
|
||||
|
||||
#include <seastar/util/optimized_optional.hh>
|
||||
#include <boost/intrusive/list.hpp>
|
||||
@@ -305,7 +306,7 @@ public:
|
||||
}
|
||||
|
||||
void add_ancestor(int64_t generation) {
|
||||
_collector.add_ancestor(generation);
|
||||
_collector->add_ancestor(generation);
|
||||
}
|
||||
|
||||
// Returns true iff this sstable contains data which belongs to many shards.
|
||||
@@ -408,8 +409,15 @@ public:
|
||||
|
||||
bool requires_view_building() const;
|
||||
|
||||
bool has_metadata_collector() const {
|
||||
return _collector.has_value();
|
||||
}
|
||||
|
||||
metadata_collector& get_metadata_collector() {
|
||||
return _collector;
|
||||
if (!_collector.has_value()) {
|
||||
on_internal_error(sstlog, "No metadata collector");
|
||||
}
|
||||
return *_collector;
|
||||
}
|
||||
|
||||
std::vector<std::pair<component_type, sstring>> all_components() const;
|
||||
@@ -468,7 +476,7 @@ private:
|
||||
bool _open = false;
|
||||
// NOTE: _collector and _c_stats are used to generation of statistics file
|
||||
// when writing a new sstable.
|
||||
metadata_collector _collector;
|
||||
std::optional<metadata_collector> _collector;
|
||||
column_stats _c_stats;
|
||||
file _index_file;
|
||||
file _data_file;
|
||||
@@ -477,7 +485,7 @@ private:
|
||||
uint64_t _filter_file_size = 0;
|
||||
uint64_t _bytes_on_disk = 0;
|
||||
db_clock::time_point _data_file_write_time;
|
||||
std::vector<nonwrapping_range<bytes_view>> _clustering_components_ranges;
|
||||
position_range _position_range = position_range::all_clustered_rows();
|
||||
std::vector<unsigned> _shards;
|
||||
std::optional<dht::decorated_key> _first;
|
||||
std::optional<dht::decorated_key> _last;
|
||||
@@ -611,11 +619,10 @@ private:
|
||||
|
||||
void set_first_and_last_keys();
|
||||
|
||||
// Create one range for each clustering component of this sstable.
|
||||
// Each range stores min and max value for that specific component.
|
||||
// Create a position range based on the min/max_column_names metadata of this sstable.
|
||||
// It does nothing if schema defines no clustering key, and it's supposed
|
||||
// to be called when loading an existing sstable or after writing a new one.
|
||||
void set_clustering_components_ranges();
|
||||
void set_position_range();
|
||||
|
||||
future<> create_data() noexcept;
|
||||
|
||||
@@ -680,6 +687,8 @@ private:
|
||||
}
|
||||
|
||||
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
|
||||
|
||||
void make_metadata_collector();
|
||||
public:
|
||||
future<> read_toc() noexcept;
|
||||
|
||||
@@ -715,7 +724,7 @@ public:
|
||||
}
|
||||
|
||||
bool has_correct_max_deletion_time() const {
|
||||
return (_version == sstable_version_types::mc) || has_scylla_component();
|
||||
return (_version >= sstable_version_types::mc) || has_scylla_component();
|
||||
}
|
||||
|
||||
bool filter_has_key(const key& key) const {
|
||||
@@ -817,8 +826,6 @@ public:
|
||||
return _components->summary;
|
||||
}
|
||||
|
||||
const std::vector<nonwrapping_range<bytes_view>>& clustering_components_ranges() const;
|
||||
|
||||
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
|
||||
// for cells expired before gc_before and regular tombstones older than gc_before.
|
||||
double estimate_droppable_tombstone_ratio(gc_clock::time_point gc_before) const;
|
||||
@@ -833,6 +840,13 @@ public:
|
||||
|
||||
void update_stats_on_end_of_stream();
|
||||
|
||||
bool has_correct_min_max_column_names() const noexcept {
|
||||
return _version >= sstable_version_types::md;
|
||||
}
|
||||
|
||||
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
|
||||
bool may_contain_rows(const query::clustering_row_ranges& ranges) const;
|
||||
|
||||
// Allow the test cases from sstable_test.cc to test private methods. We use
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
// will then re-export as public every method it needs.
|
||||
@@ -842,6 +856,8 @@ public:
|
||||
friend class sstable_writer_k_l;
|
||||
friend class mc::writer;
|
||||
friend class index_reader;
|
||||
friend class sstable_writer;
|
||||
friend class compaction;
|
||||
template <typename DataConsumeRowsContext>
|
||||
friend data_consume_context<DataConsumeRowsContext>
|
||||
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t);
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include "tombstone.hh"
|
||||
#include "utils/streaming_histogram.hh"
|
||||
#include "utils/estimated_histogram.hh"
|
||||
#include "column_name_helper.hh"
|
||||
#include "sstables/key.hh"
|
||||
#include "db/commitlog/replay_position.hh"
|
||||
#include "version.hh"
|
||||
@@ -278,6 +277,7 @@ struct compaction_metadata : public metadata_base<compaction_metadata> {
|
||||
auto describe_type(sstable_version_types v, Describer f) {
|
||||
switch (v) {
|
||||
case sstable_version_types::mc:
|
||||
case sstable_version_types::md:
|
||||
return f(
|
||||
cardinality
|
||||
);
|
||||
@@ -319,6 +319,7 @@ struct stats_metadata : public metadata_base<stats_metadata> {
|
||||
auto describe_type(sstable_version_types v, Describer f) {
|
||||
switch (v) {
|
||||
case sstable_version_types::mc:
|
||||
case sstable_version_types::md:
|
||||
return f(
|
||||
estimated_partition_size,
|
||||
estimated_cells_count,
|
||||
@@ -389,6 +390,7 @@ struct serialization_header : public metadata_base<serialization_header> {
|
||||
auto describe_type(sstable_version_types v, Describer f) {
|
||||
switch (v) {
|
||||
case sstable_version_types::mc:
|
||||
case sstable_version_types::md:
|
||||
return f(
|
||||
min_timestamp_base,
|
||||
min_local_deletion_time_base,
|
||||
|
||||
@@ -27,13 +27,14 @@
|
||||
|
||||
namespace sstables {
|
||||
|
||||
enum class sstable_version_types { ka, la, mc };
|
||||
enum class sstable_version_types { ka, la, mc, md };
|
||||
enum class sstable_format_types { big };
|
||||
|
||||
inline std::array<sstable_version_types, 3> all_sstable_versions = {
|
||||
inline std::array<sstable_version_types, 4> all_sstable_versions = {
|
||||
sstable_version_types::ka,
|
||||
sstable_version_types::la,
|
||||
sstable_version_types::mc,
|
||||
sstable_version_types::md,
|
||||
};
|
||||
|
||||
inline sstable_version_types from_string(const seastar::sstring& format) {
|
||||
@@ -46,6 +47,9 @@ inline sstable_version_types from_string(const seastar::sstring& format) {
|
||||
if (format == "mc") {
|
||||
return sstable_version_types::mc;
|
||||
}
|
||||
if (format == "md") {
|
||||
return sstable_version_types::md;
|
||||
}
|
||||
throw std::invalid_argument("Wrong sstable format name: " + format);
|
||||
}
|
||||
|
||||
@@ -54,14 +58,11 @@ inline seastar::sstring to_string(sstable_version_types format) {
|
||||
case sstable_version_types::ka: return "ka";
|
||||
case sstable_version_types::la: return "la";
|
||||
case sstable_version_types::mc: return "mc";
|
||||
case sstable_version_types::md: return "md";
|
||||
}
|
||||
throw std::runtime_error("Wrong sstable format");
|
||||
}
|
||||
|
||||
inline bool is_latest_supported(sstable_version_types format) {
|
||||
return format == sstable_version_types::mc;
|
||||
}
|
||||
|
||||
inline int operator<=>(sstable_version_types a, sstable_version_types b) {
|
||||
auto to_int = [] (sstable_version_types x) {
|
||||
return static_cast<std::underlying_type_t<sstable_version_types>>(x);
|
||||
|
||||
@@ -544,26 +544,26 @@ void write_column_name(sstable_version_types v, Writer& out, const schema& s, co
|
||||
|
||||
template <typename W>
|
||||
requires Writer<W>
|
||||
void write_cell_value(W& out, const abstract_type& type, bytes_view value) {
|
||||
void write_cell_value(sstable_version_types v, W& out, const abstract_type& type, bytes_view value) {
|
||||
if (!value.empty()) {
|
||||
if (type.value_length_if_fixed()) {
|
||||
write(sstable_version_types::mc, out, value);
|
||||
write(v, out, value);
|
||||
} else {
|
||||
write_vint(out, value.size());
|
||||
write(sstable_version_types::mc, out, value);
|
||||
write(v, out, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename W>
|
||||
requires Writer<W>
|
||||
void write_cell_value(W& out, const abstract_type& type, atomic_cell_value_view value) {
|
||||
void write_cell_value(sstable_version_types v, W& out, const abstract_type& type, atomic_cell_value_view value) {
|
||||
if (!value.empty()) {
|
||||
if (!type.value_length_if_fixed()) {
|
||||
write_vint(out, value.size_bytes());
|
||||
}
|
||||
using boost::range::for_each;
|
||||
for_each(value, [&] (bytes_view fragment) { write(sstable_version_types::mc, out, fragment); });
|
||||
for_each(value, [&] (bytes_view fragment) { write(v, out, fragment); });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
157
table.cc
157
table.cc
@@ -40,8 +40,6 @@
|
||||
#include "query-result-writer.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <boost/algorithm/cxx11/all_of.hpp>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include "utils/error_injection.hh"
|
||||
@@ -54,89 +52,10 @@ static seastar::metrics::label keyspace_label("ks");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Stores ranges for all components of the same clustering key, index 0 referring to component
|
||||
// range 0, and so on.
|
||||
using ck_filter_clustering_key_components = std::vector<nonwrapping_range<bytes_view>>;
|
||||
// Stores an entry for each clustering key range specified by the filter.
|
||||
using ck_filter_clustering_key_ranges = std::vector<ck_filter_clustering_key_components>;
|
||||
|
||||
// Used to split a clustering key range into a range for each component.
|
||||
// If a range in ck_filtering_all_ranges is composite, a range will be created
|
||||
// for each component. If it's not composite, a single range is created.
|
||||
// This split is needed to check for overlap in each component individually.
|
||||
static ck_filter_clustering_key_ranges
|
||||
ranges_for_clustering_key_filter(const schema_ptr& schema, const query::clustering_row_ranges& ck_filtering_all_ranges) {
|
||||
ck_filter_clustering_key_ranges ranges;
|
||||
|
||||
for (auto& r : ck_filtering_all_ranges) {
|
||||
// this vector stores a range for each component of a key, only one if not composite.
|
||||
ck_filter_clustering_key_components composite_ranges;
|
||||
|
||||
if (r.is_full()) {
|
||||
ranges.push_back({ nonwrapping_range<bytes_view>::make_open_ended_both_sides() });
|
||||
continue;
|
||||
}
|
||||
auto start = r.start() ? r.start()->value().components() : clustering_key_prefix::make_empty().components();
|
||||
auto end = r.end() ? r.end()->value().components() : clustering_key_prefix::make_empty().components();
|
||||
auto start_it = start.begin();
|
||||
auto end_it = end.begin();
|
||||
|
||||
// This test is enough because equal bounds in nonwrapping_range are inclusive.
|
||||
auto is_singular = [&schema] (const auto& type_it, const bytes_view& b1, const bytes_view& b2) {
|
||||
if (type_it == schema->clustering_key_type()->types().end()) {
|
||||
throw std::runtime_error(format("clustering key filter passed more components than defined in schema of {}.{}",
|
||||
schema->ks_name(), schema->cf_name()));
|
||||
}
|
||||
return (*type_it)->compare(b1, b2) == 0;
|
||||
};
|
||||
auto type_it = schema->clustering_key_type()->types().begin();
|
||||
composite_ranges.reserve(schema->clustering_key_size());
|
||||
|
||||
// the rule is to ignore any component cn if another component ck (k < n) is not if the form [v, v].
|
||||
// If we have [v1, v1], [v2, v2], ... {vl3, vr3}, ....
|
||||
// then we generate [v1, v1], [v2, v2], ... {vl3, vr3}. Where { = '(' or '[', etc.
|
||||
while (start_it != start.end() && end_it != end.end() && is_singular(type_it++, *start_it, *end_it)) {
|
||||
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it++), true }},
|
||||
{{ std::move(*end_it++), true }}));
|
||||
}
|
||||
// handle a single non-singular tail element, if present
|
||||
if (start_it != start.end() && end_it != end.end()) {
|
||||
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it), r.start()->is_inclusive() }},
|
||||
{{ std::move(*end_it), r.end()->is_inclusive() }}));
|
||||
} else if (start_it != start.end()) {
|
||||
composite_ranges.push_back(nonwrapping_range<bytes_view>({{ std::move(*start_it), r.start()->is_inclusive() }}, {}));
|
||||
} else if (end_it != end.end()) {
|
||||
composite_ranges.push_back(nonwrapping_range<bytes_view>({}, {{ std::move(*end_it), r.end()->is_inclusive() }}));
|
||||
}
|
||||
|
||||
ranges.push_back(std::move(composite_ranges));
|
||||
}
|
||||
return ranges;
|
||||
}
|
||||
|
||||
// Return true if this sstable possibly stores clustering row(s) specified by ranges.
|
||||
static inline bool
|
||||
contains_rows(const sstables::sstable& sst, const schema_ptr& schema, const ck_filter_clustering_key_ranges& ranges) {
|
||||
auto& clustering_key_types = schema->clustering_key_type()->types();
|
||||
auto& clustering_components_ranges = sst.clustering_components_ranges();
|
||||
|
||||
if (!schema->clustering_key_size() || clustering_components_ranges.empty()) {
|
||||
return true;
|
||||
}
|
||||
return boost::algorithm::any_of(ranges, [&] (const ck_filter_clustering_key_components& range) {
|
||||
auto s = std::min(range.size(), clustering_components_ranges.size());
|
||||
return boost::algorithm::all_of(boost::irange<unsigned>(0, s), [&] (unsigned i) {
|
||||
auto& type = clustering_key_types[i];
|
||||
return range[i].is_full() || range[i].overlaps(clustering_components_ranges[i], type->as_tri_comparator());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Filter out sstables for reader using bloom filter and sstable metadata that keeps track
|
||||
// of a range for each clustering component.
|
||||
// Filter out sstables for reader using bloom filter
|
||||
static std::vector<sstables::shared_sstable>
|
||||
filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const dht::partition_range& pr, const sstables::key& key, const query::partition_slice& slice) {
|
||||
filter_sstable_for_reader_by_pk(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const dht::partition_range& pr, const sstables::key& key) {
|
||||
const dht::ring_position& pr_key = pr.start()->value();
|
||||
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) {
|
||||
return cmp(pr_key, sst->get_first_decorated_key()) < 0 ||
|
||||
@@ -144,16 +63,21 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
!sst->filter_has_key(key);
|
||||
};
|
||||
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
|
||||
return sstables;
|
||||
}
|
||||
|
||||
// FIXME: Workaround for https://github.com/scylladb/scylla/issues/3552
|
||||
// and https://github.com/scylladb/scylla/issues/3553
|
||||
const bool filtering_broken = true;
|
||||
|
||||
// Filter out sstables for reader using sstable metadata that keeps track
|
||||
// of a range for each clustering component.
|
||||
static std::vector<sstables::shared_sstable>
|
||||
filter_sstable_for_reader_by_ck(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const query::partition_slice& slice) {
|
||||
// no clustering filtering is applied if schema defines no clustering key or
|
||||
// compaction strategy thinks it will not benefit from such an optimization.
|
||||
if (filtering_broken || !schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter()) {
|
||||
return sstables;
|
||||
// compaction strategy thinks it will not benefit from such an optimization,
|
||||
// or the partition_slice includes static columns.
|
||||
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
|
||||
return sstables;
|
||||
}
|
||||
|
||||
::cf_stats* stats = cf.cf_stats();
|
||||
stats->clustering_filter_count++;
|
||||
stats->sstables_checked_by_clustering_filter += sstables.size();
|
||||
@@ -166,28 +90,11 @@ filter_sstable_for_reader(std::vector<sstables::shared_sstable>&& sstables, colu
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
return sstables;
|
||||
}
|
||||
auto ranges = ranges_for_clustering_key_filter(schema, ck_filtering_all_ranges);
|
||||
if (ranges.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
int64_t min_timestamp = std::numeric_limits<int64_t>::max();
|
||||
auto sstable_has_clustering_key = [&min_timestamp, &schema, &ranges] (const sstables::shared_sstable& sst) {
|
||||
if (!contains_rows(*sst, schema, ranges)) {
|
||||
return false; // ordered after sstables that contain clustering rows.
|
||||
} else {
|
||||
min_timestamp = std::min(min_timestamp, sst->get_stats_metadata().min_timestamp);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
auto sstable_has_relevant_tombstone = [&min_timestamp] (const sstables::shared_sstable& sst) {
|
||||
const auto& stats = sst->get_stats_metadata();
|
||||
// re-add sstable as candidate if it contains a tombstone that may cover a row in an included sstable.
|
||||
return (stats.max_timestamp > min_timestamp && stats.estimated_tombstone_drop_time.bin.size());
|
||||
};
|
||||
auto skipped = std::partition(sstables.begin(), sstables.end(), sstable_has_clustering_key);
|
||||
auto actually_skipped = std::partition(skipped, sstables.end(), sstable_has_relevant_tombstone);
|
||||
sstables.erase(actually_skipped, sstables.end());
|
||||
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) {
|
||||
return sst->may_contain_rows(ranges);
|
||||
});
|
||||
sstables.erase(skipped, sstables.end());
|
||||
stats->surviving_sstables_after_clustering_filter += sstables.size();
|
||||
|
||||
return sstables;
|
||||
@@ -287,18 +194,34 @@ create_single_key_sstable_reader(column_family* cf,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
auto key = sstables::key::from_partition_key(*schema, *pr.start()->value().key());
|
||||
auto& pk = pr.start()->value().key();
|
||||
auto key = sstables::key::from_partition_key(*schema, *pk);
|
||||
auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key);
|
||||
auto num_sstables = selected_sstables.size();
|
||||
if (!num_sstables) {
|
||||
return make_empty_flat_reader(schema);
|
||||
}
|
||||
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
|
||||
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
|
||||
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
|
||||
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
|
||||
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
|
||||
})
|
||||
);
|
||||
if (readers.empty()) {
|
||||
return make_empty_flat_reader(schema);
|
||||
|
||||
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
|
||||
// we want to emit partition_start/end if no rows were found,
|
||||
// to prevent https://github.com/scylladb/scylla/issues/3552.
|
||||
//
|
||||
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
|
||||
// the partition_start/end pair and append it to the list of readers passed
|
||||
// to make_combined_reader to ensure partition_start/end are emitted even if
|
||||
// all sstables actually containing the partition were filtered.
|
||||
auto num_readers = readers.size();
|
||||
if (num_readers != num_sstables) {
|
||||
readers.push_back(flat_mutation_reader_from_mutations({mutation(schema, *pk)}, slice, fwd));
|
||||
}
|
||||
sstable_histogram.add(readers.size());
|
||||
sstable_histogram.add(num_readers);
|
||||
return make_combined_reader(schema, std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
|
||||
@@ -4653,3 +4653,84 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) {
|
||||
}
|
||||
}, std::move(cfg)).get();
|
||||
}
|
||||
|
||||
// reproduces https://github.com/scylladb/scylla/issues/3552
|
||||
// when clustering-key filtering is enabled in filter_sstable_for_reader
|
||||
static future<> test_clustering_filtering_with_compaction_strategy(const std::string_view& cs) {
|
||||
auto db_config = make_shared<db::config>();
|
||||
db_config->enable_sstables_md_format.set(true);
|
||||
|
||||
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
|
||||
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
|
||||
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
|
||||
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
|
||||
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
|
||||
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT v FROM cf", {{T("a1")}});
|
||||
}, cql_test_config(db_config));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_clustering_filtering) {
|
||||
static std::array<std::string_view, 2> test_compaction_strategies = {
|
||||
"SizeTieredCompactionStrategy",
|
||||
"TimeWindowCompactionStrategy",
|
||||
};
|
||||
|
||||
return do_for_each(test_compaction_strategies,
|
||||
test_clustering_filtering_with_compaction_strategy);
|
||||
}
|
||||
|
||||
static future<> test_clustering_filtering_2_with_compaction_strategy(const std::string_view& cs) {
|
||||
auto db_config = make_shared<db::config>();
|
||||
db_config->enable_sstables_md_format.set(true);
|
||||
|
||||
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
|
||||
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
|
||||
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
|
||||
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('b', 2, 'b2')");
|
||||
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
|
||||
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
|
||||
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT v FROM cf", {{T("a1")}, {T("b2")}});
|
||||
}, cql_test_config(db_config));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_clustering_filtering_2) {
|
||||
static std::array<std::string_view, 2> test_compaction_strategies = {
|
||||
"SizeTieredCompactionStrategy",
|
||||
"TimeWindowCompactionStrategy",
|
||||
};
|
||||
|
||||
return do_for_each(test_compaction_strategies,
|
||||
test_clustering_filtering_2_with_compaction_strategy);
|
||||
}
|
||||
|
||||
static future<> test_clustering_filtering_3_with_compaction_strategy(const std::string_view& cs) {
|
||||
auto db_config = make_shared<db::config>();
|
||||
db_config->enable_sstables_md_format.set(true);
|
||||
|
||||
return do_with_cql_env_thread([&cs] (cql_test_env& e) {
|
||||
cquery_nofail(e, format("CREATE TABLE cf(pk text, ck int, v text, PRIMARY KEY(pk, ck)) WITH COMPACTION = {{'class': '{}'}}", cs));
|
||||
e.db().invoke_on_all([] (database& db) {
|
||||
auto& table = db.find_column_family("ks", "cf");
|
||||
table.disable_auto_compaction();
|
||||
}).get();
|
||||
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('a', 1, 'a1')");
|
||||
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
|
||||
cquery_nofail(e, "INSERT INTO cf(pk, ck, v) VALUES ('b', 0, 'b0')");
|
||||
e.db().invoke_on_all([] (database& db) { return db.flush_all_memtables(); }).get();
|
||||
e.db().invoke_on_all([] (database& db) { db.row_cache_tracker().clear(); }).get();
|
||||
require_rows(e, "SELECT v FROM cf WHERE pk='a' AND ck=0 ALLOW FILTERING", {});
|
||||
require_rows(e, "SELECT v FROM cf", {{T("a1")}, {T("b0")}});
|
||||
}, cql_test_config(db_config));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_clustering_filtering_3) {
|
||||
static std::array<std::string_view, 2> test_compaction_strategies = {
|
||||
"SizeTieredCompactionStrategy",
|
||||
"TimeWindowCompactionStrategy",
|
||||
};
|
||||
|
||||
return do_for_each(test_compaction_strategies,
|
||||
test_clustering_filtering_3_with_compaction_strategy);
|
||||
}
|
||||
|
||||
@@ -563,8 +563,7 @@ struct sst_factory {
|
||||
|
||||
sstables::shared_sstable operator()() {
|
||||
auto sst = env.make_sstable(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
|
||||
//sst->set_sstable_level(level);
|
||||
sst->get_metadata_collector().sstable_level(level);
|
||||
sst->set_sstable_level(level);
|
||||
|
||||
return sst;
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3282,10 +3282,13 @@ SEASTAR_TEST_CASE(test_promoted_index_read) {
|
||||
|
||||
static void check_min_max_column_names(const sstable_ptr& sst, std::vector<bytes> min_components, std::vector<bytes> max_components) {
|
||||
const auto& st = sst->get_stats_metadata();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min {}/{} max {}/{}", st.min_column_names.elements.size(), min_components.size(), st.max_column_names.elements.size(), max_components.size()));
|
||||
BOOST_REQUIRE(st.min_column_names.elements.size() == min_components.size());
|
||||
BOOST_REQUIRE(st.min_column_names.elements.size() == st.max_column_names.elements.size());
|
||||
for (auto i = 0U; i < st.min_column_names.elements.size(); i++) {
|
||||
BOOST_REQUIRE(min_components[i] == st.min_column_names.elements[i].value);
|
||||
}
|
||||
BOOST_REQUIRE(st.max_column_names.elements.size() == max_components.size());
|
||||
for (auto i = 0U; i < st.max_column_names.elements.size(); i++) {
|
||||
BOOST_REQUIRE(max_components[i] == st.max_column_names.elements[i].value);
|
||||
}
|
||||
}
|
||||
@@ -3357,6 +3360,47 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"a", "b"},
|
||||
{"a", "c"}}, {"a", "b"}, {"a", "c"}, version);
|
||||
}
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", utf8_type, column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} version={}", to_string(version)));
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
|
||||
}
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with(schema_builder::compact_storage::yes)
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", utf8_type, column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: min={{\"a\", \"c\"}} max={{\"b\", \"a\"}} with compact storage version={}", to_string(version)));
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"b", "a"}, {"a", "c"}}, {"a", "c"}, {"b", "a"}, version);
|
||||
}
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"z\"}} max={{\"a\", \"a\"}} version={}", to_string(version)));
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"a", "a"}, {"a", "z"}}, {"a", "z"}, {"a", "a"}, version);
|
||||
}
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\", \"a\"}} max={{\"b\", \"z\"}} version={}", to_string(version)));
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"b", "z"}, {"a", "a"}}, {"a", "a"}, {"b", "z"}, version);
|
||||
}
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
@@ -3382,6 +3426,19 @@ SEASTAR_TEST_CASE(min_max_clustering_key_test) {
|
||||
.build();
|
||||
test_min_max_clustering_key(s, {"key1"}, {}, {}, {}, version);
|
||||
}
|
||||
if (version >= sstable_version_types::mc) {
|
||||
{
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with(schema_builder::compact_storage::yes)
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
BOOST_TEST_MESSAGE(fmt::format("min_max_clustering_key_test: reversed order: min={{\"a\"}} max={{\"a\"}} with compact storage version={}", to_string(version)));
|
||||
test_min_max_clustering_key(s, {"key1"}, {{"a", "z"}, {"a"}}, {"a"}, {"a"}, version);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -3449,6 +3506,8 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1")});
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
@@ -3458,8 +3517,8 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
|
||||
sstables::sstlog.warn("Version {}", (int)version);
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1"}, {"c1"});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3471,6 +3530,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1"}, {"c1"});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3482,6 +3542,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
|
||||
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1"}, {"c1"});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3501,6 +3562,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1"}, {"c1"});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3513,6 +3575,7 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -3527,6 +3590,535 @@ SEASTAR_TEST_CASE(sstable_tombstone_metadata_check) {
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a"}, {"a"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_single_value(*s, bytes("a")),
|
||||
clustering_key_prefix::from_single_value(*s, bytes("a")),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a"}, {"c1"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_single_value(*s, bytes("c")),
|
||||
clustering_key_prefix::from_single_value(*s, bytes("d")),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c"}, {"d"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_single_value(*s, bytes("d")),
|
||||
clustering_key_prefix::from_single_value(*s, bytes("z")),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c1"}, {"z"});
|
||||
}
|
||||
}
|
||||
|
||||
if (version >= sstable_version_types::mc) {
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view::bottom(),
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {"z"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
|
||||
bound_view::top(),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"a"}, {});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_delete(*s, clustering_key_prefix::make_empty(), tomb);
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 12, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 12, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_composite_tombstone_metadata_check) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
for (const auto version : all_sstable_versions) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", utf8_type, column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
auto tmp = tmpdir();
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_delete(*s, c_key, tomb);
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 2, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 3, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
|
||||
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_delete(*s, c_key, tomb);
|
||||
mt->apply(std::move(m));
|
||||
|
||||
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
|
||||
mutation m2(s, key2);
|
||||
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
mt->apply(std::move(m2));
|
||||
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 4, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply(tomb);
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 5, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 6, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a", "aa"}, {"z", "zz"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a"}, {"c1", "c2"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("aa")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c1", "aa"}, {"c1", "zz"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("z"), to_bytes("zz")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"z", "zz"});
|
||||
}
|
||||
}
|
||||
|
||||
if (version >= sstable_version_types::mc) {
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view::bottom(),
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {"z"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
|
||||
bound_view::top(),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"a"}, {});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_composite_reverse_tombstone_metadata_check) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
for (const auto version : all_sstable_versions) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("ck2", reversed_type_impl::get_instance(utf8_type), column_kind::clustering_key)
|
||||
.with_column("r1", int32_type)
|
||||
.build();
|
||||
auto tmp = tmpdir();
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto c_key = clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("c2")});
|
||||
const column_definition& r1_col = *s->get_column_definition("r1");
|
||||
|
||||
BOOST_TEST_MESSAGE(fmt::format("version {}", to_string(version)));
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_delete(*s, c_key, tomb);
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 1, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_dead_atomic_cell(3600));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 2, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 2, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 3, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 3, version).get0();
|
||||
BOOST_REQUIRE(!sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply_delete(*s, c_key, tomb);
|
||||
mt->apply(std::move(m));
|
||||
|
||||
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
|
||||
mutation m2(s, key2);
|
||||
m2.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
mt->apply(std::move(m2));
|
||||
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 4, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 4, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"c1", "c2"}, {"c1", "c2"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
m.partition().apply(tomb);
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 5, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 5, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("aa")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 6, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 6, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a", "zz"}, {"a", "aa"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a"), to_bytes("zz")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("a")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 7, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 7, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"a", "zz"}, {"c1", "c2"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 8, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 8, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c1", "zz"}, {"c1"});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("zz")}),
|
||||
clustering_key_prefix::from_exploded(*s, {to_bytes("c1"), to_bytes("d")}),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 9, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 9, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
if (version >= sstable_version_types::mc) {
|
||||
check_min_max_column_names(sst, {"c1", "zz"}, {"c1", "c2"});
|
||||
}
|
||||
}
|
||||
|
||||
if (version >= sstable_version_types::mc) {
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view::bottom(),
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("z")), bound_kind::incl_end),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 10, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 10, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {}, {"z"});
|
||||
}
|
||||
|
||||
{
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, int32_type->decompose(1)));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
range_tombstone rt(
|
||||
bound_view(clustering_key_prefix::from_single_value(*s, bytes("a")), bound_kind::incl_start),
|
||||
bound_view::top(),
|
||||
tomb);
|
||||
m.partition().apply_delete(*s, std::move(rt));
|
||||
mt->apply(std::move(m));
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), 11, version, big);
|
||||
write_memtable_to_sstable_for_test(*mt, sst).get();
|
||||
sst = env.reusable_sst(s, tmp.path().string(), 11, version).get0();
|
||||
BOOST_REQUIRE(sst->get_stats_metadata().estimated_tombstone_drop_time.bin.size());
|
||||
check_min_max_column_names(sst, {"a"}, {});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -4467,7 +5059,7 @@ SEASTAR_TEST_CASE(test_old_format_non_compound_range_tombstone_is_read) {
|
||||
// delete from ks.test where pk = 1 and ck = 2;
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
for (const auto version : all_sstable_versions) {
|
||||
if (version != sstables::sstable::version_types::mc) { // Does not apply to 'mc' format
|
||||
if (version < sstable_version_types::mc) { // Applies only to formats older than 'm'
|
||||
auto s = schema_builder("ks", "test")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
|
||||
@@ -59,6 +59,7 @@
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "db/sstables-format-selector.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -476,6 +477,10 @@ public:
|
||||
db.stop().get();
|
||||
});
|
||||
|
||||
db.invoke_on_all([] (database& db) {
|
||||
db.set_format_by_config();
|
||||
}).get();
|
||||
|
||||
auto stop_ms_fd_gossiper = defer([] {
|
||||
gms::get_gossiper().stop().get();
|
||||
});
|
||||
|
||||
@@ -211,7 +211,7 @@ int main(int argc, char** argv) {
|
||||
("read-concurrency", bpo::value<unsigned>()->default_value(1), "Concurrency of reads, the amount of reads to fire at once")
|
||||
("sstables", bpo::value<uint64_t>()->default_value(100), "")
|
||||
("sstable-size", bpo::value<uint64_t>()->default_value(10000000), "")
|
||||
("sstable-format", bpo::value<std::string>()->default_value("mc"), "Sstable format version to use during population")
|
||||
("sstable-format", bpo::value<std::string>()->default_value("md"), "Sstable format version to use during population")
|
||||
("collect-stats", "Enable collecting statistics.")
|
||||
("stats-file", bpo::value<sstring>()->default_value("stats.csv"), "Store statistics in the specified file.")
|
||||
("stats-period-ms", bpo::value<unsigned>()->default_value(100), "Tick period of the stats collection.")
|
||||
@@ -229,10 +229,15 @@ int main(int argc, char** argv) {
|
||||
db_cfg.virtual_dirty_soft_limit(1.0);
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "mc") {
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
}
|
||||
|
||||
@@ -1774,7 +1774,7 @@ int main(int argc, char** argv) {
|
||||
("test-case-duration", bpo::value<double>()->default_value(1), "Duration in seconds of a single test case (0 for a single run).")
|
||||
("data-directory", bpo::value<sstring>()->default_value("./perf_large_partition_data"), "Data directory")
|
||||
("output-directory", bpo::value<sstring>()->default_value("./perf_fast_forward_output"), "Results output directory (for 'json')")
|
||||
("sstable-format", bpo::value<std::string>()->default_value("mc"), "Sstable format version to use during population")
|
||||
("sstable-format", bpo::value<std::string>()->default_value("md"), "Sstable format version to use during population")
|
||||
("use-binary-search-in-promoted-index", bpo::value<bool>()->default_value(true), "Use binary search based variant of the promoted index cursor")
|
||||
("dump-all-results", "Write results of all iterations of all tests to text files in the output directory")
|
||||
;
|
||||
@@ -1815,10 +1815,15 @@ int main(int argc, char** argv) {
|
||||
db_cfg.virtual_dirty_soft_limit(1.0); // prevent background memtable flushes.
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "mc") {
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
}
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2474332411
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Digest.crc32
|
||||
Filter.db
|
||||
Index.db
|
||||
Statistics.db
|
||||
Summary.db
|
||||
Data.db
|
||||
CompressionInfo.db
|
||||
TOC.txt
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
1110194964
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Digest.crc32
|
||||
Filter.db
|
||||
Index.db
|
||||
Statistics.db
|
||||
Summary.db
|
||||
Data.db
|
||||
CompressionInfo.db
|
||||
TOC.txt
|
||||
Binary file not shown.
Binary file not shown.
|
After Width: | Height: | Size: 141 KiB |
@@ -0,0 +1 @@
|
||||
772386724
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Digest.crc32
|
||||
Filter.db
|
||||
Index.db
|
||||
Statistics.db
|
||||
Summary.db
|
||||
Data.db
|
||||
CompressionInfo.db
|
||||
TOC.txt
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
2903198582
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
CRC.db
|
||||
Data.db
|
||||
Digest.crc32
|
||||
Index.db
|
||||
TOC.txt
|
||||
Statistics.db
|
||||
Filter.db
|
||||
Summary.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
712389452
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Statistics.db
|
||||
Summary.db
|
||||
CRC.db
|
||||
TOC.txt
|
||||
Filter.db
|
||||
Digest.crc32
|
||||
Data.db
|
||||
Index.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3963475046
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Data.db
|
||||
Digest.crc32
|
||||
Index.db
|
||||
TOC.txt
|
||||
Filter.db
|
||||
Statistics.db
|
||||
Summary.db
|
||||
CompressionInfo.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
3809570971
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,8 @@
|
||||
Statistics.db
|
||||
Summary.db
|
||||
CRC.db
|
||||
TOC.txt
|
||||
Filter.db
|
||||
Digest.crc32
|
||||
Data.db
|
||||
Index.db
|
||||
Binary file not shown.
Binary file not shown.
@@ -0,0 +1 @@
|
||||
1423862028
|
||||
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user