Compare commits

...

35 Commits

Author SHA1 Message Date
Avi Kivity
e87bed5816 Update seastar submodule
* seastar b7be36a...28aeb47 (1):
  > rpc: Avoid using zero-copy interface of output_stream (Fixes #1786)
2016-10-28 14:15:02 +03:00
Pekka Enberg
577ffc5851 auth: Fix resource level handling
We use `data_resource` class in the CQL parser, which let's users refer
to a table resource without specifying a keyspace. This asserts out in
get_level() for no good reason as we already know the intented level
based on the constructor. Therefore, change `data_resource` to track the
level like upstream Cassandra does and use that.

Fixes #1790

Message-Id: <1477599169-2945-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit b54870764f)
2016-10-27 23:37:57 +03:00
Glauber Costa
a4fffc9c5d auth: always convert string to upper case before comparing
We store all auth perm strings in upper case, but the user might very
well pass this in upper case.

We could use a standard key comparator / hash here, but since the
strings tend to be small, the new sstring will likely be allocated in
the stack here and this approach yields significantly less code.

Fixes #1791.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <51df92451e6e0a6325a005c19c95eaa55270da61.1477594199.git.glauber@scylladb.com>
(cherry picked from commit ef3c7ab38e)
2016-10-27 22:09:42 +03:00
Pekka Enberg
10ba47674a release: prepare for 1.4.rc3 2016-10-26 12:20:13 +03:00
Tomasz Grabiec
1c278d9abf Update seastar submodule
* seastar 810ef2b...b7be36a (2):
  > rpc: Fix crash during connection teardown
  > rpc: Move _connected flag to protocol::connection
2016-10-26 10:03:58 +02:00
Tomasz Grabiec
be0b5ad962 Merge seastar upstream
* seastar 742eb00...810ef2b (1):
  > rpc: Do not close client connection on error response for a timed out request

Refs #1778
2016-10-25 13:55:19 +02:00
Vlad Zolotarov
707b59100c service::storage_proxy: use global_trace_state_ptr when using invoke_on
When trace_state may migrate to a different shard a global_trace_state_ptr
has to be used.

This patch completes the patch below:

commit 7e180c7bd3
Author: Vlad Zolotarov <vladz@cloudius-systems.com>
Date:   Tue Sep 20 19:09:27 2016 +0300

    tracing: introduce the tracing::global_trace_state_ptr class

Fixes #1770

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1476993537-27388-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit f75a350a8f)
2016-10-25 11:36:16 +03:00
Takuya ASADA
dc8fa5090d dist/ami: fix incorrect /etc/fstab entry on CentOS7 base image
There was incorrect rootfs entry on /etc/fstab:
 /dev/sda1 / xfs defaults,noatime 1 1
This causes boot error when updated to new kernel.
(see:
https://github.com/scylladb/scylla/issues/1597#issuecomment-250243187)

So replaced the entry to
 UUID=<uuid>  / xfs defaults,noatime 1 1
Also all recent security updates applied.

Fixes #1597
Fixes #1707

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475094957-9464-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 80e3d8286c)
2016-10-20 11:53:04 +03:00
Avi Kivity
ea9a8e7f65 Update seastar submodule
* seastar c960804...742eb00 (1):
  > rpc: Add missing adjustment of snd_buf::size

Fixes #1767.
2016-10-19 19:45:16 +03:00
Tomasz Grabiec
8d91b8652f partition_version: Fix corruption of partition_version list
The move constructor of partition_version was not invoking move
constructor of anchorless_list_base_hook. As a result, when
partition_version objects were moved, e.g. during LSA compaction, they
were unlinked from their lists.

This can make readers return invalid data, because not all versions
will be reachable.

It also casues leaks of the versions which are not directly attached
to memtable entry. This will trigger assertion failure in LSA region
destructor. This assetion triggers with row cache disabled. With cache
enabled (default) all segments are merged into the cache region, which
currently is not destroyed on shutdown, so this problem would go
unnoticed. With cache disabled, memtable region is destroyed after
memtable is flushed and after all readers stop using that memtable.

Fixes #1753.
Message-Id: <1476778472-5711-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit fe387f8ba0)
2016-10-18 10:58:47 +02:00
Pekka Enberg
830df18df5 release: prepare for 1.4.0 2016-10-14 14:37:13 +03:00
Amnon Heiman
ced171c28b scylla_setup: Reorder questions and actions
The expected behaviour in the scylla_setup script is that a question
will be followed by the answer.

For example, after asking if the scylla should be run as a service the
relevant actions will be taken before the following question.

This patch address two such mis-orders:
1. the scylla-housekeeping depends on the scylla-server, but the
setup should first setup the scylla-server service and only then ask
(and install if needed) the scylla-housekeeping.
2. The node_exporter should be placed after the io_setup is done.

Fixes #1739

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
Message-Id: <1476370098-25617-1-git-send-email-amnon@scylladb.com>
(cherry picked from commit 7829da13b4)
2016-10-13 18:29:52 +03:00
Avi Kivity
34bf40b552 Merge "node_exporter service on ubuntu 16" from Amnon
"This series address two issues that interfere with running the node_exporter as a service in ubuntu 16.
1. The service file should be packed in the deb file
2. When setting the node_exporter as a service it doesn't need to run with scylla use"

* 'amnon/node_exporter_ubuntu_v2' of github.com:cloudius-systems/seastar-dev:
  node-exporter service: No need to run as scylla user
  debian package: Include the node_exporter service file

(cherry picked from commit 1506b06617)
2016-10-13 15:54:41 +03:00
Avi Kivity
a68d829644 Update seastar submodule
* seastar f9f4746...c960804 (1):
  > Merge "rometheus API with grafana uses labels" from Amnon
2016-10-13 15:53:51 +03:00
Takuya ASADA
551c4ff965 dist/common/script/scylla_io_setup: handle comma correctly when parsing cpuset
The script mistakenly split value at "," when cpuset list is separated
by comma. Instead of matching possible patterns of the argument, let's
pass all characters until reach to space delimiter or end of line.

Fixes #1716

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1476171037-32373-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit ccad720bb1)
2016-10-11 10:43:17 +03:00
Pekka Enberg
19b35e812b release: prepare for 1.4.rc2 2016-10-10 16:09:16 +03:00
Takuya ASADA
d9ac058bff dist/ubuntu: add realpath to dependency, requires for scylla_setup
We need dependency to realpath, since scylla_setup using it.

Fixes #1740.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 8452045b85)
2016-10-10 15:59:10 +03:00
Pekka Enberg
766367a6c5 dist/docker: Use Scylla 1.4 RPM repository 2016-10-10 15:21:08 +03:00
Pekka Enberg
7ac9b6e9ca docs/docker: Tag --listen-address as 1.4 feature
The Docker Hub documentation is the same for all image versions. Tag
`--listen-address` as 1.4 feature.

Message-Id: <1475819164-7865-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 3b75ff1496)
2016-10-10 14:34:04 +03:00
Vlad Zolotarov
b5bab524e1 api::storage_service::slow_query: don't use duration_cast in GET
The slow_query_record_ttl() and slow_query_threshold() return the duration
of the appropriate type already - no need for an additional cast.
In addition there was a mistake in a cast of ttl.

Fixes #1734

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1475669400-5925-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 006999f46c)
2016-10-09 19:27:09 +03:00
Takuya ASADA
f4bb52096b dist/common/scripts/scylla_setup: use 'swapon -s' instead of 'swapon --show'
Since Ubuntu 14.04 doesn't supported --show option, we need to prevent use it.
Fixes #1740

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-2-git-send-email-syuu@scylladb.com>
(cherry picked from commit 469e9af1f4)
2016-10-09 19:27:07 +03:00
Raphael S. Carvalho
af26e7a691 lcs: fix starvation at higher levels
When max sstable size is increased, higher levels are suffering from
starvation because we decide to compact a given level if the following
calculation results in a number greater than 1.001:
level_size(L) / max_size_for_level_l(L)

Fixes #1720.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a8ab4b8f37)
2016-10-09 11:07:48 +03:00
Raphael S. Carvalho
770c982541 lcs: fix broken token range distribution at higher levels
Uniform token range distribution across sstables in a level > 1 was broken,
because we were only choosing sstable with lowest first key, when compacting
a level > 0. This resulted in performance problem because L1->L2 may have a
huge overlap over time, for example.
Last compacted key will now be stored for each level to ensure sort of
"round robin" selection of sstables for compactions at level >= 1.
That's also done by C*, and they were once affected by it as described in
https://issues.apache.org/jira/browse/CASSANDRA-6284.

Fixes #1719.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a3bf7558f2)
2016-10-09 11:07:39 +03:00
Tomasz Grabiec
2e22c027b2 db: Do not timeout streaming readers
There is a limit to concurrency of sstable readers on each shard. When
this limit is exhausted (currently 100 readers) readers queue. There
is a timeout after which queued readers are failed, equal to
read_request_timeout_in_ms (5s by default). The reason we have the
timeout here is primarily because the readers created for the purpose
of serving a CQL request no longer need to execute after waiting
longer than read_request_timeout_in_ms. The coordinator no longer
waits for the result so there is no point in proceeding with the read.

This timeout should not apply for readers created for streaming. The
streaming client currently times out after 10 minutes, so we could
wait at least that long. Timing out sooner makes streaming unreliable,
which under high load may prevent streaming from completing.

