Compare commits

...

20 Commits

Author SHA1 Message Date
Pekka Enberg
19b35e812b release: prepare for 1.4.rc2 2016-10-10 16:09:16 +03:00
Takuya ASADA
d9ac058bff dist/ubuntu: add realpath to dependency, requires for scylla_setup
We need dependency to realpath, since scylla_setup using it.

Fixes #1740.

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 8452045b85)
2016-10-10 15:59:10 +03:00
Pekka Enberg
766367a6c5 dist/docker: Use Scylla 1.4 RPM repository 2016-10-10 15:21:08 +03:00
Pekka Enberg
7ac9b6e9ca docs/docker: Tag --listen-address as 1.4 feature
The Docker Hub documentation is the same for all image versions. Tag
`--listen-address` as 1.4 feature.

Message-Id: <1475819164-7865-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 3b75ff1496)
2016-10-10 14:34:04 +03:00
Vlad Zolotarov
b5bab524e1 api::storage_service::slow_query: don't use duration_cast in GET
The slow_query_record_ttl() and slow_query_threshold() return the duration
of the appropriate type already - no need for an additional cast.
In addition there was a mistake in a cast of ttl.

Fixes #1734

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1475669400-5925-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 006999f46c)
2016-10-09 19:27:09 +03:00
Takuya ASADA
f4bb52096b dist/common/scripts/scylla_setup: use 'swapon -s' instead of 'swapon --show'
Since Ubuntu 14.04 doesn't supported --show option, we need to prevent use it.
Fixes #1740

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475788340-22939-2-git-send-email-syuu@scylladb.com>
(cherry picked from commit 469e9af1f4)
2016-10-09 19:27:07 +03:00
Raphael S. Carvalho
af26e7a691 lcs: fix starvation at higher levels
When max sstable size is increased, higher levels are suffering from
starvation because we decide to compact a given level if the following
calculation results in a number greater than 1.001:
level_size(L) / max_size_for_level_l(L)

Fixes #1720.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a8ab4b8f37)
2016-10-09 11:07:48 +03:00
Raphael S. Carvalho
770c982541 lcs: fix broken token range distribution at higher levels
Uniform token range distribution across sstables in a level > 1 was broken,
because we were only choosing sstable with lowest first key, when compacting
a level > 0. This resulted in performance problem because L1->L2 may have a
huge overlap over time, for example.
Last compacted key will now be stored for each level to ensure sort of
"round robin" selection of sstables for compactions at level >= 1.
That's also done by C*, and they were once affected by it as described in
https://issues.apache.org/jira/browse/CASSANDRA-6284.

Fixes #1719.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit a3bf7558f2)
2016-10-09 11:07:39 +03:00
Tomasz Grabiec
2e22c027b2 db: Do not timeout streaming readers
There is a limit to concurrency of sstable readers on each shard. When
this limit is exhausted (currently 100 readers) readers queue. There
is a timeout after which queued readers are failed, equal to
read_request_timeout_in_ms (5s by default). The reason we have the
timeout here is primarily because the readers created for the purpose
of serving a CQL request no longer need to execute after waiting
longer than read_request_timeout_in_ms. The coordinator no longer
waits for the result so there is no point in proceeding with the read.

This timeout should not apply for readers created for streaming. The
streaming client currently times out after 10 minutes, so we could
wait at least that long. Timing out sooner makes streaming unreliable,
which under high load may prevent streaming from completing.

The change sets no timeout for streaming readers at replica level,
similarly as we do for system tables readers.

Fixes #1741.

Message-Id: <1475840678-25606-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 2a5a90f391)
2016-10-09 10:33:00 +03:00
Avi Kivity
d2c0a5c318 Update seastar submodule
* seastar c2489c6...f9f4746 (2):
  > prometheus CPU should start in 0
  > Collectd: bytes ordering depends on the type

Fixes #1726.
Fixes #1727.
2016-10-06 11:26:17 +03:00
Pekka Enberg
52e2688c7b Update seastar submodule
* seastar 777ab50...c2489c6 (1):
  > Merge "Fix signal mask corruption" from Tomasz
