Compare commits
36 Commits
scylla-4.3
...
next-4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67a62b3e8d | ||
|
|
92effccf52 | ||
|
|
7357529834 | ||
|
|
3dd7874f08 | ||
|
|
1bf218c29e | ||
|
|
89c47a44dc | ||
|
|
dd93f297c1 | ||
|
|
b0b2606a8c | ||
|
|
6de458e915 | ||
|
|
b6aa5ab2d4 | ||
|
|
08cbd180ff | ||
|
|
693c7b300a | ||
|
|
2e7f618632 | ||
|
|
5cd698c89d | ||
|
|
482fa83a0e | ||
|
|
cabb7fbd3b | ||
|
|
4d1c83a4e8 | ||
|
|
7da9884d09 | ||
|
|
b4242f01a8 | ||
|
|
27cd231f61 | ||
|
|
030419d5ed | ||
|
|
0d1362fc31 | ||
|
|
0888aa1717 | ||
|
|
690a96ff54 | ||
|
|
38cdf30a35 | ||
|
|
61b71e4da0 | ||
|
|
e4b42e622e | ||
|
|
e625144d6e | ||
|
|
76ec7513f1 | ||
|
|
f36f7035c8 | ||
|
|
709e934164 | ||
|
|
13428d56f6 | ||
|
|
2c1f5e5225 | ||
|
|
9ae3edb102 | ||
|
|
11851fa4d9 | ||
|
|
1a56e41f44 |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.3.4
|
||||
VERSION=4.3.7
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -123,7 +123,7 @@ struct rjson_engaged_ptr_comp {
|
||||
// as internally they're stored in an array, and the order of elements is
|
||||
// not important in set equality. See issue #5021
|
||||
static bool check_EQ_for_sets(const rjson::value& set1, const rjson::value& set2) {
|
||||
if (set1.Size() != set2.Size()) {
|
||||
if (!set1.IsArray() || !set2.IsArray() || set1.Size() != set2.Size()) {
|
||||
return false;
|
||||
}
|
||||
std::set<const rjson::value*, rjson_engaged_ptr_comp> set1_raw;
|
||||
@@ -137,25 +137,70 @@ static bool check_EQ_for_sets(const rjson::value& set1, const rjson::value& set2
|
||||
}
|
||||
return true;
|
||||
}
|
||||
// Moreover, the JSON being compared can be a nested document with outer
|
||||
// layers of lists and maps and some inner set - and we need to get to that
|
||||
// inner set to compare it correctly with check_EQ_for_sets() (issue #8514).
|
||||
static bool check_EQ(const rjson::value* v1, const rjson::value& v2);
|
||||
static bool check_EQ_for_lists(const rjson::value& list1, const rjson::value& list2) {
|
||||
if (!list1.IsArray() || !list2.IsArray() || list1.Size() != list2.Size()) {
|
||||
return false;
|
||||
}
|
||||
auto it1 = list1.Begin();
|
||||
auto it2 = list2.Begin();
|
||||
while (it1 != list1.End()) {
|
||||
// Note: Alternator limits an item's depth (rjson::parse() limits
|
||||
// it to around 37 levels), so this recursion is safe.
|
||||
if (!check_EQ(&*it1, *it2)) {
|
||||
return false;
|
||||
}
|
||||
++it1;
|
||||
++it2;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
static bool check_EQ_for_maps(const rjson::value& list1, const rjson::value& list2) {
|
||||
if (!list1.IsObject() || !list2.IsObject() || list1.MemberCount() != list2.MemberCount()) {
|
||||
return false;
|
||||
}
|
||||
for (auto it1 = list1.MemberBegin(); it1 != list1.MemberEnd(); ++it1) {
|
||||
auto it2 = list2.FindMember(it1->name);
|
||||
if (it2 == list2.MemberEnd() || !check_EQ(&it1->value, it2->value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if two JSON-encoded values match with the EQ relation
|
||||
static bool check_EQ(const rjson::value* v1, const rjson::value& v2) {
|
||||
if (!v1) {
|
||||
return false;
|
||||
}
|
||||
if (v1->IsObject() && v1->MemberCount() == 1 && v2.IsObject() && v2.MemberCount() == 1) {
|
||||
if (v1 && v1->IsObject() && v1->MemberCount() == 1 && v2.IsObject() && v2.MemberCount() == 1) {
|
||||
auto it1 = v1->MemberBegin();
|
||||
auto it2 = v2.MemberBegin();
|
||||
if ((it1->name == "SS" && it2->name == "SS") || (it1->name == "NS" && it2->name == "NS") || (it1->name == "BS" && it2->name == "BS")) {
|
||||
return check_EQ_for_sets(it1->value, it2->value);
|
||||
if (it1->name != it2->name) {
|
||||
return false;
|
||||
}
|
||||
if (it1->name == "SS" || it1->name == "NS" || it1->name == "BS") {
|
||||
return check_EQ_for_sets(it1->value, it2->value);
|
||||
} else if(it1->name == "L") {
|
||||
return check_EQ_for_lists(it1->value, it2->value);
|
||||
} else if(it1->name == "M") {
|
||||
return check_EQ_for_maps(it1->value, it2->value);
|
||||
} else {
|
||||
// Other, non-nested types (number, string, etc.) can be compared
|
||||
// literally, comparing their JSON representation.
|
||||
return it1->value == it2->value;
|
||||
}
|
||||
} else {
|
||||
// If v1 and/or v2 are missing (IsNull()) the result should be false.
|
||||
// In the unlikely case that the object is malformed (issue #8070),
|
||||
// let's also return false.
|
||||
return false;
|
||||
}
|
||||
return *v1 == v2;
|
||||
}
|
||||
|
||||
// Check if two JSON-encoded values match with the NE relation
|
||||
static bool check_NE(const rjson::value* v1, const rjson::value& v2) {
|
||||
return !v1 || *v1 != v2; // null is unequal to anything.
|
||||
return !check_EQ(v1, v2);
|
||||
}
|
||||
|
||||
// Check if two JSON-encoded values match with the BEGINS_WITH relation
|
||||
@@ -298,6 +343,8 @@ static bool check_NOT_NULL(const rjson::value* val) {
|
||||
|
||||
// Only types S, N or B (string, number or bytes) may be compared by the
|
||||
// various comparion operators - lt, le, gt, ge, and between.
|
||||
// Note that in particular, if the value is missing (v->IsNull()), this
|
||||
// check returns false.
|
||||
static bool check_comparable_type(const rjson::value& v) {
|
||||
if (!v.IsObject() || v.MemberCount() != 1) {
|
||||
return false;
|
||||
|
||||
@@ -331,15 +331,15 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
});
|
||||
|
||||
cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, req->param["name"], 0, [](column_family& cf) {
|
||||
return map_reduce_cf(ctx, req->param["name"], uint64_t{0}, [](column_family& cf) {
|
||||
return cf.active_memtable().partition_count();
|
||||
}, std::plus<int>());
|
||||
}, std::plus<>());
|
||||
});
|
||||
|
||||
cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, 0, [](column_family& cf) {
|
||||
return map_reduce_cf(ctx, uint64_t{0}, [](column_family& cf) {
|
||||
return cf.active_memtable().partition_count();
|
||||
}, std::plus<int>());
|
||||
}, std::plus<>());
|
||||
});
|
||||
|
||||
cf::get_memtable_on_heap_size.set(r, [] (const_req req) {
|
||||
|
||||
16
cdc/log.cc
16
cdc/log.cc
@@ -980,9 +980,9 @@ static bytes get_bytes(const atomic_cell_view& acv) {
|
||||
return acv.value().linearize();
|
||||
}
|
||||
|
||||
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::vector<bytes>& buf) {
|
||||
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::forward_list<bytes>& buf) {
|
||||
return acv.value().is_fragmented()
|
||||
? bytes_view{buf.emplace_back(acv.value().linearize())}
|
||||
? bytes_view{buf.emplace_front(acv.value().linearize())}
|
||||
: acv.value().first_fragment();
|
||||
}
|
||||
|
||||
@@ -1137,9 +1137,9 @@ struct process_row_visitor {
|
||||
|
||||
struct udt_visitor : public collection_visitor {
|
||||
std::vector<bytes_opt> _added_cells;
|
||||
std::vector<bytes>& _buf;
|
||||
std::forward_list<bytes>& _buf;
|
||||
|
||||
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::vector<bytes>& buf)
|
||||
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::forward_list<bytes>& buf)
|
||||
: collection_visitor(ttl_column), _added_cells(num_keys), _buf(buf) {}
|
||||
|
||||
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
||||
@@ -1148,7 +1148,7 @@ struct process_row_visitor {
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<bytes> buf;
|
||||
std::forward_list<bytes> buf;
|
||||
udt_visitor v(_ttl_column, type.size(), buf);
|
||||
|
||||
visit_collection(v);
|
||||
@@ -1167,9 +1167,9 @@ struct process_row_visitor {
|
||||
|
||||
struct map_or_list_visitor : public collection_visitor {
|
||||
std::vector<std::pair<bytes_view, bytes_view>> _added_cells;
|
||||
std::vector<bytes>& _buf;
|
||||
std::forward_list<bytes>& _buf;
|
||||
|
||||
map_or_list_visitor(ttl_opt& ttl_column, std::vector<bytes>& buf)
|
||||
map_or_list_visitor(ttl_opt& ttl_column, std::forward_list<bytes>& buf)
|
||||
: collection_visitor(ttl_column), _buf(buf) {}
|
||||
|
||||
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
|
||||
@@ -1178,7 +1178,7 @@ struct process_row_visitor {
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<bytes> buf;
|
||||
std::forward_list<bytes> buf;
|
||||
map_or_list_visitor v(_ttl_column, buf);
|
||||
|
||||
visit_collection(v);
|
||||
|
||||
@@ -306,6 +306,13 @@ create_index_statement::announce_migration(service::storage_proxy& proxy, bool i
|
||||
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
|
||||
}
|
||||
}
|
||||
auto index_table_name = secondary_index::index_table_name(accepted_name);
|
||||
if (db.has_schema(keyspace(), index_table_name)) {
|
||||
return make_exception_future<::shared_ptr<cql_transport::event::schema_change>>(
|
||||
exceptions::invalid_request_exception(format("Index {} cannot be created, because table {} already exists",
|
||||
accepted_name, index_table_name))
|
||||
);
|
||||
}
|
||||
++_cql_stats->secondary_index_creates;
|
||||
schema_builder builder{schema};
|
||||
builder.with_index(index);
|
||||
|
||||
@@ -1120,7 +1120,11 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
|
||||
if (single_ck_restrictions) {
|
||||
auto prefix_restrictions = single_ck_restrictions->get_longest_prefix_restrictions();
|
||||
auto clustering_restrictions_from_base = ::make_shared<restrictions::single_column_clustering_key_restrictions>(_view_schema, *prefix_restrictions);
|
||||
const auto indexed_column = _view_schema->get_column_definition(to_bytes(_index.target_column()));
|
||||
for (auto restriction_it : clustering_restrictions_from_base->restrictions()) {
|
||||
if (restriction_it.first == indexed_column) {
|
||||
continue; // In the index table, the indexed column is the partition (not clustering) key.
|
||||
}
|
||||
clustering_restrictions->merge_with(restriction_it.second);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1751,7 +1751,11 @@ sstring database::get_available_index_name(const sstring &ks_name, const sstring
|
||||
auto base_name = index_metadata::get_default_index_name(cf_name, index_name_root);
|
||||
sstring accepted_name = base_name;
|
||||
int i = 0;
|
||||
while (existing_names.contains(accepted_name)) {
|
||||
auto name_accepted = [&] {
|
||||
auto index_table_name = secondary_index::index_table_name(accepted_name);
|
||||
return !has_schema(ks_name, index_table_name) && !existing_names.contains(accepted_name);
|
||||
};
|
||||
while (!name_accepted()) {
|
||||
accepted_name = base_name + "_" + std::to_string(++i);
|
||||
}
|
||||
return accepted_name;
|
||||
|
||||
@@ -43,9 +43,13 @@
|
||||
|
||||
namespace db {
|
||||
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name) {
|
||||
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
|
||||
auto& ks = _db.local().find_keyspace(ks_name);
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
|
||||
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {
|
||||
auto& cf_name = pair.first;
|
||||
if (filter && std::find(filter->begin(), filter->end(), cf_name) == filter->end()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
auto& cf = _db.local().find_column_family(pair.second);
|
||||
return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) {
|
||||
if (exists) {
|
||||
@@ -111,7 +115,7 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
|
||||
}
|
||||
|
||||
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag)] {
|
||||
return check_snapshot_not_exist(ks_name, tag).then([this, ks_name, tables = std::move(tables), tag] {
|
||||
return check_snapshot_not_exist(ks_name, tag, tables).then([this, ks_name, tables, tag] {
|
||||
return do_with(std::vector<sstring>(std::move(tables)),[this, ks_name, tag](const std::vector<sstring>& tables) {
|
||||
return do_for_each(tables, [ks_name, tag, this] (const sstring& table_name) {
|
||||
if (table_name.find(".") != sstring::npos) {
|
||||
|
||||
@@ -40,6 +40,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "database.hh"
|
||||
@@ -112,7 +114,7 @@ private:
|
||||
seastar::rwlock _lock;
|
||||
seastar::gate _ops;
|
||||
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name);
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
|
||||
|
||||
template <typename Func>
|
||||
std::result_of_t<Func()> run_snapshot_modify_operation(Func&&);
|
||||
|
||||
@@ -58,7 +58,8 @@ public:
|
||||
|
||||
template<typename T, typename... Args>
|
||||
void feed_hash(const T& value, Args&&... args) {
|
||||
std::visit([&] (auto& hasher) noexcept -> void {
|
||||
// FIXME uncomment the noexcept marking once clang bug 50994 is fixed or gcc compilation is turned on
|
||||
std::visit([&] (auto& hasher) /* noexcept(noexcept(::feed_hash(hasher, value, args...))) */ -> void {
|
||||
::feed_hash(hasher, value, std::forward<Args>(args)...);
|
||||
}, _impl);
|
||||
};
|
||||
|
||||
3
dist/common/scripts/scylla_coredump_setup
vendored
3
dist/common/scripts/scylla_coredump_setup
vendored
@@ -87,7 +87,8 @@ WantedBy=multi-user.target
|
||||
run('sysctl -p /etc/sysctl.d/99-scylla-coredump.conf')
|
||||
|
||||
fp = tempfile.NamedTemporaryFile()
|
||||
fp.write(b'kill -SEGV $$')
|
||||
fp.write(b'ulimit -c unlimited\n')
|
||||
fp.write(b'kill -SEGV $$\n')
|
||||
fp.flush()
|
||||
p = subprocess.Popen(['/bin/bash', fp.name], stdout=subprocess.PIPE)
|
||||
pid = p.pid
|
||||
|
||||
13
dist/common/scripts/scylla_cpuscaling_setup
vendored
13
dist/common/scripts/scylla_cpuscaling_setup
vendored
@@ -22,6 +22,7 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import shlex
|
||||
import distro
|
||||
from scylla_util import *
|
||||
@@ -33,12 +34,22 @@ if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
print('Requires root permission.')
|
||||
sys.exit(1)
|
||||
if not os.path.exists('/sys/devices/system/cpu/cpufreq/policy0/scaling_governor'):
|
||||
parser = argparse.ArgumentParser(description='CPU scaling setup script for Scylla.')
|
||||
parser.add_argument('--force', dest='force', action='store_true',
|
||||
help='force running setup even CPU scaling unsupported')
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.force and not os.path.exists('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'):
|
||||
print('This computer doesn\'t supported CPU scaling configuration.')
|
||||
sys.exit(0)
|
||||
if is_debian_variant():
|
||||
if not shutil.which('cpufreq-set'):
|
||||
apt_install('cpufrequtils')
|
||||
try:
|
||||
ondemand = systemd_unit('ondemand')
|
||||
ondemand.disable()
|
||||
except:
|
||||
pass
|
||||
cfg = sysconfig_parser('/etc/default/cpufrequtils')
|
||||
cfg.set('GOVERNOR', 'performance')
|
||||
cfg.commit()
|
||||
|
||||
6
dist/common/scripts/scylla_ntp_setup
vendored
6
dist/common/scripts/scylla_ntp_setup
vendored
@@ -91,12 +91,12 @@ if __name__ == '__main__':
|
||||
with open('/etc/ntp.conf') as f:
|
||||
conf = f.read()
|
||||
if args.subdomain:
|
||||
conf2 = re.sub(r'server\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', 'server \\1.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
|
||||
conf2 = re.sub(r'(server|pool)\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', '\\1 \\2.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
|
||||
with open('/etc/ntp.conf', 'w') as f:
|
||||
f.write(conf2)
|
||||
conf = conf2
|
||||
match = re.search(r'^server\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
|
||||
server = match.group(1)
|
||||
match = re.search(r'^(server|pool)\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
|
||||
server = match.group(2)
|
||||
ntpd = systemd_unit('ntpd.service')
|
||||
ntpd.stop()
|
||||
# ignore error, ntpd may able to adjust clock later
|
||||
|
||||
14
dist/common/scripts/scylla_prepare
vendored
14
dist/common/scripts/scylla_prepare
vendored
@@ -27,7 +27,6 @@ import platform
|
||||
import distro
|
||||
|
||||
from scylla_util import *
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
def get_mode_cpuset(nic, mode):
|
||||
mode_cpu_mask = out('/opt/scylladb/scripts/perftune.py --tune net --nic {} --mode {} --get-cpu-mask-quiet'.format(nic, mode))
|
||||
@@ -98,16 +97,6 @@ def verify_cpu():
|
||||
print('\nIf this is a virtual machine, please update its CPU feature configuration or upgrade to a newer hypervisor.')
|
||||
sys.exit(1)
|
||||
|
||||
def configure_aio_slots():
|
||||
with open('/proc/sys/fs/aio-max-nr') as f:
|
||||
aio_max_nr = int(f.read())
|
||||
# (10000 + 1024 + 2) * ncpus for scylla,
|
||||
# 65536 for other apps
|
||||
required_aio_slots = cpu_count() * 11026 + 65536
|
||||
if aio_max_nr < required_aio_slots:
|
||||
with open('/proc/sys/fs/aio-max-nr', 'w') as f:
|
||||
f.write(str(required_aio_slots))
|
||||
|
||||
if __name__ == '__main__':
|
||||
verify_cpu()
|
||||
|
||||
@@ -125,8 +114,6 @@ if __name__ == '__main__':
|
||||
os.remove('/etc/scylla/ami_disabled')
|
||||
sys.exit(1)
|
||||
|
||||
configure_aio_slots()
|
||||
|
||||
if mode == 'virtio':
|
||||
tap = cfg.get('TAP')
|
||||
user = cfg.get('USER')
|
||||
@@ -156,4 +143,3 @@ if __name__ == '__main__':
|
||||
print(f'Exception occurred while creating perftune.yaml: {e}')
|
||||
print('To fix the error, please re-run scylla_setup.')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
1
dist/common/scripts/scylla_util.py
vendored
1
dist/common/scripts/scylla_util.py
vendored
@@ -34,6 +34,7 @@ from pathlib import Path
|
||||
|
||||
import distro
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
def scriptsdir_p():
|
||||
p = Path(sys.argv[0]).resolve()
|
||||
|
||||
2
dist/common/sysctl.d/99-scylla-aio.conf
vendored
Normal file
2
dist/common/sysctl.d/99-scylla-aio.conf
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
# Raise max AIO events
|
||||
fs.aio-max-nr = 5578536
|
||||
2
dist/common/systemd/scylla-fstrim.timer
vendored
2
dist/common/systemd/scylla-fstrim.timer
vendored
@@ -1,7 +1,5 @@
|
||||
[Unit]
|
||||
Description=Run Scylla fstrim daily
|
||||
After=scylla-server.service
|
||||
BindsTo=scylla-server.service
|
||||
|
||||
[Timer]
|
||||
OnCalendar=Sat *-*-* 00:00:00
|
||||
|
||||
@@ -11,6 +11,7 @@ else
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-sched.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-vm.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-inotify.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-aio.conf || :
|
||||
fi
|
||||
|
||||
#DEBHELPER#
|
||||
|
||||
2
dist/debian/debian/scylla-server.postrm
vendored
2
dist/debian/debian/scylla-server.postrm
vendored
@@ -12,8 +12,6 @@ case "$1" in
|
||||
if [ "$1" = "purge" ]; then
|
||||
rm -rf /etc/systemd/system/scylla-server.service.d/
|
||||
fi
|
||||
rm -f /etc/systemd/system/var-lib-systemd-coredump.mount
|
||||
rm -f /etc/systemd/system/var-lib-scylla.mount
|
||||
;;
|
||||
esac
|
||||
|
||||
|
||||
9
dist/redhat/scylla.spec
vendored
9
dist/redhat/scylla.spec
vendored
@@ -7,7 +7,7 @@ Group: Applications/Databases
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Source0: %{reloc_pkg}
|
||||
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
|
||||
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-python3 = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
|
||||
Obsoletes: scylla-server < 1.1
|
||||
|
||||
%global _debugsource_template %{nil}
|
||||
@@ -52,7 +52,7 @@ Summary: The Scylla database server
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
Requires: kernel >= 3.10.0-514
|
||||
Requires: %{product}-conf %{product}-python3
|
||||
Requires: %{product}-conf = %{version} %{product}-python3 = %{version}
|
||||
Conflicts: abrt
|
||||
AutoReqProv: no
|
||||
|
||||
@@ -137,9 +137,9 @@ rm -rf $RPM_BUILD_ROOT
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/capabilities.conf
|
||||
%ghost /etc/systemd/system/scylla-server.service.d/mounts.conf
|
||||
/etc/systemd/system/scylla-server.service.d/dependencies.conf
|
||||
%ghost /etc/systemd/system/var-lib-systemd-coredump.mount
|
||||
%ghost %config /etc/systemd/system/var-lib-systemd-coredump.mount
|
||||
%ghost /etc/systemd/system/scylla-cpupower.service
|
||||
%ghost /etc/systemd/system/var-lib-scylla.mount
|
||||
%ghost %config /etc/systemd/system/var-lib-scylla.mount
|
||||
|
||||
%package conf
|
||||
Group: Applications/Databases
|
||||
@@ -207,6 +207,7 @@ if Scylla is the main application on your server and you wish to optimize its la
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-vm.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-inotify.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-aio.conf >/dev/null 2>&1 || :
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
|
||||
@@ -62,7 +62,7 @@ struct appending_hash;
|
||||
template<typename H, typename T, typename... Args>
|
||||
requires Hasher<H>
|
||||
inline
|
||||
void feed_hash(H& h, const T& value, Args&&... args) noexcept {
|
||||
void feed_hash(H& h, const T& value, Args&&... args) noexcept(noexcept(std::declval<appending_hash<T>>()(h, value, args...))) {
|
||||
appending_hash<T>()(h, value, std::forward<Args>(args)...);
|
||||
};
|
||||
|
||||
|
||||
@@ -1151,6 +1151,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
_range_override.reset();
|
||||
_slice_override.reset();
|
||||
|
||||
_drop_partition_start = false;
|
||||
_drop_static_row = false;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
@@ -1236,13 +1239,25 @@ void evictable_reader::maybe_validate_partition_start(const flat_mutation_reader
|
||||
// is in range.
|
||||
if (_last_pkey) {
|
||||
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
|
||||
if (_drop_partition_start) { // should be the same partition
|
||||
if (_drop_partition_start) { // we expect to continue from the same partition
|
||||
// We cannot assume the partition we stopped the read at is still alive
|
||||
// when we recreate the reader. It might have been compacted away in the
|
||||
// meanwhile, so allow for a larger partition too.
|
||||
require(
|
||||
cmp_res == 0,
|
||||
"{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
|
||||
cmp_res <= 0,
|
||||
"{}(): validation failed, expected partition with key larger or equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
|
||||
__FUNCTION__,
|
||||
*_last_pkey,
|
||||
ps.key());
|
||||
// Reset drop flags and next pos if we are not continuing from the same partition
|
||||
if (cmp_res < 0) {
|
||||
// Close previous partition, we are not going to continue it.
|
||||
push_mutation_fragment(*_schema, _permit, partition_end{});
|
||||
_drop_partition_start = false;
|
||||
_drop_static_row = false;
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
_trim_range_tombstones = false;
|
||||
}
|
||||
} else { // should be a larger partition
|
||||
require(
|
||||
cmp_res < 0,
|
||||
@@ -1293,9 +1308,14 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
// Unlike partition-start above, a partition is not guaranteed to have a
|
||||
// static row fragment. So reset the flag regardless of whether we could
|
||||
// drop one or not.
|
||||
// We are guaranteed to get here only right after dropping a partition-start,
|
||||
// so if we are not seeing a static row here, the partition doesn't have one.
|
||||
if (_drop_static_row) {
|
||||
_drop_static_row = false;
|
||||
return mf.is_static_row();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -309,7 +309,7 @@ float node_ops_metrics::repair_finished_percentage() {
|
||||
tracker::tracker(size_t nr_shards, size_t max_repair_memory)
|
||||
: _shutdown(false)
|
||||
, _repairs(nr_shards) {
|
||||
auto nr = std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range()));
|
||||
auto nr = std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4));
|
||||
rlogger.info("Setting max_repair_memory={}, max_repair_memory_per_range={}, max_repair_ranges_in_parallel={}",
|
||||
max_repair_memory, max_repair_memory_per_range(), nr);
|
||||
_range_parallelism_semaphores.reserve(nr_shards);
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: b70b444924...5ef45afa4d
@@ -438,7 +438,6 @@ protected:
|
||||
mutation_source_metadata _ms_metadata = {};
|
||||
garbage_collected_sstable_writer::data _gc_sstable_writer_data;
|
||||
compaction_sstable_replacer_fn _replacer;
|
||||
std::optional<compaction_weight_registration> _weight_registration;
|
||||
utils::UUID _run_identifier;
|
||||
::io_priority_class _io_priority;
|
||||
// optional clone of sstable set to be used for expiration purposes, so it will be set if expiration is enabled.
|
||||
@@ -457,7 +456,6 @@ protected:
|
||||
, _sstable_level(descriptor.level)
|
||||
, _gc_sstable_writer_data(*this)
|
||||
, _replacer(std::move(descriptor.replacer))
|
||||
, _weight_registration(std::move(descriptor.weight_registration))
|
||||
, _run_identifier(descriptor.run_identifier)
|
||||
, _io_priority(descriptor.io_priority)
|
||||
, _sstable_set(std::move(descriptor.all_sstables_snapshot))
|
||||
@@ -929,9 +927,6 @@ public:
|
||||
}
|
||||
|
||||
virtual void on_end_of_compaction() override {
|
||||
if (_weight_registration) {
|
||||
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
|
||||
}
|
||||
replace_remaining_exhausted_sstables();
|
||||
}
|
||||
private:
|
||||
|
||||
@@ -134,8 +134,6 @@ struct compaction_descriptor {
|
||||
uint64_t max_sstable_bytes;
|
||||
// Run identifier of output sstables.
|
||||
utils::UUID run_identifier;
|
||||
// Holds ownership of a weight assigned to this compaction iff it's a regular one.
|
||||
std::optional<compaction_weight_registration> weight_registration;
|
||||
// Calls compaction manager's task for this compaction to release reference to exhausted sstables.
|
||||
std::function<void(const std::vector<shared_sstable>& exhausted_sstables)> release_exhausted;
|
||||
// The options passed down to the compaction code.
|
||||
|
||||
@@ -436,7 +436,7 @@ void compaction_manager::reevaluate_postponed_compactions() {
|
||||
}
|
||||
|
||||
void compaction_manager::postpone_compaction_for_column_family(column_family* cf) {
|
||||
_postponed.push_back(cf);
|
||||
_postponed.insert(cf);
|
||||
}
|
||||
|
||||
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
|
||||
@@ -576,7 +576,7 @@ void compaction_manager::submit(column_family* cf) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
|
||||
descriptor.weight_registration = compaction_weight_registration(this, weight);
|
||||
auto weight_r = compaction_weight_registration(this, weight);
|
||||
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
|
||||
compacting->release_compacting(exhausted_sstables);
|
||||
};
|
||||
@@ -586,7 +586,7 @@ void compaction_manager::submit(column_family* cf) {
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
task->compaction_running = true;
|
||||
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
task->compaction_running = false;
|
||||
|
||||
@@ -799,7 +799,7 @@ future<> compaction_manager::remove(column_family* cf) {
|
||||
task->stopping = true;
|
||||
}
|
||||
}
|
||||
_postponed.erase(boost::remove(_postponed, cf), _postponed.end());
|
||||
_postponed.erase(cf);
|
||||
|
||||
// Wait for the termination of an ongoing compaction on cf, if any.
|
||||
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
|
||||
@@ -835,11 +835,6 @@ void compaction_manager::stop_compaction(sstring type) {
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) {
|
||||
weight_registration.deregister();
|
||||
reevaluate_postponed_compactions();
|
||||
}
|
||||
|
||||
void compaction_manager::propagate_replacement(column_family* cf,
|
||||
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
||||
for (auto& info : _compactions) {
|
||||
|
||||
@@ -99,7 +99,7 @@ private:
|
||||
future<> _waiting_reevalution = make_ready_future<>();
|
||||
condition_variable _postponed_reevaluation;
|
||||
// column families that wait for compaction but had its submission postponed due to ongoing compaction.
|
||||
std::vector<column_family*> _postponed;
|
||||
std::unordered_set<column_family*> _postponed;
|
||||
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
|
||||
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
|
||||
std::unordered_set<int> _weight_tracker;
|
||||
@@ -256,11 +256,6 @@ public:
|
||||
// Stops ongoing compaction of a given type.
|
||||
void stop_compaction(sstring type);
|
||||
|
||||
// Called by compaction procedure to release the weight lock assigned to it, such that
|
||||
// another compaction waiting on same weight can start as soon as possible. That's usually
|
||||
// called before compaction seals sstable and such and after all compaction work is done.
|
||||
void on_compaction_complete(compaction_weight_registration& weight_registration);
|
||||
|
||||
double backlog() {
|
||||
return _backlog_manager.backlog();
|
||||
}
|
||||
|
||||
@@ -367,6 +367,7 @@ class index_reader {
|
||||
const io_priority_class& _pc;
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
shared_index_lists _index_lists;
|
||||
future<> _background_closes = make_ready_future<>();
|
||||
|
||||
struct reader {
|
||||
index_consumer _consumer;
|
||||
@@ -472,6 +473,16 @@ private:
|
||||
};
|
||||
|
||||
return _index_lists.get_or_load(summary_idx, loader).then([this, &bound, summary_idx] (shared_index_lists::list_ptr ref) {
|
||||
// to make sure list is not closed when another bound is still using it, index list will only be closed when there's only one owner holding it
|
||||
if (bound.current_list && bound.current_list.use_count() == 1) {
|
||||
// a new background close will only be initiated when previous ones terminate, so as to limit the concurrency.
|
||||
_background_closes = _background_closes.then_wrapped([current_list = std::move(bound.current_list)] (future<>&& f) mutable {
|
||||
f.ignore_ready_future();
|
||||
return do_with(std::move(current_list), [] (shared_index_lists::list_ptr& current_list) mutable {
|
||||
return close_index_list(current_list);
|
||||
});
|
||||
});
|
||||
}
|
||||
bound.current_list = std::move(ref);
|
||||
bound.current_summary_idx = summary_idx;
|
||||
bound.current_index_idx = 0;
|
||||
@@ -841,6 +852,8 @@ public:
|
||||
return close_index_list(_upper_bound->current_list);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([this] () mutable {
|
||||
return std::move(_background_closes);
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -147,7 +147,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
unsigned overlapping_sstables = 0;
|
||||
auto prev_last = dht::ring_position::min();
|
||||
for (auto& sst : sstables) {
|
||||
if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) {
|
||||
if (dht::ring_position(sst->get_first_decorated_key()).tri_compare(*schema, prev_last) <= 0) {
|
||||
overlapping_sstables++;
|
||||
}
|
||||
prev_last = dht::ring_position(sst->get_last_decorated_key());
|
||||
@@ -189,10 +189,8 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
|
||||
};
|
||||
|
||||
if (level_info[0].size() > offstrategy_threshold) {
|
||||
level_info[0].resize(std::min(level_info[0].size(), max_sstables));
|
||||
compaction_descriptor desc(std::move(level_info[0]), std::optional<sstables::sstable_set>(), iop);
|
||||
desc.options = compaction_options::make_reshape();
|
||||
return desc;
|
||||
size_tiered_compaction_strategy stcs(_stcs_options);
|
||||
return stcs.get_reshaping_job(std::move(level_info[0]), schema, iop, mode);
|
||||
}
|
||||
|
||||
for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) {
|
||||
|
||||
@@ -256,6 +256,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
bucket.resize(std::min(max_sstables, bucket.size()));
|
||||
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
|
||||
desc.options = compaction_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -154,6 +154,27 @@ def test_update_condition_eq_unequal(test_table_s):
|
||||
ConditionExpression='q = :oldval',
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
|
||||
|
||||
# In test_update_condition_eq_unequal() above we saw that a non-existent
|
||||
# attribute is not "=" to a value. Here we check what happens when two
|
||||
# non-existent attributes are checked for equality. It turns out, they should
|
||||
# *not* be considered equal. In short, an unset attribute is never equal to
|
||||
# anything - not even to another unset attribute.
|
||||
# Reproduces issue #8511.
|
||||
def test_update_condition_eq_two_unset(test_table_s):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q = z',
|
||||
ExpressionAttributeValues={':val1': 2})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q = z',
|
||||
ExpressionAttributeValues={':val1': 3})
|
||||
|
||||
# Check that set equality is checked correctly. Unlike string equality (for
|
||||
# example), it cannot be done with just naive string comparison of the JSON
|
||||
# representation, and we need to allow for any order. (see issue #5021)
|
||||
@@ -175,6 +196,39 @@ def test_update_condition_eq_set(test_table_s):
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': set(['chinchilla', 'cat', 'dog', 'mouse'])})
|
||||
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
|
||||
# The above test (test_update_condition_eq_set()) checked equality of simple
|
||||
# set attributes. But an attributes can contain a nested document, where the
|
||||
# set sits in a deep level (the set itself is a leaf in this heirarchy because
|
||||
# it can only contain numbers, strings or bytes). We need to correctly support
|
||||
# equality check in that case too.
|
||||
# Reproduces issue #8514.
|
||||
@pytest.mark.skip(reason="test needs nested update not yet in branch 4.3")
|
||||
def test_update_condition_eq_nested_set(test_table_s):
|
||||
p = random_string()
|
||||
# Because boto3 sorts the set values we give it, in order to generate a
|
||||
# set with a different order, we need to build it incrementally.
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': {'b': 'c', 'd': ['e', 'f', set(['g', 'h'])], 'i': set(['j', 'k'])}, 'Action': 'PUT'}})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='ADD a.d[2] :val1, a.i :val2',
|
||||
ExpressionAttributeValues={':val1': set(['l', 'm']), ':val2': set(['n', 'o'])})
|
||||
# Sanity check - the attribute contains the set we think it does
|
||||
expected = {'b': 'c', 'd': ['e', 'f', set(['g', 'h', 'l', 'm'])], 'i': set(['j', 'k', 'n', 'o'])}
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == expected
|
||||
# Now finally check that condition expression check knows the equality too.
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET b = :val1',
|
||||
ConditionExpression='a = :oldval',
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': expected})
|
||||
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# Check that equality can also fail, if the inner set differs
|
||||
wrong = {'b': 'c', 'd': ['e', 'f', set(['g', 'h', 'l', 'bad'])], 'i': set(['j', 'k', 'n', 'o'])}
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET b = :val1',
|
||||
ConditionExpression='a = :oldval',
|
||||
ExpressionAttributeValues={':val1': 4, ':oldval': wrong})
|
||||
|
||||
# Test for ConditionExpression with operator "<>" (non-equality),
|
||||
def test_update_condition_ne(test_table_s):
|
||||
p = random_string()
|
||||
@@ -215,6 +269,54 @@ def test_update_condition_ne(test_table_s):
|
||||
ExpressionAttributeValues={':newval': 3, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['c'] == 3
|
||||
|
||||
# Check that set inequality is checked correctly. This reproduces the same
|
||||
# bug #5021 that we reproduced above in test_update_condition_eq_set(), just
|
||||
# that here we check the inequality operator instead of equality.
|
||||
# Reproduces issue #8513.
|
||||
def test_update_condition_ne_set(test_table_s):
|
||||
p = random_string()
|
||||
# Because boto3 sorts the set values we give it, in order to generate a
|
||||
# set with a different order, we need to build it incrementally.
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': set(['dog', 'chinchilla']), 'Action': 'PUT'}})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='ADD a :val1',
|
||||
ExpressionAttributeValues={':val1': set(['cat', 'mouse'])})
|
||||
# Sanity check - the attribute contains the set we think it does
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == set(['chinchilla', 'cat', 'dog', 'mouse'])
|
||||
# Now check that condition expression check knows there is no inequality
|
||||
# here.
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET b = :val1',
|
||||
ConditionExpression='a <> :oldval',
|
||||
ExpressionAttributeValues={':val1': 2, ':oldval': set(['chinchilla', 'cat', 'dog', 'mouse'])})
|
||||
# As a sanity check, also check something which should be unequal:
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET b = :val1',
|
||||
ConditionExpression='a <> :oldval',
|
||||
ExpressionAttributeValues={':val1': 3, ':oldval': set(['chinchilla', 'cat', 'dog', 'horse'])})
|
||||
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
|
||||
# In test_update_condition_ne() above we saw that a non-existent attribute is
|
||||
# "not equal" to any value. Here we check what happens when two non-existent
|
||||
# attributes are checked for non-equality. It turns out, they are also
|
||||
# considered "not equal". In short, an unset attribute is always "not equal" to
|
||||
# anything - even to another unset attribute.
|
||||
# Reproduces issue #8511.
|
||||
def test_update_condition_ne_two_unset(test_table_s):
|
||||
p = random_string()
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q <> z',
|
||||
ExpressionAttributeValues={':val1': 2})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == 2
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q <> z',
|
||||
ExpressionAttributeValues={':val1': 3})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == 3
|
||||
|
||||
# Test for ConditionExpression with operator "<"
|
||||
def test_update_condition_lt(test_table_s):
|
||||
p = random_string()
|
||||
@@ -316,6 +418,45 @@ def test_update_condition_lt(test_table_s):
|
||||
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
|
||||
|
||||
# In test_update_condition_lt() above we saw that a non-existent attribute is
|
||||
# not "<" any value. Here we check what happens when two non-existent
|
||||
# attributes are compared with "<". It turns out that the result of such
|
||||
# comparison is also false.
|
||||
# The same is true for other order operators - any order comparison involving
|
||||
# one unset attribute should be false - even if the second operand is an
|
||||
# unset attribute as well. Note that the <> operator is different - it is
|
||||
# always results in true if one of the operands is an unset attribute (see
|
||||
# test_update_condition_ne_two_unset() above).
|
||||
# This test is related to issue #8511 (although it passed even before fixing
|
||||
# that issue).
|
||||
def test_update_condition_comparison_two_unset(test_table_s):
|
||||
p = random_string()
|
||||
ops = ['<', '<=', '>', '>=']
|
||||
for op in ops:
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q ' + op + ' z',
|
||||
ExpressionAttributeValues={':val1': 2})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q between z and x',
|
||||
ExpressionAttributeValues={':val1': 2})
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'}})
|
||||
for op in ops:
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q ' + op + ' z',
|
||||
ExpressionAttributeValues={':val1': 3})
|
||||
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
UpdateExpression='SET a = :val1',
|
||||
ConditionExpression='q between z and x',
|
||||
ExpressionAttributeValues={':val1': 2})
|
||||
|
||||
# Test for ConditionExpression with operator "<="
|
||||
def test_update_condition_le(test_table_s):
|
||||
p = random_string()
|
||||
|
||||
@@ -578,11 +578,14 @@ SEASTAR_TEST_CASE(test_allocation_failure){
|
||||
|
||||
// Use us loads of memory so we can OOM at the appropriate place
|
||||
try {
|
||||
assert(fragmented_temporary_buffer::default_fragment_size < size);
|
||||
for (;;) {
|
||||
junk->emplace_back(new char[size]);
|
||||
junk->emplace_back(new char[fragmented_temporary_buffer::default_fragment_size]);
|
||||
}
|
||||
} catch (std::bad_alloc&) {
|
||||
}
|
||||
auto last = junk->end();
|
||||
junk->erase(--last);
|
||||
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, db::commitlog::force_sync::no, [size](db::commitlog::output& dst) {
|
||||
dst.fill(char(1), size);
|
||||
}).then_wrapped([junk, size](future<db::rp_handle> f) {
|
||||
|
||||
@@ -3267,39 +3267,30 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
reader_permit permit,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> second_buffer,
|
||||
size_t max_buffer_size) {
|
||||
std::list<std::deque<mutation_fragment>> buffers,
|
||||
position_in_partition_view first_buf_last_fragment_position,
|
||||
size_t max_buffer_size,
|
||||
bool detach_buffer = true) {
|
||||
class factory {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
std::optional<std::deque<mutation_fragment>> _first_buffer;
|
||||
std::optional<std::deque<mutation_fragment>> _second_buffer;
|
||||
std::list<std::deque<mutation_fragment>> _buffers;
|
||||
size_t _max_buffer_size;
|
||||
|
||||
private:
|
||||
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
|
||||
if (!o) {
|
||||
return {};
|
||||
}
|
||||
return copy_fragments(*_schema, _permit, *o);
|
||||
}
|
||||
|
||||
public:
|
||||
factory(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
|
||||
factory(schema_ptr schema, reader_permit permit, std::list<std::deque<mutation_fragment>> buffers, size_t max_buffer_size)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _first_buffer(std::move(first_buffer))
|
||||
, _second_buffer(std::move(second_buffer))
|
||||
, _buffers(std::move(buffers))
|
||||
, _max_buffer_size(max_buffer_size) {
|
||||
}
|
||||
|
||||
factory(const factory& o)
|
||||
: _schema(o._schema)
|
||||
, _permit(o._permit)
|
||||
, _first_buffer(copy_buffer(o._first_buffer))
|
||||
, _second_buffer(copy_buffer(o._second_buffer)) {
|
||||
, _permit(o._permit) {
|
||||
for (const auto& buf : o._buffers) {
|
||||
_buffers.emplace_back(copy_fragments(*_schema, _permit, buf));
|
||||
}
|
||||
}
|
||||
factory(factory&& o) = default;
|
||||
|
||||
@@ -3313,14 +3304,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
BOOST_REQUIRE(s == _schema);
|
||||
if (_first_buffer) {
|
||||
auto buf = *std::exchange(_first_buffer, {});
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
}
|
||||
if (_second_buffer) {
|
||||
auto buf = *std::exchange(_second_buffer, {});
|
||||
if (!_buffers.empty()) {
|
||||
auto buf = std::move(_buffers.front());
|
||||
_buffers.pop_front();
|
||||
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
|
||||
rd.set_max_buffer_size(_max_buffer_size);
|
||||
return rd;
|
||||
@@ -3328,9 +3314,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
return make_empty_flat_reader(_schema, std::move(permit));
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source(factory(schema, permit, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
|
||||
auto ms = mutation_source(factory(schema, permit, std::move(buffers), max_buffer_size));
|
||||
|
||||
auto [rd, handle] = make_manually_paused_evictable_reader(
|
||||
auto rd = make_auto_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
schema,
|
||||
permit,
|
||||
@@ -3346,18 +3332,42 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
|
||||
const auto eq_cmp = position_in_partition::equal_compare(*schema);
|
||||
BOOST_REQUIRE(rd.is_buffer_full());
|
||||
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
|
||||
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), first_buf_last_fragment_position));
|
||||
BOOST_REQUIRE(!rd.is_end_of_stream());
|
||||
|
||||
rd.detach_buffer();
|
||||
|
||||
handle.pause();
|
||||
if (detach_buffer) {
|
||||
rd.detach_buffer();
|
||||
}
|
||||
|
||||
while(permit.semaphore().try_evict_one_inactive_read());
|
||||
|
||||
return std::move(rd);
|
||||
}
|
||||
|
||||
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& prange,
|
||||
const query::partition_slice& slice,
|
||||
std::deque<mutation_fragment> first_buffer,
|
||||
position_in_partition_view last_fragment_position,
|
||||
std::deque<mutation_fragment> last_buffer,
|
||||
size_t max_buffer_size,
|
||||
bool detach_buffer = true) {
|
||||
std::list<std::deque<mutation_fragment>> list;
|
||||
list.emplace_back(std::move(first_buffer));
|
||||
list.emplace_back(std::move(last_buffer));
|
||||
return create_evictable_reader_and_evict_after_first_buffer(
|
||||
std::move(schema),
|
||||
std::move(permit),
|
||||
prange,
|
||||
slice,
|
||||
std::move(list),
|
||||
last_fragment_position,
|
||||
max_buffer_size,
|
||||
detach_buffer);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
|
||||
@@ -3659,7 +3669,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
|
||||
check_evictable_reader_validation_is_triggered(
|
||||
"pkey > _last_pkey; pkey ∈ pkrange",
|
||||
partition_error_prefix,
|
||||
"",
|
||||
s.schema(),
|
||||
permit,
|
||||
prange,
|
||||
@@ -3747,3 +3757,208 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
|
||||
make_second_buffer(pkeys[3]),
|
||||
max_buffer_size);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
|
||||
reader_concurrency_semaphore semaphore(1, 0, get_name());
|
||||
simple_schema s;
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
auto pkeys = s.make_pkeys(2);
|
||||
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
|
||||
return pk1.less_compare(*s.schema(), pk2);
|
||||
});
|
||||
const auto& pkey1 = pkeys[0];
|
||||
const auto& pkey2 = pkeys[1];
|
||||
const int second_buffer_ck = 10;
|
||||
|
||||
struct buffer {
|
||||
simple_schema& s;
|
||||
reader_permit permit;
|
||||
std::deque<mutation_fragment> frags;
|
||||
std::vector<mutation> muts;
|
||||
size_t size = 0;
|
||||
std::optional<position_in_partition_view> last_pos;
|
||||
|
||||
buffer(simple_schema& s_, reader_permit permit_, dht::decorated_key key)
|
||||
: s(s_), permit(std::move(permit_)) {
|
||||
add_partition(key);
|
||||
}
|
||||
size_t add_partition(dht::decorated_key key) {
|
||||
size += frags.emplace_back(*s.schema(), permit, partition_start{key, {}}).memory_usage();
|
||||
muts.emplace_back(s.schema(), key);
|
||||
return size;
|
||||
}
|
||||
size_t add_mutation_fragment(mutation_fragment&& mf, bool only_to_frags = false) {
|
||||
if (!only_to_frags) {
|
||||
muts.back().apply(mf);
|
||||
}
|
||||
size += frags.emplace_back(*s.schema(), permit, std::move(mf)).memory_usage();
|
||||
return size;
|
||||
}
|
||||
size_t add_static_row(std::optional<mutation_fragment> sr = {}) {
|
||||
auto srow = sr ? std::move(*sr) : s.make_static_row("s");
|
||||
return add_mutation_fragment(std::move(srow));
|
||||
}
|
||||
size_t add_clustering_row(int i, bool only_to_frags = false) {
|
||||
return add_mutation_fragment(mutation_fragment(*s.schema(), permit, s.make_row(s.make_ckey(i), "v")), only_to_frags);
|
||||
}
|
||||
size_t add_clustering_rows(int start, int end) {
|
||||
for (int i = start; i < end; ++i) {
|
||||
add_clustering_row(i);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
size_t add_partition_end() {
|
||||
size += frags.emplace_back(*s.schema(), permit, partition_end{}).memory_usage();
|
||||
return size;
|
||||
}
|
||||
void save_position() { last_pos = frags.back().position(); }
|
||||
void find_position(size_t buf_size) {
|
||||
size_t s = 0;
|
||||
for (const auto& frag : frags) {
|
||||
s += frag.memory_usage();
|
||||
if (s >= buf_size) {
|
||||
last_pos = frag.position();
|
||||
break;
|
||||
}
|
||||
}
|
||||
BOOST_REQUIRE(last_pos);
|
||||
}
|
||||
};
|
||||
|
||||
auto make_reader = [&] (const buffer& first_buffer, const buffer& second_buffer, const buffer* const third_buffer, size_t max_buffer_size) {
|
||||
std::list<std::deque<mutation_fragment>> buffers;
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, first_buffer.frags));
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, second_buffer.frags));
|
||||
if (third_buffer) {
|
||||
buffers.emplace_back(copy_fragments(*s.schema(), permit, third_buffer->frags));
|
||||
}
|
||||
return create_evictable_reader_and_evict_after_first_buffer(
|
||||
s.schema(),
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
s.schema()->full_slice(),
|
||||
std::move(buffers),
|
||||
*first_buffer.last_pos,
|
||||
max_buffer_size,
|
||||
false);
|
||||
};
|
||||
|
||||
testlog.info("Same partition, with static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
first_buffer.add_static_row();
|
||||
auto srow = mutation_fragment(*s.schema(), permit, first_buffer.frags.back());
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck);
|
||||
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_static_row(std::move(srow));
|
||||
second_buffer.add_clustering_row(second_buffer_ck);
|
||||
second_buffer.add_clustering_row(second_buffer_ck + 1);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Same partition, no static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck);
|
||||
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_clustering_row(second_buffer_ck);
|
||||
second_buffer.add_clustering_row(second_buffer_ck + 1);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Same partition as expected, no static row, next partition has static row (#8923)");
|
||||
{
|
||||
buffer second_buffer(s, permit, pkey1);
|
||||
second_buffer.add_clustering_rows(second_buffer_ck, second_buffer_ck + second_buffer_ck / 2);
|
||||
// We want to end the buffer on the partition-start below, but since a
|
||||
// partition start will be dropped from it, we have to use the size
|
||||
// without it.
|
||||
const auto buf_size = second_buffer.add_partition_end();
|
||||
second_buffer.add_partition(pkey2);
|
||||
second_buffer.add_static_row();
|
||||
auto srow = mutation_fragment(*s.schema(), permit, second_buffer.frags.back());
|
||||
second_buffer.add_clustering_rows(0, 2);
|
||||
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
for (int i = 0; first_buffer.add_clustering_row(i) < buf_size; ++i);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_mutation_fragment(mutation_fragment(*s.schema(), permit, second_buffer.frags[1]));
|
||||
|
||||
buffer third_buffer(s, permit, pkey2);
|
||||
third_buffer.add_static_row(std::move(srow));
|
||||
third_buffer.add_clustering_rows(0, 2);
|
||||
third_buffer.add_partition_end();
|
||||
|
||||
first_buffer.find_position(buf_size);
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
|
||||
.produces(first_buffer.muts[0] + second_buffer.muts[0])
|
||||
.produces(second_buffer.muts[1] + third_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Next partition, with no static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
|
||||
|
||||
buffer second_buffer(s, permit, pkey2);
|
||||
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0])
|
||||
.produces(second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
|
||||
testlog.info("Next partition, with static row");
|
||||
{
|
||||
buffer first_buffer(s, permit, pkey1);
|
||||
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
|
||||
first_buffer.save_position();
|
||||
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
|
||||
|
||||
buffer second_buffer(s, permit, pkey2);
|
||||
second_buffer.add_static_row();
|
||||
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
|
||||
second_buffer.add_partition_end();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.has_monotonic_positions();
|
||||
|
||||
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
|
||||
.produces(first_buffer.muts[0])
|
||||
.produces(second_buffer.muts[0])
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1805,12 +1805,16 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
|
||||
BOOST_FAIL(format("Partitions don't match, got: {}\n...and: {}", mutation_partition::printer(s, mp1), mutation_partition::printer(s, mp2)));
|
||||
}
|
||||
};
|
||||
for_each_mutation_pair([&] (auto&& m1, auto&& m2, are_equal eq) {
|
||||
const auto now = gc_clock::now();
|
||||
can_gc_fn never_gc = [] (tombstone) { return false; };
|
||||
for_each_mutation_pair([&] (auto m1, auto m2, are_equal eq) {
|
||||
mutation_application_stats app_stats;
|
||||
auto s = m1.schema();
|
||||
if (s != m2.schema()) {
|
||||
return;
|
||||
}
|
||||
m1.partition().compact_for_compaction(*s, never_gc, now);
|
||||
m2.partition().compact_for_compaction(*s, never_gc, now);
|
||||
auto m12 = m1;
|
||||
m12.apply(m2);
|
||||
auto m12_with_diff = m1;
|
||||
|
||||
@@ -6685,3 +6685,135 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
BOOST_REQUIRE(smp::count == 1);
|
||||
|
||||
auto make_schema = [] (auto idx) {
|
||||
auto builder = schema_builder("tests", std::to_string(idx))
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map <sstring, sstring> opts = {
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
|
||||
{time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"},
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
builder.set_gc_grace_seconds(0);
|
||||
return builder.build();
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
cm->enable();
|
||||
auto stop_cm = defer([&cm] {
|
||||
cm->stop().get();
|
||||
});
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
auto make_expiring_cell = [&] (schema_ptr s, std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto make_table_with_single_fully_expired_sstable = [&] (auto idx) {
|
||||
auto s = make_schema(idx);
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
cfg.datadir = tmp.path().string() + "/" + std::to_string(idx);
|
||||
touch_directory(cfg.datadir).get();
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
|
||||
auto sst_gen = [&env, s, dir = cfg.datadir, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, dir, (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto muts = { make_expiring_cell(s, std::chrono::hours(1)) };
|
||||
auto sst = make_sstable_containing(sst_gen, muts);
|
||||
column_family_test(cf).add_sstable(sst);
|
||||
return cf;
|
||||
};
|
||||
|
||||
std::vector<lw_shared_ptr<column_family>> tables;
|
||||
auto stop_tables = defer([&tables] {
|
||||
for (auto& t : tables) {
|
||||
t->stop().get();
|
||||
}
|
||||
});
|
||||
for (auto i = 0; i < 100; i++) {
|
||||
tables.push_back(make_table_with_single_fully_expired_sstable(i));
|
||||
}
|
||||
|
||||
// Make sure everything is expired
|
||||
forward_jump_clocks(std::chrono::hours(100));
|
||||
|
||||
for (auto& t : tables) {
|
||||
BOOST_REQUIRE(t->sstables_count() == 1);
|
||||
t->trigger_compaction();
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cm->get_stats().pending_tasks >= 1 || cm->get_stats().active_tasks >= 1);
|
||||
|
||||
size_t max_ongoing_compaction = 0;
|
||||
|
||||
// wait for submitted jobs to finish.
|
||||
auto end = [cm, &tables] {
|
||||
return cm->get_stats().pending_tasks == 0 && cm->get_stats().active_tasks == 0
|
||||
&& boost::algorithm::all_of(tables, [] (auto& t) { return t->sstables_count() == 0; });
|
||||
};
|
||||
while (!end()) {
|
||||
if (!cm->get_stats().pending_tasks && !cm->get_stats().active_tasks) {
|
||||
for (auto& t : tables) {
|
||||
if (t->sstables_count()) {
|
||||
t->trigger_compaction();
|
||||
}
|
||||
}
|
||||
}
|
||||
max_ongoing_compaction = std::max(cm->get_stats().active_tasks, max_ongoing_compaction);
|
||||
later().get();
|
||||
}
|
||||
BOOST_REQUIRE(cm->get_stats().errors == 0);
|
||||
BOOST_REQUIRE(max_ongoing_compaction == 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(stcs_reshape_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
std::vector<shared_sstable> sstables;
|
||||
sstables.reserve(s->max_compaction_threshold());
|
||||
for (auto gen = 1; gen <= s->max_compaction_threshold(); gen++) {
|
||||
auto sst = env.make_sstable(s, "", gen, la, big);
|
||||
sstables::test(sst).set_data_file_size(1);
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered,
|
||||
s->compaction_strategy_options());
|
||||
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::strict).sstables.size());
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size());
|
||||
});
|
||||
}
|
||||
|
||||
186
test/cql-pytest/test_secondary_index.py
Normal file
186
test/cql-pytest/test_secondary_index.py
Normal file
@@ -0,0 +1,186 @@
|
||||
# Copyright 2020 ScyllaDB
|
||||
#
|
||||
# This file is part of Scylla.
|
||||
#
|
||||
# Scylla is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Scylla is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
# Tests for secondary indexes
|
||||
|
||||
import time
|
||||
import pytest
|
||||
from cassandra.protocol import SyntaxException, AlreadyExists, InvalidRequest, ConfigurationException, ReadFailure
|
||||
|
||||
from util import new_test_table, unique_name
|
||||
|
||||
# A reproducer for issue #7443: Normally, when the entire table is SELECTed,
|
||||
# the partitions are returned sorted by the partitions' token. When there
|
||||
# is filtering, this order is not expected to change. Furthermore, when this
|
||||
# filtering happens to use a secondary index, again the order is not expected
|
||||
# to change.
|
||||
def test_partition_order_with_si(cql, test_keyspace):
|
||||
schema = 'pk int, x int, PRIMARY KEY ((pk))'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
# Insert 20 partitions, all of them with x=1 so that filtering by x=1
|
||||
# will yield the same 20 partitions:
|
||||
N = 20
|
||||
stmt = cql.prepare('INSERT INTO '+table+' (pk, x) VALUES (?, ?)')
|
||||
for i in range(N):
|
||||
cql.execute(stmt, [i, 1])
|
||||
# SELECT all the rows, and verify they are returned in increasing
|
||||
# partition token order (note that the token is a *signed* number):
|
||||
tokens = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table)]
|
||||
assert len(tokens) == N
|
||||
assert sorted(tokens) == tokens
|
||||
# Now select all the partitions with filtering of x=1. Since all
|
||||
# rows have x=1, this shouldn't change the list of matching rows, and
|
||||
# also shouldn't check their order:
|
||||
tokens1 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1 ALLOW FILTERING')]
|
||||
assert tokens1 == tokens
|
||||
# Now add an index on x, which allows implementing the "x=1"
|
||||
# restriction differently. With the index, "ALLOW FILTERING" is
|
||||
# no longer necessary. But the order of the results should
|
||||
# still not change. Issue #7443 is about the order changing here.
|
||||
cql.execute('CREATE INDEX ON '+table+'(x)')
|
||||
# "CREATE INDEX" does not wait until the index is actually available
|
||||
# for use. Reads immediately after the CREATE INDEX may fail or return
|
||||
# partial results. So let's retry until reads resume working:
|
||||
for i in range(100):
|
||||
try:
|
||||
tokens2 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1')]
|
||||
if len(tokens2) == N:
|
||||
break
|
||||
except ReadFailure:
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
assert tokens2 == tokens
|
||||
|
||||
# Test which ensures that indexes for a query are picked by the order in which
|
||||
# they appear in restrictions. That way, users can deterministically pick
|
||||
# which indexes are used for which queries.
|
||||
# Note that the order of picking indexing is not set in stone and may be
|
||||
# subject to change - in which case this test case should be amended as well.
|
||||
# The order tested in this case was decided as a good first step in issue
|
||||
# #7969, but it's possible that it will eventually be implemented another
|
||||
# way, e.g. dynamically based on estimated query selectivity statistics.
|
||||
# Ref: #7969
|
||||
@pytest.mark.xfail(reason="The order of picking indexes is currently arbitrary. Issue #7969")
|
||||
def test_order_of_indexes(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int primary key, v1 int, v2 int, v3 int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
cql.execute(f"CREATE INDEX my_v3_idx ON {table}(v3)")
|
||||
cql.execute(f"CREATE INDEX my_v1_idx ON {table}(v1)")
|
||||
cql.execute(f"CREATE INDEX my_v2_idx ON {table}((p),v2)")
|
||||
# All queries below should use the first index they find in the list
|
||||
# of restrictions. Tracing information will be consulted to ensure
|
||||
# it's true. Currently some of the cases below succeed, because the
|
||||
# order is not well defined (and may, for instance, change upon
|
||||
# server restart), but some of them fail. Once a proper ordering
|
||||
# is implemented, all cases below should succeed.
|
||||
def index_used(query, index_name):
|
||||
assert any([index_name in event.description for event in cql.execute(query, trace=True).get_query_trace().events])
|
||||
index_used(f"SELECT * FROM {table} WHERE v3 = 1", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 allow filtering", "my_v1_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
|
||||
# Local indexes are still skipped if they cannot be used
|
||||
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v1 = 2 allow filtering", "my_v1_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v3 = 2 and v1 = 3 allow filtering", "my_v3_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE v1 = 1 and v2 = 2 and v3 = 3 allow filtering", "my_v1_idx")
|
||||
# Local indexes are still preferred over global ones, if they can be used
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 and v2 = 2 allow filtering", "my_v2_idx")
|
||||
index_used(f"SELECT * FROM {table} WHERE p = 1 and v2 = 1 and v1 = 2 allow filtering", "my_v2_idx")
|
||||
|
||||
# Indexes can be created without an explicit name, in which case a default name is chosen.
|
||||
# However, due to #8620 it was possible to break the index creation mechanism by creating
|
||||
# a properly named regular table, which conflicts with the generated index name.
|
||||
def test_create_unnamed_index_when_its_name_is_taken(cql, test_keyspace):
|
||||
schema = 'p int primary key, v int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
try:
|
||||
cql.execute(f"CREATE TABLE {table}_v_idx_index (i_do_not_exist_in_the_base_table int primary key)")
|
||||
# Creating an index should succeed, even though its default name is taken
|
||||
# by the table above
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE {table}_v_idx_index")
|
||||
|
||||
# Indexed created with an explicit name cause a materialized view to be created,
|
||||
# and this view has a specific name - <index-name>_index. If there happens to be
|
||||
# a regular table (or another view) named just like that, index creation should fail.
|
||||
def test_create_named_index_when_its_name_is_taken(scylla_only, cql, test_keyspace):
|
||||
schema = 'p int primary key, v int'
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
index_name = unique_name()
|
||||
try:
|
||||
cql.execute(f"CREATE TABLE {test_keyspace}.{index_name}_index (i_do_not_exist_in_the_base_table int primary key)")
|
||||
# Creating an index should fail, because it's impossible to create
|
||||
# its underlying materialized view, because its name is taken by a regular table
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f"CREATE INDEX {index_name} ON {table}(v)")
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE {test_keyspace}.{index_name}_index")
|
||||
|
||||
# Tests for CREATE INDEX IF NOT EXISTS
|
||||
# Reproduces issue #8717.
|
||||
def test_create_index_if_not_exists(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, 'p int primary key, v int') as table:
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
# Can't create the same index again without "IF NOT EXISTS", but can
|
||||
# do it with "IF NOT EXISTS":
|
||||
with pytest.raises(InvalidRequest, match="duplicate"):
|
||||
cql.execute(f"CREATE INDEX ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS ON {table}(v)")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.{table.split('.')[1]}_v_idx")
|
||||
|
||||
# Now test the same thing for named indexes. This is what broke in #8717:
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS xyz ON {table}(v)")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
|
||||
|
||||
# Exactly the same with non-lower case name.
|
||||
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
|
||||
with pytest.raises(InvalidRequest, match="already exists"):
|
||||
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
|
||||
cql.execute(f'CREATE INDEX IF NOT EXISTS "CamelCase" ON {table}(v)')
|
||||
cql.execute(f'DROP INDEX {test_keyspace}."CamelCase"')
|
||||
|
||||
# Trying to create an index for an attribute that's already indexed,
|
||||
# but with a different name. The "IF NOT EXISTS" appears to succeed
|
||||
# in this case, but does not actually create the new index name -
|
||||
# only the old one remains.
|
||||
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest, match="duplicate"):
|
||||
cql.execute(f"CREATE INDEX abc ON {table}(v)")
|
||||
cql.execute(f"CREATE INDEX IF NOT EXISTS abc ON {table}(v)")
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.abc")
|
||||
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
|
||||
|
||||
# Test that the paging state works properly for indexes on tables
|
||||
# with descending clustering order. There was a problem with indexes
|
||||
# created on clustering keys with DESC clustering order - they are represented
|
||||
# as "reverse" types internally and Scylla assertions failed that the base type
|
||||
# is different from the underlying view type, even though, from the perspective
|
||||
# of deserialization, they're equal. Issue #8666
|
||||
def test_paging_with_desc_clustering_order(cql, test_keyspace):
|
||||
schema = 'p int, c int, primary key (p,c)'
|
||||
extra = 'with clustering order by (c desc)'
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"CREATE INDEX ON {table}(c)")
|
||||
for i in range(3):
|
||||
cql.execute(f"INSERT INTO {table}(p,c) VALUES ({i}, 42)")
|
||||
stmt = SimpleStatement(f"SELECT * FROM {table} WHERE c = 42", fetch_size=1)
|
||||
assert len([row for row in cql.execute(stmt)]) == 3
|
||||
Submodule tools/java updated: 1489e7c539...86fb5c826d
@@ -173,6 +173,10 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
long use_count() const noexcept {
|
||||
return _e ? _e.use_count() : 0;
|
||||
}
|
||||
|
||||
friend class loading_shared_values;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user