The change sets no timeout for streaming readers at replica level,
similarly as we do for system tables readers.

Fixes #1741.

Message-Id: <1475840678-25606-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 2a5a90f391)
2016-10-09 10:33:00 +03:00
Avi Kivity
d2c0a5c318 Update seastar submodule
* seastar c2489c6...f9f4746 (2):
  > prometheus CPU should start in 0
  > Collectd: bytes ordering depends on the type

Fixes #1726.
Fixes #1727.
2016-10-06 11:26:17 +03:00
Pekka Enberg
52e2688c7b Update seastar submodule
* seastar 777ab50...c2489c6 (1):
  > Merge "Fix signal mask corruption" from Tomasz
2016-10-05 12:39:45 +03:00
Pekka Enberg
08b71b25c1 dist/docker: Add '--listen-address' to 'docker run'
Add a '--listen-address' command line parameter to the Docker image,
which can be used to set Scylla's listen address.

Refs #1723

Message-Id: <1475485165-6772-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit c3bebea1ef)
2016-10-04 14:08:36 +03:00
Avi Kivity
9682b36cdc Update seastar submodule
* seastar 9e1d5db...777ab50 (1):
  > prometheus: remove invalid chars from meric names

Fixes #1710.
2016-10-02 11:39:52 +03:00
Avi Kivity
e953955515 seastar: switch to branch-1.4 on scylla-seastar submodule
This allows us to backport individual commits, rather than the entire
master branch.
2016-10-02 11:37:48 +03:00
Tomasz Grabiec
c1f93c461c transport: Extend request memory footprint accounting to also cover execution
CQL server is supposed to throttle requests so that they don't
overflow memory. The problem is that it currently accounts for
request's memory only around reading of its frame from the connection
and not actual request execution. As a result too many requests may be
allowed to execute and we may run out of memory.

Fixes #1708.
Message-Id: <1475149302-11517-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 7e25b958ac)
2016-10-02 11:34:35 +03:00
Vlad Zolotarov
c0d32a8297 tracing: introduce the tracing::global_trace_state_ptr class
This object, similarly to a global_schema_ptr, allows to dynamically
create the trace_state_ptr objects on different shards in a context
of the original tracing session.

This object would create a secondary tracing session object from the
original trace_state_ptr object when a trace_state_ptr object is needed
on a "remote" shard, similarly to what we do when we need it on a remote
Node.

Fixes #1678
Fixes #1647

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1474387767-21910-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 7e180c7bd3)
2016-10-02 11:33:16 +03:00
Paweł Dziepak
a9bd9289a4 query_pagers: fix clustering key range calculation
Paging code assumes that clustering row range [a, a] contains only one
row which may not be true. Another problem is that it tries to use
range<> interface for dealing with clustering key ranges which doesn't
work because of the lack of correct comparator.

Refs #1446.
Fixes #1684.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1475236805-16223-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit eb1fcf3ecc)
2016-10-02 10:55:43 +03:00
Takuya ASADA
a2feaa998c dist/redhat: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1722

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475260099-25881-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 9639cc840e)
2016-09-30 21:34:20 +03:00
Takuya ASADA
11950dcba3 dist/ubuntu: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1721

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475255706-26434-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit c89d9599b1)
2016-09-30 21:34:13 +03:00
Pekka Enberg
08ce047792 release: prepare for 1.4.rc1 2016-09-30 13:59:46 +03:00
30 changed files with 300 additions and 172 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.4.rc3
if test -f version
then

View File

@@ -684,8 +684,8 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::get_slow_query_info.set(r, [](const_req req) {
ss::slow_query_info res;
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
res.ttl = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_record_ttl()).count() ;
res.threshold = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_threshold()).count();
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
return res;
});

View File

@@ -47,11 +47,8 @@
const sstring auth::data_resource::ROOT_NAME("data");
auth::data_resource::data_resource(level l, const sstring& ks, const sstring& cf)
: _ks(ks), _cf(cf)
: _level(l), _ks(ks), _cf(cf)
{
if (l != get_level()) {
throw std::invalid_argument("level/keyspace/column mismatch");
}
}
auth::data_resource::data_resource()
@@ -67,14 +64,7 @@ auth::data_resource::data_resource(const sstring& ks, const sstring& cf)
{}
auth::data_resource::level auth::data_resource::get_level() const {
if (!_cf.empty()) {
assert(!_ks.empty());
return level::COLUMN_FAMILY;
}
if (!_ks.empty()) {
return level::KEYSPACE;
}
return level::ROOT;
return _level;
}
auth::data_resource auth::data_resource::from_name(

View File

@@ -56,6 +56,7 @@ private:
static const sstring ROOT_NAME;
level _level;
sstring _ks;
sstring _cf;

View File

@@ -40,6 +40,7 @@
*/
#include <unordered_map>
#include <boost/algorithm/string.hpp>
#include "permission.hh"
const auth::permission_set auth::permissions::ALL_DATA =
@@ -75,7 +76,9 @@ const sstring& auth::permissions::to_string(permission p) {
}
auth::permission auth::permissions::from_string(const sstring& s) {
return permission_names.at(s);
sstring upper(s);
boost::to_upper(upper);
return permission_names.at(upper);
}
std::unordered_set<sstring> auth::permissions::to_strings(const permission_set& set) {

View File

@@ -54,6 +54,10 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;

View File

@@ -427,8 +427,14 @@ column_family::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_config.sem) {
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
if (service::get_local_streaming_read_priority().id() == pc.id()) {
return _config.streaming_read_concurrency_config;
}
return _config.read_concurrency_config;
}();
if (config.sem) {
return make_restricted_reader(config, 1, std::move(in));
} else {
return std::move(in);
}
@@ -1283,6 +1289,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});
@@ -2070,6 +2077,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.dirty_memory_manager = _config.dirty_memory_manager;
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
@@ -2559,6 +2567,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
++_stats->sstable_read_queue_overloaded;
throw std::runtime_error("sstable inactive read queue overloaded");
};
cfg.streaming_read_concurrency_config = cfg.read_concurrency_config;
cfg.streaming_read_concurrency_config.timeout = {};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;

View File

@@ -325,6 +325,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
@@ -879,6 +880,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
private:

View File

@@ -33,7 +33,7 @@ done
. /etc/os-release
case "$ID" in
"centos")
AMI=ami-f3102499
AMI=ami-4e1d5b59
REGION=us-east-1
SSH_USERNAME=centos
;;

View File

@@ -44,8 +44,8 @@ output_to_user()
}
if [ `is_developer_mode` -eq 0 ]; then
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([0-9]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[0-9\-]*\).*$/\1/"`
SMP=`echo $CPUSET|grep smp|sed -e "s/^.*smp\(\s\+\|=\)\([^ ]*\).*$/\2/"`
CPUSET=`echo $CPUSET|grep cpuset|sed -e "s/^.*\(--cpuset\(\s\+\|=\)[^ ]*\).*$/\1/"`
if [ $AMI_OPT -eq 1 ]; then
NR_CPU=`cat /proc/cpuinfo |grep processor|wc -l`
NR_DISKS=`lsblk --list --nodeps --noheadings | grep -v xvda | grep xvd | wc -l`

View File