2016-10-05 12:39:45 +03:00
Pekka Enberg
08b71b25c1 dist/docker: Add '--listen-address' to 'docker run'
Add a '--listen-address' command line parameter to the Docker image,
which can be used to set Scylla's listen address.

Refs #1723

Message-Id: <1475485165-6772-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit c3bebea1ef)
2016-10-04 14:08:36 +03:00
Avi Kivity
9682b36cdc Update seastar submodule
* seastar 9e1d5db...777ab50 (1):
  > prometheus: remove invalid chars from meric names

Fixes #1710.
2016-10-02 11:39:52 +03:00
Avi Kivity
e953955515 seastar: switch to branch-1.4 on scylla-seastar submodule
This allows us to backport individual commits, rather than the entire
master branch.
2016-10-02 11:37:48 +03:00
Tomasz Grabiec
c1f93c461c transport: Extend request memory footprint accounting to also cover execution
CQL server is supposed to throttle requests so that they don't
overflow memory. The problem is that it currently accounts for
request's memory only around reading of its frame from the connection
and not actual request execution. As a result too many requests may be
allowed to execute and we may run out of memory.

Fixes #1708.
Message-Id: <1475149302-11517-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 7e25b958ac)
2016-10-02 11:34:35 +03:00
Vlad Zolotarov
c0d32a8297 tracing: introduce the tracing::global_trace_state_ptr class
This object, similarly to a global_schema_ptr, allows to dynamically
create the trace_state_ptr objects on different shards in a context
of the original tracing session.

This object would create a secondary tracing session object from the
original trace_state_ptr object when a trace_state_ptr object is needed
on a "remote" shard, similarly to what we do when we need it on a remote
Node.

Fixes #1678
Fixes #1647

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>
Message-Id: <1474387767-21910-1-git-send-email-vladz@cloudius-systems.com>
(cherry picked from commit 7e180c7bd3)
2016-10-02 11:33:16 +03:00
Paweł Dziepak
a9bd9289a4 query_pagers: fix clustering key range calculation
Paging code assumes that clustering row range [a, a] contains only one
row which may not be true. Another problem is that it tries to use
range<> interface for dealing with clustering key ranges which doesn't
work because of the lack of correct comparator.

Refs #1446.
Fixes #1684.

Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
Message-Id: <1475236805-16223-1-git-send-email-pdziepak@scylladb.com>
(cherry picked from commit eb1fcf3ecc)
2016-10-02 10:55:43 +03:00
Takuya ASADA
a2feaa998c dist/redhat: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1722

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475260099-25881-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit 9639cc840e)
2016-09-30 21:34:20 +03:00
Takuya ASADA
11950dcba3 dist/ubuntu: add missing build time dependency for libunwind
There was missing dependency for libunwind, so add it.
Fixes #1721

Signed-off-by: Takuya ASADA <syuu@scylladb.com>
Message-Id: <1475255706-26434-1-git-send-email-syuu@scylladb.com>
(cherry picked from commit c89d9599b1)
2016-09-30 21:34:13 +03:00
Pekka Enberg
08ce047792 release: prepare for 1.4.rc1 2016-09-30 13:59:46 +03:00
22 changed files with 267 additions and 135 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.4.rc2
if test -f version
then

View File

@@ -684,8 +684,8 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::get_slow_query_info.set(r, [](const_req req) {
ss::slow_query_info res;
res.enable = tracing::tracing::get_local_tracing_instance().slow_query_tracing_enabled();
res.ttl = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_record_ttl()).count() ;
res.threshold = std::chrono::duration_cast<std::chrono::microseconds>(tracing::tracing::get_local_tracing_instance().slow_query_threshold()).count();
res.ttl = tracing::tracing::get_local_tracing_instance().slow_query_record_ttl().count() ;
res.threshold = tracing::tracing::get_local_tracing_instance().slow_query_threshold().count();
return res;
});

View File

@@ -54,6 +54,10 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;

View File

