Compare commits
35 Commits
master
...
scylla-1.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e87bed5816 | ||
|
|
577ffc5851 | ||
|
|
a4fffc9c5d | ||
|
|
10ba47674a | ||
|
|
1c278d9abf | ||
|
|
be0b5ad962 | ||
|
|
707b59100c | ||
|
|
dc8fa5090d | ||
|
|
ea9a8e7f65 | ||
|
|
8d91b8652f | ||
|
|
830df18df5 | ||
|
|
ced171c28b | ||
|
|
34bf40b552 | ||
|
|
a68d829644 | ||
|
|
551c4ff965 | ||
|
|
19b35e812b | ||
|
|
d9ac058bff | ||
|
|
766367a6c5 | ||
|
|
7ac9b6e9ca | ||
|
|
b5bab524e1 | ||
|
|
f4bb52096b | ||
|
|
af26e7a691 | ||
|
|
770c982541 | ||
|
|
2e22c027b2 | ||
|
|
d2c0a5c318 | ||
|
|
52e2688c7b | ||
|
|
08b71b25c1 | ||
|
|
9682b36cdc | ||
|
|
e953955515 | ||
|
|
c1f93c461c | ||
|
|
c0d32a8297 | ||
|
|
a9bd9289a4 | ||
|
|
a2feaa998c | ||
|
|
11950dcba3 | ||
|
|
08ce047792 |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.4.rc3
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -684,8 +684,8 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
ss::get_slow_query_info.set(r, [](const_req req) {
|
||||
ss::slow_query_info res;
|
||||
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
|
||||
res.ttl = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_record_ttl()).count() ;
|
||||
res.threshold = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_threshold()).count();
|
||||
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
|
||||
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
|
||||
return res;
|
||||
});
|
||||
|
||||
|
||||
@@ -47,11 +47,8 @@
|
||||
const sstring auth::data_resource::ROOT_NAME("data");
|
||||
|
||||
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
|
||||
: _ks(ks), _cf(cf)
|
||||
: _level(l), _ks(ks), _cf(cf)
|
||||
{
|
||||
if (l != get_level()) {
|
||||
throw std::invalid_argument("level/keyspace/column mismatch");
|
||||
}
|
||||
}
|
||||
|
||||
auth::data_resource::data_resource()
|
||||
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
|
||||
{}
|
||||
|
||||
auth::data_resource::level auth::data_resource::get_level() const {
|
||||
if (!_cf.empty()) {
|
||||
assert(!_ks.empty());
|
||||
return level::COLUMN_FAMILY;
|
||||
}
|
||||
if (!_ks.empty()) {
|
||||
return level::KEYSPACE;
|
||||
}
|
||||
return level::ROOT;
|
||||
return _level;
|
||||
}
|
||||
|
||||
auth::data_resource auth::data_resource::from_name(
|
||||
|
||||
@@ -56,6 +56,7 @@ private:
|
||||
|
||||
static const sstring ROOT_NAME;
|
||||
|
||||
level _level;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
*/
|
||||
|
||||
#include <unordered_map>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include "permission.hh"
|
||||
|
||||
const auth::permission_set auth::permissions::ALL_DATA =
|
||||
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
|
||||
}
|
||||
|
||||
auth::permission auth::permissions::from_string(const sstring& s) {
|
||||
return permission_names.at(s);
|
||||
sstring upper(s);
|
||||
boost::to_upper(upper);
|
||||
return permission_names.at(upper);
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {
|
||||
|
||||
@@ -54,6 +54,10 @@ public:
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
|
||||
|
||||
// Some strategies may look at the compacted and resulting sstables to
|
||||
// get some useful information for subsequent compactions.
|
||||
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
|
||||
|
||||
// Return if parallel compaction is allowed by strategy.
|
||||
bool parallel_compaction() const;
|
||||
|
||||
|
||||
14
database.cc
14
database.cc
@@ -427,8 +427,14 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
tracing::trace_state_ptr trace_state) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
if (_config.read_concurrency_config.sem) {
|
||||
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
|
||||
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
|
||||
if (service::get_local_streaming_read_priority().id() == pc.id()) {
|
||||
return _config.streaming_read_concurrency_config;
|
||||
}
|
||||
return _config.read_concurrency_config;
|
||||
}();
|
||||
if (config.sem) {
|
||||
return make_restricted_reader(config, 1, std::move(in));
|
||||
} else {
|
||||
return std::move(in);
|
||||
}
|
||||
@@ -1283,6 +1289,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
|
||||
};
|
||||
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
|
||||
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
|
||||
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
|
||||
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
|
||||
});
|
||||
});
|
||||
@@ -2070,6 +2077,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
|
||||
cfg.dirty_memory_manager = _config.dirty_memory_manager;
|
||||
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
|
||||
cfg.read_concurrency_config = _config.read_concurrency_config;
|
||||
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
|
||||
cfg.cf_stats = _config.cf_stats;
|
||||
cfg.enable_incremental_backups = _config.enable_incremental_backups;
|
||||
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
|
||||
@@ -2559,6 +2567,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
|
||||
++_stats->sstable_read_queue_overloaded;
|
||||
throw std::runtime_error("sstable inactive read queue overloaded");
|
||||
};
|
||||
cfg.streaming_read_concurrency_config = cfg.read_concurrency_config;
|
||||
cfg.streaming_read_concurrency_config.timeout = {};
|
||||
cfg.cf_stats = &_cf_stats;
|
||||
cfg.enable_incremental_backups = _enable_incremental_backups;
|
||||
return cfg;
|
||||
|
||||
@@ -325,6 +325,7 @@ public:
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
restricted_mutation_reader_config streaming_read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
uint64_t max_cached_partition_size_in_bytes;
|
||||
};
|
||||
@@ -879,6 +880,7 @@ public:
|
||||
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
|
||||
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
|
||||
restricted_mutation_reader_config read_concurrency_config;
|
||||
restricted_mutation_reader_config streaming_read_concurrency_config;
|
||||
::cf_stats* cf_stats = nullptr;
|
||||
};
|
||||
private:
|
||||
|
||||
2
dist/ami/build_ami.sh
vendored
2
dist/ami/build_ami.sh
vendored
@@ -33,7 +33,7 @@ done
|
||||
. /etc/os-release
|
||||
case "$ID" in
|
||||
"centos")
|
||||
AMI=ami-f3102499
|
||||
AMI=ami-4e1d5b59
|
||||
REGION=us-east-1
|
||||
SSH_USERNAME=centos
|
||||
;;
|
||||
|
||||
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -44,8 +44,8 @@ output_to_user()
|
||||
}
|
||||
|
||||
if [ `is_developer_mode` -eq 0 ]; then
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
|
||||
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([^ ]*\).*$/\2/"`
|
||||
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[^ ]*\).*$/\1/"`
|
||||
if [ $AMI_OPT -eq 1 ]; then
|
||||
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
|
||||
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`
|
||||
|
||||
31
dist/common/scripts/scylla_setup
vendored
31
dist/common/scripts/scylla_setup
vendored
@@ -84,7 +84,7 @@ get_unused_disks() {
|
||||
if [ -f /usr/sbin/pvs ]; then
|
||||
count_pvs=$(pvs|grep $dev|wc -l)
|
||||
fi
|
||||
count_swap=$(swapon --show |grep `realpath $dev`|wc -l)
|
||||
count_swap=$(swapon -s |grep `realpath $dev`|wc -l)
|
||||
if [ $count_raw -eq 0 -a $count_pvs -eq 0 -a $count_swap -eq 0 ]; then
|
||||
echo -n "$dev "
|
||||
fi
|
||||
@@ -226,31 +226,32 @@ fi
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB services?" "Answer yes to automatically start Scylla when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_SERVICE=$?
|
||||
if [ $ENABLE_SERVICE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_CHECK_VERSION=$?
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ $ENABLE_SERVICE -eq 1 ]; then
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl enable scylla-server.service
|
||||
systemctl enable collectd.service
|
||||
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
|
||||
systemctl unmask scylla-housekeeping.timer
|
||||
else
|
||||
systemctl mask scylla-housekeeping.timer
|
||||
systemctl stop scylla-housekeeping.timer || true
|
||||
fi
|
||||
fi
|
||||
if [ $INTERACTIVE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
|
||||
ENABLE_CHECK_VERSION=$?
|
||||
fi
|
||||
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
|
||||
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
printf "[housekeeping]\ncheck-version: True\n" > /etc/scylla.d/housekeeping.cfg
|
||||
fi
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl unmask scylla-housekeeping.timer
|
||||
fi
|
||||
else
|
||||
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
|
||||
printf "[housekeeping]\ncheck-version: False\n" > /etc/scylla.d/housekeeping.cfg
|
||||
fi
|
||||
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
|
||||
systemctl mask scylla-housekeeping.timer
|
||||
systemctl stop scylla-housekeeping.timer || true
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
@@ -374,6 +375,10 @@ if [ $INTERACTIVE -eq 1 ]; then
|
||||
IO_SETUP=$?
|
||||
fi
|
||||
|
||||
if [ $IO_SETUP -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_io_setup
|
||||
fi
|
||||
|
||||
if [ $INTERACTIVE -eq 1 ]; then
|
||||
interactive_ask_service "Do you want to install node exporter, that exports prometheus data from the node?" "Answer yes to install it; answer no to skip this installation." "yes" &&:
|
||||
NODE_EXPORTER=$?
|
||||
@@ -383,10 +388,6 @@ if [ $NODE_EXPORTER -eq 1 ]; then
|
||||
/usr/lib/scylla/node_exporter_install
|
||||
fi
|
||||
|
||||
if [ $IO_SETUP -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_io_setup
|
||||
fi
|
||||
|
||||
if [ $DEV_MODE -eq 1 ]; then
|
||||
/usr/lib/scylla/scylla_dev_mode_setup --developer-mode 1
|
||||
fi
|
||||
|
||||
2
dist/common/systemd/node-exporter.service
vendored
2
dist/common/systemd/node-exporter.service
vendored
@@ -3,8 +3,6 @@ Description=Node Exporter
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=scylla
|
||||
Group=scylla
|
||||
ExecStart=/usr/bin/node_exporter
|
||||
|
||||
[Install]
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -7,7 +7,7 @@ ENV container docker
|
||||
VOLUME [ "/sys/fs/cgroup" ]
|
||||
|
||||
#install scylla
|
||||
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.4.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
|
||||
1
dist/docker/redhat/commandlineparser.py
vendored
1
dist/docker/redhat/commandlineparser.py
vendored
@@ -9,6 +9,7 @@ def parse():
|
||||
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
|
||||
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
|
||||
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
|
||||
parser.add_argument('--listen-address', default=None, dest='listenAddress')
|
||||
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
|
||||
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
|
||||
return parser.parse_args()
|
||||
|
||||
10
dist/docker/redhat/scyllasetup.py
vendored
10
dist/docker/redhat/scyllasetup.py
vendored
@@ -8,6 +8,7 @@ class ScyllaSetup:
|
||||
self._developerMode = arguments.developerMode
|
||||
self._seeds = arguments.seeds
|
||||
self._cpuset = arguments.cpuset
|
||||
self._listenAddress = arguments.listenAddress
|
||||
self._broadcastAddress = arguments.broadcastAddress
|
||||
self._broadcastRpcAddress = arguments.broadcastRpcAddress
|
||||
self._smp = arguments.smp
|
||||
@@ -31,14 +32,15 @@ class ScyllaSetup:
|
||||
|
||||
def scyllaYAML(self):
|
||||
configuration = yaml.load(open('/etc/scylla/scylla.yaml'))
|
||||
IP = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
|
||||
configuration['listen_address'] = IP
|
||||
configuration['rpc_address'] = IP
|
||||
if self._listenAddress is None:
|
||||
self._listenAddress = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
|
||||
configuration['listen_address'] = self._listenAddress
|
||||
configuration['rpc_address'] = self._listenAddress
|
||||
if self._seeds is None:
|
||||
if self._broadcastAddress is not None:
|
||||
self._seeds = self._broadcastAddress
|
||||
else:
|
||||
self._seeds = IP
|
||||
self._seeds = self._listenAddress
|
||||
configuration['seed_provider'] = [
|
||||
{'class_name': 'org.apache.cassandra.locator.SimpleSeedProvider',
|
||||
'parameters': [{'seeds': self._seeds}]}
|
||||
|
||||
2
dist/redhat/scylla.spec.in
vendored
2
dist/redhat/scylla.spec.in
vendored
@@ -27,7 +27,7 @@ Group: Applications/Databases
|
||||
Summary: The Scylla database server
|
||||
License: AGPLv3
|
||||
URL: http://www.scylladb.com/
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler
|
||||
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel
|
||||
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
|
||||
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
|
||||
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl bc util-linux
|
||||
|
||||
5
dist/ubuntu/build_deb.sh
vendored
5
dist/ubuntu/build_deb.sh
vendored
@@ -78,13 +78,13 @@ cp dist/ubuntu/scylla-server.install.in debian/scylla-server.install
|
||||
if [ "$RELEASE" = "14.04" ]; then
|
||||
sed -i -e "s/@@DH_INSTALLINIT@@/--upstart-only/g" debian/rules
|
||||
sed -i -e "s/@@COMPILER@@/g++-5/g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5, libunwind8-dev/g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@#dist/ubuntu/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER@@##g" debian/scylla-server.install
|
||||
else
|
||||
sed -i -e "s/@@DH_INSTALLINIT@@//g" debian/rules
|
||||
sed -i -e "s/@@COMPILER@@/g++/g" debian/rules
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++/g" debian/control
|
||||
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev/g" debian/control
|
||||
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
|
||||
sed -i -e "s#@@HKDOTTIMER@@#dist/common/systemd/scylla-housekeeping.timer /lib/systemd/system#g" debian/scylla-server.install
|
||||
fi
|
||||
@@ -102,6 +102,7 @@ fi
|
||||
cp dist/common/systemd/scylla-server.service.in debian/scylla-server.service
|
||||
sed -i -e "s#@@SYSCONFDIR@@#/etc/default#g" debian/scylla-server.service
|
||||
cp dist/common/systemd/scylla-housekeeping.service debian/scylla-server.scylla-housekeeping.service
|
||||
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
|
||||
|
||||
if [ "$RELEASE" = "14.04" ] && [ $REBUILD -eq 0 ]; then
|
||||
if [ ! -f /etc/apt/sources.list.d/scylla-3rdparty-trusty.list ]; then
|
||||
|
||||
2
dist/ubuntu/control.in
vendored
2
dist/ubuntu/control.in
vendored
@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
|
||||
|
||||
Package: scylla-server
|
||||
Architecture: amd64
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, @@DEPENDS@@
|
||||
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, realpath, @@DEPENDS@@
|
||||
Description: Scylla database server binaries
|
||||
Scylla is a highly scalable, eventually consistent, distributed,
|
||||
partitioned row DB.
|
||||
|
||||
1
dist/ubuntu/rules.in
vendored
1
dist/ubuntu/rules.in
vendored
@@ -12,6 +12,7 @@ override_dh_auto_clean:
|
||||
override_dh_installinit:
|
||||
dh_installinit --no-start @@DH_INSTALLINIT@@
|
||||
dh_installinit --no-start --name scylla-housekeeping @@DH_INSTALLINIT@@
|
||||
dh_installinit --no-start --name node-exporter @@DH_INSTALLINIT@@
|
||||
|
||||
override_dh_strip:
|
||||
dh_strip --dbg-package=scylla-server-dbg
|
||||
|
||||
@@ -97,6 +97,18 @@ For example, to configure Scylla to run with two seed nodes `192.168.0.100` and
|
||||
$ docker run --name some-scylla -d scylladb/scylla --seeds 192.168.0.100,192.168.0.200
|
||||
```
|
||||
|
||||
### `--listen-address ADDR`
|
||||
|
||||
The `--listen-address` command line option configures the IP address the Scylla instance listens for client connections.
|
||||
|
||||
For example, to configure Scylla to use listen address `10.0.0.5`:
|
||||
|
||||
```console
|
||||
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
|
||||
```
|
||||
|
||||
**Since: 1.4**
|
||||
|
||||
### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.
|
||||
|
||||
@@ -36,7 +36,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current)
|
||||
}
|
||||
|
||||
partition_version::partition_version(partition_version&& pv) noexcept
|
||||
: _backref(pv._backref)
|
||||
: anchorless_list_base_hook(std::move(pv))
|
||||
, _backref(pv._backref)
|
||||
, _partition(std::move(pv._partition))
|
||||
{
|
||||
if (_backref) {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 9e1d5dbc66...28aeb47d83
@@ -124,6 +124,43 @@ private:
|
||||
logger.trace("Result ranges {}", ranges);
|
||||
};
|
||||
|
||||
// Because of #1446 we don't have a comparator to use with
|
||||
// range<clustering_key_prefix> which would produce correct results.
|
||||
// This means we cannot reuse the same logic for dealing with
|
||||
// partition and clustering keys.
|
||||
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
|
||||
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
|
||||
typedef typename range_type::bound bound_type;
|
||||
|
||||
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
|
||||
};
|
||||
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.second : range.first;
|
||||
};
|
||||
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.first : range.second;
|
||||
};
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix, lo)) {
|
||||
logger.trace("Remove ck range {}", *it);
|
||||
it = ranges.erase(it);
|
||||
continue;
|
||||
} else if (cmp(start_bound(range), lo)) {
|
||||
assert(cmp(lo, end_bound(range)));
|
||||
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
|
||||
: range_type(bound_type { lo, false }, it->end());
|
||||
logger.trace("Modify ck range {} -> {}", *it, r);
|
||||
*it = std::move(r);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
};
|
||||
|
||||
// last ck can be empty depending on whether we
|
||||
// deserialized state or not. This case means "last page ended on
|
||||
// something-not-bound-by-clustering" (i.e. a static row, alone)
|
||||
@@ -136,15 +173,7 @@ private:
|
||||
if (has_ck) {
|
||||
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
|
||||
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
|
||||
clustering_key_prefix::less_compare cmp_rt(*_schema);
|
||||
modify_ranges(row_ranges, ckp, false, [&cmp_rt](auto& c1, auto c2) {
|
||||
if (cmp_rt(c1, c2)) {
|
||||
return -1;
|
||||
} else if (cmp_rt(c2, c1)) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
modify_ck_ranges(*_schema, row_ranges, ckp);
|
||||
|
||||
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
|
||||
}
|
||||
|
||||
@@ -2496,8 +2496,8 @@ storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::re
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>>
|
||||
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, trace_state = std::move(trace_state)] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, std::move(trace_state)).then([](auto&& f) {
|
||||
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query(gs, *cmd, request, prv, gt).then([](auto&& f) {
|
||||
return make_foreign(std::move(f));
|
||||
});
|
||||
});
|
||||
@@ -3446,14 +3446,14 @@ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
|
||||
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
|
||||
if (pr.is_singular()) {
|
||||
unsigned shard = _db.local().shard_of(pr.start()->value().token());
|
||||
return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) mutable {
|
||||
return db.query_mutations(gs, *cmd, pr, std::move(trace_state)).then([] (reconcilable_result&& result) {
|
||||
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
|
||||
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
|
||||
return make_foreign(make_lw_shared(std::move(result)));
|
||||
});
|
||||
});
|
||||
} else {
|
||||
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) {
|
||||
return db.query_mutations(gs, *cmd, pr, trace_state).then([] (reconcilable_result&& result) {
|
||||
return _db.map_reduce(mutation_result_merger{ cmd, s }, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) {
|
||||
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
|
||||
return make_foreign(make_lw_shared(std::move(result)));
|
||||
});
|
||||
}).then([] (reconcilable_result&& result) {
|
||||
|
||||
@@ -216,6 +216,7 @@ protected:
|
||||
public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
|
||||
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
|
||||
virtual compaction_strategy_type type() const = 0;
|
||||
virtual bool parallel_compaction() const {
|
||||
return true;
|
||||
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
|
||||
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
|
||||
|
||||
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
|
||||
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
|
||||
std::vector<int> _compaction_counter;
|
||||
public:
|
||||
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
|
||||
using namespace cql3::statements;
|
||||
@@ -596,10 +599,14 @@ public:
|
||||
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
|
||||
_max_sstable_size_in_mb);
|
||||
}
|
||||
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
|
||||
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
|
||||
}
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
|
||||
|
||||
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
|
||||
|
||||
virtual bool parallel_compaction() const override {
|
||||
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
// sstable in it may be marked for deletion after compacted.
|
||||
// Currently, we create a new manifest whenever it's time for compaction.
|
||||
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
|
||||
|
||||
if (candidate.sstables.empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
return std::move(candidate);
|
||||
}
|
||||
|
||||
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
|
||||
if (removed.empty() || added.empty()) {
|
||||
return;
|
||||
}
|
||||
auto min_level = std::numeric_limits<uint32_t>::max();
|
||||
for (auto& sstable : removed) {
|
||||
min_level = std::min(min_level, sstable->get_sstable_level());
|
||||
}
|
||||
|
||||
const sstables::sstable *last = nullptr;
|
||||
for (auto& candidate : added) {
|
||||
if (!last || last->compare_by_first_key(*candidate) < 0) {
|
||||
last = &*candidate;
|
||||
}
|
||||
}
|
||||
_last_compacted_keys[min_level] = last->get_last_decorated_key();
|
||||
}
|
||||
|
||||
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(cf.sstables_count());
|
||||
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
|
||||
}
|
||||
|
||||
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
|
||||
_compaction_strategy_impl->notify_completion(removed, added);
|
||||
}
|
||||
|
||||
bool compaction_strategy::parallel_compaction() const {
|
||||
return _compaction_strategy_impl->parallel_compaction();
|
||||
}
|
||||
|
||||
@@ -64,16 +64,14 @@ class leveled_manifest {
|
||||
|
||||
schema_ptr _schema;
|
||||
std::vector<std::list<sstables::shared_sstable>> _generations;
|
||||
#if 0
|
||||
private final RowPosition[] lastCompactedKeys;
|
||||
#endif
|
||||
uint64_t _max_sstable_size_in_bytes;
|
||||
#if 0
|
||||
private final SizeTieredCompactionStrategyOptions options;
|
||||
private final int [] compactionCounter;
|
||||
#endif
|
||||
|
||||
public:
|
||||
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
|
||||
|
||||
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
|
||||
: logger("LeveledManifest")
|
||||
, _schema(cfs.schema())
|
||||
@@ -82,15 +80,8 @@ public:
|
||||
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
|
||||
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
|
||||
// dependent on maxSSTableSize.)
|
||||
uint64_t n = 9; // log10(1000^3)
|
||||
_generations.resize(n);
|
||||
_generations.resize(MAX_LEVELS);
|
||||
#if 0
|
||||
lastCompactedKeys = new RowPosition[n];
|
||||
for (int i = 0; i < generations.length; i++)
|
||||
{
|
||||
generations[i] = new ArrayList<>();
|
||||
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
|
||||
}
|
||||
compactionCounter = new int[n];
|
||||
#endif
|
||||
}
|
||||
@@ -129,37 +120,6 @@ public:
|
||||
_generations[level].push_back(sstable);
|
||||
}
|
||||
|
||||
#if 0
|
||||
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
|
||||
{
|
||||
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
|
||||
logDistribution();
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Replacing [{}]", toString(removed));
|
||||
|
||||
// the level for the added sstables is the max of the removed ones,
|
||||
// plus one if the removed were all on the same level
|
||||
int minLevel = Integer.MAX_VALUE;
|
||||
|
||||
for (SSTableReader sstable : removed)
|
||||
{
|
||||
int thisLevel = remove(sstable);
|
||||
minLevel = Math.min(minLevel, thisLevel);
|
||||
}
|
||||
|
||||
// it's valid to do a remove w/o an add (e.g. on truncate)
|
||||
if (added.isEmpty())
|
||||
return;
|
||||
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Adding [{}]", toString(added));
|
||||
|
||||
for (SSTableReader ssTableReader : added)
|
||||
add(ssTableReader);
|
||||
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
|
||||
}
|
||||
#endif
|
||||
|
||||
void repair_overlapping_sstables(int level) {
|
||||
const sstables::sstable *previous = nullptr;
|
||||
const schema& s = *_schema;
|
||||
@@ -272,7 +232,8 @@ public:
|
||||
* @return highest-priority sstables to compact, and level to compact them to
|
||||
* If no compactions are necessary, will return null
|
||||
*/
|
||||
sstables::compaction_descriptor get_compaction_candidates() {
|
||||
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
|
||||
std::vector<int>& compaction_counter) {
|
||||
#if 0
|
||||
// during bootstrap we only do size tiering in L0 to make sure
|
||||
// the streamed files can be placed in their original levels
|
||||
@@ -339,11 +300,12 @@ public:
|
||||
}
|
||||
}
|
||||
// L0 is fine, proceed with this level
|
||||
auto candidates = get_candidates_for(i);
|
||||
auto candidates = get_candidates_for(i, last_compacted_keys);
|
||||
if (!candidates.empty()) {
|
||||
int next_level = get_next_level(candidates);
|
||||
|
||||
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
|
||||
#if 0
|
||||
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
|
||||
if (logger.isDebugEnabled())
|
||||
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
|
||||
#endif
|
||||
@@ -359,7 +321,7 @@ public:
|
||||
if (get_level(0).empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
auto candidates = get_candidates_for(0);
|
||||
auto candidates = get_candidates_for(0, last_compacted_keys);
|
||||
if (candidates.empty()) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
@@ -391,49 +353,57 @@ public:
|
||||
* @param candidates the original sstables to compact
|
||||
* @return
|
||||
*/
|
||||
#if 0
|
||||
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
|
||||
{
|
||||
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
|
||||
std::vector<sstables::shared_sstable>
|
||||
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
|
||||
for (int i = _generations.size() - 1; i > 0; i--) {
|
||||
compaction_counter[i]++;
|
||||
}
|
||||
compaction_counter[target_level] = 0;
|
||||
|
||||
for (int i = generations.length - 1; i > 0; i--)
|
||||
compactionCounter[i]++;
|
||||
compactionCounter[targetLevel] = 0;
|
||||
if (logger.isDebugEnabled())
|
||||
{
|
||||
for (int j = 0; j < compactionCounter.length; j++)
|
||||
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
|
||||
if (logger.level() == logging::log_level::debug) {
|
||||
for (auto j = 0U; j < compaction_counter.size(); j++) {
|
||||
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = generations.length - 1; i > 0; i--)
|
||||
{
|
||||
if (getLevelSize(i) > 0)
|
||||
{
|
||||
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
|
||||
{
|
||||
for (int i = _generations.size() - 1; i > 0; i--) {
|
||||
if (get_level_size(i) > 0) {
|
||||
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
|
||||
// we try to find an sstable that is fully contained within the boundaries we are compacting;
|
||||
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
|
||||
// this means that we will not create overlap in L2 if we add an sstable
|
||||
// contained within 0 -> 33 to the compaction
|
||||
RowPosition max = null;
|
||||
RowPosition min = null;
|
||||
for (SSTableReader candidate : candidates)
|
||||
{
|
||||
if (min == null || candidate.first.compareTo(min) < 0)
|
||||
min = candidate.first;
|
||||
if (max == null || candidate.last.compareTo(max) > 0)
|
||||
max = candidate.last;
|
||||
stdx::optional<dht::decorated_key> max;
|
||||
stdx::optional<dht::decorated_key> min;
|
||||
for (auto& candidate : candidates) {
|
||||
auto& candidate_first = candidate->get_first_decorated_key();
|
||||
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
|
||||
min = candidate_first;
|
||||
}
|
||||
auto& candidate_last = candidate->get_first_decorated_key();
|
||||
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
|
||||
max = candidate_last;
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
|
||||
// uncompacting sstables and parallel compaction is also disabled for lcs.
|
||||
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
|
||||
Range<RowPosition> boundaries = new Range<>(min, max);
|
||||
for (SSTableReader sstable : getLevel(i))
|
||||
{
|
||||
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
|
||||
if (boundaries.contains(r) && !compacting.contains(sstable))
|
||||
{
|
||||
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
|
||||
withStarvedCandidate.add(sstable);
|
||||
return withStarvedCandidate;
|
||||
#endif
|
||||
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
|
||||
for (auto& sstable : get_level(i)) {
|
||||
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
|
||||
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
|
||||
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
|
||||
|
||||
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
|
||||
return sstable->generation() == candidate->generation();
|
||||
});
|
||||
if (result != std::end(candidates)) {
|
||||
continue;
|
||||
}
|
||||
candidates.push_back(sstable);
|
||||
return candidates;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -443,7 +413,6 @@ public:
|
||||
|
||||
return candidates;
|
||||
}
|
||||
#endif
|
||||
|
||||
size_t get_level_size(uint32_t level) {
|
||||
#if 0
|
||||
@@ -557,7 +526,7 @@ public:
|
||||
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
|
||||
* for prior failure), will return an empty list. Never returns null.
|
||||
*/
|
||||
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
|
||||
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
|
||||
const schema& s = *_schema;
|
||||
assert(!get_level(level).empty());
|
||||
|
||||
@@ -657,31 +626,35 @@ public:
|
||||
}
|
||||
|
||||
// for non-L0 compactions, pick up where we left off last time
|
||||
get_level(level).sort([&s] (auto& i, auto& j) {
|
||||
std::list<sstables::shared_sstable>& sstables = get_level(level);
|
||||
sstables.sort([&s] (auto& i, auto& j) {
|
||||
return i->compare_by_first_key(*j) < 0;
|
||||
});
|
||||
int start = 0; // handles case where the prior compaction touched the very last range
|
||||
#if 0
|
||||
for (int i = 0; i < getLevel(level).size(); i++)
|
||||
{
|
||||
SSTableReader sstable = getLevel(level).get(i);
|
||||
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
|
||||
{
|
||||
start = i;
|
||||
int idx = 0;
|
||||
for (auto& sstable : sstables) {
|
||||
if (uint32_t(level) >= last_compacted_keys.size()) {
|
||||
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
|
||||
}
|
||||
auto& sstable_first = sstable->get_first_decorated_key();
|
||||
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
|
||||
start = idx;
|
||||
break;
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
#endif
|
||||
|
||||
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
|
||||
// and wrapping back to the beginning of the generation if necessary
|
||||
for (auto i = 0U; i < get_level(level).size(); i++) {
|
||||
for (auto i = 0U; i < sstables.size(); i++) {
|
||||
// get an iterator to the element of position pos from the list get_level(level).
|
||||
auto pos = (start + i) % get_level(level).size();
|
||||
auto it = get_level(level).begin();
|
||||
auto pos = (start + i) % sstables.size();
|
||||
auto it = sstables.begin();
|
||||
std::advance(it, pos);
|
||||
|
||||
auto sstable = *it;
|
||||
auto& sstable = *it;
|
||||
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
|
||||
|
||||
candidates.push_back(sstable);
|
||||
#if 0
|
||||
if (Iterables.any(candidates, suspectP))
|
||||
|
||||
@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
|
||||
}
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
|
||||
BOOST_REQUIRE(candidate.level == 1);
|
||||
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
|
||||
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 2);
|
||||
BOOST_REQUIRE(candidate.level == 0);
|
||||
|
||||
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 3);
|
||||
BOOST_REQUIRE(candidate.level == 0);
|
||||
|
||||
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 3);
|
||||
BOOST_REQUIRE(candidate.level == 1);
|
||||
|
||||
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
|
||||
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
|
||||
BOOST_REQUIRE(level2_score < 1.001);
|
||||
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 2);
|
||||
BOOST_REQUIRE(candidate.level == 2);
|
||||
|
||||
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
|
||||
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
|
||||
|
||||
auto candidate = manifest.get_compaction_candidates();
|
||||
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
||||
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
|
||||
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
|
||||
BOOST_REQUIRE(candidate.level == 2);
|
||||
BOOST_REQUIRE(candidate.sstables.size() == 1);
|
||||
auto& sst = (candidate.sstables)[0];
|
||||
|
||||
@@ -600,4 +600,62 @@ inline void stop_foreground(const trace_state_ptr& state) {
|
||||
state->stop_foreground_and_write();
|
||||
}
|
||||
}
|
||||
|
||||
// global_trace_state_ptr is a helper class that may be used for creating spans
|
||||
// of an existing tracing session on other shards. When a tracing span on a
|
||||
// different shard is needed global_trace_state_ptr would create a secondary
|
||||
// tracing session on that shard similarly to what we do when we create tracing
|
||||
// spans on remote Nodes.
|
||||
//
|
||||
// The usage is straight forward:
|
||||
// 1. Create a global_trace_state_ptr from the existing trace_state_ptr object.
|
||||
// 2. Pass it to the execution unit that (possibly) runs on a different shard
|
||||
// and pass the global_trace_state_ptr object instead of a trace_state_ptr
|
||||
// object.
|
||||
class global_trace_state_ptr {
|
||||
unsigned _cpu_of_origin;
|
||||
trace_state_ptr _ptr;
|
||||
public:
|
||||
// Note: the trace_state_ptr must come from the current shard
|
||||
global_trace_state_ptr(trace_state_ptr t)
|
||||
: _cpu_of_origin(engine().cpu_id())
|
||||
, _ptr(std::move(t))
|
||||
{ }
|
||||
|
||||
// May be invoked across shards.
|
||||
global_trace_state_ptr(const global_trace_state_ptr& other)
|
||||
: global_trace_state_ptr(other.get())
|
||||
{ }
|
||||
|
||||
// May be invoked across shards.
|
||||
global_trace_state_ptr(global_trace_state_ptr&& other)
|
||||
: global_trace_state_ptr(other.get())
|
||||
{ }
|
||||
|
||||
global_trace_state_ptr& operator=(const global_trace_state_ptr&) = delete;
|
||||
|
||||
// May be invoked across shards.
|
||||
trace_state_ptr get() const {
|
||||
// optimize the "tracing not enabled" case
|
||||
if (!_ptr) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (_cpu_of_origin != engine().cpu_id()) {
|
||||
auto opt_trace_info = make_trace_info(_ptr);
|
||||
if (opt_trace_info) {
|
||||
trace_state_ptr new_trace_state = tracing::get_local_tracing_instance().create_session(*opt_trace_info);
|
||||
begin(new_trace_state);
|
||||
return new_trace_state;
|
||||
} else {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
return _ptr;
|
||||
}
|
||||
|
||||
// May be invoked across shards.
|
||||
operator trace_state_ptr() const { return get(); }
|
||||
};
|
||||
}
|
||||
|
||||
@@ -652,13 +652,13 @@ future<> cql_server::connection::process_request() {
|
||||
f.length, mem_estimate, _server._max_request_size));
|
||||
}
|
||||
|
||||
return with_semaphore(_server._memory_available, mem_estimate, [this, length = f.length, flags = f.flags, op, stream, tracing_requested] {
|
||||
return read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested] (temporary_buffer<char> buf) {
|
||||
return get_units(_server._memory_available, mem_estimate).then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
|
||||
return this->read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (temporary_buffer<char> buf) mutable {
|
||||
|
||||
++_server._requests_served;
|
||||
++_server._requests_serving;
|
||||
|
||||
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested] () mutable {
|
||||
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested, mem_permit = std::move(mem_permit)] () mutable {
|
||||
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
|
||||
auto cpu = pick_request_cpu();
|
||||
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
|
||||
@@ -672,7 +672,7 @@ future<> cql_server::connection::process_request() {
|
||||
}).then([this, flags] (auto&& response) {
|
||||
_client_state.merge(response.second);
|
||||
return this->write_response(std::move(response.first), _compression);
|
||||
}).then([buf = std::move(buf)] {
|
||||
}).then([buf = std::move(buf), mem_permit = std::move(mem_permit)] {
|
||||
// Keep buf alive.
|
||||
});
|
||||
}).handle_exception([] (std::exception_ptr ex) {
|
||||
|
||||
Reference in New Issue
Block a user