@@ -84,7 +84,7 @@ get_unused_disks() {
if [ -f /usr/sbin/pvs ]; then
count_pvs=$(pvs|grep $dev|wc -l)
fi
count_swap=$(swapon --show |grep `realpath $dev`|wc -l)
count_swap=$(swapon -s |grep `realpath $dev`|wc -l)
if [ $count_raw -eq 0 -a $count_pvs -eq 0 -a $count_swap -eq 0 ]; then
echo -n "$dev "
fi
@@ -226,31 +226,32 @@ fi
if [ $INTERACTIVE -eq 1 ]; then
interactive_ask_service "Do you want to enable ScyllaDB services?" "Answer yes to automatically start Scylla when the node boots; answer no to skip this step." "yes" &&:
ENABLE_SERVICE=$?
if [ $ENABLE_SERVICE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
ENABLE_CHECK_VERSION=$?
fi
fi
if [ $ENABLE_SERVICE -eq 1 ]; then
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl enable scylla-server.service
systemctl enable collectd.service
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
systemctl unmask scylla-housekeeping.timer
else
systemctl mask scylla-housekeeping.timer
systemctl stop scylla-housekeeping.timer || true
fi
fi
if [ $INTERACTIVE -eq 1 ] && [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
interactive_ask_service "Do you want to enable ScyllaDB version check?" "Answer yes to automatically start Scylla-housekeeping that check for newer version, when the node boots; answer no to skip this step." "yes" &&:
ENABLE_CHECK_VERSION=$?
fi
if [ $ENABLE_CHECK_VERSION -eq 1 ]; then
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
printf "[housekeeping]\ncheck-version: True\n" > /etc/scylla.d/housekeeping.cfg
fi
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl unmask scylla-housekeeping.timer
fi
else
if [ ! -f /etc/scylla.d/housekeeping.cfg ]; then
printf "[housekeeping]\ncheck-version: False\n" > /etc/scylla.d/housekeeping.cfg
fi
if [ "$ID" = "fedora" ] || [ "$ID" = "centos" ] || [ "$ID" = "ubuntu" -a "$VERSION_ID" != "14.04" ]; then
systemctl mask scylla-housekeeping.timer
systemctl stop scylla-housekeeping.timer || true
fi
fi
fi
@@ -374,6 +375,10 @@ if [ $INTERACTIVE -eq 1 ]; then
IO_SETUP=$?
fi
if [ $IO_SETUP -eq 1 ]; then
/usr/lib/scylla/scylla_io_setup
fi
if [ $INTERACTIVE -eq 1 ]; then
interactive_ask_service "Do you want to install node exporter, that exports prometheus data from the node?" "Answer yes to install it; answer no to skip this installation." "yes" &&:
NODE_EXPORTER=$?
@@ -383,10 +388,6 @@ if [ $NODE_EXPORTER -eq 1 ]; then
/usr/lib/scylla/node_exporter_install
fi
if [ $IO_SETUP -eq 1 ]; then
/usr/lib/scylla/scylla_io_setup
fi
if [ $DEV_MODE -eq 1 ]; then
/usr/lib/scylla/scylla_dev_mode_setup --developer-mode 1
fi

View File

@@ -3,8 +3,6 @@ Description=Node Exporter
[Service]
Type=simple
User=scylla
Group=scylla
ExecStart=/usr/bin/node_exporter
[Install]

View File

@@ -7,7 +7,7 @@ ENV container docker
VOLUME [ "/sys/fs/cgroup" ]
#install scylla
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.4.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
RUN yum -y clean expire-cache
RUN yum -y update

View File

@@ -9,6 +9,7 @@ def parse():
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
parser.add_argument('--listen-address', default=None, dest='listenAddress')
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
return parser.parse_args()

View File

@@ -8,6 +8,7 @@ class ScyllaSetup:
self._developerMode = arguments.developerMode
self._seeds = arguments.seeds
self._cpuset = arguments.cpuset
self._listenAddress = arguments.listenAddress
self._broadcastAddress = arguments.broadcastAddress
self._broadcastRpcAddress = arguments.broadcastRpcAddress
self._smp = arguments.smp
@@ -31,14 +32,15 @@ class ScyllaSetup:
def scyllaYAML(self):
configuration = yaml.load(open('/etc/scylla/scylla.yaml'))
IP = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = IP
configuration['rpc_address'] = IP
if self._listenAddress is None:
self._listenAddress = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = self._listenAddress
configuration['rpc_address'] = self._listenAddress
if self._seeds is None:
if self._broadcastAddress is not None:
self._seeds = self._broadcastAddress
else:
self._seeds = IP
self._seeds = self._listenAddress
configuration['seed_provider'] = [
{'class_name': 'org.apache.cassandra.locator.SimpleSeedProvider',
'parameters': [{'seeds': self._seeds}]}

View File

@@ -27,7 +27,7 @@ Group: Applications/Databases
Summary: The Scylla database server
License: AGPLv3
URL: http://www.scylladb.com/
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl bc util-linux

View File

@@ -78,13 +78,13 @@ cp dist/ubuntu/scylla-server.install.in debian/scylla-server.install
if [ "$RELEASE" = "14.04" ]; then
sed -i -e "s/@@DH_INSTALLINIT@@/--upstart-only/g" debian/rules
sed -i -e "s/@@COMPILER@@/g++-5/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5, libunwind8-dev/g" debian/control
sed -i -e "s#@@INSTALL@@#dist/ubuntu/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@##g" debian/scylla-server.install
else
sed -i -e "s/@@DH_INSTALLINIT@@//g" debian/rules
sed -i -e "s/@@COMPILER@@/g++/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev/g" debian/control
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@#dist/common/systemd/scylla-housekeeping.timer /lib/systemd/system#g" debian/scylla-server.install
fi
@@ -102,6 +102,7 @@ fi
cp dist/common/systemd/scylla-server.service.in debian/scylla-server.service
sed -i -e "s#@@SYSCONFDIR@@#/etc/default#g" debian/scylla-server.service
cp dist/common/systemd/scylla-housekeeping.service debian/scylla-server.scylla-housekeeping.service
cp dist/common/systemd/node-exporter.service debian/scylla-server.node-exporter.service
if [ "$RELEASE" = "14.04" ] && [ $REBUILD -eq 0 ]; then
if [ ! -f /etc/apt/sources.list.d/scylla-3rdparty-trusty.list ]; then

View File

@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
Package: scylla-server
Architecture: amd64
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, @@DEPENDS@@
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, realpath, @@DEPENDS@@
Description: Scylla database server binaries
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -12,6 +12,7 @@ override_dh_auto_clean:
override_dh_installinit:
dh_installinit --no-start @@DH_INSTALLINIT@@
dh_installinit --no-start --name scylla-housekeeping @@DH_INSTALLINIT@@
dh_installinit --no-start --name node-exporter @@DH_INSTALLINIT@@
override_dh_strip:
dh_strip --dbg-package=scylla-server-dbg

View File

@@ -97,6 +97,18 @@ For example, to configure Scylla to run with two seed nodes `192.168.0.100` and
$ docker run --name some-scylla -d scylladb/scylla --seeds 192.168.0.100,192.168.0.200
```
### `--listen-address ADDR`
The `--listen-address` command line option configures the IP address the Scylla instance listens for client connections.
For example, to configure Scylla to use listen address `10.0.0.5`:
```console
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
```
**Since: 1.4**
### `--broadcast-address ADDR`
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.

View File

@@ -36,7 +36,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current)
}
partition_version::partition_version(partition_version&& pv) noexcept
: _backref(pv._backref)
: anchorless_list_base_hook(std::move(pv))
, _backref(pv._backref)
, _partition(std::move(pv._partition))
{
if (_backref) {

Submodule seastar updated: 9e1d5dbc66...28aeb47d83

View File

@@ -124,6 +124,43 @@ private:
logger.trace("Result ranges {}", ranges);
};
// Because of #1446 we don't have a comparator to use with
// range<clustering_key_prefix> which would produce correct results.
// This means we cannot reuse the same logic for dealing with
// partition and clustering keys.
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
typedef typename range_type::bound bound_type;
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
};
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.second : range.first;
};
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.first : range.second;
};
clustering_key_prefix::equality eq(s);
auto it = ranges.begin();
while (it != ranges.end()) {
auto range = bound_view::from_range(*it);
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix, lo)) {
logger.trace("Remove ck range {}", *it);
it = ranges.erase(it);
continue;
} else if (cmp(start_bound(range), lo)) {
assert(cmp(lo, end_bound(range)));
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
: range_type(bound_type { lo, false }, it->end());
logger.trace("Modify ck range {} -> {}", *it, r);
*it = std::move(r);
}
++it;
}
};
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
@@ -136,15 +173,7 @@ private:
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
clustering_key_prefix::less_compare cmp_rt(*_schema);
modify_ranges(row_ranges, ckp, false, [&cmp_rt](auto& c1, auto c2) {
if (cmp_rt(c1, c2)) {
return -1;
} else if (cmp_rt(c2, c1)) {
return 1;
}
return 0;
});
modify_ck_ranges(*_schema, row_ranges, ckp);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
}

View File

@@ -2496,8 +2496,8 @@ storage_proxy::query_singular_local_digest(schema_ptr s, lw_shared_ptr<query::re
future<foreign_ptr<lw_shared_ptr<query::result>>>
storage_proxy::query_singular_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, query::result_request request, tracing::trace_state_ptr trace_state) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, trace_state = std::move(trace_state)] (database& db) mutable {
return db.query(gs, *cmd, request, prv, std::move(trace_state)).then([](auto&& f) {
return _db.invoke_on(shard, [gs = global_schema_ptr(s), prv = std::vector<query::partition_range>({pr}) /* FIXME: pr is copied */, cmd, request, gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
return db.query(gs, *cmd, request, prv, gt).then([](auto&& f) {
return make_foreign(std::move(f));
});
});
@@ -3446,14 +3446,14 @@ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const query::partition_range& pr, tracing::trace_state_ptr trace_state) {
if (pr.is_singular()) {
unsigned shard = _db.local().shard_of(pr.start()->value().token());
return _db.invoke_on(shard, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) mutable {
return db.query_mutations(gs, *cmd, pr, std::move(trace_state)).then([] (reconcilable_result&& result) {
return _db.invoke_on(shard, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) mutable {
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
});
} else {
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) {
return db.query_mutations(gs, *cmd, pr, trace_state).then([] (reconcilable_result&& result) {
return _db.map_reduce(mutation_result_merger{ cmd, s }, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) {
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
}).then([] (reconcilable_result&& result) {

View File

@@ -216,6 +216,7 @@ protected:
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
std::vector<int> _compaction_counter;
public:
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
@@ -596,10 +599,14 @@ public:
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
_max_sstable_size_in_mb);
}
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
virtual bool parallel_compaction() const override {
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
auto candidate = manifest.get_compaction_candidates();
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
if (candidate.sstables.empty()) {
return sstables::compaction_descriptor();
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
return std::move(candidate);
}
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
if (removed.empty() || added.empty()) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
for (auto& sstable : removed) {
min_level = std::min(min_level, sstable->get_sstable_level());
}
const sstables::sstable *last = nullptr;
for (auto& candidate : added) {
if (!last || last->compare_by_first_key(*candidate) < 0) {
last = &*candidate;
}
}
_last_compacted_keys[min_level] = last->get_last_decorated_key();
}
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
std::vector<sstables::shared_sstable> sstables;
sstables.reserve(cf.sstables_count());
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}

View File

@@ -64,16 +64,14 @@ class leveled_manifest {
schema_ptr _schema;
std::vector<std::list<sstables::shared_sstable>> _generations;
#if 0
private final RowPosition[] lastCompactedKeys;
#endif
uint64_t _max_sstable_size_in_bytes;
#if 0
private final SizeTieredCompactionStrategyOptions options;
private final int [] compactionCounter;
#endif
public:
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
: logger("LeveledManifest")
, _schema(cfs.schema())
@@ -82,15 +80,8 @@ public:
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
uint64_t n = 9; // log10(1000^3)
_generations.resize(n);
_generations.resize(MAX_LEVELS);
#if 0
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
generations[i] = new ArrayList<>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
compactionCounter = new int[n];
#endif
}
@@ -129,37 +120,6 @@ public:
_generations[level].push_back(sstable);
}
#if 0
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
logDistribution();
if (logger.isDebugEnabled())
logger.debug("Replacing [{}]", toString(removed));
// the level for the added sstables is the max of the removed ones,
// plus one if the removed were all on the same level
int minLevel = Integer.MAX_VALUE;
for (SSTableReader sstable : removed)
{
int thisLevel = remove(sstable);
minLevel = Math.min(minLevel, thisLevel);
}
// it's valid to do a remove w/o an add (e.g. on truncate)
if (added.isEmpty())
return;
if (logger.isDebugEnabled())
logger.debug("Adding [{}]", toString(added));
for (SSTableReader ssTableReader : added)
add(ssTableReader);
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
#endif
void repair_overlapping_sstables(int level) {
const sstables::sstable *previous = nullptr;
const schema& s = *_schema;
@@ -272,7 +232,8 @@ public:
* @return highest-priority sstables to compact, and level to compact them to
* If no compactions are necessary, will return null
*/
sstables::compaction_descriptor get_compaction_candidates() {
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
#if 0
// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
@@ -339,11 +300,12 @@ public:
}
}
// L0 is fine, proceed with this level
auto candidates = get_candidates_for(i);
auto candidates = get_candidates_for(i, last_compacted_keys);
if (!candidates.empty()) {
int next_level = get_next_level(candidates);
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
#if 0
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
#endif
@@ -359,7 +321,7 @@ public:
if (get_level(0).empty()) {
return sstables::compaction_descriptor();
}
auto candidates = get_candidates_for(0);
auto candidates = get_candidates_for(0, last_compacted_keys);
if (candidates.empty()) {
return sstables::compaction_descriptor();
}
@@ -391,49 +353,57 @@ public:
* @param candidates the original sstables to compact
* @return
*/
#if 0
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
{
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
std::vector<sstables::shared_sstable>
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
for (int i = _generations.size() - 1; i > 0; i--) {
compaction_counter[i]++;
}
compaction_counter[target_level] = 0;
for (int i = generations.length - 1; i > 0; i--)
compactionCounter[i]++;
compactionCounter[targetLevel] = 0;
if (logger.isDebugEnabled())
{
for (int j = 0; j < compactionCounter.length; j++)
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
if (logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < compaction_counter.size(); j++) {
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
}
}
for (int i = generations.length - 1; i > 0; i--)
{
if (getLevelSize(i) > 0)
{
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
{
for (int i = _generations.size() - 1; i > 0; i--) {
if (get_level_size(i) > 0) {
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
// we try to find an sstable that is fully contained within the boundaries we are compacting;
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
// this means that we will not create overlap in L2 if we add an sstable
// contained within 0 -> 33 to the compaction
RowPosition max = null;
RowPosition min = null;
for (SSTableReader candidate : candidates)
{
if (min == null || candidate.first.compareTo(min) < 0)
min = candidate.first;
if (max == null || candidate.last.compareTo(max) > 0)
max = candidate.last;
stdx::optional<dht::decorated_key> max;
stdx::optional<dht::decorated_key> min;
for (auto& candidate : candidates) {
auto& candidate_first = candidate->get_first_decorated_key();
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
min = candidate_first;
}
auto& candidate_last = candidate->get_first_decorated_key();
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
max = candidate_last;
}
}
#if 0
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
// uncompacting sstables and parallel compaction is also disabled for lcs.
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
Range<RowPosition> boundaries = new Range<>(min, max);
for (SSTableReader sstable : getLevel(i))
{
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
if (boundaries.contains(r) && !compacting.contains(sstable))
{
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
withStarvedCandidate.add(sstable);
return withStarvedCandidate;
#endif
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
for (auto& sstable : get_level(i)) {
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
return sstable->generation() == candidate->generation();
});
if (result != std::end(candidates)) {
continue;
}
candidates.push_back(sstable);
return candidates;
}
}
}
@@ -443,7 +413,6 @@ public:
return candidates;
}
#endif
size_t get_level_size(uint32_t level) {
#if 0
@@ -557,7 +526,7 @@ public:
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
* for prior failure), will return an empty list. Never returns null.
*/
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
const schema& s = *_schema;
assert(!get_level(level).empty());
@@ -657,31 +626,35 @@ public:
}
// for non-L0 compactions, pick up where we left off last time
get_level(level).sort([&s] (auto& i, auto& j) {
std::list<sstables::shared_sstable>& sstables = get_level(level);
sstables.sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
int start = 0; // handles case where the prior compaction touched the very last range
#if 0
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get(i);
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
{
start = i;
int idx = 0;
for (auto& sstable : sstables) {
if (uint32_t(level) >= last_compacted_keys.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
}
auto& sstable_first = sstable->get_first_decorated_key();
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
start = idx;
break;
}
idx++;
}
#endif
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (auto i = 0U; i < get_level(level).size(); i++) {
for (auto i = 0U; i < sstables.size(); i++) {
// get an iterator to the element of position pos from the list get_level(level).
auto pos = (start + i) % get_level(level).size();
auto it = get_level(level).begin();
auto pos = (start + i) % sstables.size();
auto it = sstables.begin();
std::advance(it, pos);
auto sstable = *it;
auto& sstable = *it;
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
candidates.push_back(sstable);
#if 0
if (Iterables.any(candidates, suspectP))

View File

@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
}
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
BOOST_REQUIRE(candidate.level == 1);
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 0);
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 0);
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 1);
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
BOOST_REQUIRE(level2_score < 1.001);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 2);
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.level == 2);
BOOST_REQUIRE(candidate.sstables.size() == 1);
auto& sst = (candidate.sstables)[0];

View File

@@ -600,4 +600,62 @@ inline void stop_foreground(const trace_state_ptr& state) {
state->stop_foreground_and_write();
}
}
// global_trace_state_ptr is a helper class that may be used for creating spans
// of an existing tracing session on other shards. When a tracing span on a
// different shard is needed global_trace_state_ptr would create a secondary
// tracing session on that shard similarly to what we do when we create tracing
// spans on remote Nodes.
//
// The usage is straight forward:
// 1. Create a global_trace_state_ptr from the existing trace_state_ptr object.
// 2. Pass it to the execution unit that (possibly) runs on a different shard
// and pass the global_trace_state_ptr object instead of a trace_state_ptr
// object.
class global_trace_state_ptr {
unsigned _cpu_of_origin;
trace_state_ptr _ptr;
public:
// Note: the trace_state_ptr must come from the current shard
global_trace_state_ptr(trace_state_ptr t)
: _cpu_of_origin(engine().cpu_id())
, _ptr(std::move(t))
{ }
// May be invoked across shards.
global_trace_state_ptr(const global_trace_state_ptr& other)
: global_trace_state_ptr(other.get())
{ }
// May be invoked across shards.
global_trace_state_ptr(global_trace_state_ptr&& other)
: global_trace_state_ptr(other.get())
{ }
global_trace_state_ptr& operator=(const global_trace_state_ptr&) = delete;
// May be invoked across shards.
trace_state_ptr get() const {
// optimize the "tracing not enabled" case
if (!_ptr) {
return nullptr;
}
if (_cpu_of_origin != engine().cpu_id()) {
auto opt_trace_info = make_trace_info(_ptr);
if (opt_trace_info) {
trace_state_ptr new_trace_state = tracing::get_local_tracing_instance().create_session(*opt_trace_info);
begin(new_trace_state);
return new_trace_state;
} else {
return nullptr;
}
}
return _ptr;
}
// May be invoked across shards.
operator trace_state_ptr() const { return get(); }
};
}

View File

@@ -652,13 +652,13 @@ future<> cql_server::connection::process_request() {
f.length, mem_estimate, _server._max_request_size));
}
return with_semaphore(_server._memory_available, mem_estimate, [this, length = f.length, flags = f.flags, op, stream, tracing_requested] {
return read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested] (temporary_buffer<char> buf) {
return get_units(_server._memory_available, mem_estimate).then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
return this->read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (temporary_buffer<char> buf) mutable {
++_server._requests_served;
++_server._requests_serving;
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested] () mutable {
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested, mem_permit = std::move(mem_permit)] () mutable {
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
auto cpu = pick_request_cpu();
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
@@ -672,7 +672,7 @@ future<> cql_server::connection::process_request() {
}).then([this, flags] (auto&& response) {
_client_state.merge(response.second);
return this->write_response(std::move(response.first), _compression);
}).then([buf = std::move(buf)] {
}).then([buf = std::move(buf), mem_permit = std::move(mem_permit)] {
// Keep buf alive.
});
}).handle_exception([] (std::exception_ptr ex) {