@@ -427,8 +427,14 @@ column_family::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_config.sem) {
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
if (service::get_local_streaming_read_priority().id() == pc.id()) {
return _config.streaming_read_concurrency_config;
}
return _config.read_concurrency_config;
}();
if (config.sem) {
return make_restricted_reader(config, 1, std::move(in));
} else {
return std::move(in);
}
@@ -1283,6 +1289,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});
@@ -2070,6 +2077,7 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.dirty_memory_manager = _config.dirty_memory_manager;
cfg.streaming_dirty_memory_manager = _config.streaming_dirty_memory_manager;
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
@@ -2559,6 +2567,8 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
++_stats->sstable_read_queue_overloaded;
throw std::runtime_error("sstable inactive read queue overloaded");
};
cfg.streaming_read_concurrency_config = cfg.read_concurrency_config;
cfg.streaming_read_concurrency_config.timeout = {};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;

View File

@@ -325,6 +325,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
@@ -879,6 +880,7 @@ public:
::dirty_memory_manager* dirty_memory_manager = &default_dirty_memory_manager;
::dirty_memory_manager* streaming_dirty_memory_manager = &default_dirty_memory_manager;
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
private:

View File

@@ -84,7 +84,7 @@ get_unused_disks() {
if [ -f /usr/sbin/pvs ]; then
count_pvs=$(pvs|grep $dev|wc -l)
fi
count_swap=$(swapon --show |grep `realpath $dev`|wc -l)
count_swap=$(swapon -s |grep `realpath $dev`|wc -l)
if [ $count_raw -eq 0 -a $count_pvs -eq 0 -a $count_swap -eq 0 ]; then
echo -n "$dev "
fi

View File

@@ -7,7 +7,7 @@ ENV container docker
VOLUME [ "/sys/fs/cgroup" ]
#install scylla
RUN curl http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo -o /etc/yum.repos.d/scylla.repo
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.4.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
RUN yum -y clean expire-cache
RUN yum -y update

View File

@@ -9,6 +9,7 @@ def parse():
parser.add_argument('--smp', default=None, help="e.g --smp 2 to use two CPUs")
parser.add_argument('--memory', default=None, help="e.g. --memory 1G to use 1 GB of RAM")
parser.add_argument('--overprovisioned', default='0', choices=['0', '1'], help="run in overprovisioned environment")
parser.add_argument('--listen-address', default=None, dest='listenAddress')
parser.add_argument('--broadcast-address', default=None, dest='broadcastAddress')
parser.add_argument('--broadcast-rpc-address', default=None, dest='broadcastRpcAddress')
return parser.parse_args()

View File

@@ -8,6 +8,7 @@ class ScyllaSetup:
self._developerMode = arguments.developerMode
self._seeds = arguments.seeds
self._cpuset = arguments.cpuset
self._listenAddress = arguments.listenAddress
self._broadcastAddress = arguments.broadcastAddress
self._broadcastRpcAddress = arguments.broadcastRpcAddress
self._smp = arguments.smp
@@ -31,14 +32,15 @@ class ScyllaSetup:
def scyllaYAML(self):
configuration = yaml.load(open('/etc/scylla/scylla.yaml'))
IP = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = IP
configuration['rpc_address'] = IP
if self._listenAddress is None:
self._listenAddress = subprocess.check_output(['hostname', '-i']).decode('ascii').strip()
configuration['listen_address'] = self._listenAddress
configuration['rpc_address'] = self._listenAddress
if self._seeds is None:
if self._broadcastAddress is not None:
self._seeds = self._broadcastAddress
else:
self._seeds = IP
self._seeds = self._listenAddress
configuration['seed_provider'] = [
{'class_name': 'org.apache.cassandra.locator.SimpleSeedProvider',
'parameters': [{'seeds': self._seeds}]}

View File

@@ -27,7 +27,7 @@ Group: Applications/Databases
Summary: The Scylla database server
License: AGPLv3
URL: http://www.scylladb.com/
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler
BuildRequires: libaio-devel libstdc++-devel cryptopp-devel hwloc-devel numactl-devel libpciaccess-devel libxml2-devel zlib-devel thrift-devel yaml-cpp-devel lz4-devel snappy-devel jsoncpp-devel systemd-devel xz-devel openssl-devel libcap-devel libselinux-devel libgcrypt-devel libgpg-error-devel elfutils-devel krb5-devel libcom_err-devel libattr-devel pcre-devel elfutils-libelf-devel bzip2-devel keyutils-libs-devel xfsprogs-devel make gnutls-devel systemd-devel lksctp-tools-devel protobuf-devel protobuf-compiler libunwind-devel
%{?fedora:BuildRequires: boost-devel ninja-build ragel antlr3-tool antlr3-C++-devel python3 gcc-c++ libasan libubsan python3-pyparsing dnf-yum}
%{?rhel:BuildRequires: scylla-libstdc++-static scylla-boost-devel scylla-ninja-build scylla-ragel scylla-antlr3-tool scylla-antlr3-C++-devel python34 scylla-gcc-c++ >= 5.1.1, python34-pyparsing}
Requires: scylla-conf systemd-libs hwloc collectd PyYAML python-urwid pciutils pyparsing python-requests curl bc util-linux

View File

@@ -78,13 +78,13 @@ cp dist/ubuntu/scylla-server.install.in debian/scylla-server.install
if [ "$RELEASE" = "14.04" ]; then
sed -i -e "s/@@DH_INSTALLINIT@@/--upstart-only/g" debian/rules
sed -i -e "s/@@COMPILER@@/g++-5/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/g++-5, libunwind8-dev/g" debian/control
sed -i -e "s#@@INSTALL@@#dist/ubuntu/sudoers.d/scylla etc/sudoers.d#g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@##g" debian/scylla-server.install
else
sed -i -e "s/@@DH_INSTALLINIT@@//g" debian/rules
sed -i -e "s/@@COMPILER@@/g++/g" debian/rules
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++/g" debian/control
sed -i -e "s/@@BUILD_DEPENDS@@/libsystemd-dev, g++, libunwind-dev/g" debian/control
sed -i -e "s#@@INSTALL@@##g" debian/scylla-server.install
sed -i -e "s#@@HKDOTTIMER@@#dist/common/systemd/scylla-housekeeping.timer /lib/systemd/system#g" debian/scylla-server.install
fi

View File

@@ -16,7 +16,7 @@ Conflicts: scylla-server (<< 1.1)
Package: scylla-server
Architecture: amd64
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, @@DEPENDS@@
Depends: ${shlibs:Depends}, ${misc:Depends}, adduser, hwloc-nox, collectd, scylla-conf, python-yaml, python-urwid, python-requests, curl, bc, util-linux, realpath, @@DEPENDS@@
Description: Scylla database server binaries
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.

View File

@@ -97,6 +97,18 @@ For example, to configure Scylla to run with two seed nodes `192.168.0.100` and
$ docker run --name some-scylla -d scylladb/scylla --seeds 192.168.0.100,192.168.0.200
```
### `--listen-address ADDR`
The `--listen-address` command line option configures the IP address the Scylla instance listens for client connections.
For example, to configure Scylla to use listen address `10.0.0.5`:
```console
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
```
**Since: 1.4**
### `--broadcast-address ADDR`
The `--broadcast-address` command line option configures the IP address the Scylla instance tells other Scylla nodes in the cluster to connect to.

Submodule seastar updated: 9e1d5dbc66...f9f474663b

View File

@@ -124,6 +124,43 @@ private:
logger.trace("Result ranges {}", ranges);
};
// Because of #1446 we don't have a comparator to use with
// range<clustering_key_prefix> which would produce correct results.
// This means we cannot reuse the same logic for dealing with
// partition and clustering keys.
auto modify_ck_ranges = [reversed] (const schema& s, auto& ranges, auto& lo) {
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
typedef typename range_type::bound bound_type;
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
};
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.second : range.first;
};
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.first : range.second;
};
clustering_key_prefix::equality eq(s);
auto it = ranges.begin();
while (it != ranges.end()) {
auto range = bound_view::from_range(*it);
if (cmp(end_bound(range), lo) || eq(end_bound(range).prefix, lo)) {
logger.trace("Remove ck range {}", *it);
it = ranges.erase(it);
continue;
} else if (cmp(start_bound(range), lo)) {
assert(cmp(lo, end_bound(range)));
auto r = reversed ? range_type(it->start(), bound_type { lo, false })
: range_type(bound_type { lo, false }, it->end());
logger.trace("Modify ck range {} -> {}", *it, r);
*it = std::move(r);
}
++it;
}
};
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
@@ -136,15 +173,7 @@ private:
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
clustering_key_prefix::less_compare cmp_rt(*_schema);
modify_ranges(row_ranges, ckp, false, [&cmp_rt](auto& c1, auto c2) {
if (cmp_rt(c1, c2)) {
return -1;
} else if (cmp_rt(c2, c1)) {
return 1;
}
return 0;
});
modify_ck_ranges(*_schema, row_ranges, ckp);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
}

View File

@@ -3452,8 +3452,8 @@ storage_proxy::query_mutations_locally(schema_ptr s, lw_shared_ptr<query::read_c
});
});
} else {
return _db.map_reduce(mutation_result_merger{cmd, s}, [cmd, &pr, gs = global_schema_ptr(s), trace_state = std::move(trace_state)] (database& db) {
return db.query_mutations(gs, *cmd, pr, trace_state).then([] (reconcilable_result&& result) {
return _db.map_reduce(mutation_result_merger{ cmd, s }, [cmd, &pr, gs=global_schema_ptr(s), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (database& db) {
return db.query_mutations(gs, *cmd, pr, gt).then([] (reconcilable_result&& result) {
return make_foreign(make_lw_shared(std::move(result)));
});
}).then([] (reconcilable_result&& result) {

View File

@@ -216,6 +216,7 @@ protected:
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
std::vector<int> _compaction_counter;
public:
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
@@ -596,10 +599,14 @@ public:
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
_max_sstable_size_in_mb);
}
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
virtual bool parallel_compaction() const override {
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
auto candidate = manifest.get_compaction_candidates();
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
if (candidate.sstables.empty()) {
return sstables::compaction_descriptor();
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
return std::move(candidate);
}
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
if (removed.empty() || added.empty()) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
for (auto& sstable : removed) {
min_level = std::min(min_level, sstable->get_sstable_level());
}
const sstables::sstable *last = nullptr;
for (auto& candidate : added) {
if (!last || last->compare_by_first_key(*candidate) < 0) {
last = &*candidate;
}
}
_last_compacted_keys[min_level] = last->get_last_decorated_key();
}
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
std::vector<sstables::shared_sstable> sstables;
sstables.reserve(cf.sstables_count());
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}

View File

@@ -64,16 +64,14 @@ class leveled_manifest {
schema_ptr _schema;
std::vector<std::list<sstables::shared_sstable>> _generations;
#if 0
private final RowPosition[] lastCompactedKeys;
#endif
uint64_t _max_sstable_size_in_bytes;
#if 0
private final SizeTieredCompactionStrategyOptions options;
private final int [] compactionCounter;
#endif
public:
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
: logger("LeveledManifest")
, _schema(cfs.schema())
@@ -82,15 +80,8 @@ public:
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
uint64_t n = 9; // log10(1000^3)
_generations.resize(n);
_generations.resize(MAX_LEVELS);
#if 0
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
generations[i] = new ArrayList<>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
compactionCounter = new int[n];
#endif
}
@@ -129,37 +120,6 @@ public:
_generations[level].push_back(sstable);
}
#if 0
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
logDistribution();
if (logger.isDebugEnabled())
logger.debug("Replacing [{}]", toString(removed));
// the level for the added sstables is the max of the removed ones,
// plus one if the removed were all on the same level
int minLevel = Integer.MAX_VALUE;
for (SSTableReader sstable : removed)
{
int thisLevel = remove(sstable);
minLevel = Math.min(minLevel, thisLevel);
}
// it's valid to do a remove w/o an add (e.g. on truncate)
if (added.isEmpty())
return;
if (logger.isDebugEnabled())
logger.debug("Adding [{}]", toString(added));
for (SSTableReader ssTableReader : added)
add(ssTableReader);
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
#endif
void repair_overlapping_sstables(int level) {
const sstables::sstable *previous = nullptr;
const schema& s = *_schema;
@@ -272,7 +232,8 @@ public:
* @return highest-priority sstables to compact, and level to compact them to
* If no compactions are necessary, will return null
*/
sstables::compaction_descriptor get_compaction_candidates() {
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
#if 0
// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
@@ -339,11 +300,12 @@ public:
}
}
// L0 is fine, proceed with this level
auto candidates = get_candidates_for(i);
auto candidates = get_candidates_for(i, last_compacted_keys);
if (!candidates.empty()) {
int next_level = get_next_level(candidates);
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
#if 0
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
#endif
@@ -359,7 +321,7 @@ public:
if (get_level(0).empty()) {
return sstables::compaction_descriptor();
}
auto candidates = get_candidates_for(0);
auto candidates = get_candidates_for(0, last_compacted_keys);
if (candidates.empty()) {
return sstables::compaction_descriptor();
}
@@ -391,49 +353,57 @@ public:
* @param candidates the original sstables to compact
* @return
*/
#if 0
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
{
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
std::vector<sstables::shared_sstable>
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
for (int i = _generations.size() - 1; i > 0; i--) {
compaction_counter[i]++;
}
compaction_counter[target_level] = 0;
for (int i = generations.length - 1; i > 0; i--)
compactionCounter[i]++;
compactionCounter[targetLevel] = 0;
if (logger.isDebugEnabled())
{
for (int j = 0; j < compactionCounter.length; j++)
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
if (logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < compaction_counter.size(); j++) {
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
}
}
for (int i = generations.length - 1; i > 0; i--)
{
if (getLevelSize(i) > 0)
{
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
{
for (int i = _generations.size() - 1; i > 0; i--) {
if (get_level_size(i) > 0) {
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
// we try to find an sstable that is fully contained within the boundaries we are compacting;
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
// this means that we will not create overlap in L2 if we add an sstable
// contained within 0 -> 33 to the compaction
RowPosition max = null;
RowPosition min = null;
for (SSTableReader candidate : candidates)
{
if (min == null || candidate.first.compareTo(min) < 0)
min = candidate.first;
if (max == null || candidate.last.compareTo(max) > 0)
max = candidate.last;
stdx::optional<dht::decorated_key> max;
stdx::optional<dht::decorated_key> min;
for (auto& candidate : candidates) {
auto& candidate_first = candidate->get_first_decorated_key();
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
min = candidate_first;
}
auto& candidate_last = candidate->get_first_decorated_key();
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
max = candidate_last;
}
}
#if 0
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
// uncompacting sstables and parallel compaction is also disabled for lcs.
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
Range<RowPosition> boundaries = new Range<>(min, max);
for (SSTableReader sstable : getLevel(i))
{
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
if (boundaries.contains(r) && !compacting.contains(sstable))
{
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
withStarvedCandidate.add(sstable);
return withStarvedCandidate;
#endif
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
for (auto& sstable : get_level(i)) {
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
return sstable->generation() == candidate->generation();
});
if (result != std::end(candidates)) {
continue;
}
candidates.push_back(sstable);
return candidates;
}
}
}
@@ -443,7 +413,6 @@ public:
return candidates;
}
#endif
size_t get_level_size(uint32_t level) {
#if 0
@@ -557,7 +526,7 @@ public:
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
* for prior failure), will return an empty list. Never returns null.
*/
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
const schema& s = *_schema;
assert(!get_level(level).empty());
@@ -657,31 +626,35 @@ public:
}
// for non-L0 compactions, pick up where we left off last time
get_level(level).sort([&s] (auto& i, auto& j) {
std::list<sstables::shared_sstable>& sstables = get_level(level);
sstables.sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
int start = 0; // handles case where the prior compaction touched the very last range
#if 0
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get(i);
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
{
start = i;
int idx = 0;
for (auto& sstable : sstables) {
if (uint32_t(level) >= last_compacted_keys.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
}
auto& sstable_first = sstable->get_first_decorated_key();
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
start = idx;
break;
}
idx++;
}
#endif
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (auto i = 0U; i < get_level(level).size(); i++) {
for (auto i = 0U; i < sstables.size(); i++) {
// get an iterator to the element of position pos from the list get_level(level).
auto pos = (start + i) % get_level(level).size();
auto it = get_level(level).begin();
auto pos = (start + i) % sstables.size();
auto it = sstables.begin();
std::advance(it, pos);
auto sstable = *it;
auto& sstable = *it;
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
candidates.push_back(sstable);
#if 0
if (Iterables.any(candidates, suspectP))

View File

@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
}
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
BOOST_REQUIRE(candidate.level == 1);
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 0);
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 0);
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 1);
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
BOOST_REQUIRE(level2_score < 1.001);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 2);
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.level == 2);
BOOST_REQUIRE(candidate.sstables.size() == 1);
auto& sst = (candidate.sstables)[0];

View File

@@ -600,4 +600,62 @@ inline void stop_foreground(const trace_state_ptr& state) {
state->stop_foreground_and_write();
}
}
// global_trace_state_ptr is a helper class that may be used for creating spans
// of an existing tracing session on other shards. When a tracing span on a
// different shard is needed global_trace_state_ptr would create a secondary
// tracing session on that shard similarly to what we do when we create tracing
// spans on remote Nodes.
//
// The usage is straight forward:
// 1. Create a global_trace_state_ptr from the existing trace_state_ptr object.
// 2. Pass it to the execution unit that (possibly) runs on a different shard
// and pass the global_trace_state_ptr object instead of a trace_state_ptr
// object.
class global_trace_state_ptr {
unsigned _cpu_of_origin;
trace_state_ptr _ptr;
public:
// Note: the trace_state_ptr must come from the current shard
global_trace_state_ptr(trace_state_ptr t)
: _cpu_of_origin(engine().cpu_id())
, _ptr(std::move(t))
{ }
// May be invoked across shards.
global_trace_state_ptr(const global_trace_state_ptr& other)
: global_trace_state_ptr(other.get())
{ }
// May be invoked across shards.
global_trace_state_ptr(global_trace_state_ptr&& other)
: global_trace_state_ptr(other.get())
{ }
global_trace_state_ptr& operator=(const global_trace_state_ptr&) = delete;
// May be invoked across shards.
trace_state_ptr get() const {
// optimize the "tracing not enabled" case
if (!_ptr) {
return nullptr;
}
if (_cpu_of_origin != engine().cpu_id()) {
auto opt_trace_info = make_trace_info(_ptr);
if (opt_trace_info) {
trace_state_ptr new_trace_state = tracing::get_local_tracing_instance().create_session(*opt_trace_info);
begin(new_trace_state);
return new_trace_state;
} else {
return nullptr;
}
}
return _ptr;
}
// May be invoked across shards.
operator trace_state_ptr() const { return get(); }
};
}

View File

@@ -652,13 +652,13 @@ future<> cql_server::connection::process_request() {
f.length, mem_estimate, _server._max_request_size));
}
return with_semaphore(_server._memory_available, mem_estimate, [this, length = f.length, flags = f.flags, op, stream, tracing_requested] {
return read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested] (temporary_buffer<char> buf) {
return get_units(_server._memory_available, mem_estimate).then([this, length = f.length, flags = f.flags, op, stream, tracing_requested] (semaphore_units<> mem_permit) {
return this->read_and_decompress_frame(length, flags).then([this, flags, op, stream, tracing_requested, mem_permit = std::move(mem_permit)] (temporary_buffer<char> buf) mutable {
++_server._requests_served;
++_server._requests_serving;
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested] () mutable {
with_gate(_pending_requests_gate, [this, flags, op, stream, buf = std::move(buf), tracing_requested, mem_permit = std::move(mem_permit)] () mutable {
auto bv = bytes_view{reinterpret_cast<const int8_t*>(buf.begin()), buf.size()};
auto cpu = pick_request_cpu();
return smp::submit_to(cpu, [this, bv = std::move(bv), op, stream, client_state = _client_state, tracing_requested] () mutable {
@@ -672,7 +672,7 @@ future<> cql_server::connection::process_request() {
}).then([this, flags] (auto&& response) {
_client_state.merge(response.second);
return this->write_response(std::move(response.first), _compression);
}).then([buf = std::move(buf)] {
}).then([buf = std::move(buf), mem_permit = std::move(mem_permit)] {
// Keep buf alive.
});
}).handle_exception([] (std::exception_ptr ex) {