Compare commits

...

22 Commits

Author SHA1 Message Date
Tomasz Grabiec
dce549f44f tests: Add unit tests for schema_registry
(cherry picked from commit 90c31701e3)
2016-05-16 10:52:02 +03:00
Tomasz Grabiec
26a3302957 schema_registry: Fix possible hang in maybe_sync() if syncer doesn't defer
Spotted during code review.

If it doesn't defer, we may execute then_wrapped() body before we
change the state. Fix by moving then_wrapped() body after state changes.

(cherry picked from commit 443e5aef5a)
2016-05-16 10:51:54 +03:00
Tomasz Grabiec
f796d8081b migration_manager: Fix schema syncing with older version
The problem was that "s" would not be marked as synced-with if it came from
shard != 0.

As a result, mutation using that schema would fail to apply with an exception:

  "attempted to mutate using not synced schema of ..."

The problem could surface when altering schema without changing
columns and restarting one of the nodes so that it forgets past
versions.

Fixes #1258.

Will be covered by dtest:

  SchemaManagementTest.test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns

(cherry picked from commit 8703136a4f)
2016-05-16 10:51:48 +03:00
Pekka Enberg
b850cb991c release: prepare for 1.1.0 2016-05-16 09:33:26 +03:00
Tomasz Grabiec
734cfa949a migration_manager: Invalidate prepared statements on every schema change
Currently we only do that when column set changes. When prepared
statements are executed, paramaters like read repair chance are read
from schema version stored in the statement. Not invalidating prepared
statements on changes of such parameters will appear as if alter took
no effect.

Fixes #1255.
Message-Id: <1462985495-9767-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit 13d8cd0ae9)
2016-05-12 09:18:00 +03:00
Calle Wilund
3606e3ab29 transport::server: Do not treat accept exception as fatal
1.) It most likely is not, i.e. either tcp or more likely, ssl
    negotiation failure. In any case, we can still try next
    connection.
2.) Not retrying will cause us to "leak" the accept, and then hang
    on shutdown.

Also, promote logging message on accept exception to "warn", since
dtest(s?) depend on seeing log output.

Message-Id: <1462283265-27051-4-git-send-email-calle@scylladb.com>
(cherry picked from commit 917bf850fa)
2016-05-10 19:26:29 +03:00
Calle Wilund
014284de00 cql_server: Use credentials_builder to init tls
Slightly cleaner, and shard-safe tls init.

Message-Id: <1462283265-27051-3-git-send-email-calle@scylladb.com>
(cherry picked from commit 437ebe7128)
2016-05-10 19:26:23 +03:00
Calle Wilund
f17764e74a messaging_service: Change tls init to use credentials_builder
To simplify init of msg service, use credendials_builder
to encapsulate tls options so actual credentials can be
more easily created in each shard.

Message-Id: <1462283265-27051-2-git-send-email-calle@scylladb.com>
(cherry picked from commit 58f7edb04f)
2016-05-10 19:26:18 +03:00
Avi Kivity
8643028d0c Update seastar submodule
* seastar 73d5583...85bdfb7 (4):
  > tests/mkcert.gmk: Fix makefile bug in snakeoil cert generator
  > tls_test: Add case to do a little checking of credentials_builder
  > tls: Add credentials_builder - copyable credentials "factory"
  > tls_test: Add test for large-ish buffer send/recieve
2016-05-10 19:24:49 +03:00
Avi Kivity
a35f1d765a Backport seastar iotune fixes
* seastar dab58e4...73d5583 (2):
  > iotune: don't coredump when directory fails to be created
  > iotune: improve recommendation in case we timeout

Fixes #1243.
2016-05-09 10:49:15 +03:00
Avi Kivity
3116a92b0e Point seastar submodule at scylla-seastar repository
Allows us to backport seastar fixes to branch-1.1.
2016-05-08 14:49:02 +03:00
Gleb Natapov
dad312ce0a tests: test for result row counting
Message-Id: <1462377579-2419-2-git-send-email-gleb@scylladb.com>
(cherry picked from commit f1cd52ff3f)
2016-05-06 13:32:36 +03:00
Gleb Natapov
3cae56f3e3 query: fix result row counting for results with multiple partitions
Message-Id: <1462377579-2419-1-git-send-email-gleb@scylladb.com>
(cherry picked from commit b75475de80)
2016-05-06 13:32:29 +03:00
Calle Wilund
656a10c4b8 storage_service: Add logging to match origin
Pointing out if CQL server is listing in SSL mode.
Message-Id: <1462368016-32394-2-git-send-email-calle@scylladb.com>

(cherry picked from commit 709dd82d59)
2016-05-06 13:30:29 +03:00
Calle Wilund
c04b3de564 messaging_service: Add logging to match origin
To announce rpc port + ssl if on.

Message-Id: <1462368016-32394-1-git-send-email-calle@scylladb.com>
(cherry picked from commit d8ea85cd90)
2016-05-06 13:26:01 +03:00
Gleb Natapov
4964fe4cf0 storage_proxy: stop range query with limit after the limit is reached
(cherry picked from commit 3039e4c7de)
2016-05-06 12:50:51 +03:00
Gleb Natapov
e3ad3cf7d9 query: put live row count into query::result
The patch calculates row count during result building and while merging.
If one of results that are being merged does not have row count the
merged result will not have one either.

(cherry picked from commit db322d8f74)
2016-05-06 12:50:47 +03:00
Gleb Natapov
cef40627a7 storage_proxy: fix calculation of concurrency queried ranges
(cherry picked from commit 41c586313a)
2016-05-06 12:50:37 +03:00
Gleb Natapov
995820c08a storage_proxy: add logging for range query row count estimation
(cherry picked from commit c364ab9121)
2016-05-06 12:50:32 +03:00
Calle Wilund
b78abd7649 auth: Make auth.* schemas use deterministic UUIDs
In initial implementation I figured this was not required, but
we get issues communicating across nodes if system tables
don't have the same UUID, since creation is forcefully local, yet
shared.

Just do a manual re-create of the scema with a name UUID, and
use migration manager directly.
Message-Id: <1462194588-11964-1-git-send-email-calle@scylladb.com>

(cherry picked from commit 6d2caedafd)
2016-05-03 10:49:16 +03:00
Calle Wilund
d47c62b51c messaging_service: Change init to use per-shard tls credentials
Fixes: #1220

While the server_credentials object is technically immutable
(esp with last change in seastar), the ::shared_ptr holding them
is not safe to share across shards.

Pre-create cpu x credentials and then move-hand them out in service
start-up instead.

Fixes assertion error in debug builds. And just maybe real memory
corruption in release.

Requires seastar tls change:
"Change server_credentials to copy dh_params input"

Message-Id: <1462187704-2056-1-git-send-email-calle@scylladb.com>
(cherry picked from commit 751ba2f0bf)
2016-05-02 15:37:43 +03:00
Pekka Enberg
bef19e7f9e release: prepare for 1.1.rc1 2016-04-29 08:49:10 +03:00
23 changed files with 268 additions and 53 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,6 +1,6 @@
#!/bin/sh
VERSION=666.development
VERSION=1.1.0
if test -f version
then

View File

@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
::shared_ptr<cql3::statements::create_table_statement> statement =
static_pointer_cast<cql3::statements::create_table_statement>(
parsed->prepare(db)->statement);
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
auto schema = statement->get_cf_meta_data();
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
}
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {

View File

@@ -162,6 +162,7 @@ modes = {
scylla_tests = [
'tests/mutation_test',
'tests/schema_registry_test',
'tests/canonical_mutation_test',
'tests/range_test',
'tests/types_test',

View File

@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
{
if (columns_changed) {
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
// #1255: Ignoring columns_changed deliberately.
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
remove_invalid_prepared_statements(ks_name, cf_name);
}
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)

15
init.cc
View File

@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
}
future<> f = make_ready_future<>();
::shared_ptr<server_credentials> creds;
std::shared_ptr<credentials_builder> creds;
if (ew != encrypt_what::none) {
// note: credentials are immutable after this, and ok to share across shards
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
creds = std::make_shared<credentials_builder>();
creds->set_dh_level(dh_params::level::MEDIUM);
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
ms_trust_store.empty() ? creds->set_system_trust().get() :
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
if (ms_trust_store.empty()) {
creds->set_system_trust().get();
} else {
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
}
}
// Init messaging_service
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
// #293 - do not stop anything
//engine().at_exit([] { return net::get_messaging_service().stop(); });
// Init failure_detector

View File

@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
, uint16_t port
, encrypt_what ew
, uint16_t ssl_port
, ::shared_ptr<seastar::tls::server_credentials> credentials
, std::shared_ptr<seastar::tls::credentials_builder> credentials
)
: _listen_address(ip)
, _port(port)
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
, _encrypt_what(ew)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
, _credentials(std::move(credentials))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
return nullptr;
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
return rpc::no_wait;
});
// Do this on just cpu 0, to avoid duplicate logs.
if (engine().cpu_id() == 0) {
if (_server_tls) {
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
}
logger.info("Starting Messaging Service on port {}", _port);
}
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {

View File

@@ -186,7 +186,7 @@ public:
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
~messaging_service();
public:
uint16_t port();

View File

@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
pw.retract();
} else {
pw.row_count() += row_count ? : 1;
std::move(rows_wr).end_rows().end_qr_partition();
}
}

View File

@@ -50,6 +50,7 @@ class result::partition_writer {
bool _static_row_added = false;
md5_hasher& _digest;
md5_hasher _digest_pos;
uint32_t& _row_count;
public:
partition_writer(
result_request request,
@@ -58,7 +59,8 @@ public:
ser::query_result__partitions& pw,
ser::vector_position pos,
ser::after_qr_partition__key w,
md5_hasher& digest)
md5_hasher& digest,
uint32_t& row_count)
: _request(request)
, _w(std::move(w))
, _slice(slice)
@@ -67,6 +69,7 @@ public:
, _pos(std::move(pos))
, _digest(digest)
, _digest_pos(digest)
, _row_count(row_count)
{ }
bool requested_digest() const {
@@ -98,6 +101,9 @@ public:
md5_hasher& digest() {
return _digest;
}
uint32_t& row_count() {
return _row_count;
}
};
class result::builder {
@@ -106,6 +112,7 @@ class result::builder {
const partition_slice& _slice;
ser::query_result__partitions _w;
result_request _request;
uint32_t _row_count = 0;
public:
builder(const partition_slice& slice, result_request request)
: _slice(slice)
@@ -130,21 +137,21 @@ public:
if (_request != result_request::only_result) {
key.feed_hash(_digest, s);
}
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
}
result build() {
std::move(_w).end_partitions().end_query_result();
switch (_request) {
case result_request::only_result:
return result(std::move(_out));
return result(std::move(_out), _row_count);
case result_request::only_digest: {
bytes_ostream buf;
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
return result(std::move(buf), result_digest(_digest.finalize_array()));
}
case result_request::result_and_digest:
return result(std::move(_out), result_digest(_digest.finalize_array()));
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
}
abort();
}

View File

@@ -96,14 +96,16 @@ public:
class result {
bytes_ostream _w;
stdx::optional<result_digest> _digest;
stdx::optional<uint32_t> _row_count;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w) : _w(std::move(w)) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
result(result&&) = default;
result(const result&) = default;
result& operator=(result&&) = default;
@@ -117,6 +119,10 @@ public:
return _digest;
}
const stdx::optional<uint32_t>& row_count() const {
return _row_count;
}
uint32_t calculate_row_count(const query::partition_slice&);
struct printer {

View File

@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
bytes_ostream w;
auto partitions = ser::writer_of_query_result(w).start_partitions();
std::experimental::optional<uint32_t> row_count = 0;
for (auto&& r : _partial) {
if (row_count) {
if (r->row_count()) {
row_count = row_count.value() + r->row_count().value();
} else {
row_count = std::experimental::nullopt;
}
}
result_view::do_with(*r, [&] (result_view rv) {
for (auto&& pv : rv._v.partitions()) {
partitions.add(pv);
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
std::move(partitions).end_partitions().end_query_result();
return make_foreign(make_lw_shared<query::result>(std::move(w)));
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
}
}

View File

@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
return make_ready_future<>();
case schema_registry_entry::sync_state::SYNCING:
return _synced_future;
case schema_registry_entry::sync_state::NOT_SYNCED:
case schema_registry_entry::sync_state::NOT_SYNCED: {
logger.debug("Syncing {}", _version);
_synced_promise = {};
do_with(std::move(syncer), [] (auto& syncer) {
auto f = do_with(std::move(syncer), [] (auto& syncer) {
return syncer();
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
if (_sync_state != sync_state::SYNCING) {
return;
}
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
_synced_promise.set_value();
}
});
_synced_future = _synced_promise.get_future();
_sync_state = schema_registry_entry::sync_state::SYNCING;
return _synced_future;
}
default:
assert(0);
}

Submodule seastar updated: dab58e4562...85bdfb7ad9

View File

@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
//
// The endpoint is the node from which 's' originated.
//
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
if (s->is_synced()) {
return make_ready_future<>();
}
// Serialize schema sync by always doing it on shard 0.
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync([endpoint, s] {
return s->registry_entry()->maybe_sync([s, endpoint] {
auto merge = [gs = global_schema_ptr(s), endpoint] {
schema_ptr s = gs.get();
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
return get_local_migration_manager().merge_schema_from(endpoint);
});
};
// Serialize schema sync by always doing it on shard 0.
if (engine().cpu_id() == 0) {
return merge();
} else {
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
schema_ptr s = gs.get();
schema_registry_entry& e = *s->registry_entry();
return e.maybe_sync(merge);
});
}
});
}

View File

@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
auto concurrent_fetch_starting_index = i;
auto p = shared_from_this();
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
query::partition_range& range = *i;
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
return rex->execute(timeout);
}, std::move(merger));
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
total_row_count += result->row_count() ? result->row_count().value() :
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
results.emplace_back(std::move(result));
if (i == ranges.end()) {
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
} else {
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
}
}).handle_exception([p] (std::exception_ptr eptr) {
p->handle_read_error(eptr);
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
results.reserve(ranges.size()/concurrency_factor + 1);
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {

View File

@@ -219,7 +219,7 @@ private:
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor);
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,

View File

@@ -1797,16 +1797,18 @@ future<> storage_service::start_native_transport() {
// return cserver->stop();
//});
::shared_ptr<seastar::tls::server_credentials> cred;
std::shared_ptr<seastar::tls::credentials_builder> cred;
auto addr = ipv4_addr{ip, port};
auto f = make_ready_future();
// main should have made sure values are clean and neatish
if (ceo.at("enabled") == "true") {
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
cred = std::make_shared<seastar::tls::credentials_builder>();
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
logger.info("Enabling encrypted CQL connections between client and server");
}
return f.then([cserver, addr, cred, keepalive] {
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
});
});

View File

@@ -32,6 +32,7 @@ boost_tests = [
'types_test',
'keys_test',
'mutation_test',
'schema_registry_test',
'range_test',
'mutation_reader_test',
'cql_query_test',

View File

@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
});
}
SEASTAR_TEST_CASE(test_result_row_count) {
return seastar::async([] {
auto s = make_schema();
auto now = gc_clock::now();
auto slice = partition_slice_builder(*s).build();
mutation m1(partition_key::from_single_value(*s, "key1"), s);
auto src = make_source({m1});
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
mutation m2(partition_key::from_single_value(*s, "key2"), s);
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
});
}

View File

@@ -0,0 +1,130 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_DYN_LINK
#include <seastar/core/thread.hh>
#include "tests/test-utils.hh"
#include "schema_registry.hh"
#include "schema_builder.hh"
#include "mutation_source_test.hh"
#include "disk-error-handler.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
static bytes random_column_name() {
return to_bytes(to_hex(make_blob(32)));
}
static schema_ptr random_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column(random_column_name(), bytes_type)
.build();
}
SEASTAR_TEST_CASE(test_async_loading) {
return seastar::async([] {
auto s1 = random_schema();
auto s2 = random_schema();
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
return make_ready_future<frozen_schema>(frozen_schema(s1));
}).get0();
BOOST_REQUIRE(s1_loaded);
BOOST_REQUIRE(s1_loaded->version() == s1->version());
auto s1_later = local_schema_registry().get_or_null(s1->version());
BOOST_REQUIRE(s1_later);
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
return later().then([s2] { return frozen_schema(s2); });
}).get0();
BOOST_REQUIRE(s2_loaded);
BOOST_REQUIRE(s2_loaded->version() == s2->version());
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
BOOST_REQUIRE(s2_later);
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return later(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
return seastar::async([] {
auto s = random_schema();
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
BOOST_REQUIRE(!s->is_synced());
promise<> fail_sync;
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
return fail_sync.get_future().then([] {
throw std::runtime_error("sync failed");
});
});
// concurrent maybe_sync should attach the the current one
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
fail_sync.set_value();
try {
f1.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
try {
f2.get();
BOOST_FAIL("Should have failed");
} catch (...) {
// expected
}
BOOST_REQUIRE(!s->is_synced());
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
BOOST_REQUIRE(s->is_synced());
});
}

View File

@@ -275,7 +275,7 @@ future<> cql_server::stop() {
}
future<>
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
_notifier = std::make_unique<event_notifier>(addr.port);
service::get_local_migration_manager().register_listener(_notifier.get());
service::get_local_storage_service().register_subscriber(_notifier.get());
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
server_socket ss;
try {
ss = creds
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
: engine().listen(make_ipv4_address(addr), lo);
} catch (...) {
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
}).then_wrapped([this, which, keepalive] (future<> f) {
try {
f.get();
} catch (const std::bad_alloc&) {
logger.debug("accept failed: {}, retrying", std::current_exception());
do_accepts(which, keepalive);
} catch (...) {
logger.debug("accept failed: {}", std::current_exception());
logger.warn("acccept failed: {}", std::current_exception());
do_accepts(which, keepalive);
}
});
}

View File

@@ -107,7 +107,7 @@ private:
cql_load_balance _lb;
public:
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive);
future<> stop();
public: