Compare commits

...

37 Commits

Author SHA1 Message Date
Pekka Enberg
426316a4b7 release: prepare for 1.1.2 2016-06-06 14:47:32 +03:00
Pekka Enberg
3289010910 Revert "dist/common/scripts: update SET_NIC when --setup-nic passed to scylla_sysconfig_setup"
This reverts commit 73fa36b416.

Fixes #1301.
2016-06-06 14:46:32 +03:00
Asias He
52c9723e04 streaming: Reduce memory usage when sending mutations
Limit disk bandwidth to 5MB/s to emulate a slow disk:
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.write_bps_device
echo "8:0 5000000" >
/cgroup/blkio/limit/blkio.throttle.read_bps_device

Start scylla node 1 with low memory:
scylla -c 1 -m 128M --auto-bootstrap false

Run c-s:
taskset -c 7 cassandra-stress write duration=5m cl=ONE -schema
'replication(factor=1)' -pop seq=1..100000  -rate threads=20
limit=2000/s -node 127.0.0.1

Start scylla node 2 with low memory:
scylla -c 1 -m 128M --auto-bootstrap true

Without this patch, I saw std::bad_alloc during streaming

ERROR 2016-06-01 14:31:00,196 [shard 0] storage_proxy - exception during
mutation write to 127.0.0.1: std::bad_alloc (std::bad_alloc)
...
ERROR 2016-06-01 14:31:10,172 [shard 0] database - failed to move
memtable to cache: std::bad_alloc (std::bad_alloc)
...

To fix:

1. Apply the streaming mutation limiter before we read the mutation into
memory to avoid wasting memory holding the mutation which we can not
send.

2. Reduce the parallelism of sending streaming mutations. Before we send each
range in parallel, after we send each range one by one.

   before: nr_vnode * nr_shard * (send_info + cf.make_reader memory usage)

   after: nr_shard * (send_info + cf.make_reader memory usage)

We can at least save memory usage by the factor of nr_vnode, 256 by
default.

In my setup, fix 1) alone is not enough, with both fix 1) and 2), I saw
no std::bad_alloc. Also, I did not see streaming bandwidth dropped due
to 2).

In addition, I tested grow_cluster_test.py:GrowClusterTest.test_grow_3_to_4,
as described:

https://github.com/scylladb/scylla/issues/1270#issuecomment-222585375

With this patch, I saw no std::bad_alloc any more.

Fixes: #1270

Message-Id: <7703cf7a9db40e53a87f0f7b5acbb03fff2daf43.1464785542.git.asias@scylladb.com>
(cherry picked from commit 206955e47c)

Conflicts:
	streaming/stream_transfer_task.cc
2016-06-02 11:08:19 +03:00
Pekka Enberg
3cc91eeb84 release: prepare for 1.1.1 2016-05-26 12:58:39 +03:00
Pekka Enberg
d67ee37bbc Update seastar submodule
* seastar 85bdfb7...b80564d (1):
  > reactor: advertise the logging_failures metric as a DERIVE counter
2016-05-26 12:57:54 +03:00
Raphael S. Carvalho
fcbe43cc87 sstables: optimize leveled compaction strategy
Leveled compaction strategy is doing a lot of work whenever it's asked to get
a list of sstables to be compacted. It's checking if a sstable overlaps with
another sstable in the same level twice. First, when adding a sstable to a
list with sstables at the same level. Second, after adding all sstables to
their respective lists.

It's enough to check that a sstable creates an overlap in its level only once.
So I am changing the code to unconditionally insert a sstable to its respective
list, and after that, it will call repair_overlapping_sstables() that will send
any sstable that creates an overlap in its level to L0 list.

By the way, the optimization isn't in the compaction itself, instead in the
strategy code that gets a set of sstables to be compacted.

Reviewed-by: Nadav Har'El <nyh@scylladb.com>
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <8c8526737277cb47987a3a5dbd5ff3bb81a6d038.1461965074.git.raphaelsc@scylladb.com>
(cherry picked from commit ae95ce1bd7)
2016-05-24 15:56:10 +03:00
Pekka Enberg
ef79310b3c dist/docker: Use Scylla 1.1 RPM repository 2016-05-23 10:16:13 +03:00
Pekka Enberg
f7e81c7b7d dist/docker: Fetch RPM repository from Scylla web site
Fix the hard-coded Scylla RPM repository by downloading it from Scylla
web site. This makes it easier to switch between different versions.

Message-Id: <1463981271-25231-1-git-send-email-penberg@scylladb.com>
(cherry picked from commit 8a7197e390)
2016-05-23 10:15:17 +03:00
Raphael S. Carvalho
54224dfaa0 tests: check that overlapping sstable has its level changed to 0
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit cbc2e96a58)
2016-05-20 13:42:32 +03:00
Raphael S. Carvalho
e30199119c db: fix migration of sstables with level greater than 0
Refresh will rewrite statistics of any migrated sstable with level
> 0. However, this operation is currently not working because O_EXCL
flag is used, meaning that create will fail.

It turns out that we don't actually need to change on-disk level of
a sstable by overwriting statistics file.
We can only set in-memory level of a sstable to 0. If Scylla reboots
before all migrated sstables are compacted, leveled strategy is smart
enough to detect sstables that overlap, and set their in-memory level
to 0.

