Compare commits
22 Commits
debug_form
...
scylla-1.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dce549f44f | ||
|
|
26a3302957 | ||
|
|
f796d8081b | ||
|
|
b850cb991c | ||
|
|
734cfa949a | ||
|
|
3606e3ab29 | ||
|
|
014284de00 | ||
|
|
f17764e74a | ||
|
|
8643028d0c | ||
|
|
a35f1d765a | ||
|
|
3116a92b0e | ||
|
|
dad312ce0a | ||
|
|
3cae56f3e3 | ||
|
|
656a10c4b8 | ||
|
|
c04b3de564 | ||
|
|
4964fe4cf0 | ||
|
|
e3ad3cf7d9 | ||
|
|
cef40627a7 | ||
|
|
995820c08a | ||
|
|
b78abd7649 | ||
|
|
d47c62b51c | ||
|
|
bef19e7f9e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.1.0
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
|
||||
::shared_ptr<cql3::statements::create_table_statement> statement =
|
||||
static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed->prepare(db)->statement);
|
||||
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
|
||||
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
|
||||
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
|
||||
auto schema = statement->get_cf_meta_data();
|
||||
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {
|
||||
|
||||
@@ -162,6 +162,7 @@ modes = {
|
||||
|
||||
scylla_tests = [
|
||||
'tests/mutation_test',
|
||||
'tests/schema_registry_test',
|
||||
'tests/canonical_mutation_test',
|
||||
'tests/range_test',
|
||||
'tests/types_test',
|
||||
|
||||
@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
|
||||
|
||||
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
|
||||
{
|
||||
if (columns_changed) {
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
// #1255: Ignoring columns_changed deliberately.
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
|
||||
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)
|
||||
|
||||
15
init.cc
15
init.cc
@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
|
||||
}
|
||||
|
||||
future<> f = make_ready_future<>();
|
||||
::shared_ptr<server_credentials> creds;
|
||||
std::shared_ptr<credentials_builder> creds;
|
||||
|
||||
if (ew != encrypt_what::none) {
|
||||
// note: credentials are immutable after this, and ok to share across shards
|
||||
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
|
||||
creds = std::make_shared<credentials_builder>();
|
||||
creds->set_dh_level(dh_params::level::MEDIUM);
|
||||
|
||||
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
|
||||
ms_trust_store.empty() ? creds->set_system_trust().get() :
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
if (ms_trust_store.empty()) {
|
||||
creds->set_system_trust().get();
|
||||
} else {
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Init messaging_service
|
||||
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return net::get_messaging_service().stop(); });
|
||||
// Init failure_detector
|
||||
|
||||
@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, uint16_t port
|
||||
, encrypt_what ew
|
||||
, uint16_t ssl_port
|
||||
, ::shared_ptr<seastar::tls::server_credentials> credentials
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
)
|
||||
: _listen_address(ip)
|
||||
, _port(port)
|
||||
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _encrypt_what(ew)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
|
||||
, _credentials(std::move(credentials))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
|
||||
return rpc::no_wait;
|
||||
});
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls) {
|
||||
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
|
||||
}
|
||||
logger.info("Starting Messaging Service on port {}", _port);
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
|
||||
@@ -186,7 +186,7 @@ public:
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
|
||||
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
~messaging_service();
|
||||
public:
|
||||
uint16_t port();
|
||||
|
||||
@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
pw.row_count() += row_count ? : 1;
|
||||
std::move(rows_wr).end_rows().end_qr_partition();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ class result::partition_writer {
|
||||
bool _static_row_added = false;
|
||||
md5_hasher& _digest;
|
||||
md5_hasher _digest_pos;
|
||||
uint32_t& _row_count;
|
||||
public:
|
||||
partition_writer(
|
||||
result_request request,
|
||||
@@ -58,7 +59,8 @@ public:
|
||||
ser::query_result__partitions& pw,
|
||||
ser::vector_position pos,
|
||||
ser::after_qr_partition__key w,
|
||||
md5_hasher& digest)
|
||||
md5_hasher& digest,
|
||||
uint32_t& row_count)
|
||||
: _request(request)
|
||||
, _w(std::move(w))
|
||||
, _slice(slice)
|
||||
@@ -67,6 +69,7 @@ public:
|
||||
, _pos(std::move(pos))
|
||||
, _digest(digest)
|
||||
, _digest_pos(digest)
|
||||
, _row_count(row_count)
|
||||
{ }
|
||||
|
||||
bool requested_digest() const {
|
||||
@@ -98,6 +101,9 @@ public:
|
||||
md5_hasher& digest() {
|
||||
return _digest;
|
||||
}
|
||||
uint32_t& row_count() {
|
||||
return _row_count;
|
||||
}
|
||||
};
|
||||
|
||||
class result::builder {
|
||||
@@ -106,6 +112,7 @@ class result::builder {
|
||||
const partition_slice& _slice;
|
||||
ser::query_result__partitions _w;
|
||||
result_request _request;
|
||||
uint32_t _row_count = 0;
|
||||
public:
|
||||
builder(const partition_slice& slice, result_request request)
|
||||
: _slice(slice)
|
||||
@@ -130,21 +137,21 @@ public:
|
||||
if (_request != result_request::only_result) {
|
||||
key.feed_hash(_digest, s);
|
||||
}
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
|
||||
}
|
||||
|
||||
result build() {
|
||||
std::move(_w).end_partitions().end_query_result();
|
||||
switch (_request) {
|
||||
case result_request::only_result:
|
||||
return result(std::move(_out));
|
||||
return result(std::move(_out), _row_count);
|
||||
case result_request::only_digest: {
|
||||
bytes_ostream buf;
|
||||
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
|
||||
return result(std::move(buf), result_digest(_digest.finalize_array()));
|
||||
}
|
||||
case result_request::result_and_digest:
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()));
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -96,14 +96,16 @@ public:
|
||||
class result {
|
||||
bytes_ostream _w;
|
||||
stdx::optional<result_digest> _digest;
|
||||
stdx::optional<uint32_t> _row_count;
|
||||
|
||||
public:
|
||||
class builder;
|
||||
class partition_writer;
|
||||
friend class result_merger;
|
||||
|
||||
result();
|
||||
result(bytes_ostream&& w) : _w(std::move(w)) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
|
||||
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
|
||||
result(result&&) = default;
|
||||
result(const result&) = default;
|
||||
result& operator=(result&&) = default;
|
||||
@@ -117,6 +119,10 @@ public:
|
||||
return _digest;
|
||||
}
|
||||
|
||||
const stdx::optional<uint32_t>& row_count() const {
|
||||
return _row_count;
|
||||
}
|
||||
|
||||
uint32_t calculate_row_count(const query::partition_slice&);
|
||||
|
||||
struct printer {
|
||||
|
||||
10
query.cc
10
query.cc
@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
bytes_ostream w;
|
||||
auto partitions = ser::writer_of_query_result(w).start_partitions();
|
||||
std::experimental::optional<uint32_t> row_count = 0;
|
||||
|
||||
for (auto&& r : _partial) {
|
||||
if (row_count) {
|
||||
if (r->row_count()) {
|
||||
row_count = row_count.value() + r->row_count().value();
|
||||
} else {
|
||||
row_count = std::experimental::nullopt;
|
||||
}
|
||||
}
|
||||
result_view::do_with(*r, [&] (result_view rv) {
|
||||
for (auto&& pv : rv._v.partitions()) {
|
||||
partitions.add(pv);
|
||||
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
std::move(partitions).end_partitions().end_query_result();
|
||||
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w)));
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
return make_ready_future<>();
|
||||
case schema_registry_entry::sync_state::SYNCING:
|
||||
return _synced_future;
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED:
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED: {
|
||||
logger.debug("Syncing {}", _version);
|
||||
_synced_promise = {};
|
||||
do_with(std::move(syncer), [] (auto& syncer) {
|
||||
auto f = do_with(std::move(syncer), [] (auto& syncer) {
|
||||
return syncer();
|
||||
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
if (_sync_state != sync_state::SYNCING) {
|
||||
return;
|
||||
}
|
||||
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
_synced_promise.set_value();
|
||||
}
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
return _synced_future;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: dab58e4562...85bdfb7ad9
@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
|
||||
//
|
||||
// The endpoint is the node from which 's' originated.
|
||||
//
|
||||
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
|
||||
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
|
||||
if (s->is_synced()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync([endpoint, s] {
|
||||
return s->registry_entry()->maybe_sync([s, endpoint] {
|
||||
auto merge = [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
|
||||
return get_local_migration_manager().merge_schema_from(endpoint);
|
||||
});
|
||||
};
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
if (engine().cpu_id() == 0) {
|
||||
return merge();
|
||||
} else {
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync(merge);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
|
||||
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
||||
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
||||
auto concurrent_fetch_starting_index = i;
|
||||
auto p = shared_from_this();
|
||||
|
||||
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
|
||||
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
|
||||
query::partition_range& range = *i;
|
||||
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
|
||||
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
|
||||
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
|
||||
return rex->execute(timeout);
|
||||
}, std::move(merger));
|
||||
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
|
||||
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
|
||||
total_row_count += result->row_count() ? result->row_count().value() :
|
||||
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
|
||||
results.emplace_back(std::move(result));
|
||||
if (i == ranges.end()) {
|
||||
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
|
||||
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
|
||||
} else {
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
|
||||
}
|
||||
}).handle_exception([p] (std::exception_ptr eptr) {
|
||||
p->handle_read_error(eptr);
|
||||
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
results.reserve(ranges.size()/concurrency_factor + 1);
|
||||
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
|
||||
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
|
||||
@@ -219,7 +219,7 @@ private:
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor);
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
@@ -1797,16 +1797,18 @@ future<> storage_service::start_native_transport() {
|
||||
// return cserver->stop();
|
||||
//});
|
||||
|
||||
::shared_ptr<seastar::tls::server_credentials> cred;
|
||||
std::shared_ptr<seastar::tls::credentials_builder> cred;
|
||||
auto addr = ipv4_addr{ip, port};
|
||||
auto f = make_ready_future();
|
||||
|
||||
// main should have made sure values are clean and neatish
|
||||
if (ceo.at("enabled") == "true") {
|
||||
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
|
||||
cred = std::make_shared<seastar::tls::credentials_builder>();
|
||||
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
|
||||
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
|
||||
logger.info("Enabling encrypted CQL connections between client and server");
|
||||
}
|
||||
return f.then([cserver, addr, cred, keepalive] {
|
||||
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
|
||||
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
|
||||
});
|
||||
});
|
||||
|
||||
1
test.py
1
test.py
@@ -32,6 +32,7 @@ boost_tests = [
|
||||
'types_test',
|
||||
'keys_test',
|
||||
'mutation_test',
|
||||
'schema_registry_test',
|
||||
'range_test',
|
||||
'mutation_reader_test',
|
||||
'cql_query_test',
|
||||
|
||||
@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
auto now = gc_clock::now();
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
|
||||
mutation m1(partition_key::from_single_value(*s, "key1"), s);
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(partition_key::from_single_value(*s, "key2"), s);
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
|
||||
130
tests/schema_registry_test.cc
Normal file
130
tests/schema_registry_test.cc
Normal file
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_DYN_LINK
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "schema_registry.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "mutation_source_test.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
static bytes random_column_name() {
|
||||
return to_bytes(to_hex(make_blob(32)));
|
||||
}
|
||||
|
||||
static schema_ptr random_schema() {
|
||||
return schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column(random_column_name(), bytes_type)
|
||||
.build();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_async_loading) {
|
||||
return seastar::async([] {
|
||||
auto s1 = random_schema();
|
||||
auto s2 = random_schema();
|
||||
|
||||
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
|
||||
return make_ready_future<frozen_schema>(frozen_schema(s1));
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s1_loaded);
|
||||
BOOST_REQUIRE(s1_loaded->version() == s1->version());
|
||||
auto s1_later = local_schema_registry().get_or_null(s1->version());
|
||||
BOOST_REQUIRE(s1_later);
|
||||
|
||||
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
|
||||
return later().then([s2] { return frozen_schema(s2); });
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s2_loaded);
|
||||
BOOST_REQUIRE(s2_loaded->version() == s2->version());
|
||||
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
|
||||
BOOST_REQUIRE(s2_later);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return later(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
promise<> fail_sync;
|
||||
|
||||
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
|
||||
return fail_sync.get_future().then([] {
|
||||
throw std::runtime_error("sync failed");
|
||||
});
|
||||
});
|
||||
|
||||
// concurrent maybe_sync should attach the the current one
|
||||
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
|
||||
|
||||
fail_sync.set_value();
|
||||
|
||||
try {
|
||||
f1.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try {
|
||||
f2.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
@@ -275,7 +275,7 @@ future<> cql_server::stop() {
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
|
||||
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
_notifier = std::make_unique<event_notifier>(addr.port);
|
||||
service::get_local_migration_manager().register_listener(_notifier.get());
|
||||
service::get_local_storage_service().register_subscriber(_notifier.get());
|
||||
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
|
||||
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
|
||||
: engine().listen(make_ipv4_address(addr), lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
|
||||
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
|
||||
}).then_wrapped([this, which, keepalive] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (const std::bad_alloc&) {
|
||||
logger.debug("accept failed: {}, retrying", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
} catch (...) {
|
||||
logger.debug("accept failed: {}", std::current_exception());
|
||||
logger.warn("acccept failed: {}", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ private:
|
||||
cql_load_balance _lb;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
|
||||
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
|
||||
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive);
|
||||
future<> stop();
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user