Compare commits
13 Commits
scylla-4.3
...
next-4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67a62b3e8d | ||
|
|
92effccf52 | ||
|
|
7357529834 | ||
|
|
3dd7874f08 | ||
|
|
1bf218c29e | ||
|
|
89c47a44dc | ||
|
|
dd93f297c1 | ||
|
|
b0b2606a8c | ||
|
|
6de458e915 | ||
|
|
b6aa5ab2d4 | ||
|
|
08cbd180ff | ||
|
|
693c7b300a | ||
|
|
2e7f618632 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.3.6
|
||||
VERSION=4.3.7
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -306,6 +306,13 @@ create_index_statement::announce_migration(service::storage_proxy& proxy, bool i
|
||||
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
|
||||
}
|
||||
}
|
||||
auto index_table_name = secondary_index::index_table_name(accepted_name);
|
||||
if (db.has_schema(keyspace(), index_table_name)) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::event::schema_change>>(
|
||||
exceptions::invalid_request_exception(format("Index {} cannot be created, because table {} already exists",
|
||||
accepted_name, index_table_name))
|
||||
);
|
||||
}
|
||||
++_cql_stats->secondary_index_creates;
|
||||
schema_builder builder{schema};
|
||||
builder.with_index(index);
|
||||
|
||||
@@ -1751,7 +1751,11 @@ sstring database::get_available_index_name(const sstring &ks_name, const sstring
|
||||
auto base_name = index_metadata::get_default_index_name(cf_name, index_name_root);
|
||||
sstring accepted_name = base_name;
|
||||
int i = 0;
|
||||
while (existing_names.contains(accepted_name)) {
|
||||
auto name_accepted = [&] {
|
||||
auto index_table_name = secondary_index::index_table_name(accepted_name);
|
||||
return !has_schema(ks_name, index_table_name) && !existing_names.contains(accepted_name);
|
||||
};
|
||||
while (!name_accepted()) {
|
||||
accepted_name = base_name + "_" + std::to_string(++i);
|
||||
}
|
||||
return accepted_name;
|
||||
|
||||
@@ -43,9 +43,13 @@
|
||||
|
||||
namespace db {
|
||||
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name) {
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
|
||||
auto& ks = _db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {
|
||||
auto& cf_name = pair.first;
|
||||
if (filter && std::find(filter->begin(), filter->end(), cf_name) == filter->end()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto& cf = _db.local().find_column_family(pair.second);
|
||||
return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) {
|
||||
if (exists) {
|
||||
@@ -111,7 +115,7 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag)] {
|
||||
return check_snapshot_not_exist(ks_name, tag).then([this, ks_name, tables = std::move(tables), tag] {
|
||||
return check_snapshot_not_exist(ks_name, tag, tables).then([this, ks_name, tables, tag] {
|
||||
return do_with(std::vector<sstring>(std::move(tables)),[this, ks_name, tag](const std::vector<sstring>& tables) {
|
||||
return do_for_each(tables, [ks_name, tag, this] (const sstring& table_name) {
|
||||
if (table_name.find(".") != sstring::npos) {
|
||||
|
||||
@@ -40,6 +40,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "database.hh"
|
||||
@@ -112,7 +114,7 @@ private:
|
||||
seastar::rwlock _lock;
|
||||
seastar::gate _ops;
|
||||
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name);
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
|
||||
|
||||
template <typename Func>
|
||||
std::result_of_t<Func()> run_snapshot_modify_operation(Func&&);
|
||||
|
||||
13
dist/common/scripts/scylla_cpuscaling_setup
vendored
13
dist/common/scripts/scylla_cpuscaling_setup
vendored
@@ -22,6 +22,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import shlex
|
||||
import distro
|
||||
from scylla_util import *
|
||||
@@ -33,12 +34,22 @@ if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
print('Requires root permission.')
|
||||
sys.exit(1)
|
||||
if not os.path.exists('/sys/devices/system/cpu/cpufreq/policy0/scaling_governor'):
|
||||
parser = argparse.ArgumentParser(description='CPU scaling setup script for Scylla.')
|
||||
parser.add_argument('--force', dest='force', action='store_true',
|
||||
help='force running setup even CPU scaling unsupported')
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.force and not os.path.exists('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'):
|
||||
print('This computer doesn\'t supported CPU scaling configuration.')
|
||||
sys.exit(0)
|
||||
if is_debian_variant():
|
||||
if not shutil.which('cpufreq-set'):
|
||||
apt_install('cpufrequtils')
|
||||
try:
|
||||
ondemand = systemd_unit('ondemand')
|
||||
ondemand.disable()
|
||||
except:
|
||||
pass
|
||||
cfg = sysconfig_parser('/etc/default/cpufrequtils')
|
||||
cfg.set('GOVERNOR', 'performance')
|
||||
cfg.commit()
|
||||
|
||||
6
dist/common/scripts/scylla_ntp_setup
vendored
6
dist/common/scripts/scylla_ntp_setup
vendored
@@ -91,12 +91,12 @@ if __name__ == '__main__':
|
||||
with open('/etc/ntp.conf') as f:
|
||||
conf = f.read()
|
||||
if args.subdomain:
|
||||
conf2 = re.sub(r'server\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', 'server \\1.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
|
||||
conf2 = re.sub(r'(server|pool)\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', '\\1 \\2.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
|
||||
with open('/etc/ntp.conf', 'w') as f:
|
||||
f.write(conf2)
|
||||
conf = conf2
|
||||
match = re.search(r'^server\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
|
||||
server = match.group(1)
|
||||
match = re.search(r'^(server|pool)\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
|
||||
server = match.group(2)
|
||||
ntpd = systemd_unit('ntpd.service')
|
||||
ntpd.stop()
|
||||
# ignore error, ntpd may able to adjust clock later
|
||||
|
||||
4
dist/redhat/scylla.spec
vendored
4
dist/redhat/scylla.spec
vendored
@@ -7,7 +7,7 @@ Group: Applications/Databases
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Source0: %{reloc_pkg}
|
||||
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
|
||||
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-python3 = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
|
||||
Obsoletes: scylla-server < 1.1
|
||||
|
||||
%global _debugsource_template %{nil}
|
||||
@@ -52,7 +52,7 @@ Summary: The Scylla database server
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Requires: kernel >= 3.10.0-514
|
||||
Requires: %{product}-conf %{product}-python3
|
||||
Requires: %{product}-conf = %{version} %{product}-python3 = %{version}
|
||||
Conflicts: abrt
|
||||
AutoReqProv: no
|
||||
|
||||
|
||||
@@ -1151,6 +1151,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
_range_override.reset();
|
||||
_slice_override.reset();
|
||||
|
||||
_drop_partition_start = false;
|
||||
_drop_static_row = false;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
@@ -1236,13 +1239,25 @@ void evictable_reader::maybe_validate_partition_start(const flat_mutation_reader
|
||||
// is in range.
|
||||
if (_last_pkey) {
|
||||
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
|
||||
if (_drop_partition_start) { // should be the same partition
|
||||
if (_drop_partition_start) { // we expect to continue from the same partition
|
||||
// We cannot assume the partition we stopped the read at is still alive
|
||||
// when we recreate the reader. It might have been compacted away in the
|
||||
// meanwhile, so allow for a larger partition too.
|
||||
require(
|
||||
cmp_res == 0,
|
||||
"{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
|
||||
cmp_res <= 0,
|
||||
"{}(): validation failed, expected partition with key larger or equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
|
||||
__FUNCTION__,
|
||||
*_last_pkey,
|
||||
ps.key());
|
||||
// Reset drop flags and next pos if we are not continuing from the same partition
|
||||
if (cmp_res < 0) {
|
||||
// Close previous partition, we are not going to continue it.
|
||||
push_mutation_fragment(*_schema, _permit, partition_end{});
|
||||
_drop_partition_start = false;
|
||||
_drop_static_row = false;
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
_trim_range_tombstones = false;
|
||||
}
|
||||
} else { // should be a larger partition
|
||||
require(
|
||||
cmp_res < 0,
|
||||
@@ -1293,9 +1308,14 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
// Unlike partition-start above, a partition is not guaranteed to have a
|
||||
// static row fragment. So reset the flag regardless of whether we could
|
||||
// drop one or not.
|
||||
// We are guaranteed to get here only right after dropping a partition-start,
|
||||
// so if we are not seeing a static row here, the partition doesn't have one.
|
||||
if (_drop_static_row) {
|
||||
_drop_static_row = false;
|
||||
return mf.is_static_row();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -438,7 +438,6 @@ protected:
|
||||
mutation_source_metadata _ms_metadata = {};
|
||||
garbage_collected_sstable_writer::data _gc_sstable_writer_data;
|
||||
compaction_sstable_replacer_fn _replacer;
|
||||
std::optional<compaction_weight_registration> _weight_registration;
|
||||
utils::UUID _run_identifier;
|
||||
::io_priority_class _io_priority;
|
||||
// optional clone of sstable set to be used for expiration purposes, so it will be set if expiration is enabled.
|
||||
@@ -457,7 +456,6 @@ protected:
|
||||
, _sstable_level(descriptor.level)
|
||||
, _gc_sstable_writer_data(*this)
|
||||
, _replacer(std::move(descriptor.replacer))
|
||||
, _weight_registration(std::move(descriptor.weight_registration))
|
||||
, _run_identifier(descriptor.run_identifier)
|
||||
, _io_priority(descriptor.io_priority)
|
||||
, _sstable_set(std::move(descriptor.all_sstables_snapshot))
|
||||
@@ -929,9 +927,6 @@ public:
|
||||
}
|
||||
|
||||
virtual void on_end_of_compaction() override {
|
||||
if (_weight_registration) {
|
||||
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
|
||||
}
|
||||
replace_remaining_exhausted_sstables();
|
||||
}
|
||||
private:
|
||||
|
||||
@@ -134,8 +134,6 @@ struct compaction_descriptor {
|
||||
uint64_t max_sstable_bytes;
|
||||
// Run identifier of output sstables.
|
||||
utils::UUID run_identifier;
|
||||
// Holds ownership of a weight assigned to this compaction iff it's a regular one.
|
||||
std::optional<compaction_weight_registration> weight_registration;
|
||||
// Calls compaction manager's task for this compaction to release reference to exhausted sstables.
|
||||
std::function<void(const std::vector<shared_sstable>& exhausted_sstables)> release_exhausted;
|
||||
// The options passed down to the compaction code.
|
||||
|
||||
@@ -436,7 +436,7 @@ void compaction_manager::reevaluate_postponed_compactions() {
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_column_family(column_family* cf) {
|
||||
_postponed.push_back(cf);
|
||||
_postponed.insert(cf);
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
|
||||
@@ -576,7 +576,7 @@ void compaction_manager::submit(column_family* cf) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
|
||||
descriptor.weight_registration = compaction_weight_registration(this, weight);
|
||||
auto weight_r = compaction_weight_registration(this, weight);
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
compacting->release_compacting(exhausted_sstables);
|
||||
};
|
||||
@@ -586,7 +586,7 @@ void compaction_manager::submit(column_family* cf) {
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
task->compaction_running = true;
|
||||
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
task->compaction_running = false;
|
||||
|
||||
@@ -799,7 +799,7 @@ future<> compaction_manager::remove(column_family* cf) {
|
||||
task->stopping = true;
|
||||
}
|
||||
}
|
||||
_postponed.erase(boost::remove(_postponed, cf), _postponed.end());
|
||||
_postponed.erase(cf);
|
||||
|
||||
// Wait for the termination of an ongoing compaction on cf, if any.
|
||||
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
|
||||
@@ -835,11 +835,6 @@ void compaction_manager::stop_compaction(sstring type) {
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) {
|
||||
weight_registration.deregister();
|
||||
reevaluate_postponed_compactions();
|
||||
}
|
||||
|
||||
void compaction_manager::propagate_replacement(column_family* cf,
|
||||
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
||||
for (auto& info : _compactions) {
|
||||
|
||||
@@ -99,7 +99,7 @@ private:
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
condition_variable _postponed_reevaluation;
|
||||
// column families that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::vector<column_family*> _postponed;
|
||||
std::unordered_set<column_family*> _postponed;
|
||||
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
|
||||
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
|
||||
std::unordered_set<int> _weight_tracker;
|
||||
@@ -256,11 +256,6 @@ public:
|
||||
// Stops ongoing compaction of a given type.
|
||||
void stop_compaction(sstring type);
|
||||
|
||||
// Called by compaction procedure to release the weight lock assigned to it, such that
|
||||
// another compaction waiting on same weight can start as soon as possible. That's usually
|
||||
// called before compaction seals sstable and such and after all compaction work is done.
|
||||
void on_compaction_complete(compaction_weight_registration& weight_registration);
|
||||
|
||||
double backlog() {
|
||||
return _backlog_manager.backlog();
|
||||
}
|
||||
|
||||
@@ -256,6 +256,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
bucket.resize(std::min(max_sstables, bucket.size()));
|
||||
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
|
||||
desc.options = compaction_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3267,39 +3267,30 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
reader_permit permit,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> second_buffer,
|
||||
size_t max_buffer_size) {
|
||||
std::list<std::deque<mutation_fragment>> buffers,
|
||||
position_in_partition_view first_buf_last_fragment_position,
|
||||
size_t max_buffer_size,
|
||||
bool detach_buffer = true) {
|
||||
class factory {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::optional<std::deque<mutation_fragment>> _first_buffer;
|
||||
std::optional<std::deque<mutation_fragment>> _second_buffer;
|
||||
std::list<std::deque<mutation_fragment>> _buffers;
|
||||
size_t _max_buffer_size;
|
||||
|
||||
private:
|
||||
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
|
||||
if (!o) {
|
||||
return {};
|
||||
}
|
||||
return copy_fragments(*_schema, _permit, *o);
|
||||
}
|
||||
|
||||
public:
|
||||
factory(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
|
||||
factory(schema_ptr schema, reader_permit permit, std::list<std::deque<mutation_fragment>> buffers, size_t max_buffer_size)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _first_buffer(std::move(first_buffer))
|
||||
, _second_buffer(std::move(second_buffer))
|
||||
, _buffers(std::move(buffers))
|
||||
, _max_buffer_size(max_buffer_size) {
|
||||
}
|
||||
|
||||
factory(const factory& o)
|
||||
: _schema(o._schema)
|
||||
, _permit(o._permit)
|
||||
, _first_buffer(copy_buffer(o._first_buffer))
|
||||
, _second_buffer(copy_buffer(o._second_buffer)) {
|
||||
, _permit(o._permit) {
|
||||
for (const auto& buf : o._buffers) {
|
||||
_buffers.emplace_back(copy_fragments(*_schema, _permit, buf));
|
||||
}
|
||||
}
|
||||
factory(factory&& o) = default;
|
||||
|
||||
@@ -3313,14 +3304,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
BOOST_REQUIRE(s == _schema);
|
||||
if (_first_buffer) {
|
||||
auto buf = *std::exchange(_first_buffer, {});
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
}
|
||||
if (_second_buffer) {
|
||||
auto buf = *std::exchange(_second_buffer, {});
|
||||
if (!_buffers.empty()) {
|
||||
auto buf = std::move(_buffers.front());
|
||||
_buffers.pop_front();
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
@@ -3328,9 +3314,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
return make_empty_flat_reader(_schema, std::move(permit));
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source(factory(schema, permit, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
|
||||
auto ms = mutation_source(factory(schema, permit, std::move(buffers), max_buffer_size));
|
||||
|
||||
auto [rd, handle] = make_manually_paused_evictable_reader(
|
||||
auto rd = make_auto_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
schema,
|
||||
permit,
|
||||
@@ -3346,18 +3332,42 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
|
||||
const auto eq_cmp = position_in_partition::equal_compare(*schema);
|
||||
BOOST_REQUIRE(rd.is_buffer_full());
|
||||
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
|
||||
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), first_buf_last_fragment_position));
|
||||
BOOST_REQUIRE(!rd.is_end_of_stream());
|
||||
|
||||
rd.detach_buffer();
|
||||
|
||||
handle.pause();
|
||||
if (detach_buffer) {
|
||||
rd.detach_buffer();
|
||||
}
|
||||
|
||||
while(permit.semaphore().try_evict_one_inactive_read());
|
||||
|
||||
return std::move(rd);
|
||||
}
|
||||
|
||||
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> last_buffer,
|
||||
size_t max_buffer_size,
|
||||
bool detach_buffer = true) {
|
||||
std::list<std::deque<mutation_fragment>> list;
|
||||
list.emplace_back(std::move(first_buffer));
|
||||
list.emplace_back(std::move(last_buffer));
|
||||
return create_evictable_reader_and_evict_after_first_buffer(
|
||||
std::move(schema),
|
||||
std::move(permit),
|
||||
prange,
|
||||
slice,
|
||||
std::move(list),
|
||||
last_fragment_position,
|
||||
max_buffer_size,
|
||||
detach_buffer);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
|
||||
@@ -3659,7 +3669,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∈ pkrange",
|
||||
partition_error_prefix,
|
||||
"",
|
||||
s.schema(),
|
||||
permit,
|
||||
prange,
|
||||
@@ -3747,3 +3757,208 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
make_second_buffer(pkeys[3]),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(1, 0, get_name());
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
return pk1.less_compare(*s.schema(), pk2);
|
||||
});
|
||||
const auto& pkey1 = pkeys[0];
|
||||
const auto& pkey2 = pkeys[1];
|
||||
const int second_buffer_ck = 10;
|
||||
|
||||
struct buffer {
|
||||
simple_schema& s;
|
||||
reader_permit permit;
|
||||
std::deque<mutation_fragment> frags;
|
||||
std::vector<mutation> muts;
|
||||
size_t size = 0;
|
||||
std::optional<position_in_partition_view> last_pos;
|
||||
|
||||
buffer(simple_schema& s_, reader_permit permit_, dht::decorated_key key)
|
||||
: s(s_), permit(std::move(permit_)) {
|
||||
add_partition(key);
|
||||
}
|
||||
size_t add_partition(dht::decorated_key key) {
|
||||
size += frags.emplace_back(*s.schema(), permit, partition_start{key, {}}).memory_usage();
|
||||
muts.emplace_back(s.schema(), key);
|
||||
return size;
|
||||
}
|
||||
size_t add_mutation_fragment(mutation_fragment&& mf, bool only_to_frags = false) {
|
||||
if (!only_to_frags) {
|
||||
muts.back().apply(mf);
|
||||
}
|
||||
size += frags.emplace_back(*s.schema(), permit, std::move(mf)).memory_usage();
|
||||
return size;
|
||||
}
|
||||
size_t add_static_row(std::optional<mutation_fragment> sr = {}) {
|
||||
auto srow = sr ? std::move(*sr) : s.make_static_row("s");
|
||||
return add_mutation_fragment(std::move(srow));
|
||||
}
|
||||
size_t add_clustering_row(int i, bool only_to_frags = false) {
|
||||
return add_mutation_fragment(mutation_fragment(*s.schema(), permit, s.make_row(s.make_ckey(i), "v")), only_to_frags);
|
||||
}
|
||||
size_t add_clustering_rows(int start, int end) {
|
||||
for (int i = start; i < end; ++i) {
|
||||
add_clustering_row(i);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
size_t add_partition_end() {
|
||||
size += frags.emplace_back(*s.schema(), permit, partition_end{}).memory_usage();
|
||||
return size;
|
||||
}
|
||||
void save_position() { last_pos = frags.back().position(); }
|
||||
void find_position(size_t buf_size) {
|
||||
size_t s = 0;
|
||||
for (const auto& frag : frags) {
|
||||
s += frag.memory_usage();
|
||||
if (s >= buf_size) {
|
||||
last_pos = frag.position();
|
||||
break;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE(last_pos);
|
||||
}
|
||||
};
|
||||
|
||||
auto make_reader = [&] (const buffer& first_buffer, const buffer& second_buffer, const buffer* const third_buffer, size_t max_buffer_size) {
|
||||
std::list<std::deque<mutation_fragment>> buffers;
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, first_buffer.frags));
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, second_buffer.frags));
|
||||
if (third_buffer) {
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, third_buffer->frags));
|
||||
}
|
||||
return create_evictable_reader_and_evict_after_first_buffer(
|
||||
s.schema(),
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
std::move(buffers),
|
||||
*first_buffer.last_pos,
|
||||
max_buffer_size,
|
||||
false);
|
||||
};
|
||||
|
||||
testlog.info("Same partition, with static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
first_buffer.add_static_row();
|
||||
auto srow = mutation_fragment(*s.schema(), permit, first_buffer.frags.back());
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck);
|
||||
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_static_row(std::move(srow));
|
||||
second_buffer.add_clustering_row(second_buffer_ck);
|
||||
second_buffer.add_clustering_row(second_buffer_ck + 1);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Same partition, no static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck);
|
||||
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_clustering_row(second_buffer_ck);
|
||||
second_buffer.add_clustering_row(second_buffer_ck + 1);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Same partition as expected, no static row, next partition has static row (#8923)");
|
||||
{
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_clustering_rows(second_buffer_ck, second_buffer_ck + second_buffer_ck / 2);
|
||||
// We want to end the buffer on the partition-start below, but since a
|
||||
// partition start will be dropped from it, we have to use the size
|
||||
// without it.
|
||||
const auto buf_size = second_buffer.add_partition_end();
|
||||
second_buffer.add_partition(pkey2);
|
||||
second_buffer.add_static_row();
|
||||
auto srow = mutation_fragment(*s.schema(), permit, second_buffer.frags.back());
|
||||
second_buffer.add_clustering_rows(0, 2);
|
||||
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
for (int i = 0; first_buffer.add_clustering_row(i) < buf_size; ++i);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_mutation_fragment(mutation_fragment(*s.schema(), permit, second_buffer.frags[1]));
|
||||
|
||||
buffer third_buffer(s, permit, pkey2);
|
||||
third_buffer.add_static_row(std::move(srow));
|
||||
third_buffer.add_clustering_rows(0, 2);
|
||||
third_buffer.add_partition_end();
|
||||
|
||||
first_buffer.find_position(buf_size);
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces(second_buffer.muts[1] + third_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Next partition, with no static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
|
||||
|
||||
buffer second_buffer(s, permit, pkey2);
|
||||
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0])
|
||||
.produces(second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Next partition, with static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
|
||||
|
||||
buffer second_buffer(s, permit, pkey2);
|
||||
second_buffer.add_static_row();
|
||||
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0])
|
||||
.produces(second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6685,3 +6685,135 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
BOOST_REQUIRE(smp::count == 1);
|
||||
|
||||
auto make_schema = [] (auto idx) {
|
||||
auto builder = schema_builder("tests", std::to_string(idx))
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map <sstring, sstring> opts = {
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
|
||||
{time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"},
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
builder.set_gc_grace_seconds(0);
|
||||
return builder.build();
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
cm->enable();
|
||||
auto stop_cm = defer([&cm] {
|
||||
cm->stop().get();
|
||||
});
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
auto make_expiring_cell = [&] (schema_ptr s, std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto make_table_with_single_fully_expired_sstable = [&] (auto idx) {
|
||||
auto s = make_schema(idx);
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
cfg.datadir = tmp.path().string() + "/" + std::to_string(idx);
|
||||
touch_directory(cfg.datadir).get();
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
|
||||
auto sst_gen = [&env, s, dir = cfg.datadir, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, dir, (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto muts = { make_expiring_cell(s, std::chrono::hours(1)) };
|
||||
auto sst = make_sstable_containing(sst_gen, muts);
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
return cf;
|
||||
};
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> tables;
|
||||
auto stop_tables = defer([&tables] {
|
||||
for (auto& t : tables) {
|
||||
t->stop().get();
|
||||
}
|
||||
});
|
||||
for (auto i = 0; i < 100; i++) {
|
||||
tables.push_back(make_table_with_single_fully_expired_sstable(i));
|
||||
}
|
||||
|
||||
// Make sure everything is expired
|
||||
forward_jump_clocks(std::chrono::hours(100));
|
||||
|
||||
for (auto& t : tables) {
|
||||
BOOST_REQUIRE(t->sstables_count() == 1);
|
||||
t->trigger_compaction();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cm->get_stats().pending_tasks >= 1 || cm->get_stats().active_tasks >= 1);
|
||||
|
||||
size_t max_ongoing_compaction = 0;
|
||||
|
||||
// wait for submitted jobs to finish.
|
||||
auto end = [cm, &tables] {
|
||||
return cm->get_stats().pending_tasks == 0 && cm->get_stats().active_tasks == 0
|
||||
&& boost::algorithm::all_of(tables, [] (auto& t) { return t->sstables_count() == 0; });
|
||||
};
|
||||
while (!end()) {
|
||||
if (!cm->get_stats().pending_tasks && !cm->get_stats().active_tasks) {
|
||||
for (auto& t : tables) {
|
||||
if (t->sstables_count()) {
|
||||
t->trigger_compaction();
|
||||
}
|
||||
}
|
||||
}
|
||||
max_ongoing_compaction = std::max(cm->get_stats().active_tasks, max_ongoing_compaction);
|
||||
later().get();
|
||||
}
|
||||
BOOST_REQUIRE(cm->get_stats().errors == 0);
|
||||
BOOST_REQUIRE(max_ongoing_compaction == 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(stcs_reshape_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
std::vector<shared_sstable> sstables;
|
||||
sstables.reserve(s->max_compaction_threshold());
|
||||
for (auto gen = 1; gen <= s->max_compaction_threshold(); gen++) {
|
||||
auto sst = env.make_sstable(s, "", gen, la, big);
|
||||
sstables::test(sst).set_data_file_size(1);
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered,
|
||||
s->compaction_strategy_options());
|
||||
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::strict).sstables.size());
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size());
|
||||
});
|
||||
}
|
||||
|
||||
186
test/cql-pytest/test_secondary_index.py
Normal file
186
test/cql-pytest/test_secondary_index.py
Normal file
@@ -0,0 +1,186 @@
|
||||
# Copyright 2020 ScyllaDB
|
||||
#
|
||||
# This file is part of Scylla.
|
||||
#
|
||||
# Scylla is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Scylla is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Tests for secondary indexes
|
||||
|
||||
import time
|
||||
import pytest
|
||||
from cassandra.protocol import SyntaxException, AlreadyExists, InvalidRequest, ConfigurationException, ReadFailure
|
||||
|
||||
from util import new_test_table, unique_name
|
||||
|
||||
# A reproducer for issue #7443: Normally, when the entire table is SELECTed,
|
||||
# the partitions are returned sorted by the partitions' token. When there
|
||||
# is filtering, this order is not expected to change. Furthermore, when this
|
||||
# filtering happens to use a secondary index, again the order is not expected
|
||||
# to change.
|
||||
def test_partition_order_with_si(cql, test_keyspace):
|
||||
schema = 'pk int, x int, PRIMARY KEY ((pk))'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
# Insert 20 partitions, all of them with x=1 so that filtering by x=1
|
||||
# will yield the same 20 partitions:
|
||||
N = 20
|
||||
stmt = cql.prepare('INSERT INTO '+table+' (pk, x) VALUES (?, ?)')
|
||||
for i in range(N):
|
||||
cql.execute(stmt, [i, 1])
|
||||
# SELECT all the rows, and verify they are returned in increasing
|
||||
# partition token order (note that the token is a *signed* number):
|
||||
tokens = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table)]
|
||||
assert len(tokens) == N
|
||||
assert sorted(tokens) == tokens
|
||||
# Now select all the partitions with filtering of x=1. Since all
|
||||
# rows have x=1, this shouldn't change the list of matching rows, and
|
||||
# also shouldn't check their order:
|
||||
tokens1 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1 ALLOW FILTERING')]
|
||||
assert tokens1 == tokens
|
||||
# Now add an index on x, which allows implementing the "x=1"
|
||||
# restriction differently. With the index, "ALLOW FILTERING" is
|
||||
# no longer necessary. But the order of the results should
|
||||
# still not change. Issue #7443 is about the order changing here.
|
||||
cql.execute('CREATE INDEX ON '+table+'(x)')
|
||||
# "CREATE INDEX" does not wait until the index is actually available
|
||||
# for use. Reads immediately after the CREATE INDEX may fail or return
|
||||
# partial results. So let's retry until reads resume working:
|
||||
for i in range(100):
|
||||
try:
|
||||
tokens2 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1')]
|
||||
if len(tokens2) == N:
|
||||
break
|
||||
except ReadFailure:
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
assert tokens2 == tokens
|
||||
|
||||
# Test which ensures that indexes for a query are picked by the order in which
|
||||
# they appear in restrictions. That way, users can deterministically pick
|
||||
# which indexes are used for which queries.
|
||||
# Note that the order of picking indexing is not set in stone and may be
|
||||
# subject to change - in which case this test case should be amended as well.
|
||||
# The order tested in this case was decided as a good first step in issue
|
||||
# #7969, but it's possible that it will eventually be implemented another
|
||||
# way, e.g. dynamically based on estimated query selectivity statistics.
|
||||
# Ref: #7969
|
||||
@pytest.mark.xfail(reason="The order of picking indexes is currently arbitrary. Issue #7969")
|
||||
def test_order_of_indexes(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int primary key, v1 int, v2 int, v3 int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE INDEX my_v3_idx ON {table}(v3)")
|
||||
cql.execute(f"CREATE INDEX my_v1_idx ON {table}(v1)")
|
||||
cql.execute(f"CREATE INDEX my_v2_idx ON {table}((p),v2)")
|
||||
# All queries below should use the first index they find in the list
|
||||
# of restrictions. Tracing information will be consulted to ensure
|
||||
# it's true. Currently some of the cases below succeed, because the
|
||||
# order is not well defined (and may, for instance, change upon
|
||||
# server restart), but some of them fail. Once a proper ordering
|
||||
# is implemented, all cases below should succeed.
|
||||
def index_used(query, index_name):
|
||||
assert any([index_name in event.description for event in cql.execute(query, trace=True).get_query_trace().events])
|
||||
index_used(f"SELECT * FROM {table} WHERE v3 = 1", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 allow filtering", "my_v1_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
|
||||
# Local indexes are still skipped if they cannot be used
|
||||
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v1 = 2 allow filtering", "my_v1_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v3 = 2 and v1 = 3 allow filtering", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v1 = 1 and v2 = 2 and v3 = 3 allow filtering", "my_v1_idx")
|
||||
# Local indexes are still preferred over global ones, if they can be used
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 and v2 = 2 allow filtering", "my_v2_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v2 = 1 and v1 = 2 allow filtering", "my_v2_idx")
|
||||
|
||||
# Indexes can be created without an explicit name, in which case a default name is chosen.
|
||||
# However, due to #8620 it was possible to break the index creation mechanism by creating
|
||||
# a properly named regular table, which conflicts with the generated index name.
|
||||
def test_create_unnamed_index_when_its_name_is_taken(cql, test_keyspace):
|
||||
schema = 'p int primary key, v int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
try:
|
||||
cql.execute(f"CREATE TABLE {table}_v_idx_index (i_do_not_exist_in_the_base_table int primary key)")
|
||||
# Creating an index should succeed, even though its default name is taken
|
||||
# by the table above
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE {table}_v_idx_index")
|
||||
|
||||
# Indexed created with an explicit name cause a materialized view to be created,
|
||||
# and this view has a specific name - <index-name>_index. If there happens to be
|
||||
# a regular table (or another view) named just like that, index creation should fail.
|
||||
def test_create_named_index_when_its_name_is_taken(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int primary key, v int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
index_name = unique_name()
|
||||
try:
|
||||
cql.execute(f"CREATE TABLE {test_keyspace}.{index_name}_index (i_do_not_exist_in_the_base_table int primary key)")
|
||||
# Creating an index should fail, because it's impossible to create
|
||||
# its underlying materialized view, because its name is taken by a regular table
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f"CREATE INDEX {index_name} ON {table}(v)")
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE {test_keyspace}.{index_name}_index")
|
||||
|
||||
# Tests for CREATE INDEX IF NOT EXISTS
|
||||
# Reproduces issue #8717.
|
||||
def test_create_index_if_not_exists(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p int primary key, v int') as table:
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
# Can't create the same index again without "IF NOT EXISTS", but can
|
||||
# do it with "IF NOT EXISTS":
|
||||
with pytest.raises(InvalidRequest, match="duplicate"):
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS ON {table}(v)")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.{table.split('.')[1]}_v_idx")
|
||||
|
||||
# Now test the same thing for named indexes. This is what broke in #8717:
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS xyz ON {table}(v)")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
|
||||
|
||||
# Exactly the same with non-lower case name.
|
||||
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
|
||||
cql.execute(f'CREATE INDEX IF NOT EXISTS "CamelCase" ON {table}(v)')
|
||||
cql.execute(f'DROP INDEX {test_keyspace}."CamelCase"')
|
||||
|
||||
# Trying to create an index for an attribute that's already indexed,
|
||||
# but with a different name. The "IF NOT EXISTS" appears to succeed
|
||||
# in this case, but does not actually create the new index name -
|
||||
# only the old one remains.
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest, match="duplicate"):
|
||||
cql.execute(f"CREATE INDEX abc ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS abc ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.abc")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
|
||||
|
||||
# Test that the paging state works properly for indexes on tables
|
||||
# with descending clustering order. There was a problem with indexes
|
||||
# created on clustering keys with DESC clustering order - they are represented
|
||||
# as "reverse" types internally and Scylla assertions failed that the base type
|
||||
# is different from the underlying view type, even though, from the perspective
|
||||
# of deserialization, they're equal. Issue #8666
|
||||
def test_paging_with_desc_clustering_order(cql, test_keyspace):
|
||||
schema = 'p int, c int, primary key (p,c)'
|
||||
extra = 'with clustering order by (c desc)'
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"CREATE INDEX ON {table}(c)")
|
||||
for i in range(3):
|
||||
cql.execute(f"INSERT INTO {table}(p,c) VALUES ({i}, 42)")
|
||||
stmt = SimpleStatement(f"SELECT * FROM {table} WHERE c = 42", fetch_size=1)
|
||||
assert len([row for row in cql.execute(stmt)]) == 3
|
||||
Reference in New Issue
Block a user