Fixes #1124.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit ee0f66eef6)
2016-05-20 13:42:27 +03:00
Raphael S. Carvalho
07ce4ec032 main: stop compaction manager earlier
Avi says:
"During shutdown, we prevent new compactions, but perhaps too late.
Memtables are flushed and these can trigger compaction."

To solve that, let's stop compaction manager at a very early step
of shutdown. We will still try to stop compaction manager in
database::stop() because user may ask for a shutdown before scylla
was fully started. It's fine to stop compaction manager twice.
Only the first call will actually stop the manager.

Fixes #1238.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <c64ab11f3c91129c424259d317e48abc5bde6ff3.1462496694.git.raphaelsc@scylladb.com>
(cherry picked from commit bf18025937)
2016-05-20 13:41:11 +03:00
Asias He
c7c18d9c0c gms: Optimize gossiper::is_alive
In perf-flame, I saw in

service::storage_proxy::create_write_response_handler (2.66% cpu)

  gossiper::is_alive takes 0.72% cpu
  locator::token_metadata::pending_endpoints_for takes 1.2% cpu

After this patch:

service::storage_proxy::create_write_response_handler (2.17% cpu)

  gossiper::is_alive does not show up at all
  locator::token_metadata::pending_endpoints_for takes 1.3% cpu

There is no need to copy the endpoint_state from the endpoint_state_map
to check if a node is alive. Optimize it since gossiper::is_alive is
called in the fast path.

Message-Id: <2144310aef8d170cab34a2c96cb67cabca761ca8.1463540290.git.asias@scylladb.com>
(cherry picked from commit eb9ac9ab91)
2016-05-20 13:03:44 +03:00
Asias He
2a4582ab9f token_metadata: Speed up pending_endpoints_for
pending_endpoints_for is called frequently by
storage_proxy::create_write_response_handler when doing cql query.

Before this patch, each call to pending_endpoints_for involves
converting a multimap (std::unordered_multimap<range<token>,
inet_address>>) to map (std::unordered_map<range<token>,
std::unordered_set<inet_address>>).

To speed up the token to pending endpoint mapping search, a interval map
is introduced. It is faster than searching the map linearly and can
avoid caching the token/pending endpoint mapping.

With this patch, the operations per second drop during adding node
period gets much better.

Before:
45K to 10K

After:
45k to 38K

(The number is measured with the streaming code skipping to send data to
rule out the streaming factor.)

Refs: #1223
(cherry picked from commit 089734474b)
2016-05-20 13:03:31 +03:00
Asias He
ab5e23f6e7 dht: Add default constructor for token
It is needed to put token in to a boost interval_map in the following
patch.

(cherry picked from commit ee0585cee9)
2016-05-20 13:03:24 +03:00
Calle Wilund
fecea15a25 cql3::statements::cf_prop_defs: Fix compation min/max not handled
Property parsing code was looking at wrong property level
for initial guard statement.

Fixes #1257

Message-Id: <1462967584-2875-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 5604fb8aa3)
2016-05-18 14:19:27 +03:00
Tomasz Grabiec
dce549f44f tests: Add unit tests for schema_registry
(cherry picked from commit 90c31701e3)
2016-05-16 10:52:02 +03:00
Tomasz Grabiec
26a3302957 schema_registry: Fix possible hang in maybe_sync() if syncer doesn't defer
Spotted during code review.

If it doesn't defer, we may execute then_wrapped() body before we
change the state. Fix by moving then_wrapped() body after state changes.

(cherry picked from commit 443e5aef5a)
2016-05-16 10:51:54 +03:00
Tomasz Grabiec
f796d8081b migration_manager: Fix schema syncing with older version
The problem was that "s" would not be marked as synced-with if it came from
shard != 0.

As a result, mutation using that schema would fail to apply with an exception:

  "attempted to mutate using not synced schema of ..."

The problem could surface when altering schema without changing
columns and restarting one of the nodes so that it forgets past
versions.

Fixes #1258.

Will be covered by dtest:

  SchemaManagementTest.test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns

(cherry picked from commit 8703136a4f)
2016-05-16 10:51:48 +03:00
Pekka Enberg
b850cb991c release: prepare for 1.1.0 2016-05-16 09:33:26 +03:00
Tomasz Grabiec
734cfa949a migration_manager: Invalidate prepared statements on every schema change
Currently we only do that when column set changes. When prepared
statements are executed, paramaters like read repair chance are read
from schema version stored in the statement. Not invalidating prepared
statements on changes of such parameters will appear as if alter took
no effect.

Fixes #1255.
Message-Id: <1462985495-9767-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 13d8cd0ae9)
2016-05-12 09:18:00 +03:00
Calle Wilund
3606e3ab29 transport::server: Do not treat accept exception as fatal
1.) It most likely is not, i.e. either tcp or more likely, ssl
    negotiation failure. In any case, we can still try next
    connection.
2.) Not retrying will cause us to "leak" the accept, and then hang
    on shutdown.

Also, promote logging message on accept exception to "warn", since
dtest(s?) depend on seeing log output.

Message-Id: <1462283265-27051-4-git-send-email-calle@scylladb.com>
(cherry picked from commit 917bf850fa)
2016-05-10 19:26:29 +03:00
Calle Wilund
014284de00 cql_server: Use credentials_builder to init tls
Slightly cleaner, and shard-safe tls init.

Message-Id: <1462283265-27051-3-git-send-email-calle@scylladb.com>
(cherry picked from commit 437ebe7128)
2016-05-10 19:26:23 +03:00
Calle Wilund
f17764e74a messaging_service: Change tls init to use credentials_builder
To simplify init of msg service, use credendials_builder
to encapsulate tls options so actual credentials can be
more easily created in each shard.

Message-Id: <1462283265-27051-2-git-send-email-calle@scylladb.com>
(cherry picked from commit 58f7edb04f)
2016-05-10 19:26:18 +03:00
Avi Kivity
8643028d0c Update seastar submodule
* seastar 73d5583...85bdfb7 (4):
  > tests/mkcert.gmk: Fix makefile bug in snakeoil cert generator
  > tls_test: Add case to do a little checking of credentials_builder
  > tls: Add credentials_builder - copyable credentials "factory"
  > tls_test: Add test for large-ish buffer send/recieve
2016-05-10 19:24:49 +03:00
Avi Kivity
a35f1d765a Backport seastar iotune fixes
* seastar dab58e4...73d5583 (2):
  > iotune: don't coredump when directory fails to be created
  > iotune: improve recommendation in case we timeout

Fixes #1243.
2016-05-09 10:49:15 +03:00
Avi Kivity
3116a92b0e Point seastar submodule at scylla-seastar repository
Allows us to backport seastar fixes to branch-1.1.
2016-05-08 14:49:02 +03:00
Gleb Natapov
dad312ce0a tests: test for result row counting
Message-Id: <1462377579-2419-2-git-send-email-gleb@scylladb.com>
(cherry picked from commit f1cd52ff3f)
2016-05-06 13:32:36 +03:00
Gleb Natapov
3cae56f3e3 query: fix result row counting for results with multiple partitions
Message-Id: <1462377579-2419-1-git-send-email-gleb@scylladb.com>
(cherry picked from commit b75475de80)
2016-05-06 13:32:29 +03:00
Calle Wilund
656a10c4b8 storage_service: Add logging to match origin
Pointing out if CQL server is listing in SSL mode.
Message-Id: <1462368016-32394-2-git-send-email-calle@scylladb.com>

(cherry picked from commit 709dd82d59)
2016-05-06 13:30:29 +03:00
Calle Wilund
c04b3de564 messaging_service: Add logging to match origin
To announce rpc port + ssl if on.

Message-Id: <1462368016-32394-1-git-send-email-calle@scylladb.com>
(cherry picked from commit d8ea85cd90)
2016-05-06 13:26:01 +03:00
Gleb Natapov
4964fe4cf0 storage_proxy: stop range query with limit after the limit is reached
(cherry picked from commit 3039e4c7de)
2016-05-06 12:50:51 +03:00
Gleb Natapov
e3ad3cf7d9 query: put live row count into query::result
The patch calculates row count during result building and while merging.
If one of results that are being merged does not have row count the
merged result will not have one either.

(cherry picked from commit db322d8f74)
2016-05-06 12:50:47 +03:00
Gleb Natapov
cef40627a7 storage_proxy: fix calculation of concurrency queried ranges
(cherry picked from commit 41c586313a)
2016-05-06 12:50:37 +03:00
Gleb Natapov
995820c08a storage_proxy: add logging for range query row count estimation
(cherry picked from commit c364ab9121)
2016-05-06 12:50:32 +03:00
Calle Wilund
b78abd7649 auth: Make auth.* schemas use deterministic UUIDs
In initial implementation I figured this was not required, but
we get issues communicating across nodes if system tables
don't have the same UUID, since creation is forcefully local, yet
shared.

Just do a manual re-create of the scema with a name UUID, and
use migration manager directly.
Message-Id: <1462194588-11964-1-git-send-email-calle@scylladb.com>

(cherry picked from commit 6d2caedafd)
2016-05-03 10:49:16 +03:00
Calle Wilund
d47c62b51c messaging_service: Change init to use per-shard tls credentials
Fixes: #1220

While the server_credentials object is technically immutable
(esp with last change in seastar), the ::shared_ptr holding them
is not safe to share across shards.

Pre-create cpu x credentials and then move-hand them out in service
start-up instead.

Fixes assertion error in debug builds. And just maybe real memory
corruption in release.

Requires seastar tls change:
"Change server_credentials to copy dh_params input"

Message-Id: <1462187704-2056-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 751ba2f0bf)
2016-05-02 15:37:43 +03:00
Pekka Enberg
bef19e7f9e release: prepare for 1.1.rc1 2016-04-29 08:49:10 +03:00
39 changed files with 505 additions and 165 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.1.2
if test -f version
then

View File

@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
::shared_ptr<cql3::statements::create_table_statement> statement =
static_pointer_cast<cql3::statements::create_table_statement>(
parsed->prepare(db)->statement);
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
auto schema = statement->get_cf_meta_data();
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
}
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {

View File

@@ -162,6 +162,7 @@ modes = {
scylla_tests = [
'tests/mutation_test',
'tests/schema_registry_test',
'tests/canonical_mutation_test',
'tests/range_test',
'tests/types_test',

View File

@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
{
if (columns_changed) {
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
// #1255: Ignoring columns_changed deliberately.
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)

View File

@@ -162,7 +162,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
}
std::experimental::optional<sstring> tmp_value = {};
if (has_property(KW_MINCOMPACTIONTHRESHOLD)) {
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().count(KW_MINCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MINCOMPACTIONTHRESHOLD);
}
@@ -170,7 +170,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
int min_compaction_threshold = to_int(KW_MINCOMPACTIONTHRESHOLD, tmp_value, builder.get_min_compaction_threshold());
tmp_value = {};
if (has_property(KW_MAXCOMPACTIONTHRESHOLD)) {
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().count(KW_MAXCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MAXCOMPACTIONTHRESHOLD);
}

View File

@@ -973,7 +973,13 @@ column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tab
return parallel_for_each(new_tables, [this] (auto comps) {
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
return sst->load().then([this, sst] {
return sst->mutate_sstable_level(0);
// This sets in-memory level of sstable to 0.
// When loading a migrated sstable, it's important to set it to level 0 because
// leveled compaction relies on a level > 0 having no overlapping sstables.
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
// is smart enough to detect a sstable that overlaps and set its in-memory
// level to 0.
return sst->set_sstable_level(0);
}).then([this, sst] {
auto first = sst->get_first_partition_key(*_schema);
auto last = sst->get_last_partition_key(*_schema);

View File

@@ -87,6 +87,10 @@ public:
// [0x00, 0x80] == 1/512
// [0xff, 0x80] == 1 - 1/512
managed_bytes _data;
token() : _kind(kind::before_all_keys) {
}
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
}

View File

@@ -58,7 +58,7 @@ while [ $# -gt 0 ]; do
shift 2
;;
"--setup-nic")
SET_NIC=yes
SETUP_NIC=1
shift 1
;;
"--ami")

View File

@@ -2,8 +2,8 @@ FROM centos:7
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.1.repo -o /etc/yum.repos.d/scylla.repo
RUN yum -y install epel-release
ADD scylla.repo /etc/yum.repos.d/
RUN yum -y clean expire-cache
RUN yum -y update
RUN yum -y remove boost-thread boost-system

View File

@@ -1,23 +0,0 @@
[scylla]
name=Scylla for Centos $releasever - $basearch
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/$basearch/
enabled=1
gpgcheck=0
[scylla-generic]
name=Scylla for centos $releasever
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/noarch/
enabled=1
gpgcheck=0
[scylla-3rdparty]
name=Scylla 3rdParty for Centos $releasever - $basearch
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/$basearch/
enabled=1
gpgcheck=0
[scylla-3rdparty-generic]
name=Scylla 3rdParty for Centos $releasever
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/noarch/
enabled=1
gpgcheck=0

View File

@@ -1590,12 +1590,12 @@ bool gossiper::is_alive(inet_address ep) {
if (ep == get_broadcast_address()) {
return true;
}
auto eps = get_endpoint_state_for_endpoint(ep);
auto it = endpoint_state_map.find(ep);
// we could assert not-null, but having isAlive fail screws a node over so badly that
// it's worth being defensive here so minor bugs don't cause disproportionate
// badness. (See CASSANDRA-1463 for an example).
if (eps) {
return eps->is_alive();
if (it != endpoint_state_map.end()) {
return it->second.is_alive();
} else {
logger.warn("unknown endpoint {}", ep);
return false;

15
init.cc
View File

@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
}
future<> f = make_ready_future<>();
::shared_ptr<server_credentials> creds;
std::shared_ptr<credentials_builder> creds;
if (ew != encrypt_what::none) {
// note: credentials are immutable after this, and ok to share across shards
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
creds = std::make_shared<credentials_builder>();
creds->set_dh_level(dh_params::level::MEDIUM);
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
ms_trust_store.empty() ? creds->set_system_trust().get() :
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
if (ms_trust_store.empty()) {
creds->set_system_trust().get();
} else {
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
}
}
// Init messaging_service
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
// #293 - do not stop anything
//engine().at_exit([] { return net::get_messaging_service().stop(); });
// Init failure_detector

View File

@@ -27,6 +27,8 @@
#include "log.hh"
#include <unordered_map>
#include <algorithm>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
namespace locator {
@@ -339,25 +341,65 @@ range<token> token_metadata::get_primary_range_for(token right) {
return get_primary_ranges_for({right}).front();
}
boost::icl::interval<token>::interval_type
token_metadata::range_to_interval(range<dht::token> r) {
bool start_inclusive = false;
bool end_inclusive = false;
token start = dht::minimum_token();
token end = dht::maximum_token();
if (r.start()) {
start = r.start()->value();
start_inclusive = r.start()->is_inclusive();
}
if (r.end()) {
end = r.end()->value();
end_inclusive = r.end()->is_inclusive();
}
if (start_inclusive == false && end_inclusive == false) {
return boost::icl::interval<token>::open(std::move(start), std::move(end));
} else if (start_inclusive == false && end_inclusive == true) {
return boost::icl::interval<token>::left_open(std::move(start), std::move(end));
} else if (start_inclusive == true && end_inclusive == false) {
return boost::icl::interval<token>::right_open(std::move(start), std::move(end));
} else {
return boost::icl::interval<token>::closed(std::move(start), std::move(end));
}
}
void token_metadata::set_pending_ranges(const sstring& keyspace_name,
std::unordered_multimap<range<token>, inet_address> new_pending_ranges) {
if (new_pending_ranges.empty()) {
_pending_ranges.erase(keyspace_name);
_pending_ranges_map.erase(keyspace_name);
_pending_ranges_interval_map.erase(keyspace_name);
return;
}
std::unordered_map<range<token>, std::unordered_set<inet_address>> map;
for (const auto& x : new_pending_ranges) {
map[x.first].emplace(x.second);
}
// construct a interval map to speed up the search
_pending_ranges_interval_map[keyspace_name] = {};
for (const auto& m : map) {
_pending_ranges_interval_map[keyspace_name] +=
std::make_pair(range_to_interval(m.first), m.second);
}
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
_pending_ranges_map[keyspace_name] = std::move(map);
}
std::unordered_multimap<range<token>, inet_address>&
token_metadata::get_pending_ranges_mm(sstring keyspace_name) {
return _pending_ranges[keyspace_name];
}
std::unordered_map<range<token>, std::unordered_set<inet_address>>
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
token_metadata::get_pending_ranges(sstring keyspace_name) {
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
for (auto x : get_pending_ranges_mm(keyspace_name)) {
auto& range_token = x.first;
auto& ep = x.second;
auto it = ret.find(range_token);
if (it != ret.end()) {
it->second.emplace(ep);
} else {
ret.emplace(range_token, std::unordered_set<inet_address>{ep});
}
}
return ret;
return _pending_ranges_map[keyspace_name];
}
std::vector<range<token>>
@@ -378,7 +420,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
return;
}
@@ -463,7 +505,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
all_left_metadata.remove_endpoint(endpoint);
}
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
@@ -508,14 +550,23 @@ void token_metadata::add_moving_endpoint(token t, inet_address endpoint) {
}
std::vector<gms::inet_address> token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
// Fast path 0: no pending ranges at all
if (_pending_ranges_interval_map.empty()) {
return {};
}
// Fast path 1: no pending ranges for this keyspace_name
if (_pending_ranges_interval_map[keyspace_name].empty()) {
return {};
}
// Slow path: lookup pending ranges
std::vector<gms::inet_address> endpoints;
auto ranges = get_pending_ranges(keyspace_name);
for (auto& x : ranges) {
if (x.first.contains(token, dht::token_comparator())) {
for (auto& addr : x.second) {
endpoints.push_back(addr);
}
}
auto interval = range_to_interval(range<dht::token>(token));
auto it = _pending_ranges_interval_map[keyspace_name].find(interval);
if (it != _pending_ranges_interval_map[keyspace_name].end()) {
// interval_map does not work with std::vector, convert to std::vector of ips
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
}
return endpoints;
}

View File

@@ -46,6 +46,8 @@
#include "utils/UUID.hh"
#include <experimental/optional>
#include <boost/range/iterator_range.hpp>
#include <boost/icl/interval.hpp>
#include <boost/icl/interval_map.hpp>
#include "query-request.hh"
#include "range.hh"
@@ -144,6 +146,8 @@ private:
std::unordered_map<token, inet_address> _moving_endpoints;
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> _pending_ranges;
std::unordered_map<sstring, std::unordered_map<range<token>, std::unordered_set<inet_address>>> _pending_ranges_map;
std::unordered_map<sstring, boost::icl::interval_map<token, std::unordered_set<inet_address>>> _pending_ranges_interval_map;
std::vector<token> _sorted_tokens;
@@ -608,13 +612,15 @@ public:
std::vector<range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
range<token> get_primary_range_for(token right);
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
private:
std::unordered_multimap<range<token>, inet_address>& get_pending_ranges_mm(sstring keyspace_name);
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges);
public:
/** a mutable map may be returned but caller should not modify it */
std::unordered_map<range<token>, std::unordered_set<inet_address>> get_pending_ranges(sstring keyspace_name);
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
/**

View File

@@ -592,6 +592,11 @@ int main(int ac, char** av) {
engine().at_exit([] {
return repair_shutdown(service::get_local_storage_service().db());
});
engine().at_exit([&db] {
return db.invoke_on_all([](auto& db) {
return db.get_compaction_manager().stop();
});
});
}).or_terminate();
});
}

View File

@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
, uint16_t port
, encrypt_what ew
, uint16_t ssl_port
, ::shared_ptr<seastar::tls::server_credentials> credentials
, std::shared_ptr<seastar::tls::credentials_builder> credentials
)
: _listen_address(ip)
, _port(port)
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _encrypt_what(ew)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
, _credentials(std::move(credentials))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
return nullptr;
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
return rpc::no_wait;
});
// Do this on just cpu 0, to avoid duplicate logs.
if (engine().cpu_id() == 0) {
if (_server_tls) {
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
}
logger.info("Starting Messaging Service on port {}", _port);
}
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {

View File

@@ -186,7 +186,7 @@ public:
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
~messaging_service();
public:
uint16_t port();

View File

@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
pw.retract();
} else {
pw.row_count() += row_count ? : 1;
std::move(rows_wr).end_rows().end_qr_partition();
}
}

View File

@@ -50,6 +50,7 @@ class result::partition_writer {
bool _static_row_added = false;
md5_hasher& _digest;
md5_hasher _digest_pos;
uint32_t& _row_count;
public:
partition_writer(
result_request request,
@@ -58,7 +59,8 @@ public:
ser::query_result__partitions& pw,
ser::vector_position pos,
ser::after_qr_partition__key w,
md5_hasher& digest)
md5_hasher& digest,
uint32_t& row_count)
: _request(request)
, _w(std::move(w))
, _slice(slice)
@@ -67,6 +69,7 @@ public:
, _pos(std::move(pos))
, _digest(digest)
, _digest_pos(digest)
, _row_count(row_count)
{ }
bool requested_digest() const {
@@ -98,6 +101,9 @@ public:
md5_hasher& digest() {
return _digest;
}
uint32_t& row_count() {
return _row_count;
}
};
class result::builder {
@@ -106,6 +112,7 @@ class result::builder {
const partition_slice& _slice;
ser::query_result__partitions _w;
result_request _request;
uint32_t _row_count = 0;
public:
builder(const partition_slice& slice, result_request request)
: _slice(slice)
@@ -130,21 +137,21 @@ public:
if (_request != result_request::only_result) {
key.feed_hash(_digest, s);
}
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
}
result build() {
std::move(_w).end_partitions().end_query_result();
switch (_request) {
case result_request::only_result:
return result(std::move(_out));
return result(std::move(_out), _row_count);
case result_request::only_digest: {
bytes_ostream buf;
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
return result(std::move(buf), result_digest(_digest.finalize_array()));
}
case result_request::result_and_digest:
return result(std::move(_out), result_digest(_digest.finalize_array()));
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
}
abort();
}

View File

@@ -96,14 +96,16 @@ public:
class result {
bytes_ostream _w;
stdx::optional<result_digest> _digest;
stdx::optional<uint32_t> _row_count;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w) : _w(std::move(w)) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
result(result&&) = default;
result(const result&) = default;
result& operator=(result&&) = default;
@@ -117,6 +119,10 @@ public:
return _digest;
}
const stdx::optional<uint32_t>& row_count() const {
return _row_count;
}
uint32_t calculate_row_count(const query::partition_slice&);
struct printer {

View File

@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
bytes_ostream w;
auto partitions = ser::writer_of_query_result(w).start_partitions();
std::experimental::optional<uint32_t> row_count = 0;
for (auto&& r : _partial) {
if (row_count) {
if (r->row_count()) {
row_count = row_count.value() + r->row_count().value();
} else {
row_count = std::experimental::nullopt;
}
}
result_view::do_with(*r, [&] (result_view rv) {
for (auto&& pv : rv._v.partitions()) {
partitions.add(pv);
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
std::move(partitions).end_partitions().end_query_result();
return make_foreign(make_lw_shared<query::result>(std::move(w)));
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
}
}

View File

@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
return make_ready_future<>();
case schema_registry_entry::sync_state::SYNCING:
return _synced_future;
case schema_registry_entry::sync_state::NOT_SYNCED:
case schema_registry_entry::sync_state::NOT_SYNCED: {
logger.debug("Syncing {}", _version);
_synced_promise = {};
do_with(std::move(syncer), [] (auto& syncer) {
auto f = do_with(std::move(syncer), [] (auto& syncer) {
return syncer();
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
if (_sync_state != sync_state::SYNCING) {
return;
}
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
_synced_promise.set_value();
}
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
return _synced_future;
}
default:
assert(0);
}

Submodule seastar updated: dab58e4562...b80564dff1

View File

@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
//
// The endpoint is the node from which 's' originated.
//
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
if (s->is_synced()) {
return make_ready_future<>();
}
// Serialize schema sync by always doing it on shard 0.
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync([endpoint, s] {
return s->registry_entry()->maybe_sync([s, endpoint] {
auto merge = [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
return get_local_migration_manager().merge_schema_from(endpoint);
});
};
// Serialize schema sync by always doing it on shard 0.
if (engine().cpu_id() == 0) {
return merge();
} else {
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync(merge);
});
}
});
}

View File

@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto concurrent_fetch_starting_index = i;
auto p = shared_from_this();
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
query::partition_range& range = *i;
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
return rex->execute(timeout);
}, std::move(merger));
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
total_row_count += result->row_count() ? result->row_count().value() :
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
results.emplace_back(std::move(result));
if (i == ranges.end()) {
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
} else {
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
}
}).handle_exception([p] (std::exception_ptr eptr) {
p->handle_read_error(eptr);
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {

View File

@@ -219,7 +219,7 @@ private:
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor);
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,

View File

@@ -1797,16 +1797,18 @@ future<> storage_service::start_native_transport() {
// return cserver->stop();
//});
::shared_ptr<seastar::tls::server_credentials> cred;
std::shared_ptr<seastar::tls::credentials_builder> cred;
auto addr = ipv4_addr{ip, port};
auto f = make_ready_future();
// main should have made sure values are clean and neatish
if (ceo.at("enabled") == "true") {
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
cred = std::make_shared<seastar::tls::credentials_builder>();
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
logger.info("Enabling encrypted CQL connections between client and server");
}
return f.then([cserver, addr, cred, keepalive] {
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
});
});

View File

@@ -107,10 +107,12 @@ public:
// ensure all SSTables are in the manifest
for (auto& sstable : sstables) {
// unconditionally add a sstable to a list of its level.
manifest.add(sstable);
}
for (auto i = 1U; i < manifest._generations.size(); i++) {
// send overlapping sstables (with level > 0) to level 0, if any.
manifest.repair_overlapping_sstables(i);
}
@@ -123,36 +125,8 @@ public:
if (level >= _generations.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1)));
}
#if 0
logDistribution();
#endif
if (can_add_sstable(sstable)) {
// adding the sstable does not cause overlap in the level
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
_generations[level].push_back(sstable);
} else {
// this can happen if:
// * a compaction has promoted an overlapping sstable to the given level, or
// was also supposed to add an sstable at the given level.
// * we are moving sstables from unrepaired to repaired and the sstable
// would cause overlap
//
// The add(..):ed sstable will be sent to level 0
#if 0
try
{
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
reader.reloadSSTableMetadata();
}
catch (IOException e)
{
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
}
#endif
_generations[0].push_back(sstable);
}
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
_generations[level].push_back(sstable);
}
#if 0
@@ -258,20 +232,8 @@ public:
void send_back_to_L0(sstables::shared_sstable& sstable) {
remove(sstable);
#if 0
try
{
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
sstable.reloadSSTableMetadata();
add(sstable);
}
catch (IOException e)
{
throw new RuntimeException("Could not reload sstable meta data", e);
}
#else
_generations[0].push_back(sstable);
#endif
sstable->set_sstable_level(0);
}
#if 0

View File

@@ -1794,6 +1794,20 @@ double sstable::get_compression_ratio() const {
}
}
void sstable::set_sstable_level(uint32_t new_level) {
auto entry = _statistics.contents.find(metadata_type::Stats);
if (entry == _statistics.contents.end()) {
return;
}
auto& p = entry->second;
if (!p) {
throw std::runtime_error("Statistics is malformed");
}
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
sstlog.debug("set level of {} with generation {} from {} to {}", get_filename(), _generation, s.sstable_level, new_level);
s.sstable_level = new_level;
}
future<> sstable::mutate_sstable_level(uint32_t new_level) {
if (!has_component(component_type::Statistics)) {
return make_ready_future<>();

View File

@@ -529,6 +529,9 @@ public:
return get_stats_metadata().sstable_level;
}
// This will change sstable level only in memory.
void set_sstable_level(uint32_t);
double get_compression_ratio() const;
future<> mutate_sstable_level(uint32_t);

View File

@@ -85,7 +85,6 @@ struct send_info {
};
future<stop_iteration> do_send_mutations(auto si, auto fm) {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
auto fm_size = fm.representation().size();
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
@@ -100,26 +99,27 @@ future<stop_iteration> do_send_mutations(auto si, auto fm) {
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
}
si->mutations_done.broken();
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
return stop_iteration::no;
});
return make_ready_future<stop_iteration>(stop_iteration::no);
}
future<> send_mutations(auto si) {
auto& cf = si->db.find_column_family(si->cf_id);
auto& priority = service::get_local_streaming_read_priority();
return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) {
return repeat([si, &reader] () {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return repeat([si, &reader] {
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
return reader().then([si] (auto mopt) {
if (mopt && si->db.column_family_exists(si->cf_id)) {
si->mutations_nr++;
auto fm = frozen_mutation(*mopt);
return do_send_mutations(si, std::move(fm));
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
}).finally([] {
get_local_stream_manager().mutation_send_limiter().signal();
});
});
}).then([si] {
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
auto cf_id = this->cf_id;
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
auto cf_id = this->cf_id;

View File

@@ -32,6 +32,7 @@ boost_tests = [
'types_test',
'keys_test',
'mutation_test',
'schema_registry_test',
'range_test',
'mutation_reader_test',
'cql_query_test',

View File

@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
});
}
SEASTAR_TEST_CASE(test_result_row_count) {
return seastar::async([] {
auto s = make_schema();
auto now = gc_clock::now();
auto slice = partition_slice_builder(*s).build();
mutation m1(partition_key::from_single_value(*s, "key1"), s);
auto src = make_source({m1});
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
mutation m2(partition_key::from_single_value(*s, "key2"), s);
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
});
}

View File

@@ -23,10 +23,15 @@
#define BOOST_TEST_MODULE core
#include <boost/test/unit_test.hpp>
#include "boost/icl/interval.hpp"
#include "boost/icl/interval_map.hpp"
#include <unordered_set>
#include "query-request.hh"
#include "schema_builder.hh"
#include "disk-error-handler.hh"
#include "locator/token_metadata.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
@@ -447,3 +452,56 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
// [3,4) and (4,5]
BOOST_REQUIRE(range<unsigned>({3}, {{4, false}}).overlaps(range<unsigned>({{4, false}}, {5}), unsigned_comparator()) == false);
}
auto get_item(std::string left, std::string right, std::string val) {
using value_type = std::unordered_set<std::string>;
auto l = dht::global_partitioner().from_sstring(left);
auto r = dht::global_partitioner().from_sstring(right);
auto rg = range<dht::token>({{l, false}}, {r});
value_type v{val};
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
}
BOOST_AUTO_TEST_CASE(test_range_interval_map) {
using value_type = std::unordered_set<std::string>;
using token = dht::token;
boost::icl::interval_map<token, value_type> mymap;
mymap += get_item("1", "5", "A");
mymap += get_item("5", "8", "B");
mymap += get_item("1", "3", "C");
mymap += get_item("3", "8", "D");
std::cout << "my map: " << "\n";
for (auto x : mymap) {
std::cout << x.first << " -> ";
for (auto s : x.second) {
std::cout << s << ", ";
}
std::cout << "\n";
}
auto search_item = [&mymap] (std::string val) {
auto tok = dht::global_partitioner().from_sstring(val);
auto search = range<token>(tok);
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
if (it != mymap.end()) {
std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n";
return true;
} else {
std::cout << "Found NO:" << " token = " << tok << "\n";
return false;
}
};
BOOST_REQUIRE(search_item("0") == false);
BOOST_REQUIRE(search_item("1") == false);
BOOST_REQUIRE(search_item("2") == true);
BOOST_REQUIRE(search_item("3") == true);
BOOST_REQUIRE(search_item("4") == true);
BOOST_REQUIRE(search_item("5") == true);
BOOST_REQUIRE(search_item("6") == true);
BOOST_REQUIRE(search_item("7") == true);
BOOST_REQUIRE(search_item("8") == true);
BOOST_REQUIRE(search_item("9") == false);
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_DYN_LINK
#include <seastar/core/thread.hh>
#include "tests/test-utils.hh"
#include "schema_registry.hh"
#include "schema_builder.hh"
#include "mutation_source_test.hh"
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
static bytes random_column_name() {
return to_bytes(to_hex(make_blob(32)));
}
static schema_ptr random_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column(random_column_name(), bytes_type)
.build();
}
SEASTAR_TEST_CASE(test_async_loading) {
return seastar::async([] {
auto s1 = random_schema();
auto s2 = random_schema();
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
return make_ready_future<frozen_schema>(frozen_schema(s1));
}).get0();
BOOST_REQUIRE(s1_loaded);
BOOST_REQUIRE(s1_loaded->version() == s1->version());
auto s1_later = local_schema_registry().get_or_null(s1->version());
BOOST_REQUIRE(s1_later);
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
return later().then([s2] { return frozen_schema(s2); });
}).get0();
BOOST_REQUIRE(s2_loaded);
BOOST_REQUIRE(s2_loaded->version() == s2->version());
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
BOOST_REQUIRE(s2_later);
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return later(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
promise<> fail_sync;
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
return fail_sync.get_future().then([] {
throw std::runtime_error("sync failed");
});
});
// concurrent maybe_sync should attach the the current one
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
fail_sync.set_value();
try {
f1.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
try {
f2.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}

View File

@@ -1948,6 +1948,45 @@ SEASTAR_TEST_CASE(leveled_06) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(leveled_07) {
// Check that sstable, with level > 0, that overlaps with another in the same level is sent back to L0.
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
column_family::config cfg;
compaction_manager cm;
cfg.enable_disk_writes = false;
cfg.enable_commitlog = false;
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
cf->mark_ready_for_writes();
auto key_and_token_pair = token_generation_for_current_shard(5);
auto min_key = key_and_token_pair[0].first;
auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first;
// Creating two sstables which key range overlap.
add_sstable_for_leveled_test(cf, /*gen*/1, /*data_size*/0, /*level*/1, min_key, max_key);
BOOST_REQUIRE(cf->get_sstables()->size() == 1);
add_sstable_for_leveled_test(cf, /*gen*/2, /*data_size*/0, /*level*/1, key_and_token_pair[1].first, max_key);
BOOST_REQUIRE(cf->get_sstables()->size() == 2);
BOOST_REQUIRE(sstable_overlaps(cf, 1, 2) == true);
auto max_sstable_size_in_mb = 1;
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 1);
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
auto& l0 = manifest.get_level(0);
auto& sst = l0.front();
BOOST_REQUIRE(sst->generation() == 2);
BOOST_REQUIRE(sst->get_sstable_level() == 0);
return make_ready_future<>();
}
static lw_shared_ptr<key_reader> prepare_key_reader(schema_ptr s,
const std::vector<shared_sstable>& ssts, const query::partition_range& range)
{

View File

@@ -275,7 +275,7 @@ future<> cql_server::stop() {
}
future<>
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
_notifier = std::make_unique<event_notifier>(addr.port);
service::get_local_migration_manager().register_listener(_notifier.get());
service::get_local_storage_service().register_subscriber(_notifier.get());
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
server_socket ss;
try {
ss = creds
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
: engine().listen(make_ipv4_address(addr), lo);
} catch (...) {
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
}).then_wrapped([this, which, keepalive] (future<> f) {
try {
f.get();
} catch (const std::bad_alloc&) {
logger.debug("accept failed: {}, retrying", std::current_exception());
do_accepts(which, keepalive);
} catch (...) {
logger.debug("accept failed: {}", std::current_exception());
logger.warn("acccept failed: {}", std::current_exception());
do_accepts(which, keepalive);
}
});
}

View File

@@ -107,7 +107,7 @@ private:
cql_load_balance _lb;
public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive);
future<> stop();
public: