Compare commits
34 Commits
next
...
scylla-4.5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d92a26636a | ||
|
|
7445bfec86 | ||
|
|
b077b198bf | ||
|
|
44f85d2ba0 | ||
|
|
b81919dbe2 | ||
|
|
5651a20ba1 | ||
|
|
8c30b83ea4 | ||
|
|
fce7eab9ac | ||
|
|
ab8eefade7 | ||
|
|
e2704554b5 | ||
|
|
e36e490469 | ||
|
|
c97005fbb8 | ||
|
|
d881d539f3 | ||
|
|
b8a502fab0 | ||
|
|
f7f2bb482f | ||
|
|
b16db6512c | ||
|
|
1a7c8223fe | ||
|
|
ac6aa66a7b | ||
|
|
98a39884c3 | ||
|
|
88192811e7 | ||
|
|
32f21f7281 | ||
|
|
c9eaf95750 | ||
|
|
44c6d0fcf9 | ||
|
|
4bcc0badb2 | ||
|
|
97664e63fe | ||
|
|
204964637a | ||
|
|
c402abe8e9 | ||
|
|
4a78d6403e | ||
|
|
2f20d52ac7 | ||
|
|
540439ee46 | ||
|
|
a0622e85ab | ||
|
|
90741dc62c | ||
|
|
83cfa6a63c | ||
|
|
1816c6df8c |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.5.dev
|
||||
VERSION=4.5.rc2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -267,6 +267,9 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
|
||||
}
|
||||
_state = state::reading_from_underlying;
|
||||
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
|
||||
if (!_read_context->partition_exists()) {
|
||||
return read_from_underlying(timeout);
|
||||
}
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
|
||||
|
||||
@@ -461,7 +461,7 @@ generate_base_key_from_index_pk(const partition_key& index_pk, const std::option
|
||||
if (!view_col) {
|
||||
throw std::runtime_error(format("Base key column not found in the view: {}", base_col.name_as_text()));
|
||||
}
|
||||
if (base_col.type != view_col->type) {
|
||||
if (base_col.type->without_reversed() != *view_col->type) {
|
||||
throw std::runtime_error(format("Mismatched types for base and view columns {}: {} and {}",
|
||||
base_col.name_as_text(), base_col.type->cql3_type_name(), view_col->type->cql3_type_name()));
|
||||
}
|
||||
|
||||
@@ -747,10 +747,8 @@ void database::set_format(sstables::sstable_version_types format) {
|
||||
void database::set_format_by_config() {
|
||||
if (_cfg.enable_sstables_md_format()) {
|
||||
set_format(sstables::sstable_version_types::md);
|
||||
} else if (_cfg.enable_sstables_mc_format()) {
|
||||
set_format(sstables::sstable_version_types::mc);
|
||||
} else {
|
||||
set_format(sstables::sstable_version_types::la);
|
||||
set_format(sstables::sstable_version_types::mc);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1511,7 +1511,9 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
|
||||
if (!cfg.allow_going_over_size_limit && max_disk_size != 0 && totals.total_size_on_disk >= max_disk_size) {
|
||||
clogger.debug("Disk usage ({} MB) exceeds maximum ({} MB) - allocation will wait...", totals.total_size_on_disk/(1024*1024), max_disk_size/(1024*1024));
|
||||
auto f = cfg.reuse_segments ? _recycled_segments.not_empty() : _disk_deletions.get_shared_future();
|
||||
return f.then([this] {
|
||||
return f.handle_exception([this](auto ep) {
|
||||
clogger.warn("Exception while waiting for segments {}. Will retry allocation...", ep);
|
||||
}).then([this] {
|
||||
return allocate_segment();
|
||||
});
|
||||
}
|
||||
@@ -1741,41 +1743,79 @@ future<> db::commitlog::segment_manager::delete_file(const sstring& filename) {
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::delete_segments(std::vector<sstring> files) {
|
||||
auto i = files.begin();
|
||||
auto e = files.end();
|
||||
if (files.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
return parallel_for_each(i, e, [this](auto& filename) {
|
||||
auto f = make_ready_future();
|
||||
auto exts = cfg.extensions;
|
||||
if (exts && !exts->commitlog_file_extensions().empty()) {
|
||||
f = parallel_for_each(exts->commitlog_file_extensions(), [&](auto& ext) {
|
||||
return ext->before_delete(filename);
|
||||
});
|
||||
}
|
||||
return f.finally([&] {
|
||||
// We allow reuse of the segment if the current disk size is less than shard max.
|
||||
auto usage = totals.total_size_on_disk;
|
||||
if (!_shutdown && cfg.reuse_segments && usage <= max_disk_size) {
|
||||
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
|
||||
auto dst = this->filename(d);
|
||||
clogger.debug("Delete segments {}", files);
|
||||
|
||||
clogger.debug("Recycling segment file {}", filename);
|
||||
// must rename the file since we must ensure the
|
||||
// data is not replayed. Changing the name will
|
||||
// cause header ID to be invalid in the file -> ignored
|
||||
return rename_file(filename, dst).then([this, dst]() mutable {
|
||||
auto b = _recycled_segments.push(std::move(dst));
|
||||
assert(b); // we set this to max_size_t so...
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([this, filename](auto&&) {
|
||||
return delete_file(filename);
|
||||
});
|
||||
std::exception_ptr recycle_error;
|
||||
|
||||
while (!files.empty()) {
|
||||
auto filename = std::move(files.back());
|
||||
files.pop_back();
|
||||
|
||||
try {
|
||||
auto exts = cfg.extensions;
|
||||
if (exts && !exts->commitlog_file_extensions().empty()) {
|
||||
for (auto& ext : exts->commitlog_file_extensions()) {
|
||||
co_await ext->before_delete(filename);
|
||||
}
|
||||
}
|
||||
return delete_file(filename);
|
||||
}).handle_exception([&filename](auto ep) {
|
||||
clogger.error("Could not delete segment {}: {}", filename, ep);
|
||||
});
|
||||
}).finally([files = std::move(files)] {});
|
||||
|
||||
// We allow reuse of the segment if the current disk size is less than shard max.
|
||||
if (!_shutdown && cfg.reuse_segments) {
|
||||
auto usage = totals.total_size_on_disk;
|
||||
auto recycle = usage <= max_disk_size;
|
||||
|
||||
// if total size is not a multiple of segment size, we need
|
||||
// to check if we are the overlap segment, and noone else
|
||||
// can be recycled. If so, let this one live so allocation
|
||||
// can proceed. We assume/hope a future delete will kill
|
||||
// files down to under the threshold, but we should expect
|
||||
// to stomp around nearest multiple of segment size, not
|
||||
// the actual limit.
|
||||
if (!recycle && _recycled_segments.empty() && files.empty()) {
|
||||
auto size = co_await seastar::file_size(filename);
|
||||
recycle = (usage - size) <= max_disk_size;
|
||||
}
|
||||
|
||||
if (recycle) {
|
||||
descriptor d(next_id(), "Recycled-" + cfg.fname_prefix);
|
||||
auto dst = this->filename(d);
|
||||
|
||||
clogger.debug("Recycling segment file {}", filename);
|
||||
// must rename the file since we must ensure the
|
||||
// data is not replayed. Changing the name will
|
||||
// cause header ID to be invalid in the file -> ignored
|
||||
try {
|
||||
co_await rename_file(filename, dst);
|
||||
auto b = _recycled_segments.push(std::move(dst));
|
||||
assert(b); // we set this to max_size_t so...
|
||||
continue;
|
||||
} catch (...) {
|
||||
recycle_error = std::current_exception();
|
||||
// fallthrough
|
||||
}
|
||||
}
|
||||
}
|
||||
co_await delete_file(filename);
|
||||
} catch (...) {
|
||||
clogger.error("Could not delete segment {}: {}", filename, std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
// #8376 - if we had an error in recycling (disk rename?), and no elements
|
||||
// are available, we could have waiters hoping they will get segements.
|
||||
// abort the queue (wakes up any existing waiters - futures), and let them
|
||||
// retry. Since we did deletions instead, disk footprint should allow
|
||||
// for new allocs at least. Or more likely, everything is broken, but
|
||||
// we will at least make more noise.
|
||||
if (recycle_error && _recycled_segments.empty()) {
|
||||
_recycled_segments.abort(recycle_error);
|
||||
// and ensure next lap(s) still has a queue
|
||||
_recycled_segments = queue<sstring>(std::numeric_limits<size_t>::max());
|
||||
}
|
||||
}
|
||||
|
||||
future<> db::commitlog::segment_manager::do_pending_deletes() {
|
||||
@@ -2449,6 +2489,14 @@ std::vector<sstring> db::commitlog::get_active_segment_names() const {
|
||||
return _segment_manager->get_active_names();
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::disk_limit() const {
|
||||
return _segment_manager->max_disk_size;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::disk_footprint() const {
|
||||
return _segment_manager->totals.total_size_on_disk;
|
||||
}
|
||||
|
||||
uint64_t db::commitlog::get_total_size() const {
|
||||
return _segment_manager->totals.active_size_on_disk + _segment_manager->totals.buffer_list_bytes;
|
||||
}
|
||||
|
||||
@@ -336,6 +336,16 @@ public:
|
||||
*/
|
||||
uint64_t max_active_flushes() const;
|
||||
|
||||
/**
|
||||
* Return disk footprint
|
||||
*/
|
||||
uint64_t disk_footprint() const;
|
||||
|
||||
/**
|
||||
* Return configured disk footprint limit
|
||||
*/
|
||||
uint64_t disk_limit() const;
|
||||
|
||||
future<> clear();
|
||||
|
||||
const config& active_config() const;
|
||||
|
||||
@@ -747,8 +747,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
" Performance is affected to some extent as a result. Useful to help debugging problems that may arise at another layers.")
|
||||
, cpu_scheduler(this, "cpu_scheduler", value_status::Used, true, "Enable cpu scheduling")
|
||||
, view_building(this, "view_building", value_status::Used, true, "Enable view building; should only be set to false when the node is experience issues due to view building")
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Used, true, "Enable SSTables 'mc' format to be used as the default file format")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format (requires enable_sstables_mc_format)")
|
||||
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format")
|
||||
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Used, true, "Enable SSTables 'md' format to be used as the default file format")
|
||||
, enable_dangerous_direct_import_of_cassandra_counters(this, "enable_dangerous_direct_import_of_cassandra_counters", value_status::Used, false, "Only turn this option on if you want to import tables from Cassandra containing counters, and you are SURE that no counters in that table were created in a version earlier than Cassandra 2.1."
|
||||
" It is not enough to have ever since upgraded to newer versions of Cassandra. If you EVER used a version earlier than 2.1 in the cluster where these SSTables come from, DO NOT TURN ON THIS OPTION! You will corrupt your data. You have been warned.")
|
||||
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")
|
||||
|
||||
1
dist/common/scripts/scylla_coredump_setup
vendored
1
dist/common/scripts/scylla_coredump_setup
vendored
@@ -67,6 +67,7 @@ Description=Save coredump to scylla data directory
|
||||
Conflicts=umount.target
|
||||
Before=scylla-server.service
|
||||
After=local-fs.target
|
||||
DefaultDependencies=no
|
||||
|
||||
[Mount]
|
||||
What=/var/lib/scylla/coredump
|
||||
|
||||
14
dist/common/scripts/scylla_prepare
vendored
14
dist/common/scripts/scylla_prepare
vendored
@@ -28,7 +28,6 @@ import distro
|
||||
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
def get_mode_cpuset(nic, mode):
|
||||
mode_cpu_mask = run('/opt/scylladb/scripts/perftune.py --tune net --nic {} --mode {} --get-cpu-mask-quiet'.format(nic, mode), shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
@@ -100,16 +99,6 @@ def verify_cpu():
|
||||
print('\nIf this is a virtual machine, please update its CPU feature configuration or upgrade to a newer hypervisor.')
|
||||
sys.exit(1)
|
||||
|
||||
def configure_aio_slots():
|
||||
with open('/proc/sys/fs/aio-max-nr') as f:
|
||||
aio_max_nr = int(f.read())
|
||||
# (10000 + 1024 + 2) * ncpus for scylla,
|
||||
# 65536 for other apps
|
||||
required_aio_slots = cpu_count() * 11026 + 65536
|
||||
if aio_max_nr < required_aio_slots:
|
||||
with open('/proc/sys/fs/aio-max-nr', 'w') as f:
|
||||
f.write(str(required_aio_slots))
|
||||
|
||||
if __name__ == '__main__':
|
||||
verify_cpu()
|
||||
|
||||
@@ -124,8 +113,6 @@ if __name__ == '__main__':
|
||||
os.remove('/etc/scylla/ami_disabled')
|
||||
sys.exit(1)
|
||||
|
||||
configure_aio_slots()
|
||||
|
||||
if mode == 'virtio':
|
||||
tap = cfg.get('TAP')
|
||||
user = cfg.get('USER')
|
||||
@@ -155,4 +142,3 @@ if __name__ == '__main__':
|
||||
print(f'Exception occurred while creating perftune.yaml: {e}')
|
||||
print('To fix the error, please re-run scylla_setup.')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
3
dist/common/scripts/scylla_raid_setup
vendored
3
dist/common/scripts/scylla_raid_setup
vendored
@@ -161,9 +161,10 @@ if __name__ == '__main__':
|
||||
Description=Scylla data directory
|
||||
Before=scylla-server.service
|
||||
After={after}
|
||||
DefaultDependencies=no
|
||||
|
||||
[Mount]
|
||||
What=UUID={uuid}
|
||||
What=/dev/disk/by-uuid/{uuid}
|
||||
Where={mount_at}
|
||||
Type=xfs
|
||||
Options=noatime
|
||||
|
||||
1
dist/common/scripts/scylla_util.py
vendored
1
dist/common/scripts/scylla_util.py
vendored
@@ -36,6 +36,7 @@ from subprocess import run, DEVNULL
|
||||
import distro
|
||||
from scylla_sysconfdir import SYSCONFDIR
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
|
||||
def scriptsdir_p():
|
||||
p = Path(sys.argv[0]).resolve()
|
||||
|
||||
2
dist/common/sysctl.d/99-scylla-aio.conf
vendored
Normal file
2
dist/common/sysctl.d/99-scylla-aio.conf
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
# Raise max AIO events
|
||||
fs.aio-max-nr = 5578536
|
||||
@@ -11,6 +11,7 @@ else
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-sched.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-vm.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-inotify.conf || :
|
||||
sysctl -p/usr/lib/sysctl.d/99-scylla-aio.conf || :
|
||||
fi
|
||||
|
||||
#DEBHELPER#
|
||||
|
||||
4
dist/docker/redhat/Dockerfile
vendored
4
dist/docker/redhat/Dockerfile
vendored
@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
|
||||
ARG VERSION=4.5.dev
|
||||
ARG SCYLLA_REPO_URL=downloads.scylladb.com/unstable/scylla/branch-4.5/rpm/centos/latest/
|
||||
ARG VERSION=4.5.rc2
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
1
dist/redhat/scylla.spec
vendored
1
dist/redhat/scylla.spec
vendored
@@ -211,6 +211,7 @@ if Scylla is the main application on your server and you wish to optimize its la
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-vm.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-inotify.conf >/dev/null 2>&1 || :
|
||||
/usr/lib/systemd/systemd-sysctl 99-scylla-aio.conf >/dev/null 2>&1 || :
|
||||
|
||||
%files kernel-conf
|
||||
%defattr(-,root,root)
|
||||
|
||||
@@ -98,14 +98,6 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
|
||||
|
||||
fcfg._disabled_features = std::move(disabled);
|
||||
|
||||
if (!cfg.enable_sstables_mc_format()) {
|
||||
if (cfg.enable_sstables_md_format()) {
|
||||
throw std::runtime_error(
|
||||
"You must use both enable_sstables_mc_format and enable_sstables_md_format "
|
||||
"to enable SSTables md format support");
|
||||
}
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MC_SSTABLE));
|
||||
}
|
||||
if (!cfg.enable_sstables_md_format()) {
|
||||
fcfg._disabled_features.insert(sstring(gms::features::MD_SSTABLE));
|
||||
}
|
||||
|
||||
20
install.sh
20
install.sh
@@ -150,6 +150,10 @@ EOF
|
||||
chmod +x "$install"
|
||||
}
|
||||
|
||||
install() {
|
||||
command install -Z "$@"
|
||||
}
|
||||
|
||||
installconfig() {
|
||||
local perm="$1"
|
||||
local src="$2"
|
||||
@@ -210,13 +214,13 @@ if [ -z "$python3" ]; then
|
||||
fi
|
||||
rpython3=$(realpath -m "$root/$python3")
|
||||
if ! $nonroot; then
|
||||
retc="$root/etc"
|
||||
rsysconfdir="$root/$sysconfdir"
|
||||
rusr="$root/usr"
|
||||
rsystemd="$rusr/lib/systemd/system"
|
||||
retc=$(realpath -m "$root/etc")
|
||||
rsysconfdir=$(realpath -m "$root/$sysconfdir")
|
||||
rusr=$(realpath -m "$root/usr")
|
||||
rsystemd=$(realpath -m "$rusr/lib/systemd/system")
|
||||
rdoc="$rprefix/share/doc"
|
||||
rdata="$root/var/lib/scylla"
|
||||
rhkdata="$root/var/lib/scylla-housekeeping"
|
||||
rdata=$(realpath -m "$root/var/lib/scylla")
|
||||
rhkdata=$(realpath -m "$root/var/lib/scylla-housekeeping")
|
||||
else
|
||||
retc="$rprefix/etc"
|
||||
rsysconfdir="$rprefix/$sysconfdir"
|
||||
@@ -245,6 +249,7 @@ if ! $nonroot; then
|
||||
done
|
||||
fi
|
||||
# scylla-node-exporter
|
||||
install -d -m755 "$rsysconfdir" "$rsystemd"
|
||||
install -d -m755 "$rprefix"/node_exporter
|
||||
install -d -m755 "$rprefix"/node_exporter/licenses
|
||||
install -m755 node_exporter/node_exporter "$rprefix"/node_exporter
|
||||
@@ -278,7 +283,6 @@ fi
|
||||
|
||||
# scylla-server
|
||||
install -m755 -d "$rprefix"
|
||||
install -m755 -d "$rsysconfdir"
|
||||
install -m755 -d "$retc/scylla.d"
|
||||
installconfig 644 dist/common/sysconfig/scylla-housekeeping "$rsysconfdir"
|
||||
installconfig 644 dist/common/sysconfig/scylla-server "$rsysconfdir"
|
||||
@@ -286,7 +290,7 @@ for file in dist/common/scylla.d/*.conf; do
|
||||
installconfig 644 "$file" "$retc"/scylla.d
|
||||
done
|
||||
|
||||
install -d -m755 "$retc"/scylla "$rsystemd" "$rprefix/bin" "$rprefix/libexec" "$rprefix/libreloc" "$rprefix/scripts" "$rprefix/bin"
|
||||
install -d -m755 "$retc"/scylla "$rprefix/bin" "$rprefix/libexec" "$rprefix/libreloc" "$rprefix/scripts" "$rprefix/bin"
|
||||
install -m644 dist/common/systemd/scylla-fstrim.service -Dt "$rsystemd"
|
||||
install -m644 dist/common/systemd/scylla-housekeeping-daily.service -Dt "$rsystemd"
|
||||
install -m644 dist/common/systemd/scylla-housekeeping-restart.service -Dt "$rsystemd"
|
||||
|
||||
12
main.cc
12
main.cc
@@ -716,7 +716,7 @@ int main(int ac, char** av) {
|
||||
tracing::backend_registry tracing_backend_registry;
|
||||
tracing::register_tracing_keyspace_backend(tracing_backend_registry);
|
||||
tracing::tracing::create_tracing(tracing_backend_registry, "trace_keyspace_helper").get();
|
||||
auto stop_tracing = defer_verbose_shutdown("tracing", [] {
|
||||
auto destroy_tracing = defer_verbose_shutdown("tracing instance", [] {
|
||||
tracing::tracing::tracing_instance().stop().get();
|
||||
});
|
||||
supervisor::notify("creating snitch");
|
||||
@@ -1172,13 +1172,9 @@ int main(int ac, char** av) {
|
||||
|
||||
supervisor::notify("starting tracing");
|
||||
tracing::tracing::start_tracing(qp).get();
|
||||
/*
|
||||
* FIXME -- tracing is stopped inside drain_on_shutdown, which
|
||||
* is deferred later on. If the start aborts before it, the
|
||||
* tracing will remain started and will continue referencing
|
||||
* the query processor. Nowadays the latter is not stopped
|
||||
* either, but when it will, this place shold be fixed too.
|
||||
*/
|
||||
auto stop_tracing = defer_verbose_shutdown("tracing", [] {
|
||||
tracing::tracing::stop_tracing().get();
|
||||
});
|
||||
|
||||
startlog.info("SSTable data integrity checker is {}.",
|
||||
cfg->enable_sstable_data_integrity_check() ? "enabled" : "disabled");
|
||||
|
||||
@@ -2275,9 +2275,9 @@ position_reader_queue::~position_reader_queue() {}
|
||||
// are not implemented and throw an error; the reader is only used for single partition queries.
|
||||
//
|
||||
// Assumes that:
|
||||
// - the queue contains at least one reader,
|
||||
// - there are no static rows,
|
||||
// - the returned fragments do not contain partition tombstones.
|
||||
// - the returned fragments do not contain partition tombstones,
|
||||
// - the merged readers return fragments from the same partition (but some or even all of them may be empty).
|
||||
class clustering_order_reader_merger {
|
||||
const schema_ptr _schema;
|
||||
const reader_permit _permit;
|
||||
@@ -2389,12 +2389,17 @@ class clustering_order_reader_merger {
|
||||
if (!mf) {
|
||||
// The reader returned end-of-stream before returning end-of-partition
|
||||
// (otherwise we would have removed it in a previous peek). This means that
|
||||
// we are in forwarding mode and the reader won't return any more fragments in the current range.
|
||||
// either the reader was empty from the beginning (not even returning a `partition_start`)
|
||||
// or we are in forwarding mode and the reader won't return any more fragments in the current range.
|
||||
// If the reader's upper bound is smaller then the end of the current range then it won't
|
||||
// return any more fragments in later ranges as well (subsequent fast-forward-to ranges
|
||||
// are non-overlapping and strictly increasing), so we can remove it now.
|
||||
// Otherwise it may start returning fragments later, so we save it for the moment
|
||||
// in _halted_readers and will bring it back when we get fast-forwarded.
|
||||
// Otherwise, if it previously returned a `partition_start`, it may start returning more fragments
|
||||
// later (after we fast-forward) so we save it for the moment in _halted_readers and will bring it
|
||||
// back when we get fast-forwarded.
|
||||
// We also save the reader if it was empty from the beginning (no `partition_start`) since
|
||||
// it makes the code simpler (to check for this here we would need additional state); it is a bit wasteful
|
||||
// but completely empty readers should be rare.
|
||||
if (_cmp(it->upper_bound, _pr_end) < 0) {
|
||||
_all_readers.erase(it);
|
||||
} else {
|
||||
@@ -2524,19 +2529,6 @@ public:
|
||||
: position_in_partition_view::after_all_clustered_rows())
|
||||
, _should_emit_partition_end(fwd_sm == streamed_mutation::forwarding::no)
|
||||
{
|
||||
// The first call to `_reader_queue::pop` uses `after_all_clustered_rows`
|
||||
// so we obtain at least one reader; we will return this reader's `partition_start`
|
||||
// as the first fragment.
|
||||
auto rs = _reader_queue->pop(position_in_partition_view::after_all_clustered_rows());
|
||||
for (auto& r: rs) {
|
||||
_all_readers.push_front(std::move(r));
|
||||
_unpeeked_readers.push_back(_all_readers.begin());
|
||||
}
|
||||
|
||||
if (rs.empty()) {
|
||||
// No readers, no partition.
|
||||
_should_emit_partition_end = false;
|
||||
}
|
||||
}
|
||||
|
||||
// We assume that operator() is called sequentially and that the caller doesn't use the batch
|
||||
@@ -2553,8 +2545,22 @@ public:
|
||||
return peek_readers(timeout).then([this, timeout] { return (*this)(timeout); });
|
||||
}
|
||||
|
||||
auto next_peeked_pos = _peeked_readers.empty() ? _pr_end : _peeked_readers.front()->reader.peek_buffer().position();
|
||||
// There might be queued readers containing fragments with positions <= next_peeked_pos:
|
||||
// Before we return a batch of fragments using currently opened readers we must check the queue
|
||||
// for potential new readers that must be opened. There are three cases which determine how ``far''
|
||||
// should we look:
|
||||
// - If there are some peeked readers in the heap, we must check for new readers
|
||||
// whose `min_position`s are <= the position of the first peeked reader; there is no need
|
||||
// to check for ``later'' readers (yet).
|
||||
// - Otherwise, if we already fetched a partition start fragment, we need to look no further
|
||||
// than the end of the current position range (_pr_end).
|
||||
// - Otherwise we need to look for any reader (by calling the queue with `after_all_clustered_rows`),
|
||||
// even for readers whose `min_position`s may be outside the current position range since they
|
||||
// may be the only readers which have a `partition_start` fragment which we need to return
|
||||
// before end-of-stream.
|
||||
auto next_peeked_pos =
|
||||
_peeked_readers.empty()
|
||||
? (_partition_start_fetched ? _pr_end : position_in_partition_view::after_all_clustered_rows())
|
||||
: _peeked_readers.front()->reader.peek_buffer().position();
|
||||
if (!_reader_queue->empty(next_peeked_pos)) {
|
||||
auto rs = _reader_queue->pop(next_peeked_pos);
|
||||
for (auto& r: rs) {
|
||||
@@ -2568,8 +2574,11 @@ public:
|
||||
// We are either in forwarding mode and waiting for a fast-forward,
|
||||
// or we've exhausted all the readers.
|
||||
if (_should_emit_partition_end) {
|
||||
// Not forwarding, so all readers must be exhausted. Return the last fragment.
|
||||
_current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end()));
|
||||
// Not forwarding, so all readers must be exhausted.
|
||||
// Return a partition end fragment unless all readers have been empty from the beginning.
|
||||
if (_partition_start_fetched) {
|
||||
_current_batch.push_back(mutation_fragment(*_schema, _permit, partition_end()));
|
||||
}
|
||||
_should_emit_partition_end = false;
|
||||
}
|
||||
return make_ready_future<mutation_fragment_batch>(_current_batch);
|
||||
|
||||
@@ -142,6 +142,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
|
||||
mutation_source_opt _underlying_snapshot;
|
||||
dht::partition_range _sm_range;
|
||||
std::optional<dht::decorated_key> _key;
|
||||
bool _partition_exists;
|
||||
row_cache::phase_type _phase;
|
||||
public:
|
||||
read_context(row_cache& cache,
|
||||
@@ -190,22 +191,34 @@ public:
|
||||
autoupdating_underlying_reader& underlying() { return _underlying; }
|
||||
row_cache::phase_type phase() const { return _phase; }
|
||||
const dht::decorated_key& key() const { return *_key; }
|
||||
bool partition_exists() const { return _partition_exists; }
|
||||
void on_underlying_created() { ++_underlying_created; }
|
||||
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
|
||||
public:
|
||||
future<> ensure_underlying(db::timeout_clock::time_point timeout) {
|
||||
if (_underlying_snapshot) {
|
||||
return create_underlying(true, timeout);
|
||||
return create_underlying(timeout).then([this, timeout] {
|
||||
return _underlying.underlying()(timeout).then([this] (mutation_fragment_opt&& mfopt) {
|
||||
_partition_exists = bool(mfopt);
|
||||
});
|
||||
});
|
||||
}
|
||||
// We know that partition exists because all the callers of
|
||||
// enter_partition(const dht::decorated_key&, row_cache::phase_type)
|
||||
// check that and there's no other way of setting _underlying_snapshot
|
||||
// to empty. Except for calling create_underlying.
|
||||
_partition_exists = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
public:
|
||||
future<> create_underlying(bool skip_first_fragment, db::timeout_clock::time_point timeout);
|
||||
future<> create_underlying(db::timeout_clock::time_point timeout);
|
||||
void enter_partition(const dht::decorated_key& dk, mutation_source& snapshot, row_cache::phase_type phase) {
|
||||
_phase = phase;
|
||||
_underlying_snapshot = snapshot;
|
||||
_key = dk;
|
||||
}
|
||||
// Precondition: each caller needs to make sure that partition with |dk| key
|
||||
// exists in underlying before calling this function.
|
||||
void enter_partition(const dht::decorated_key& dk, row_cache::phase_type phase) {
|
||||
_phase = phase;
|
||||
_underlying_snapshot = {};
|
||||
|
||||
@@ -77,7 +77,7 @@ class reader_permit::impl : public boost::intrusive::list_base_hook<boost::intru
|
||||
sstring _op_name;
|
||||
std::string_view _op_name_view;
|
||||
reader_resources _resources;
|
||||
reader_permit::state _state = reader_permit::state::registered;
|
||||
reader_permit::state _state = reader_permit::state::active;
|
||||
|
||||
public:
|
||||
struct value_tag {};
|
||||
@@ -126,40 +126,25 @@ public:
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
_state = reader_permit::state::admitted;
|
||||
_semaphore.consume(_resources);
|
||||
_state = reader_permit::state::active;
|
||||
}
|
||||
|
||||
void on_register_as_inactive() {
|
||||
if (_state != reader_permit::state::admitted) {
|
||||
_state = reader_permit::state::inactive;
|
||||
_semaphore.consume(_resources);
|
||||
}
|
||||
_state = reader_permit::state::inactive;
|
||||
}
|
||||
|
||||
void on_unregister_as_inactive() {
|
||||
if (_state == reader_permit::state::inactive) {
|
||||
_state = reader_permit::state::registered;
|
||||
_semaphore.signal(_resources);
|
||||
}
|
||||
}
|
||||
|
||||
bool should_forward_cost() const {
|
||||
return _state == reader_permit::state::admitted || _state == reader_permit::state::inactive;
|
||||
_state = reader_permit::state::active;
|
||||
}
|
||||
|
||||
void consume(reader_resources res) {
|
||||
_resources += res;
|
||||
if (should_forward_cost()) {
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
_semaphore.consume(res);
|
||||
}
|
||||
|
||||
void signal(reader_resources res) {
|
||||
_resources -= res;
|
||||
if (should_forward_cost()) {
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
_semaphore.signal(res);
|
||||
}
|
||||
|
||||
reader_resources resources() const {
|
||||
@@ -226,14 +211,11 @@ reader_resources reader_permit::consumed_resources() const {
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, reader_permit::state s) {
|
||||
switch (s) {
|
||||
case reader_permit::state::registered:
|
||||
os << "registered";
|
||||
break;
|
||||
case reader_permit::state::waiting:
|
||||
os << "waiting";
|
||||
break;
|
||||
case reader_permit::state::admitted:
|
||||
os << "admitted";
|
||||
case reader_permit::state::active:
|
||||
os << "active";
|
||||
break;
|
||||
case reader_permit::state::inactive:
|
||||
os << "inactive";
|
||||
@@ -273,7 +255,7 @@ struct permit_group_key_hash {
|
||||
|
||||
using permit_groups = std::unordered_map<permit_group_key, permit_stats, permit_group_key_hash>;
|
||||
|
||||
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state, bool sort_by_memory) {
|
||||
static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const permit_groups& permits, reader_permit::state state) {
|
||||
struct permit_summary {
|
||||
const schema* s;
|
||||
std::string_view op_name;
|
||||
@@ -289,25 +271,17 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe
|
||||
}
|
||||
}
|
||||
|
||||
std::ranges::sort(permit_summaries, [sort_by_memory] (const permit_summary& a, const permit_summary& b) {
|
||||
if (sort_by_memory) {
|
||||
return a.memory < b.memory;
|
||||
} else {
|
||||
return a.count < b.count;
|
||||
}
|
||||
std::ranges::sort(permit_summaries, [] (const permit_summary& a, const permit_summary& b) {
|
||||
return a.memory < b.memory;
|
||||
});
|
||||
|
||||
permit_stats total;
|
||||
|
||||
auto print_line = [&os, sort_by_memory] (auto col1, auto col2, auto col3) {
|
||||
if (sort_by_memory) {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
|
||||
} else {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col1, col2, col3);
|
||||
}
|
||||
auto print_line = [&os] (auto col1, auto col2, auto col3) {
|
||||
fmt::print(os, "{}\t{}\t{}\n", col2, col1, col3);
|
||||
};
|
||||
|
||||
fmt::print(os, "Permits with state {}, sorted by {}\n", state, sort_by_memory ? "memory" : "count");
|
||||
fmt::print(os, "Permits with state {}\n", state);
|
||||
print_line("count", "memory", "name");
|
||||
for (const auto& summary : permit_summaries) {
|
||||
total.count += summary.count;
|
||||
@@ -333,13 +307,11 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con
|
||||
permit_stats total;
|
||||
|
||||
fmt::print(os, "Semaphore {}: {}, dumping permit diagnostics:\n", semaphore.name(), problem);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::admitted, true);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::active);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive, false);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::inactive);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting, false);
|
||||
fmt::print(os, "\n");
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::registered, false);
|
||||
total += do_dump_reader_permit_diagnostics(os, permits, reader_permit::state::waiting);
|
||||
fmt::print(os, "\n");
|
||||
fmt::print(os, "Total: permits: {}, memory: {}\n", total.count, utils::to_hr_size(total.memory));
|
||||
}
|
||||
@@ -417,11 +389,9 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
|
||||
auto& permit_impl = *reader.permit()._impl;
|
||||
// Implies _inactive_reads.empty(), we don't queue new readers before
|
||||
// evicting all inactive reads.
|
||||
// FIXME: #4758, workaround for keeping tabs on un-admitted reads that are
|
||||
// still registered as inactive. Without the below check, these can
|
||||
// accumulate without limit. The real fix is #4758 -- that is to make all
|
||||
// reads pass admission before getting started.
|
||||
if (_wait_list.empty() && (permit_impl.get_state() == reader_permit::state::admitted || _resources >= permit_impl.resources())) {
|
||||
// Checking the _wait_list covers the count resources only, so check memory
|
||||
// separately.
|
||||
if (_wait_list.empty() && _resources.memory > 0) {
|
||||
try {
|
||||
auto irp = std::make_unique<inactive_read>(std::move(reader));
|
||||
auto& ir = *irp;
|
||||
@@ -514,13 +484,13 @@ void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason)
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::has_available_units(const resources& r) const {
|
||||
return bool(_resources) && _resources >= r;
|
||||
// Special case: when there is no active reader (based on count) admit one
|
||||
// regardless of availability of memory.
|
||||
return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count;
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::may_proceed(const resources& r) const {
|
||||
// Special case: when there is no active reader (based on count) admit one
|
||||
// regardless of availability of memory.
|
||||
return _wait_list.empty() && (has_available_units(r) || _resources.count == _initial_resources.count);
|
||||
return _wait_list.empty() && has_available_units(r);
|
||||
}
|
||||
|
||||
future<reader_permit::resource_units> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, size_t memory,
|
||||
@@ -567,6 +537,12 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
}
|
||||
}
|
||||
|
||||
std::string reader_concurrency_semaphore::dump_diagnostics() const {
|
||||
std::ostringstream os;
|
||||
do_dump_reader_permit_diagnostics(os, *this, *_permit_list, "user request");
|
||||
return os.str();
|
||||
}
|
||||
|
||||
// A file that tracks the memory usage of buffers resulting from read
|
||||
// operations.
|
||||
class tracking_file_impl : public file_impl {
|
||||
|
||||
@@ -293,4 +293,6 @@ public:
|
||||
}
|
||||
|
||||
void broken(std::exception_ptr ex);
|
||||
|
||||
std::string dump_diagnostics() const;
|
||||
};
|
||||
|
||||
@@ -91,10 +91,9 @@ public:
|
||||
class resource_units;
|
||||
|
||||
enum class state {
|
||||
registered, // read is registered, but didn't attempt admission yet
|
||||
waiting, // waiting for admission
|
||||
admitted,
|
||||
inactive, // un-admitted reads that are registered as inactive
|
||||
active,
|
||||
inactive,
|
||||
};
|
||||
|
||||
class impl;
|
||||
|
||||
13
row_cache.cc
13
row_cache.cc
@@ -332,7 +332,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_clock::time_point timeout) {
|
||||
future<> read_context::create_underlying(db::timeout_clock::time_point timeout) {
|
||||
if (_range_query) {
|
||||
// FIXME: Singular-range mutation readers don't support fast_forward_to(), so need to use a wide range
|
||||
// here in case the same reader will need to be fast forwarded later.
|
||||
@@ -340,13 +340,8 @@ future<> read_context::create_underlying(bool skip_first_fragment, db::timeout_c
|
||||
} else {
|
||||
_sm_range = dht::partition_range::make_singular({dht::ring_position(*_key)});
|
||||
}
|
||||
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this, skip_first_fragment, timeout] {
|
||||
return _underlying.fast_forward_to(std::move(_sm_range), *_underlying_snapshot, _phase, timeout).then([this] {
|
||||
_underlying_snapshot = {};
|
||||
if (skip_first_fragment) {
|
||||
return _underlying.underlying()(timeout).then([](auto &&mf) {});
|
||||
} else {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -366,7 +361,7 @@ private:
|
||||
auto src_and_phase = _cache.snapshot_of(_read_context->range().start()->value());
|
||||
auto phase = src_and_phase.phase;
|
||||
_read_context->enter_partition(_read_context->range().start()->value().as_decorated_key(), src_and_phase.snapshot, phase);
|
||||
return _read_context->create_underlying(false, timeout).then([this, phase, timeout] {
|
||||
return _read_context->create_underlying(timeout).then([this, phase, timeout] {
|
||||
return _read_context->underlying().underlying()(timeout).then([this, phase] (auto&& mfopt) {
|
||||
if (!mfopt) {
|
||||
if (phase == _cache.phase_of(_read_context->range().start()->value())) {
|
||||
@@ -728,7 +723,7 @@ row_cache::make_reader(schema_ptr s,
|
||||
auto&& pos = ctx->range().start()->value();
|
||||
partitions_type::bound_hint hint;
|
||||
auto i = _partitions.lower_bound(pos, cmp, hint);
|
||||
if (i != _partitions.end() && hint.match) {
|
||||
if (hint.match) {
|
||||
cache_entry& e = *i;
|
||||
upgrade_entry(e);
|
||||
on_partition_hit();
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 72e3baed9c...dab10ba6ad
@@ -53,6 +53,7 @@
|
||||
#include "database.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "types/user.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -1075,8 +1076,19 @@ future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging
|
||||
// referenced by the incoming request.
|
||||
// That means the column mapping for the schema should always be inserted
|
||||
// with TTL (refresh TTL in case column mapping already existed prior to that).
|
||||
return db::schema_tables::store_column_mapping(proxy, s.unfreeze(db::schema_ctxt(proxy)), true).then([s] {
|
||||
return s;
|
||||
auto us = s.unfreeze(db::schema_ctxt(proxy));
|
||||
// if this is a view - we might need to fix it's schema before registering it.
|
||||
if (us->is_view()) {
|
||||
auto& db = proxy.local().local_db();
|
||||
schema_ptr base_schema = db.find_schema(us->view_info()->base_id());
|
||||
auto fixed_view = db::schema_tables::maybe_fix_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema,
|
||||
db::schema_tables::preserve_version::yes);
|
||||
if (fixed_view) {
|
||||
us = fixed_view;
|
||||
}
|
||||
}
|
||||
return db::schema_tables::store_column_mapping(proxy, us, true).then([us] {
|
||||
return frozen_schema{us};
|
||||
});
|
||||
});
|
||||
}).then([] (schema_ptr s) {
|
||||
@@ -1084,7 +1096,7 @@ future<schema_ptr> get_schema_definition(table_schema_version v, netw::messaging
|
||||
// table.
|
||||
if (s->is_view()) {
|
||||
if (!s->view_info()->base_info()) {
|
||||
auto& db = service::get_local_storage_proxy().get_db().local();
|
||||
auto& db = service::get_local_storage_proxy().local_db();
|
||||
// This line might throw a no_such_column_family
|
||||
// It should be fine since if we tried to register a view for which
|
||||
// we don't know the base table, our registry is broken.
|
||||
|
||||
@@ -3643,7 +3643,12 @@ protected:
|
||||
}
|
||||
|
||||
public:
|
||||
virtual future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> execute(storage_proxy::clock_type::time_point timeout) {
|
||||
if (_targets.empty()) {
|
||||
// We may have no targets to read from if a DC with zero replication is queried with LOCACL_QUORUM.
|
||||
// Return an empty result in this case
|
||||
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>>(make_foreign(make_lw_shared(query::result())));
|
||||
}
|
||||
digest_resolver_ptr digest_resolver = ::make_shared<digest_read_resolver>(_schema, _cl, _block_for,
|
||||
db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
@@ -314,6 +314,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstring name, non
|
||||
cmlog.info("{} was abruptly stopped, reason: {}", name, e.what());
|
||||
} catch (...) {
|
||||
cmlog.error("{} failed: {}", name, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
});
|
||||
return task->compaction_done.get_future().then([task] {});
|
||||
|
||||
@@ -424,6 +424,11 @@ std::unique_ptr<incremental_selector_impl> time_series_sstable_set::make_increme
|
||||
// exactly once after all sstables are iterated over.
|
||||
//
|
||||
// The readers are created lazily on-demand using the supplied factory function.
|
||||
//
|
||||
// Additionally to the sstable readers, the queue always returns one ``dummy reader''
|
||||
// that contains only the partition_start/end markers. This dummy reader is always
|
||||
// returned as the first on the first `pop(b)` call for any `b`. Its upper bound
|
||||
// is `before_all_clustered_rows`.
|
||||
class min_position_reader_queue : public position_reader_queue {
|
||||
using container_t = time_series_sstable_set::container_t;
|
||||
using value_t = container_t::value_type;
|
||||
@@ -441,6 +446,11 @@ class min_position_reader_queue : public position_reader_queue {
|
||||
std::function<flat_mutation_reader(sstable&)> _create_reader;
|
||||
std::function<bool(const sstable&)> _filter;
|
||||
|
||||
// After construction contains a reader which returns only the partition
|
||||
// start (and end, if not in forwarding mode) markers. This is the first
|
||||
// returned reader.
|
||||
std::optional<flat_mutation_reader> _dummy_reader;
|
||||
|
||||
flat_mutation_reader create_reader(sstable& sst) {
|
||||
return _create_reader(sst);
|
||||
}
|
||||
@@ -450,10 +460,14 @@ class min_position_reader_queue : public position_reader_queue {
|
||||
}
|
||||
|
||||
public:
|
||||
// Assumes that `create_reader` returns readers that emit only fragments from partition `pk`.
|
||||
min_position_reader_queue(schema_ptr schema,
|
||||
lw_shared_ptr<const time_series_sstable_set::container_t> sstables,
|
||||
std::function<flat_mutation_reader(sstable&)> create_reader,
|
||||
std::function<bool(const sstable&)> filter)
|
||||
std::function<bool(const sstable&)> filter,
|
||||
partition_key pk,
|
||||
reader_permit permit,
|
||||
streamed_mutation::forwarding fwd_sm)
|
||||
: _schema(std::move(schema))
|
||||
, _sstables(std::move(sstables))
|
||||
, _it(_sstables->begin())
|
||||
@@ -461,6 +475,8 @@ public:
|
||||
, _cmp(*_schema)
|
||||
, _create_reader(std::move(create_reader))
|
||||
, _filter(std::move(filter))
|
||||
, _dummy_reader(flat_mutation_reader_from_mutations(
|
||||
std::move(permit), {mutation(_schema, std::move(pk))}, _schema->full_slice(), fwd_sm))
|
||||
{
|
||||
while (_it != _end && !this->filter(*_it->second)) {
|
||||
++_it;
|
||||
@@ -469,7 +485,8 @@ public:
|
||||
|
||||
virtual ~min_position_reader_queue() override = default;
|
||||
|
||||
// Open sstable readers to all sstables with smallest min_position() from the set
|
||||
// If the dummy reader was not yet returned, return the dummy reader.
|
||||
// Otherwise, open sstable readers to all sstables with smallest min_position() from the set
|
||||
// {S: filter(S) and prev_min_pos < S.min_position() <= bound}, where `prev_min_pos` is the min_position()
|
||||
// of the sstables returned from last non-empty pop() or -infinity if no sstables were previously returned,
|
||||
// and `filter` is the filtering function provided when creating the queue.
|
||||
@@ -483,6 +500,12 @@ public:
|
||||
return {};
|
||||
}
|
||||
|
||||
if (_dummy_reader) {
|
||||
std::vector<reader_and_upper_bound> ret;
|
||||
ret.emplace_back(*std::exchange(_dummy_reader, std::nullopt), position_in_partition::before_all_clustered_rows());
|
||||
return ret;
|
||||
}
|
||||
|
||||
// by !empty(bound) and `_it` invariant:
|
||||
// _it != _end, _it->first <= bound, and filter(*_it->second) == true
|
||||
assert(_cmp(_it->first, bound) <= 0);
|
||||
@@ -511,17 +534,22 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Is the set of sstables {S: filter(S) and prev_min_pos < S.min_position() <= bound} empty?
|
||||
// (see pop() for definition of `prev_min_pos`)
|
||||
// If the dummy reader was not returned yet, returns false.
|
||||
// Otherwise checks if the set of sstables {S: filter(S) and prev_min_pos < S.min_position() <= bound}
|
||||
// is empty (see pop() for definition of `prev_min_pos`).
|
||||
virtual bool empty(position_in_partition_view bound) const override {
|
||||
return _it == _end || _cmp(_it->first, bound) > 0;
|
||||
return !_dummy_reader && (_it == _end || _cmp(_it->first, bound) > 0);
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<position_reader_queue> time_series_sstable_set::make_min_position_reader_queue(
|
||||
std::function<flat_mutation_reader(sstable&)> create_reader,
|
||||
std::function<bool(const sstable&)> filter) const {
|
||||
return std::make_unique<min_position_reader_queue>(_schema, _sstables, std::move(create_reader), std::move(filter));
|
||||
std::function<bool(const sstable&)> filter,
|
||||
partition_key pk, schema_ptr schema, reader_permit permit,
|
||||
streamed_mutation::forwarding fwd_sm) const {
|
||||
return std::make_unique<min_position_reader_queue>(
|
||||
std::move(schema), _sstables, std::move(create_reader), std::move(filter),
|
||||
std::move(pk), std::move(permit), fwd_sm);
|
||||
}
|
||||
|
||||
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
|
||||
@@ -787,30 +815,19 @@ time_series_sstable_set::create_single_key_sstable_reader(
|
||||
auto& stats = *cf->cf_stats();
|
||||
stats.clustering_filter_count++;
|
||||
|
||||
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };
|
||||
{
|
||||
auto next = std::find_if(it, _sstables->end(), [&] (const sst_entry& e) { return ck_filter(*e.second); });
|
||||
stats.sstables_checked_by_clustering_filter += std::distance(it, next);
|
||||
it = next;
|
||||
}
|
||||
if (it == _sstables->end()) {
|
||||
// Some sstables passed the partition key filter, but none passed the clustering key filter.
|
||||
// However, we still have to emit a partition (even though it will be empty) so we don't fool the cache
|
||||
// into thinking this partition doesn't exist in any sstable (#3552).
|
||||
return flat_mutation_reader_from_mutations(std::move(permit), {mutation(schema, *pos.key())}, slice, fwd_sm);
|
||||
}
|
||||
|
||||
auto create_reader = [schema, permit, &pr, &slice, &pc, trace_state, fwd_sm] (sstable& sst) {
|
||||
return sst.make_reader(schema, permit, pr, slice, pc, trace_state, fwd_sm);
|
||||
};
|
||||
|
||||
auto ck_filter = [ranges = slice.get_all_ranges()] (const sstable& sst) { return sst.may_contain_rows(ranges); };
|
||||
|
||||
// We're going to pass this filter into min_position_reader_queue. The queue guarantees that
|
||||
// the filter is going to be called at most once for each sstable and exactly once after
|
||||
// the queue is exhausted. We use that fact to gather statistics.
|
||||
auto filter = [pk_filter = std::move(pk_filter), ck_filter = std::move(ck_filter), &stats]
|
||||
(const sstable& sst) {
|
||||
if (pk_filter(sst)) {
|
||||
return true;
|
||||
if (!pk_filter(sst)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
++stats.sstables_checked_by_clustering_filter;
|
||||
@@ -822,9 +839,12 @@ time_series_sstable_set::create_single_key_sstable_reader(
|
||||
return false;
|
||||
};
|
||||
|
||||
// Note that `min_position_reader_queue` always includes a reader which emits a `partition_start` fragment,
|
||||
// guaranteeing that the reader we return emits it as well; this helps us avoid the problem from #3552.
|
||||
return make_clustering_combined_reader(
|
||||
std::move(schema), std::move(permit), fwd_sm,
|
||||
make_min_position_reader_queue(std::move(create_reader), std::move(filter)));
|
||||
schema, permit, fwd_sm,
|
||||
make_min_position_reader_queue(
|
||||
std::move(create_reader), std::move(filter), *pos.key(), schema, permit, fwd_sm));
|
||||
}
|
||||
|
||||
compound_sstable_set::compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets)
|
||||
@@ -954,6 +974,40 @@ sstable_set make_compound_sstable_set(schema_ptr schema, std::vector<lw_shared_p
|
||||
return sstable_set(std::make_unique<compound_sstable_set>(schema, std::move(sets)), schema);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
compound_sstable_set::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
utils::estimated_histogram& sstable_histogram,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
auto sets = _sets;
|
||||
auto it = std::partition(sets.begin(), sets.end(), [] (const auto& set) { return set->all()->size() > 0; });
|
||||
auto non_empty_set_count = std::distance(sets.begin(), it);
|
||||
|
||||
if (!non_empty_set_count) {
|
||||
return make_empty_flat_reader(schema, permit);
|
||||
}
|
||||
// optimize for common case where only 1 set is populated, avoiding the expensive combined reader
|
||||
if (non_empty_set_count == 1) {
|
||||
const auto& non_empty_set = *std::begin(sets);
|
||||
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
|
||||
boost::make_iterator_range(sets.begin(), it)
|
||||
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
|
||||
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
|
||||
})
|
||||
);
|
||||
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
sstable_set::create_single_key_sstable_reader(
|
||||
column_family* cf,
|
||||
|
||||
@@ -136,7 +136,9 @@ public:
|
||||
|
||||
std::unique_ptr<position_reader_queue> make_min_position_reader_queue(
|
||||
std::function<flat_mutation_reader(sstable&)> create_reader,
|
||||
std::function<bool(const sstable&)> filter) const;
|
||||
std::function<bool(const sstable&)> filter,
|
||||
partition_key pk, schema_ptr schema, reader_permit permit,
|
||||
streamed_mutation::forwarding fwd_sm) const;
|
||||
|
||||
virtual flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family*,
|
||||
@@ -167,6 +169,19 @@ public:
|
||||
virtual void insert(shared_sstable sst) override;
|
||||
virtual void erase(shared_sstable sst) override;
|
||||
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
|
||||
|
||||
virtual flat_mutation_reader create_single_key_sstable_reader(
|
||||
column_family*,
|
||||
schema_ptr,
|
||||
reader_permit,
|
||||
utils::estimated_histogram&,
|
||||
const dht::partition_range&,
|
||||
const query::partition_slice&,
|
||||
const io_priority_class&,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding) const override;
|
||||
|
||||
class incremental_selector;
|
||||
};
|
||||
|
||||
|
||||
@@ -101,7 +101,8 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
|
||||
time_window_compaction_strategy_options _options;
|
||||
int64_t _estimated_remaining_tasks = 0;
|
||||
db_clock::time_point _last_expired_check;
|
||||
timestamp_type _highest_window_seen;
|
||||
// As timestamp_type is an int64_t, a primitive type, it must be initialized here.
|
||||
timestamp_type _highest_window_seen = 0;
|
||||
// Keep track of all recent active windows that still need to be compacted into a single SSTable
|
||||
std::unordered_set<timestamp_type> _recent_active_windows;
|
||||
size_tiered_compaction_strategy_options _stcs_options;
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <set>
|
||||
#include <deque>
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
@@ -748,3 +749,75 @@ SEASTAR_TEST_CASE(test_commitlog_new_segment_odsync){
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test for #8363
|
||||
// try to provoke edge case where we race segment deletion
|
||||
// and waiting for recycled to be replenished.
|
||||
SEASTAR_TEST_CASE(test_commitlog_deadlock_in_recycle) {
|
||||
commitlog::config cfg;
|
||||
|
||||
constexpr auto max_size_mb = 2;
|
||||
cfg.commitlog_segment_size_in_mb = max_size_mb;
|
||||
// ensure total size per shard is not multiple of segment size.
|
||||
cfg.commitlog_total_space_in_mb = 5 * smp::count;
|
||||
cfg.commitlog_sync_period_in_ms = 10;
|
||||
cfg.reuse_segments = true;
|
||||
cfg.allow_going_over_size_limit = false;
|
||||
cfg.use_o_dsync = true; // make sure we pre-allocate.
|
||||
|
||||
// not using cl_test, because we need to be able to abandon
|
||||
// the log.
|
||||
|
||||
tmpdir tmp;
|
||||
cfg.commit_log_location = tmp.path().string();
|
||||
auto log = co_await commitlog::create_commitlog(cfg);
|
||||
|
||||
rp_set rps;
|
||||
std::deque<rp_set> queue;
|
||||
size_t n = 0;
|
||||
|
||||
// uncomment for verbosity
|
||||
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
|
||||
|
||||
auto uuid = utils::UUID_gen::get_time_UUID();
|
||||
auto size = log.max_record_size() / 2;
|
||||
|
||||
timer<> t;
|
||||
t.set_callback([&] {
|
||||
while (!queue.empty()) {
|
||||
auto flush = std::move(queue.front());
|
||||
queue.pop_front();
|
||||
log.discard_completed_segments(uuid, flush);
|
||||
++n;
|
||||
};
|
||||
});
|
||||
|
||||
// add a flush handler that delays releasing things until disk threshold is reached.
|
||||
auto r = log.add_flush_handler([&](cf_id_type, replay_position pos) {
|
||||
auto old = std::exchange(rps, rp_set{});
|
||||
queue.emplace_back(std::move(old));
|
||||
if (log.disk_footprint() >= log.disk_limit() && !t.armed()) {
|
||||
t.arm(5s);
|
||||
}
|
||||
});
|
||||
|
||||
bool release = true;
|
||||
|
||||
try {
|
||||
while (n < 10) {
|
||||
auto now = timeout_clock::now();
|
||||
rp_handle h = co_await with_timeout(now + 30s, log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
|
||||
dst.fill('1', size);
|
||||
}));
|
||||
rps.put(std::move(h));
|
||||
}
|
||||
} catch (timed_out_error&) {
|
||||
BOOST_FAIL("log write timed out. maybe it is deadlocked... Will not free log. ASAN errors and leaks will follow...");
|
||||
release = false;
|
||||
}
|
||||
|
||||
if (release) {
|
||||
co_await log.shutdown();
|
||||
co_await log.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -974,14 +974,7 @@ SEASTAR_THREAD_TEST_CASE(fuzzy_test) {
|
||||
|
||||
const auto& partitions = pop_desc.partitions;
|
||||
smp::invoke_on_all([cfg, db = &env.db(), gs = global_schema_ptr(pop_desc.schema), &partitions] {
|
||||
auto s = gs.get();
|
||||
auto& sem = db->local().get_reader_concurrency_semaphore();
|
||||
|
||||
auto resources = sem.available_resources();
|
||||
resources -= reader_concurrency_semaphore::resources{1, 0};
|
||||
auto permit = sem.make_permit(s.get(), "fuzzy-test");
|
||||
|
||||
return run_fuzzy_test_workload(cfg, *db, std::move(s), partitions).finally([units = permit.consume_resources(resources)] {});
|
||||
return run_fuzzy_test_workload(cfg, *db, gs.get(), partitions);
|
||||
}).handle_exception([seed] (std::exception_ptr e) {
|
||||
testlog.error("Test workload failed with exception {}."
|
||||
" To repeat this particular run, replace the random seed of the test, with that of this run ({})."
|
||||
|
||||
@@ -970,6 +970,192 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_destroyed_permit_rele
|
||||
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
|
||||
}
|
||||
|
||||
// This unit test passes a read through admission again-and-again, just
|
||||
// like an evictable reader would be during its lifetime. When readmitted
|
||||
// the read sometimes has to wait and sometimes not. This is to check that
|
||||
// the readmitting a previously admitted reader doesn't leak any units.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves_units) {
|
||||
simple_schema s;
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{10, 1024 * 1024};
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name());
|
||||
|
||||
auto permit = semaphore.make_permit(s.schema().get(), get_name());
|
||||
|
||||
std::optional<reader_permit::resource_units> residue_units;
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
const auto have_residue_units = bool(residue_units);
|
||||
|
||||
auto current_resources = initial_resources;
|
||||
if (have_residue_units) {
|
||||
current_resources -= residue_units->resources();
|
||||
}
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
std::optional<reader_permit::resource_units> admitted_units;
|
||||
if (i % 2) {
|
||||
const auto consumed_resources = semaphore.available_resources();
|
||||
semaphore.consume(consumed_resources);
|
||||
|
||||
auto units_fut = permit.wait_admission(1024, db::no_timeout);
|
||||
BOOST_REQUIRE(!units_fut.available());
|
||||
|
||||
semaphore.signal(consumed_resources);
|
||||
admitted_units = units_fut.get();
|
||||
} else {
|
||||
admitted_units = permit.wait_admission(1024, db::no_timeout).get();
|
||||
}
|
||||
|
||||
current_resources -= admitted_units->resources();
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
residue_units.emplace(permit.consume_resources(reader_resources(0, 100)));
|
||||
if (!have_residue_units) {
|
||||
current_resources -= residue_units->resources();
|
||||
}
|
||||
BOOST_REQUIRE(semaphore.available_resources() == current_resources);
|
||||
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader(s.schema(), permit));
|
||||
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(semaphore.available_resources() == initial_resources - residue_units->resources());
|
||||
|
||||
residue_units.reset();
|
||||
|
||||
BOOST_REQUIRE(semaphore.available_resources() == initial_resources);
|
||||
}
|
||||
|
||||
// This unit test checks that the semaphore doesn't get into a deadlock
|
||||
// when contended, in the presence of many memory-only reads (that don't
|
||||
// wait for admission). This is tested by simulating the 3 kind of reads we
|
||||
// currently have in the system:
|
||||
// * memory-only: reads that don't pass admission and only own memory.
|
||||
// * admitted: reads that pass admission.
|
||||
// * evictable: admitted reads that are furthermore evictable.
|
||||
//
|
||||
// The test creates and runs a large number of these reads in parallel,
|
||||
// read kinds being selected randomly, then creates a watchdog which
|
||||
// kills the test if no progress is being made.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_forward_progress) {
|
||||
class reader {
|
||||
class skeleton_reader : public flat_mutation_reader::impl {
|
||||
reader_permit::resource_units _base_resources;
|
||||
std::optional<reader_permit::resource_units> _resources;
|
||||
public:
|
||||
skeleton_reader(schema_ptr s, reader_permit permit, reader_permit::resource_units res)
|
||||
: impl(std::move(s), std::move(permit)), _base_resources(std::move(res)) { }
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
_resources.emplace(_permit.consume_resources(reader_resources(0, tests::random::get_int(1024, 2048))));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
virtual future<> next_partition() override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
|
||||
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
|
||||
};
|
||||
struct reader_visitor {
|
||||
reader& r;
|
||||
future<> operator()(std::monostate& ms) { return r.tick(ms); }
|
||||
future<> operator()(flat_mutation_reader& reader) { return r.tick(reader); }
|
||||
future<> operator()(reader_concurrency_semaphore::inactive_read_handle& handle) { return r.tick(handle); }
|
||||
};
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
bool _memory_only = true;
|
||||
bool _evictable = false;
|
||||
std::optional<reader_permit::resource_units> _units;
|
||||
std::variant<std::monostate, flat_mutation_reader, reader_concurrency_semaphore::inactive_read_handle> _reader;
|
||||
|
||||
private:
|
||||
future<> make_reader() {
|
||||
auto res = _permit.consume_memory();
|
||||
if (!_memory_only) {
|
||||
res = co_await _permit.wait_admission(1024, db::no_timeout);
|
||||
}
|
||||
_reader = make_flat_mutation_reader<skeleton_reader>(_schema, _permit, std::move(res));
|
||||
}
|
||||
future<> tick(std::monostate&) {
|
||||
co_await make_reader();
|
||||
co_await tick(std::get<flat_mutation_reader>(_reader));
|
||||
}
|
||||
future<> tick(flat_mutation_reader& reader) {
|
||||
co_await reader.fill_buffer(db::no_timeout);
|
||||
if (_evictable) {
|
||||
_reader = _permit.semaphore().register_inactive_read(std::move(reader));
|
||||
}
|
||||
}
|
||||
future<> tick(reader_concurrency_semaphore::inactive_read_handle& handle) {
|
||||
if (auto reader = _permit.semaphore().unregister_inactive_read(std::move(handle)); reader) {
|
||||
_reader = std::move(*reader);
|
||||
} else {
|
||||
co_await make_reader();
|
||||
}
|
||||
co_await tick(std::get<flat_mutation_reader>(_reader));
|
||||
}
|
||||
|
||||
public:
|
||||
reader(schema_ptr s, reader_permit permit, bool memory_only, bool evictable)
|
||||
: _schema(std::move(s))
|
||||
, _permit(std::move(permit))
|
||||
, _memory_only(memory_only)
|
||||
, _evictable(evictable)
|
||||
, _units(_permit.consume_memory(tests::random::get_int(128, 1024)))
|
||||
{
|
||||
}
|
||||
future<> tick() {
|
||||
return std::visit(reader_visitor{*this}, _reader);
|
||||
}
|
||||
};
|
||||
|
||||
const auto count = 10;
|
||||
const auto num_readers = 512;
|
||||
const auto ticks = 1000;
|
||||
|
||||
simple_schema s;
|
||||
reader_concurrency_semaphore semaphore(count, count * 1024, get_name());
|
||||
|
||||
std::list<std::optional<reader>> readers;
|
||||
unsigned nr_memory_only = 0;
|
||||
unsigned nr_admitted = 0;
|
||||
unsigned nr_evictable = 0;
|
||||
|
||||
for (auto i = 0; i < num_readers; ++i) {
|
||||
const auto memory_only = tests::random::get_bool();
|
||||
const auto evictable = !memory_only && tests::random::get_bool();
|
||||
if (memory_only) {
|
||||
++nr_memory_only;
|
||||
} else if (evictable) {
|
||||
++nr_evictable;
|
||||
} else {
|
||||
++nr_admitted;
|
||||
}
|
||||
readers.emplace_back(reader(s.schema(), semaphore.make_permit(s.schema().get(), fmt::format("reader{}", i)), memory_only, evictable));
|
||||
}
|
||||
|
||||
testlog.info("Created {} readers, memory_only={}, admitted={}, evictable={}", readers.size(), nr_memory_only, nr_admitted, nr_evictable);
|
||||
|
||||
bool watchdog_touched = false;
|
||||
auto watchdog = timer<db::timeout_clock>([&semaphore, &watchdog_touched] {
|
||||
if (!watchdog_touched) {
|
||||
testlog.error("Watchdog detected a deadlock, dumping diagnostics before killing the test: {}", semaphore.dump_diagnostics());
|
||||
semaphore.broken(std::make_exception_ptr(std::runtime_error("test killed by watchdog")));
|
||||
}
|
||||
watchdog_touched = false;
|
||||
});
|
||||
watchdog.arm_periodic(std::chrono::seconds(30));
|
||||
|
||||
parallel_for_each(readers, [&] (std::optional<reader>& r) -> future<> {
|
||||
for (auto i = 0; i < ticks; ++i) {
|
||||
watchdog_touched = true;
|
||||
co_await r->tick();
|
||||
}
|
||||
r.reset();
|
||||
watchdog_touched = true;
|
||||
}).get();
|
||||
}
|
||||
|
||||
static
|
||||
sstables::shared_sstable create_sstable(sstables::test_env& env, schema_ptr s, std::vector<mutation> mutations) {
|
||||
static thread_local auto tmp = tmpdir();
|
||||
@@ -3829,11 +4015,26 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to)
|
||||
}
|
||||
|
||||
struct mutation_bounds {
|
||||
mutation m;
|
||||
std::optional<mutation> m;
|
||||
position_in_partition lower;
|
||||
position_in_partition upper;
|
||||
};
|
||||
|
||||
static reader_bounds make_reader_bounds(
|
||||
schema_ptr s, reader_permit permit, mutation_bounds mb, streamed_mutation::forwarding fwd,
|
||||
const query::partition_slice* slice = nullptr) {
|
||||
if (!slice) {
|
||||
slice = &s->full_slice();
|
||||
}
|
||||
|
||||
return reader_bounds {
|
||||
.r = mb.m ? flat_mutation_reader_from_mutations(permit, {std::move(*mb.m)}, *slice, fwd)
|
||||
: make_empty_flat_reader(s, permit),
|
||||
.lower = std::move(mb.lower),
|
||||
.upper = std::move(mb.upper)
|
||||
};
|
||||
}
|
||||
|
||||
struct clustering_order_merger_test_generator {
|
||||
struct scenario {
|
||||
std::vector<mutation_bounds> readers_data;
|
||||
@@ -3843,13 +4044,13 @@ struct clustering_order_merger_test_generator {
|
||||
schema_ptr _s;
|
||||
partition_key _pk;
|
||||
|
||||
clustering_order_merger_test_generator()
|
||||
: _s(make_schema()), _pk(partition_key::from_single_value(*_s, int32_type->decompose(0)))
|
||||
clustering_order_merger_test_generator(std::optional<sstring> pk = std::nullopt)
|
||||
: _s(make_schema()), _pk(partition_key::from_single_value(*_s, utf8_type->decompose(pk ? *pk : make_local_key(make_schema()))))
|
||||
{}
|
||||
|
||||
static schema_ptr make_schema() {
|
||||
return schema_builder("ks", "t")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("pk", utf8_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type, column_kind::regular_column)
|
||||
.build();
|
||||
@@ -3873,15 +4074,18 @@ struct clustering_order_merger_test_generator {
|
||||
return m;
|
||||
}
|
||||
|
||||
dht::decorated_key decorated_pk() const {
|
||||
return dht::decorate_key(*_s, _pk);
|
||||
}
|
||||
|
||||
scenario generate_scenario(std::mt19937& engine) const {
|
||||
std::set<int> all_ks;
|
||||
std::vector<mutation_bounds> readers_data;
|
||||
|
||||
auto num_readers = tests::random::get_int(1, 10, engine);
|
||||
auto num_empty_readers = tests::random::get_int(1, num_readers, engine);
|
||||
while (num_empty_readers--) {
|
||||
auto lower = -tests::random::get_int(0, 5, engine);
|
||||
auto upper = tests::random::get_int(0, 5, engine);
|
||||
readers_data.push_back(mutation_bounds{std::nullopt, mk_pos_for(lower), mk_pos_for(upper)});
|
||||
num_readers--;
|
||||
}
|
||||
while (num_readers--) {
|
||||
auto len = tests::random::get_int(0, 15, engine);
|
||||
auto ks = tests::random::random_subset<int>(100, len, engine);
|
||||
@@ -3929,16 +4133,17 @@ struct clustering_order_merger_test_generator {
|
||||
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
|
||||
clustering_order_merger_test_generator g;
|
||||
|
||||
auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) {
|
||||
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd);
|
||||
auto make_authority = [s = g._s] (std::optional<mutation> mut, streamed_mutation::forwarding fwd) {
|
||||
if (mut) {
|
||||
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(*mut)}, fwd);
|
||||
}
|
||||
return make_empty_flat_reader(s, tests::make_permit());
|
||||
};
|
||||
|
||||
auto make_tested = [s = g._s] (std::vector<mutation_bounds> ms, streamed_mutation::forwarding fwd) {
|
||||
auto rs = boost::copy_range<std::vector<reader_bounds>>(std::move(ms)
|
||||
| boost::adaptors::transformed([fwd] (auto&& mb) {
|
||||
return reader_bounds{
|
||||
flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mb.m)}, fwd),
|
||||
std::move(mb.lower), std::move(mb.upper)};
|
||||
| boost::adaptors::transformed([s, fwd] (auto&& mb) {
|
||||
return make_reader_bounds(s, tests::make_permit(), std::move(mb), fwd);
|
||||
}));
|
||||
auto q = std::make_unique<simple_position_reader_queue>(*s, std::move(rs));
|
||||
return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q));
|
||||
@@ -3951,7 +4156,15 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
|
||||
for (int run = 0; run < 1000; ++run) {
|
||||
auto scenario = g.generate_scenario(engine);
|
||||
auto merged = std::accumulate(scenario.readers_data.begin(), scenario.readers_data.end(),
|
||||
mutation(g._s, g._pk), [] (mutation curr, const mutation_bounds& mb) { return std::move(curr) + mb.m; });
|
||||
std::optional<mutation>{}, [&g] (std::optional<mutation> curr, const mutation_bounds& mb) {
|
||||
if (mb.m) {
|
||||
if (!curr) {
|
||||
curr = mutation(g._s, g._pk);
|
||||
}
|
||||
*curr += *mb.m;
|
||||
}
|
||||
return curr;
|
||||
});
|
||||
|
||||
{
|
||||
auto fwd = streamed_mutation::forwarding::no;
|
||||
@@ -3974,13 +4187,16 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
|
||||
sstables::test_env::do_with_async([] (sstables::test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
clustering_order_merger_test_generator g;
|
||||
|
||||
auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) {
|
||||
auto pkeys = make_local_keys(2, clustering_order_merger_test_generator::make_schema());
|
||||
clustering_order_merger_test_generator g(pkeys[0]);
|
||||
|
||||
auto make_authority = [s = g._s] (mutation mut, streamed_mutation::forwarding fwd) {
|
||||
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd);
|
||||
};
|
||||
|
||||
auto make_tested = [s = g._s, pr = dht::partition_range::make_singular(dht::ring_position(g.decorated_pk()))]
|
||||
auto pr = dht::partition_range::make_singular(dht::ring_position(dht::decorate_key(*g._s, g._pk)));
|
||||
auto make_tested = [s = g._s, pk = g._pk, &pr]
|
||||
(const time_series_sstable_set& sst_set,
|
||||
const std::unordered_set<int64_t>& included_gens, streamed_mutation::forwarding fwd) {
|
||||
auto q = sst_set.make_min_position_reader_queue(
|
||||
@@ -3988,7 +4204,8 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
|
||||
return sst.make_reader(s, tests::make_permit(), pr,
|
||||
s->full_slice(), seastar::default_priority_class(), nullptr, fwd);
|
||||
},
|
||||
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); });
|
||||
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); },
|
||||
pk, s, tests::make_permit(), fwd);
|
||||
return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q));
|
||||
};
|
||||
|
||||
@@ -4006,32 +4223,39 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
|
||||
std::unordered_set<int64_t> included_gens;
|
||||
int64_t gen = 0;
|
||||
for (auto& mb: scenario.readers_data) {
|
||||
sst_set.insert(make_sstable_containing([s = g._s, &env, &tmp, gen = ++gen] () {
|
||||
auto sst_factory = [s = g._s, &env, &tmp, gen = ++gen] () {
|
||||
return env.make_sstable(std::move(s), tmp.path().string(), gen,
|
||||
sstables::sstable::version_types::md, sstables::sstable::format_types::big);
|
||||
}, {mb.m}));
|
||||
};
|
||||
|
||||
if (mb.m) {
|
||||
sst_set.insert(make_sstable_containing(std::move(sst_factory), {*mb.m}));
|
||||
} else {
|
||||
// We want to have an sstable that won't return any fragments when we query it
|
||||
// for our partition (not even `partition_start`). For that we create an sstable
|
||||
// with a different partition.
|
||||
auto pk = partition_key::from_single_value(*g._s, utf8_type->decompose(pkeys[1]));
|
||||
assert(pk != g._pk);
|
||||
|
||||
sst_set.insert(make_sstable_containing(std::move(sst_factory), {mutation(g._s, pk)}));
|
||||
}
|
||||
|
||||
if (dist(engine)) {
|
||||
included_gens.insert(gen);
|
||||
merged += mb.m;
|
||||
if (mb.m) {
|
||||
merged += *mb.m;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (included_gens.empty()) {
|
||||
for (auto fwd: {streamed_mutation::forwarding::no, streamed_mutation::forwarding::yes}) {
|
||||
assert_that(make_tested(sst_set, included_gens, fwd)).produces_end_of_stream();
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
auto fwd = streamed_mutation::forwarding::no;
|
||||
compare_readers(*g._s, make_authority(merged, fwd), make_tested(sst_set, included_gens, fwd));
|
||||
}
|
||||
|
||||
auto fwd = streamed_mutation::forwarding::yes;
|
||||
compare_readers(*g._s, make_authority(std::move(merged), fwd), make_tested(sst_set, included_gens, fwd), scenario.fwd_ranges);
|
||||
compare_readers(*g._s, make_authority(std::move(merged), fwd),
|
||||
make_tested(sst_set, included_gens, fwd), scenario.fwd_ranges);
|
||||
}
|
||||
|
||||
}).get();
|
||||
@@ -4220,9 +4444,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
|
||||
for (auto& [k, ms]: good) {
|
||||
auto rs = boost::copy_range<std::vector<reader_bounds>>(std::move(ms)
|
||||
| boost::adaptors::transformed([&] (auto&& mb) {
|
||||
return reader_bounds{
|
||||
flat_mutation_reader_from_mutations(permit, {std::move(mb.m)}, slice, fwd_sm),
|
||||
std::move(mb.lower), std::move(mb.upper)};
|
||||
return make_reader_bounds(s, permit, std::move(mb), fwd_sm, &slice);
|
||||
}));
|
||||
std::sort(rs.begin(), rs.end(), [less = position_in_partition::less_compare(*s)]
|
||||
(const reader_bounds& a, const reader_bounds& b) { return less(a.lower, b.lower); });
|
||||
@@ -4242,3 +4464,23 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
|
||||
|
||||
run_mutation_source_tests(std::move(populate));
|
||||
}
|
||||
|
||||
// Regression test for #8445.
|
||||
SEASTAR_THREAD_TEST_CASE(test_clustering_combining_of_empty_readers) {
|
||||
auto s = clustering_order_merger_test_generator::make_schema();
|
||||
|
||||
std::vector<reader_bounds> rs;
|
||||
rs.push_back({
|
||||
.r = make_empty_flat_reader(s, tests::make_permit()),
|
||||
.lower = position_in_partition::before_all_clustered_rows(),
|
||||
.upper = position_in_partition::after_all_clustered_rows()
|
||||
});
|
||||
auto r = make_clustering_combined_reader(
|
||||
s, tests::make_permit(), streamed_mutation::forwarding::no,
|
||||
std::make_unique<simple_position_reader_queue>(*s, std::move(rs)));
|
||||
|
||||
auto mf = r(db::no_timeout).get0();
|
||||
if (mf) {
|
||||
BOOST_FAIL(format("reader combined of empty readers returned fragment {}", mutation_fragment::printer(*s, *mf)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -702,7 +702,10 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
|
||||
nullptr,
|
||||
db::no_timeout).get();
|
||||
|
||||
BOOST_CHECK_EQUAL(db.get_querier_cache_stats().resource_based_evictions, 1);
|
||||
// The second read might be evicted too if it consumes more
|
||||
// memory than the first and hence triggers memory control when
|
||||
// saved in the querier cache.
|
||||
BOOST_CHECK_GE(db.get_querier_cache_stats().resource_based_evictions, 1);
|
||||
|
||||
// We want to read the entire partition so that the querier
|
||||
// is not saved at the end and thus ensure it is destroyed.
|
||||
|
||||
@@ -7041,3 +7041,154 @@ SEASTAR_TEST_CASE(test_offstrategy_sstable_compaction) {
|
||||
cf->stop().get();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(single_key_reader_through_compound_set_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "single_key_reader_through_compound_set_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", ::timestamp_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map <sstring, sstring> opts = {
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
|
||||
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
|
||||
};
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
auto s = builder.build();
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, std::move(opts));
|
||||
|
||||
auto next_timestamp = [](auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() + duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_row = [&](std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto next_ts = next_timestamp(step);
|
||||
auto c_key = clustering_key::from_exploded(*s, {::timestamp_type->decompose(next_ts)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value++)), next_ts);
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
::cf_stats cf_stats{0};
|
||||
cfg.cf_stats = &cf_stats;
|
||||
cfg.datadir = tmp.path().string();
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_cache = false;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
auto set1 = make_lw_shared<sstable_set>(cs.make_sstable_set(s));
|
||||
auto set2 = make_lw_shared<sstable_set>(cs.make_sstable_set(s));
|
||||
|
||||
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
|
||||
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
// sstables with same key but belonging to different windows
|
||||
auto sst1 = make_sstable_containing(sst_gen, {make_row(std::chrono::hours(1))});
|
||||
auto sst2 = make_sstable_containing(sst_gen, {make_row(std::chrono::hours(5))});
|
||||
BOOST_REQUIRE(sst1->get_first_decorated_key().token() == sst2->get_last_decorated_key().token());
|
||||
auto dkey = sst1->get_first_decorated_key();
|
||||
|
||||
set1->insert(std::move(sst1));
|
||||
set2->insert(std::move(sst2));
|
||||
sstable_set compound = sstables::make_compound_sstable_set(s, {set1, set2});
|
||||
|
||||
reader_permit permit = tests::make_permit();
|
||||
utils::estimated_histogram eh;
|
||||
auto pr = dht::partition_range::make_singular(dkey);
|
||||
|
||||
auto reader = compound.create_single_key_sstable_reader(&*cf, s, permit, eh, pr, s->full_slice(), default_priority_class(),
|
||||
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
auto mfopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(mfopt);
|
||||
mfopt = read_mutation_from_flat_mutation_reader(reader, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(!mfopt);
|
||||
BOOST_REQUIRE(cf_stats.clustering_filter_count > 0);
|
||||
});
|
||||
}
|
||||
|
||||
// Regression test for #8432
|
||||
SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "twcs_single_key_reader_filtering")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = tmpdir();
|
||||
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)]() {
|
||||
return env.make_sstable(s, tmp.path().string(), (*gen)++, sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
auto make_row = [&] (int32_t pk, int32_t ck) {
|
||||
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(pk)));
|
||||
m.set_clustered_cell(clustering_key::from_single_value(*s, int32_type->decompose(ck)), to_bytes("v"), int32_t(0), api::new_timestamp());
|
||||
return m;
|
||||
};
|
||||
|
||||
auto sst1 = make_sstable_containing(sst_gen, {make_row(0, 0)});
|
||||
auto sst2 = make_sstable_containing(sst_gen, {make_row(0, 1)});
|
||||
auto dkey = sst1->get_first_decorated_key();
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config(env.manager());
|
||||
::cf_stats cf_stats{0};
|
||||
cfg.cf_stats = &cf_stats;
|
||||
cfg.datadir = tmp.path().string();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
column_family cf(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf.mark_ready_for_writes();
|
||||
cf.start();
|
||||
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, {});
|
||||
|
||||
auto set = cs.make_sstable_set(s);
|
||||
set.insert(std::move(sst1));
|
||||
set.insert(std::move(sst2));
|
||||
|
||||
reader_permit permit = tests::make_permit();
|
||||
utils::estimated_histogram eh;
|
||||
auto pr = dht::partition_range::make_singular(dkey);
|
||||
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range {
|
||||
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(0)) },
|
||||
query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(1)) },
|
||||
}).build();
|
||||
|
||||
auto reader = set.create_single_key_sstable_reader(
|
||||
&cf, s, permit, eh, pr, slice, default_priority_class(),
|
||||
tracing::trace_state_ptr(), ::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
auto checked_by_ck = cf_stats.sstables_checked_by_clustering_filter;
|
||||
auto surviving_after_ck = cf_stats.surviving_sstables_after_clustering_filter;
|
||||
|
||||
// consume all fragments
|
||||
while (reader(db::no_timeout).get());
|
||||
|
||||
// At least sst2 should be checked by the CK filter during fragment consumption and should pass.
|
||||
// With the bug in #8432, sst2 wouldn't even be checked by the CK filter since it would pass right after checking the PK filter.
|
||||
BOOST_REQUIRE_GE(cf_stats.sstables_checked_by_clustering_filter - checked_by_ck, 1);
|
||||
BOOST_REQUIRE_EQUAL(
|
||||
cf_stats.surviving_sstables_after_clustering_filter - surviving_after_ck,
|
||||
cf_stats.sstables_checked_by_clustering_filter - checked_by_ck);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
import time
|
||||
import pytest
|
||||
from cassandra.protocol import SyntaxException, AlreadyExists, InvalidRequest, ConfigurationException, ReadFailure
|
||||
from cassandra.query import SimpleStatement
|
||||
|
||||
from util import new_test_table
|
||||
|
||||
@@ -64,3 +65,19 @@ def test_partition_order_with_si(cql, test_keyspace):
|
||||
pass
|
||||
time.sleep(0.1)
|
||||
assert tokens2 == tokens
|
||||
|
||||
# Test that the paging state works properly for indexes on tables
|
||||
# with descending clustering order. There was a problem with indexes
|
||||
# created on clustering keys with DESC clustering order - they are represented
|
||||
# as "reverse" types internally and Scylla assertions failed that the base type
|
||||
# is different from the underlying view type, even though, from the perspective
|
||||
# of deserialization, they're equal. Issue #8666
|
||||
def test_paging_with_desc_clustering_order(cql, test_keyspace):
|
||||
schema = 'p int, c int, primary key (p,c)'
|
||||
extra = 'with clustering order by (c desc)'
|
||||
with new_test_table(cql, test_keyspace, schema, extra) as table:
|
||||
cql.execute(f"CREATE INDEX ON {table}(c)")
|
||||
for i in range(3):
|
||||
cql.execute(f"INSERT INTO {table}(p,c) VALUES ({i}, 42)")
|
||||
stmt = SimpleStatement(f"SELECT * FROM {table} WHERE c = 42", fetch_size=1)
|
||||
assert len([row for row in cql.execute(stmt)]) == 3
|
||||
|
||||
@@ -2327,11 +2327,15 @@ void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mut
|
||||
test_mutated_schemas();
|
||||
}
|
||||
|
||||
static void compare_readers(const schema& s, flat_mutation_reader& authority, flat_reader_assertions& tested) {
|
||||
// Returns true iff the readers were non-empty.
|
||||
static bool compare_readers(const schema& s, flat_mutation_reader& authority, flat_reader_assertions& tested) {
|
||||
bool empty = true;
|
||||
while (auto expected = authority(db::no_timeout).get()) {
|
||||
tested.produces(s, *expected);
|
||||
empty = false;
|
||||
}
|
||||
tested.produces_end_of_stream();
|
||||
return !empty;
|
||||
}
|
||||
|
||||
void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutation_reader tested) {
|
||||
@@ -2339,12 +2343,14 @@ void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutat
|
||||
compare_readers(s, authority, assertions);
|
||||
}
|
||||
|
||||
// Assumes that the readers return fragments from (at most) a single (and the same) partition.
|
||||
void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutation_reader tested, const std::vector<position_range>& fwd_ranges) {
|
||||
auto assertions = assert_that(std::move(tested));
|
||||
compare_readers(s, authority, assertions);
|
||||
for (auto& r: fwd_ranges) {
|
||||
authority.fast_forward_to(r, db::no_timeout).get();
|
||||
assertions.fast_forward_to(r);
|
||||
compare_readers(s, authority, assertions);
|
||||
if (compare_readers(s, authority, assertions)) {
|
||||
for (auto& r: fwd_ranges) {
|
||||
authority.fast_forward_to(r, db::no_timeout).get();
|
||||
assertions.fast_forward_to(r);
|
||||
compare_readers(s, authority, assertions);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -333,13 +333,8 @@ int main(int argc, char** argv) {
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
|
||||
@@ -1817,13 +1817,8 @@ int main(int argc, char** argv) {
|
||||
|
||||
auto sstable_format_name = app.configuration()["sstable-format"].as<std::string>();
|
||||
if (sstable_format_name == "md") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(true);
|
||||
} else if (sstable_format_name == "mc") {
|
||||
db_cfg.enable_sstables_mc_format(true);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else if (sstable_format_name == "la") {
|
||||
db_cfg.enable_sstables_mc_format(false);
|
||||
db_cfg.enable_sstables_md_format(false);
|
||||
} else {
|
||||
throw std::runtime_error(format("Unsupported sstable format: {}", sstable_format_name));
|
||||
|
||||
Submodule tools/java updated: ccc4201ded...dbcea78e7d
@@ -108,6 +108,13 @@ future<> tracing::start_tracing(sharded<cql3::query_processor>& qp) {
|
||||
});
|
||||
}
|
||||
|
||||
future<> tracing::stop_tracing() {
|
||||
return tracing_instance().invoke_on_all([] (tracing& local_tracing) {
|
||||
// It might have been shut down while draining
|
||||
return local_tracing._down ? make_ready_future<>() : local_tracing.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
trace_state_ptr tracing::create_session(trace_type type, trace_state_props_set props) noexcept {
|
||||
if (!started()) {
|
||||
return nullptr;
|
||||
|
||||
@@ -443,6 +443,7 @@ public:
|
||||
|
||||
static future<> create_tracing(const backend_registry& br, sstring tracing_backend_helper_class_name);
|
||||
static future<> start_tracing(sharded<cql3::query_processor>& qp);
|
||||
static future<> stop_tracing();
|
||||
tracing(const backend_registry& br, sstring tracing_backend_helper_class_name);
|
||||
|
||||
// Initialize a tracing backend (e.g. tracing_keyspace or logstash)
|
||||
|
||||
Reference in New Issue
Block a user