Compare commits
7 Commits
master
...
scylla-4.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a78d6403e | ||
|
|
2f20d52ac7 | ||
|
|
540439ee46 | ||
|
|
a0622e85ab | ||
|
|
90741dc62c | ||
|
|
83cfa6a63c | ||
|
|
1816c6df8c |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.5.dev
|
||||
VERSION=4.5.rc1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -747,10 +747,8 @@ void database::set_format(sstables::sstable_version_types format) {
|
||||
void database::set_format_by_config() {
|
||||
if (_cfg.enable_sstables_md_format()) {
|
||||
set_format(sstables::sstable_version_types::md);
|
||||
} else if (_cfg.enable_sstables_mc_format()) {
|
||||
set_format(sstables::sstable_version_types::mc);
|
||||
} else {
|
||||
set_format(sstables::sstable_version_types::la);
|
||||
set_format(sstables::sstable_version_types::mc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -747,8 +747,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
" Performance is affected to some extent as a result. Useful to help debugging problems that may arise at another layers.")
|
||||
, cpu_scheduler(this, "cpu_scheduler", value_status::Used, true, "Enable cpu scheduling")
|
||||
, view_building(this, "view_building", value_status::Used, true, "Enable view building; should only be set to false when the node is experience issues due to view building")
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Used, true, "Enable SSTables 'mc' format to be used as the default file format")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format (requires enable_sstables_mc_format)")
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format")
|
||||
, enable_dangerous_direct_import_of_cassandra_counters(this, "enable_dangerous_direct_import_of_cassandra_counters", value_status::Used, false, "Only turn this option on if you want to import tables from Cassandra containing counters, and you are SURE that no counters in that table were created in a version earlier than Cassandra 2.1."
|
||||
" It is not enough to have ever since upgraded to newer versions of Cassandra. If you EVER used a version earlier than 2.1 in the cluster where these SSTables come from, DO NOT TURN ON THIS OPTION! You will corrupt your data. You have been warned.")
|
||||
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=4.5.dev
|
||||
ARG SCYLLA_REPO_URL=downloads.scylladb.com/unstable/scylla/branch-4.5/rpm/centos/latest/
|
||||
ARG VERSION=4.5.rc1
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -98,14 +98,6 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
|
||||
|
||||
fcfg._disabled_features = std::move(disabled);
|
||||
|
||||
if (!cfg.enable_sstables_mc_format()) {
|
||||
if (cfg.enable_sstables_md_format()) {
|
||||
throw std::runtime_error(
|
||||
"You must use both enable_sstables_mc_format and enable_sstables_md_format "
|
||||
"to enable SSTables md format support");
|
||||
}
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MC_SSTABLE));
|
||||
}
|
||||
if (!cfg.enable_sstables_md_format()) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MD_SSTABLE));
|
||||
}
|
||||
|
||||
@@ -3643,7 +3643,12 @@ protected:
|
||||
}
|
||||
|
||||
public:
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
|
||||
if (_targets.empty()) {
|
||||
// We may have no targets to read from if a DC with zero replication is queried with LOCACL_QUORUM.
|
||||
// Return an empty result in this case
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(make_foreign(make_lw_shared(query::result())));
|
||||
}
|
||||
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for,
|
||||
db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
@@ -809,8 +809,8 @@ time_series_sstable_set::create_single_key_sstable_reader(
|
||||
// the queue is exhausted. We use that fact to gather statistics.
|
||||
auto filter = [pk_filter = std::move(pk_filter), ck_filter = std::move(ck_filter), &stats]
|
||||
(const sstable& sst) {
|
||||
if (pk_filter(sst)) {
|
||||
return true;
|
||||
if (!pk_filter(sst)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
++stats.sstables_checked_by_clustering_filter;
|
||||
@@ -954,6 +954,40 @@ sstable_set make_compound_sstable_set(schema_ptr schema, std::vector<lw_shared_p
|
||||
return sstable_set(std::make_unique<compound_sstable_set>(schema, std::move(sets)), schema);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
compound_sstable_set::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
auto sets = _sets;
|
||||
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->all()->size() > 0; });
|
||||
auto non_empty_set_count = std::distance(sets.begin(), it);
|
||||
|
||||
if (!non_empty_set_count) {
|
||||
return make_empty_flat_reader(schema, permit);
|
||||
}
|
||||
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
|
||||
if (non_empty_set_count == 1) {
|
||||
const auto& non_empty_set = *std::begin(sets);
|
||||
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
boost::make_iterator_range(sets.begin(), it)
|
||||
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
|
||||
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
|
||||
@@ -167,6 +167,19 @@ public:
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
|
||||
virtual flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family*,
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
utils::estimated_histogram&,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding) const override;
|
||||
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
|
||||
@@ -7041,3 +7041,155 @@ SEASTAR_TEST_CASE(test_offstrategy_sstable_compaction) {
|
||||
cf->stop().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(single_key_reader_through_compound_set_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "single_key_reader_through_compound_set_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", ::timestamp_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map <sstring, sstring> opts = {
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
auto s = builder.build();
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, std::move(opts));
|
||||
|
||||
auto next_timestamp = [](auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() + duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_row = [&](std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto next_ts = next_timestamp(step);
|
||||
auto c_key = clustering_key::from_exploded(*s, {::timestamp_type->decompose(next_ts)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value++)), next_ts);
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
::cf_stats cf_stats{0};
|
||||
cfg.cf_stats = &cf_stats;
|
||||
cfg.datadir = tmp.path().string();
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = false;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
auto set1 = make_lw_shared<sstable_set>(cs.make_sstable_set(s));
|
||||
auto set2 = make_lw_shared<sstable_set>(cs.make_sstable_set(s));
|
||||
|
||||
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
|
||||
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
// sstables with same key but belonging to different windows
|
||||
auto sst1 = make_sstable_containing(sst_gen, {make_row(std::chrono::hours(1))});
|
||||
auto sst2 = make_sstable_containing(sst_gen, {make_row(std::chrono::hours(5))});
|
||||
BOOST_REQUIRE(sst1->get_first_decorated_key().token() == sst2->get_last_decorated_key().token());
|
||||
auto dkey = sst1->get_first_decorated_key();
|
||||
|
||||
set1->insert(std::move(sst1));
|
||||
set2->insert(std::move(sst2));
|
||||
sstable_set compound = sstables::make_compound_sstable_set(s, {set1, set2});
|
||||
|
||||
reader_permit permit = tests::make_permit();
|
||||
utils::estimated_histogram eh;
|
||||
auto pr = dht::partition_range::make_singular(dkey);
|
||||
|
||||
auto reader = compound.create_single_key_sstable_reader(&*cf, s, permit, eh, pr, s->full_slice(), default_priority_class(),
|
||||
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
auto mfopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
mfopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(!mfopt);
|
||||
BOOST_REQUIRE(cf_stats.clustering_filter_count > 0);
|
||||
});
|
||||
}
|
||||
|
||||
// Regression test for #8432
|
||||
SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "twcs_single_key_reader_filtering")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
|
||||
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
auto make_row = [&] (int32_t pk, int32_t ck) {
|
||||
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(pk)));
|
||||
m.set_clustered_cell(clustering_key::from_single_value(*s, int32_type->decompose(ck)), to_bytes("v"), int32_t(0), api::new_timestamp());
|
||||
return m;
|
||||
};
|
||||
|
||||
auto sst1 = make_sstable_containing(sst_gen, {make_row(0, 0)});
|
||||
auto sst2 = make_sstable_containing(sst_gen, {make_row(0, 1)});
|
||||
auto sst3 = make_sstable_containing(sst_gen, {make_row(0, 2)});
|
||||
auto dkey = sst1->get_first_decorated_key();
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
::cf_stats cf_stats{0};
|
||||
cfg.cf_stats = &cf_stats;
|
||||
cfg.datadir = tmp.path().string();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
column_family cf(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf.mark_ready_for_writes();
|
||||
cf.start();
|
||||
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, {});
|
||||
|
||||
auto set = cs.make_sstable_set(s);
|
||||
set.insert(std::move(sst1));
|
||||
set.insert(std::move(sst2));
|
||||
set.insert(std::move(sst3));
|
||||
|
||||
reader_permit permit = tests::make_permit();
|
||||
utils::estimated_histogram eh;
|
||||
auto pr = dht::partition_range::make_singular(dkey);
|
||||
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range {
|
||||
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(0)) },
|
||||
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(1)) },
|
||||
}).build();
|
||||
|
||||
auto reader = set.create_single_key_sstable_reader(
|
||||
&cf, s, permit, eh, pr, slice, default_priority_class(),
|
||||
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
auto checked_by_ck = cf_stats.sstables_checked_by_clustering_filter;
|
||||
auto surviving_after_ck = cf_stats.surviving_sstables_after_clustering_filter;
|
||||
|
||||
// consume all fragments
|
||||
while (reader(db::no_timeout).get());
|
||||
|
||||
// sst1 and sst2 should have been checked by the CK filter before we started reading (when we created the reader).
|
||||
// sst3 should have been checked by the CK filter during fragment consumption and shouldn't have passed.
|
||||
// With the bug in #8432, sst3 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter.
|
||||
BOOST_REQUIRE_EQUAL(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1);
|
||||
BOOST_REQUIRE_EQUAL(cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck, 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -333,13 +333,8 @@ int main(int argc, char** argv) {
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
|
||||
@@ -1817,13 +1817,8 @@ int main(int argc, char** argv) {
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
|
||||
Submodule tools/java updated: ccc4201ded...768a59a6f1
Reference in New Issue
Block a user