Compare commits
37 Commits
next
...
scylla-1.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
426316a4b7 | ||
|
|
3289010910 | ||
|
|
52c9723e04 | ||
|
|
3cc91eeb84 | ||
|
|
d67ee37bbc | ||
|
|
fcbe43cc87 | ||
|
|
ef79310b3c | ||
|
|
f7e81c7b7d | ||
|
|
54224dfaa0 | ||
|
|
e30199119c | ||
|
|
07ce4ec032 | ||
|
|
c7c18d9c0c | ||
|
|
2a4582ab9f | ||
|
|
ab5e23f6e7 | ||
|
|
fecea15a25 | ||
|
|
dce549f44f | ||
|
|
26a3302957 | ||
|
|
f796d8081b | ||
|
|
b850cb991c | ||
|
|
734cfa949a | ||
|
|
3606e3ab29 | ||
|
|
014284de00 | ||
|
|
f17764e74a | ||
|
|
8643028d0c | ||
|
|
a35f1d765a | ||
|
|
3116a92b0e | ||
|
|
dad312ce0a | ||
|
|
3cae56f3e3 | ||
|
|
656a10c4b8 | ||
|
|
c04b3de564 | ||
|
|
4964fe4cf0 | ||
|
|
e3ad3cf7d9 | ||
|
|
cef40627a7 | ||
|
|
995820c08a | ||
|
|
b78abd7649 | ||
|
|
d47c62b51c | ||
|
|
bef19e7f9e |
2
.gitmodules
vendored
2
.gitmodules
vendored
@@ -1,6 +1,6 @@
|
||||
[submodule "seastar"]
|
||||
path = seastar
|
||||
url = ../seastar
|
||||
url = ../scylla-seastar
|
||||
ignore = dirty
|
||||
[submodule "swagger-ui"]
|
||||
path = swagger-ui
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/bin/sh
|
||||
|
||||
VERSION=666.development
|
||||
VERSION=1.1.2
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -354,9 +354,12 @@ future<> auth::auth::setup_table(const sstring& name, const sstring& cql) {
|
||||
::shared_ptr<cql3::statements::create_table_statement> statement =
|
||||
static_pointer_cast<cql3::statements::create_table_statement>(
|
||||
parsed->prepare(db)->statement);
|
||||
// Origin sets "Legacy Cf Id" for the new table. We have no need to be
|
||||
// pre-2.1 compatible (afaik), so lets skip a whole lotta hoolaballo
|
||||
return statement->announce_migration(qp.proxy(), false).then([statement](bool) {});
|
||||
auto schema = statement->get_cf_meta_data();
|
||||
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
|
||||
|
||||
schema_builder b(schema);
|
||||
b.set_uuid(uuid);
|
||||
return service::get_local_migration_manager().announce_new_column_family(b.build(), false);
|
||||
}
|
||||
|
||||
future<bool> auth::auth::has_existing_users(const sstring& cfname, const sstring& def_user_name, const sstring& name_column) {
|
||||
|
||||
@@ -162,6 +162,7 @@ modes = {
|
||||
|
||||
scylla_tests = [
|
||||
'tests/mutation_test',
|
||||
'tests/schema_registry_test',
|
||||
'tests/canonical_mutation_test',
|
||||
'tests/range_test',
|
||||
'tests/types_test',
|
||||
|
||||
@@ -432,10 +432,9 @@ void query_processor::migration_subscriber::on_update_keyspace(const sstring& ks
|
||||
|
||||
void query_processor::migration_subscriber::on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool columns_changed)
|
||||
{
|
||||
if (columns_changed) {
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
// #1255: Ignoring columns_changed deliberately.
|
||||
log.info("Column definitions for {}.{} changed, invalidating related prepared statements", ks_name, cf_name);
|
||||
remove_invalid_prepared_statements(ks_name, cf_name);
|
||||
}
|
||||
|
||||
void query_processor::migration_subscriber::on_update_user_type(const sstring& ks_name, const sstring& type_name)
|
||||
|
||||
@@ -162,7 +162,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
|
||||
}
|
||||
|
||||
std::experimental::optional<sstring> tmp_value = {};
|
||||
if (has_property(KW_MINCOMPACTIONTHRESHOLD)) {
|
||||
if (has_property(KW_COMPACTION)) {
|
||||
if (get_compaction_options().count(KW_MINCOMPACTIONTHRESHOLD)) {
|
||||
tmp_value = get_compaction_options().at(KW_MINCOMPACTIONTHRESHOLD);
|
||||
}
|
||||
@@ -170,7 +170,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder) {
|
||||
int min_compaction_threshold = to_int(KW_MINCOMPACTIONTHRESHOLD, tmp_value, builder.get_min_compaction_threshold());
|
||||
|
||||
tmp_value = {};
|
||||
if (has_property(KW_MAXCOMPACTIONTHRESHOLD)) {
|
||||
if (has_property(KW_COMPACTION)) {
|
||||
if (get_compaction_options().count(KW_MAXCOMPACTIONTHRESHOLD)) {
|
||||
tmp_value = get_compaction_options().at(KW_MAXCOMPACTIONTHRESHOLD);
|
||||
}
|
||||
|
||||
@@ -973,7 +973,13 @@ column_family::load_new_sstables(std::vector<sstables::entry_descriptor> new_tab
|
||||
return parallel_for_each(new_tables, [this] (auto comps) {
|
||||
auto sst = make_lw_shared<sstables::sstable>(_schema->ks_name(), _schema->cf_name(), _config.datadir, comps.generation, comps.version, comps.format);
|
||||
return sst->load().then([this, sst] {
|
||||
return sst->mutate_sstable_level(0);
|
||||
// This sets in-memory level of sstable to 0.
|
||||
// When loading a migrated sstable, it's important to set it to level 0 because
|
||||
// leveled compaction relies on a level > 0 having no overlapping sstables.
|
||||
// If Scylla reboots before migrated sstable gets compacted, leveled strategy
|
||||
// is smart enough to detect a sstable that overlaps and set its in-memory
|
||||
// level to 0.
|
||||
return sst->set_sstable_level(0);
|
||||
}).then([this, sst] {
|
||||
auto first = sst->get_first_partition_key(*_schema);
|
||||
auto last = sst->get_last_partition_key(*_schema);
|
||||
|
||||
@@ -87,6 +87,10 @@ public:
|
||||
// [0x00, 0x80] == 1/512
|
||||
// [0xff, 0x80] == 1 - 1/512
|
||||
managed_bytes _data;
|
||||
|
||||
token() : _kind(kind::before_all_keys) {
|
||||
}
|
||||
|
||||
token(kind k, managed_bytes d) : _kind(std::move(k)), _data(std::move(d)) {
|
||||
}
|
||||
|
||||
|
||||
2
dist/common/scripts/scylla_sysconfig_setup
vendored
2
dist/common/scripts/scylla_sysconfig_setup
vendored
@@ -58,7 +58,7 @@ while [ $# -gt 0 ]; do
|
||||
shift 2
|
||||
;;
|
||||
"--setup-nic")
|
||||
SET_NIC=yes
|
||||
SETUP_NIC=1
|
||||
shift 1
|
||||
;;
|
||||
"--ami")
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -2,8 +2,8 @@ FROM centos:7
|
||||
|
||||
MAINTAINER Avi Kivity <avi@cloudius-systems.com>
|
||||
|
||||
RUN curl http://downloads.scylladb.com/rpm/centos/scylla-1.1.repo -o /etc/yum.repos.d/scylla.repo
|
||||
RUN yum -y install epel-release
|
||||
ADD scylla.repo /etc/yum.repos.d/
|
||||
RUN yum -y clean expire-cache
|
||||
RUN yum -y update
|
||||
RUN yum -y remove boost-thread boost-system
|
||||
|
||||
23
dist/docker/redhat/scylla.repo
vendored
23
dist/docker/redhat/scylla.repo
vendored
@@ -1,23 +0,0 @@
|
||||
[scylla]
|
||||
name=Scylla for Centos $releasever - $basearch
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/$basearch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-generic]
|
||||
name=Scylla for centos $releasever
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/centos/$releasever/noarch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-3rdparty]
|
||||
name=Scylla 3rdParty for Centos $releasever - $basearch
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/$basearch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
|
||||
[scylla-3rdparty-generic]
|
||||
name=Scylla 3rdParty for Centos $releasever
|
||||
baseurl=https://s3.amazonaws.com/downloads.scylladb.com/rpm/3rdparty/centos/$releasever/noarch/
|
||||
enabled=1
|
||||
gpgcheck=0
|
||||
@@ -1590,12 +1590,12 @@ bool gossiper::is_alive(inet_address ep) {
|
||||
if (ep == get_broadcast_address()) {
|
||||
return true;
|
||||
}
|
||||
auto eps = get_endpoint_state_for_endpoint(ep);
|
||||
auto it = endpoint_state_map.find(ep);
|
||||
// we could assert not-null, but having isAlive fail screws a node over so badly that
|
||||
// it's worth being defensive here so minor bugs don't cause disproportionate
|
||||
// badness. (See CASSANDRA-1463 for an example).
|
||||
if (eps) {
|
||||
return eps->is_alive();
|
||||
if (it != endpoint_state_map.end()) {
|
||||
return it->second.is_alive();
|
||||
} else {
|
||||
logger.warn("unknown endpoint {}", ep);
|
||||
return false;
|
||||
|
||||
15
init.cc
15
init.cc
@@ -64,18 +64,23 @@ void init_ms_fd_gossiper(sstring listen_address
|
||||
}
|
||||
|
||||
future<> f = make_ready_future<>();
|
||||
::shared_ptr<server_credentials> creds;
|
||||
std::shared_ptr<credentials_builder> creds;
|
||||
|
||||
if (ew != encrypt_what::none) {
|
||||
// note: credentials are immutable after this, and ok to share across shards
|
||||
creds = ::make_shared<server_credentials>(::make_shared<dh_params>(dh_params::level::MEDIUM));
|
||||
creds = std::make_shared<credentials_builder>();
|
||||
creds->set_dh_level(dh_params::level::MEDIUM);
|
||||
|
||||
creds->set_x509_key_file(ms_cert, ms_key, x509_crt_format::PEM).get();
|
||||
ms_trust_store.empty() ? creds->set_system_trust().get() :
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
if (ms_trust_store.empty()) {
|
||||
creds->set_system_trust().get();
|
||||
} else {
|
||||
creds->set_x509_trust_file(ms_trust_store, x509_crt_format::PEM).get();
|
||||
}
|
||||
}
|
||||
|
||||
// Init messaging_service
|
||||
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return net::get_messaging_service().stop(); });
|
||||
// Init failure_detector
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include "log.hh"
|
||||
#include <unordered_map>
|
||||
#include <algorithm>
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
|
||||
namespace locator {
|
||||
|
||||
@@ -339,25 +341,65 @@ range<token> token_metadata::get_primary_range_for(token right) {
|
||||
return get_primary_ranges_for({right}).front();
|
||||
}
|
||||
|
||||
boost::icl::interval<token>::interval_type
|
||||
token_metadata::range_to_interval(range<dht::token> r) {
|
||||
bool start_inclusive = false;
|
||||
bool end_inclusive = false;
|
||||
token start = dht::minimum_token();
|
||||
token end = dht::maximum_token();
|
||||
|
||||
if (r.start()) {
|
||||
start = r.start()->value();
|
||||
start_inclusive = r.start()->is_inclusive();
|
||||
}
|
||||
|
||||
if (r.end()) {
|
||||
end = r.end()->value();
|
||||
end_inclusive = r.end()->is_inclusive();
|
||||
}
|
||||
|
||||
if (start_inclusive == false && end_inclusive == false) {
|
||||
return boost::icl::interval<token>::open(std::move(start), std::move(end));
|
||||
} else if (start_inclusive == false && end_inclusive == true) {
|
||||
return boost::icl::interval<token>::left_open(std::move(start), std::move(end));
|
||||
} else if (start_inclusive == true && end_inclusive == false) {
|
||||
return boost::icl::interval<token>::right_open(std::move(start), std::move(end));
|
||||
} else {
|
||||
return boost::icl::interval<token>::closed(std::move(start), std::move(end));
|
||||
}
|
||||
}
|
||||
|
||||
void token_metadata::set_pending_ranges(const sstring& keyspace_name,
|
||||
std::unordered_multimap<range<token>, inet_address> new_pending_ranges) {
|
||||
if (new_pending_ranges.empty()) {
|
||||
_pending_ranges.erase(keyspace_name);
|
||||
_pending_ranges_map.erase(keyspace_name);
|
||||
_pending_ranges_interval_map.erase(keyspace_name);
|
||||
return;
|
||||
}
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> map;
|
||||
for (const auto& x : new_pending_ranges) {
|
||||
map[x.first].emplace(x.second);
|
||||
}
|
||||
|
||||
// construct a interval map to speed up the search
|
||||
_pending_ranges_interval_map[keyspace_name] = {};
|
||||
for (const auto& m : map) {
|
||||
_pending_ranges_interval_map[keyspace_name] +=
|
||||
std::make_pair(range_to_interval(m.first), m.second);
|
||||
}
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
_pending_ranges_map[keyspace_name] = std::move(map);
|
||||
}
|
||||
|
||||
std::unordered_multimap<range<token>, inet_address>&
|
||||
token_metadata::get_pending_ranges_mm(sstring keyspace_name) {
|
||||
return _pending_ranges[keyspace_name];
|
||||
}
|
||||
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>>
|
||||
const std::unordered_map<range<token>, std::unordered_set<inet_address>>&
|
||||
token_metadata::get_pending_ranges(sstring keyspace_name) {
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> ret;
|
||||
for (auto x : get_pending_ranges_mm(keyspace_name)) {
|
||||
auto& range_token = x.first;
|
||||
auto& ep = x.second;
|
||||
auto it = ret.find(range_token);
|
||||
if (it != ret.end()) {
|
||||
it->second.emplace(ep);
|
||||
} else {
|
||||
ret.emplace(range_token, std::unordered_set<inet_address>{ep});
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
return _pending_ranges_map[keyspace_name];
|
||||
}
|
||||
|
||||
std::vector<range<token>>
|
||||
@@ -378,7 +420,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
|
||||
if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) {
|
||||
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name);
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -463,7 +505,7 @@ void token_metadata::calculate_pending_ranges(abstract_replication_strategy& str
|
||||
all_left_metadata.remove_endpoint(endpoint);
|
||||
}
|
||||
|
||||
_pending_ranges[keyspace_name] = std::move(new_pending_ranges);
|
||||
set_pending_ranges(keyspace_name, std::move(new_pending_ranges));
|
||||
|
||||
if (logger.is_enabled(logging::log_level::debug)) {
|
||||
logger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
|
||||
@@ -508,14 +550,23 @@ void token_metadata::add_moving_endpoint(token t, inet_address endpoint) {
|
||||
}
|
||||
|
||||
std::vector<gms::inet_address> token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) {
|
||||
// Fast path 0: no pending ranges at all
|
||||
if (_pending_ranges_interval_map.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// Fast path 1: no pending ranges for this keyspace_name
|
||||
if (_pending_ranges_interval_map[keyspace_name].empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
// Slow path: lookup pending ranges
|
||||
std::vector<gms::inet_address> endpoints;
|
||||
auto ranges = get_pending_ranges(keyspace_name);
|
||||
for (auto& x : ranges) {
|
||||
if (x.first.contains(token, dht::token_comparator())) {
|
||||
for (auto& addr : x.second) {
|
||||
endpoints.push_back(addr);
|
||||
}
|
||||
}
|
||||
auto interval = range_to_interval(range<dht::token>(token));
|
||||
auto it = _pending_ranges_interval_map[keyspace_name].find(interval);
|
||||
if (it != _pending_ranges_interval_map[keyspace_name].end()) {
|
||||
// interval_map does not work with std::vector, convert to std::vector of ips
|
||||
endpoints = std::vector<gms::inet_address>(it->second.begin(), it->second.end());
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
@@ -46,6 +46,8 @@
|
||||
#include "utils/UUID.hh"
|
||||
#include <experimental/optional>
|
||||
#include <boost/range/iterator_range.hpp>
|
||||
#include <boost/icl/interval.hpp>
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include "query-request.hh"
|
||||
#include "range.hh"
|
||||
|
||||
@@ -144,6 +146,8 @@ private:
|
||||
std::unordered_map<token, inet_address> _moving_endpoints;
|
||||
|
||||
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> _pending_ranges;
|
||||
std::unordered_map<sstring, std::unordered_map<range<token>, std::unordered_set<inet_address>>> _pending_ranges_map;
|
||||
std::unordered_map<sstring, boost::icl::interval_map<token, std::unordered_set<inet_address>>> _pending_ranges_interval_map;
|
||||
|
||||
std::vector<token> _sorted_tokens;
|
||||
|
||||
@@ -608,13 +612,15 @@ public:
|
||||
std::vector<range<token>> get_primary_ranges_for(std::unordered_set<token> tokens);
|
||||
|
||||
range<token> get_primary_range_for(token right);
|
||||
static boost::icl::interval<token>::interval_type range_to_interval(range<dht::token> r);
|
||||
|
||||
private:
|
||||
std::unordered_multimap<range<token>, inet_address>& get_pending_ranges_mm(sstring keyspace_name);
|
||||
void set_pending_ranges(const sstring& keyspace_name, std::unordered_multimap<range<token>, inet_address> new_pending_ranges);
|
||||
|
||||
public:
|
||||
/** a mutable map may be returned but caller should not modify it */
|
||||
std::unordered_map<range<token>, std::unordered_set<inet_address>> get_pending_ranges(sstring keyspace_name);
|
||||
const std::unordered_map<range<token>, std::unordered_set<inet_address>>& get_pending_ranges(sstring keyspace_name);
|
||||
|
||||
std::vector<range<token>> get_pending_ranges(sstring keyspace_name, inet_address endpoint);
|
||||
/**
|
||||
|
||||
5
main.cc
5
main.cc
@@ -592,6 +592,11 @@ int main(int ac, char** av) {
|
||||
engine().at_exit([] {
|
||||
return repair_shutdown(service::get_local_storage_service().db());
|
||||
});
|
||||
engine().at_exit([&db] {
|
||||
return db.invoke_on_all([](auto& db) {
|
||||
return db.get_compaction_manager().stop();
|
||||
});
|
||||
});
|
||||
}).or_terminate();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -225,7 +225,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, uint16_t port
|
||||
, encrypt_what ew
|
||||
, uint16_t ssl_port
|
||||
, ::shared_ptr<seastar::tls::server_credentials> credentials
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
)
|
||||
: _listen_address(ip)
|
||||
, _port(port)
|
||||
@@ -233,7 +233,7 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
, _encrypt_what(ew)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
|
||||
, _credentials(std::move(credentials))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
@@ -255,6 +255,13 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
|
||||
return rpc::no_wait;
|
||||
});
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls) {
|
||||
logger.info("Starting Encrypted Messaging Service on SSL port {}", _ssl_port);
|
||||
}
|
||||
logger.info("Starting Messaging Service on port {}", _port);
|
||||
}
|
||||
}
|
||||
|
||||
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
|
||||
|
||||
@@ -186,7 +186,7 @@ public:
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
|
||||
uint16_t ssl_port, ::shared_ptr<seastar::tls::server_credentials>);
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
~messaging_service();
|
||||
public:
|
||||
uint16_t port();
|
||||
|
||||
@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
pw.row_count() += row_count ? : 1;
|
||||
std::move(rows_wr).end_rows().end_qr_partition();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ class result::partition_writer {
|
||||
bool _static_row_added = false;
|
||||
md5_hasher& _digest;
|
||||
md5_hasher _digest_pos;
|
||||
uint32_t& _row_count;
|
||||
public:
|
||||
partition_writer(
|
||||
result_request request,
|
||||
@@ -58,7 +59,8 @@ public:
|
||||
ser::query_result__partitions& pw,
|
||||
ser::vector_position pos,
|
||||
ser::after_qr_partition__key w,
|
||||
md5_hasher& digest)
|
||||
md5_hasher& digest,
|
||||
uint32_t& row_count)
|
||||
: _request(request)
|
||||
, _w(std::move(w))
|
||||
, _slice(slice)
|
||||
@@ -67,6 +69,7 @@ public:
|
||||
, _pos(std::move(pos))
|
||||
, _digest(digest)
|
||||
, _digest_pos(digest)
|
||||
, _row_count(row_count)
|
||||
{ }
|
||||
|
||||
bool requested_digest() const {
|
||||
@@ -98,6 +101,9 @@ public:
|
||||
md5_hasher& digest() {
|
||||
return _digest;
|
||||
}
|
||||
uint32_t& row_count() {
|
||||
return _row_count;
|
||||
}
|
||||
};
|
||||
|
||||
class result::builder {
|
||||
@@ -106,6 +112,7 @@ class result::builder {
|
||||
const partition_slice& _slice;
|
||||
ser::query_result__partitions _w;
|
||||
result_request _request;
|
||||
uint32_t _row_count = 0;
|
||||
public:
|
||||
builder(const partition_slice& slice, result_request request)
|
||||
: _slice(slice)
|
||||
@@ -130,21 +137,21 @@ public:
|
||||
if (_request != result_request::only_result) {
|
||||
key.feed_hash(_digest, s);
|
||||
}
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
|
||||
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
|
||||
}
|
||||
|
||||
result build() {
|
||||
std::move(_w).end_partitions().end_query_result();
|
||||
switch (_request) {
|
||||
case result_request::only_result:
|
||||
return result(std::move(_out));
|
||||
return result(std::move(_out), _row_count);
|
||||
case result_request::only_digest: {
|
||||
bytes_ostream buf;
|
||||
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
|
||||
return result(std::move(buf), result_digest(_digest.finalize_array()));
|
||||
}
|
||||
case result_request::result_and_digest:
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()));
|
||||
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
|
||||
}
|
||||
abort();
|
||||
}
|
||||
|
||||
@@ -96,14 +96,16 @@ public:
|
||||
class result {
|
||||
bytes_ostream _w;
|
||||
stdx::optional<result_digest> _digest;
|
||||
stdx::optional<uint32_t> _row_count;
|
||||
|
||||
public:
|
||||
class builder;
|
||||
class partition_writer;
|
||||
friend class result_merger;
|
||||
|
||||
result();
|
||||
result(bytes_ostream&& w) : _w(std::move(w)) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
|
||||
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
|
||||
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
|
||||
result(result&&) = default;
|
||||
result(const result&) = default;
|
||||
result& operator=(result&&) = default;
|
||||
@@ -117,6 +119,10 @@ public:
|
||||
return _digest;
|
||||
}
|
||||
|
||||
const stdx::optional<uint32_t>& row_count() const {
|
||||
return _row_count;
|
||||
}
|
||||
|
||||
uint32_t calculate_row_count(const query::partition_slice&);
|
||||
|
||||
struct printer {
|
||||
|
||||
10
query.cc
10
query.cc
@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
bytes_ostream w;
|
||||
auto partitions = ser::writer_of_query_result(w).start_partitions();
|
||||
std::experimental::optional<uint32_t> row_count = 0;
|
||||
|
||||
for (auto&& r : _partial) {
|
||||
if (row_count) {
|
||||
if (r->row_count()) {
|
||||
row_count = row_count.value() + r->row_count().value();
|
||||
} else {
|
||||
row_count = std::experimental::nullopt;
|
||||
}
|
||||
}
|
||||
result_view::do_with(*r, [&] (result_view rv) {
|
||||
for (auto&& pv : rv._v.partitions()) {
|
||||
partitions.add(pv);
|
||||
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
|
||||
|
||||
std::move(partitions).end_partitions().end_query_result();
|
||||
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w)));
|
||||
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -203,12 +203,15 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
return make_ready_future<>();
|
||||
case schema_registry_entry::sync_state::SYNCING:
|
||||
return _synced_future;
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED:
|
||||
case schema_registry_entry::sync_state::NOT_SYNCED: {
|
||||
logger.debug("Syncing {}", _version);
|
||||
_synced_promise = {};
|
||||
do_with(std::move(syncer), [] (auto& syncer) {
|
||||
auto f = do_with(std::move(syncer), [] (auto& syncer) {
|
||||
return syncer();
|
||||
}).then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
if (_sync_state != sync_state::SYNCING) {
|
||||
return;
|
||||
}
|
||||
@@ -222,9 +225,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
_synced_promise.set_value();
|
||||
}
|
||||
});
|
||||
_synced_future = _synced_promise.get_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
return _synced_future;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: dab58e4562...b80564dff1
@@ -710,20 +710,28 @@ public static class MigrationsSerializer implements IVersionedSerializer<Collect
|
||||
//
|
||||
// The endpoint is the node from which 's' originated.
|
||||
//
|
||||
// FIXME: Avoid the sync if the source was/is synced by schema_tables::merge_schema().
|
||||
static future<> maybe_sync(const schema_ptr& s, net::messaging_service::msg_addr endpoint) {
|
||||
if (s->is_synced()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync([endpoint, s] {
|
||||
return s->registry_entry()->maybe_sync([s, endpoint] {
|
||||
auto merge = [gs = global_schema_ptr(s), endpoint] {
|
||||
schema_ptr s = gs.get();
|
||||
logger.debug("Syncing schema of {}.{} (v={}) with {}", s->ks_name(), s->cf_name(), s->version(), endpoint);
|
||||
return get_local_migration_manager().merge_schema_from(endpoint);
|
||||
});
|
||||
};
|
||||
|
||||
// Serialize schema sync by always doing it on shard 0.
|
||||
if (engine().cpu_id() == 0) {
|
||||
return merge();
|
||||
} else {
|
||||
return smp::submit_to(0, [gs = global_schema_ptr(s), endpoint, merge] {
|
||||
schema_ptr s = gs.get();
|
||||
schema_registry_entry& e = *s->registry_entry();
|
||||
return e.maybe_sync(merge);
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2260,14 +2260,14 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
|
||||
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
|
||||
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor) {
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count) {
|
||||
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
|
||||
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
|
||||
std::vector<::shared_ptr<abstract_read_executor>> exec;
|
||||
auto concurrent_fetch_starting_index = i;
|
||||
auto p = shared_from_this();
|
||||
|
||||
while (i != ranges.end() && std::distance(i, concurrent_fetch_starting_index) < concurrency_factor) {
|
||||
while (i != ranges.end() && std::distance(concurrent_fetch_starting_index, i) < concurrency_factor) {
|
||||
query::partition_range& range = *i;
|
||||
std::vector<gms::inet_address> live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
|
||||
std::vector<gms::inet_address> filtered_endpoints = filter_for_query(cl, ks, live_endpoints);
|
||||
@@ -2325,13 +2325,15 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
|
||||
return rex->execute(timeout);
|
||||
}, std::move(merger));
|
||||
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout]
|
||||
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count]
|
||||
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
|
||||
total_row_count += result->row_count() ? result->row_count().value() :
|
||||
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
|
||||
results.emplace_back(std::move(result));
|
||||
if (i == ranges.end()) {
|
||||
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
|
||||
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
|
||||
} else {
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor);
|
||||
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, total_row_count);
|
||||
}
|
||||
}).handle_exception([p] (std::exception_ptr eptr) {
|
||||
p->handle_read_error(eptr);
|
||||
@@ -2363,6 +2365,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results;
|
||||
results.reserve(ranges.size()/concurrency_factor + 1);
|
||||
logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
|
||||
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
|
||||
|
||||
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor)
|
||||
.then([](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
|
||||
|
||||
@@ -219,7 +219,7 @@ private:
|
||||
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
|
||||
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
|
||||
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor);
|
||||
std::vector<query::partition_range>&& ranges, int concurrency_factor, uint32_t total_row_count = 0);
|
||||
|
||||
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
|
||||
lw_shared_ptr<query::read_command> cmd,
|
||||
|
||||
@@ -1797,16 +1797,18 @@ future<> storage_service::start_native_transport() {
|
||||
// return cserver->stop();
|
||||
//});
|
||||
|
||||
::shared_ptr<seastar::tls::server_credentials> cred;
|
||||
std::shared_ptr<seastar::tls::credentials_builder> cred;
|
||||
auto addr = ipv4_addr{ip, port};
|
||||
auto f = make_ready_future();
|
||||
|
||||
// main should have made sure values are clean and neatish
|
||||
if (ceo.at("enabled") == "true") {
|
||||
cred = ::make_shared<seastar::tls::server_credentials>(::make_shared<seastar::tls::dh_params>(seastar::tls::dh_params::level::MEDIUM));
|
||||
cred = std::make_shared<seastar::tls::credentials_builder>();
|
||||
cred->set_dh_level(seastar::tls::dh_params::level::MEDIUM);
|
||||
f = cred->set_x509_key_file(ceo.at("certificate"), ceo.at("keyfile"), seastar::tls::x509_crt_format::PEM);
|
||||
logger.info("Enabling encrypted CQL connections between client and server");
|
||||
}
|
||||
return f.then([cserver, addr, cred, keepalive] {
|
||||
return f.then([cserver, addr, cred = std::move(cred), keepalive] {
|
||||
return cserver->invoke_on_all(&transport::cql_server::listen, addr, cred, keepalive);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -107,10 +107,12 @@ public:
|
||||
|
||||
// ensure all SSTables are in the manifest
|
||||
for (auto& sstable : sstables) {
|
||||
// unconditionally add a sstable to a list of its level.
|
||||
manifest.add(sstable);
|
||||
}
|
||||
|
||||
for (auto i = 1U; i < manifest._generations.size(); i++) {
|
||||
// send overlapping sstables (with level > 0) to level 0, if any.
|
||||
manifest.repair_overlapping_sstables(i);
|
||||
}
|
||||
|
||||
@@ -123,36 +125,8 @@ public:
|
||||
if (level >= _generations.size()) {
|
||||
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1)));
|
||||
}
|
||||
#if 0
|
||||
logDistribution();
|
||||
#endif
|
||||
if (can_add_sstable(sstable)) {
|
||||
// adding the sstable does not cause overlap in the level
|
||||
|
||||
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
|
||||
|
||||
_generations[level].push_back(sstable);
|
||||
} else {
|
||||
// this can happen if:
|
||||
// * a compaction has promoted an overlapping sstable to the given level, or
|
||||
// was also supposed to add an sstable at the given level.
|
||||
// * we are moving sstables from unrepaired to repaired and the sstable
|
||||
// would cause overlap
|
||||
//
|
||||
// The add(..):ed sstable will be sent to level 0
|
||||
#if 0
|
||||
try
|
||||
{
|
||||
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
|
||||
reader.reloadSSTableMetadata();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
|
||||
}
|
||||
#endif
|
||||
_generations[0].push_back(sstable);
|
||||
}
|
||||
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
|
||||
_generations[level].push_back(sstable);
|
||||
}
|
||||
|
||||
#if 0
|
||||
@@ -258,20 +232,8 @@ public:
|
||||
|
||||
void send_back_to_L0(sstables::shared_sstable& sstable) {
|
||||
remove(sstable);
|
||||
#if 0
|
||||
try
|
||||
{
|
||||
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
|
||||
sstable.reloadSSTableMetadata();
|
||||
add(sstable);
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
throw new RuntimeException("Could not reload sstable meta data", e);
|
||||
}
|
||||
#else
|
||||
_generations[0].push_back(sstable);
|
||||
#endif
|
||||
sstable->set_sstable_level(0);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
@@ -1794,6 +1794,20 @@ double sstable::get_compression_ratio() const {
|
||||
}
|
||||
}
|
||||
|
||||
void sstable::set_sstable_level(uint32_t new_level) {
|
||||
auto entry = _statistics.contents.find(metadata_type::Stats);
|
||||
if (entry == _statistics.contents.end()) {
|
||||
return;
|
||||
}
|
||||
auto& p = entry->second;
|
||||
if (!p) {
|
||||
throw std::runtime_error("Statistics is malformed");
|
||||
}
|
||||
stats_metadata& s = *static_cast<stats_metadata *>(p.get());
|
||||
sstlog.debug("set level of {} with generation {} from {} to {}", get_filename(), _generation, s.sstable_level, new_level);
|
||||
s.sstable_level = new_level;
|
||||
}
|
||||
|
||||
future<> sstable::mutate_sstable_level(uint32_t new_level) {
|
||||
if (!has_component(component_type::Statistics)) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -529,6 +529,9 @@ public:
|
||||
return get_stats_metadata().sstable_level;
|
||||
}
|
||||
|
||||
// This will change sstable level only in memory.
|
||||
void set_sstable_level(uint32_t);
|
||||
|
||||
double get_compression_ratio() const;
|
||||
|
||||
future<> mutate_sstable_level(uint32_t);
|
||||
|
||||
@@ -85,7 +85,6 @@ struct send_info {
|
||||
};
|
||||
|
||||
future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
net::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id).then([si, fm_size] {
|
||||
@@ -100,26 +99,27 @@ future<stop_iteration> do_send_mutations(auto si, auto fm) {
|
||||
sslog.error("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION to {}: {}", si->plan_id, si->id, ep);
|
||||
}
|
||||
si->mutations_done.broken();
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
return stop_iteration::no;
|
||||
});
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}
|
||||
|
||||
future<> send_mutations(auto si) {
|
||||
auto& cf = si->db.find_column_family(si->cf_id);
|
||||
auto& priority = service::get_local_streaming_read_priority();
|
||||
return do_with(cf.make_reader(cf.schema(), si->pr, priority), [si] (auto& reader) {
|
||||
return repeat([si, &reader] () {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
return repeat([si, &reader] {
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, &reader] {
|
||||
return reader().then([si] (auto mopt) {
|
||||
if (mopt && si->db.column_family_exists(si->cf_id)) {
|
||||
si->mutations_nr++;
|
||||
auto fm = frozen_mutation(*mopt);
|
||||
return do_send_mutations(si, std::move(fm));
|
||||
} else {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
}
|
||||
});
|
||||
}).finally([] {
|
||||
get_local_stream_manager().mutation_send_limiter().signal();
|
||||
});
|
||||
});
|
||||
}).then([si] {
|
||||
@@ -132,7 +132,7 @@ void stream_transfer_task::start() {
|
||||
auto cf_id = this->cf_id;
|
||||
auto id = net::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
|
||||
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
|
||||
parallel_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
do_for_each(_ranges.begin(), _ranges.end(), [this, plan_id, cf_id, id] (auto range) {
|
||||
unsigned shard_begin = range.start() ? dht::shard_of(range.start()->value()) : 0;
|
||||
unsigned shard_end = range.end() ? dht::shard_of(range.end()->value()) + 1 : smp::count;
|
||||
auto cf_id = this->cf_id;
|
||||
|
||||
1
test.py
1
test.py
@@ -32,6 +32,7 @@ boost_tests = [
|
||||
'types_test',
|
||||
'keys_test',
|
||||
'mutation_test',
|
||||
'schema_registry_test',
|
||||
'range_test',
|
||||
'mutation_reader_test',
|
||||
'cql_query_test',
|
||||
|
||||
@@ -444,3 +444,36 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
return seastar::async([] {
|
||||
auto s = make_schema();
|
||||
auto now = gc_clock::now();
|
||||
auto slice = partition_slice_builder(*s).build();
|
||||
|
||||
mutation m1(partition_key::from_single_value(*s, "key1"), s);
|
||||
|
||||
auto src = make_source({m1});
|
||||
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(partition_key::from_single_value(*s, "key2"), s);
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -23,10 +23,15 @@
|
||||
#define BOOST_TEST_MODULE core
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include "boost/icl/interval.hpp"
|
||||
#include "boost/icl/interval_map.hpp"
|
||||
#include <unordered_set>
|
||||
|
||||
#include "query-request.hh"
|
||||
#include "schema_builder.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
@@ -447,3 +452,56 @@ BOOST_AUTO_TEST_CASE(range_overlap_tests) {
|
||||
// [3,4) and (4,5]
|
||||
BOOST_REQUIRE(range<unsigned>({3}, {{4, false}}).overlaps(range<unsigned>({{4, false}}, {5}), unsigned_comparator()) == false);
|
||||
}
|
||||
|
||||
auto get_item(std::string left, std::string right, std::string val) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
auto l = dht::global_partitioner().from_sstring(left);
|
||||
auto r = dht::global_partitioner().from_sstring(right);
|
||||
auto rg = range<dht::token>({{l, false}}, {r});
|
||||
value_type v{val};
|
||||
return std::make_pair(locator::token_metadata::range_to_interval(rg), v);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_range_interval_map) {
|
||||
using value_type = std::unordered_set<std::string>;
|
||||
using token = dht::token;
|
||||
boost::icl::interval_map<token, value_type> mymap;
|
||||
|
||||
mymap += get_item("1", "5", "A");
|
||||
mymap += get_item("5", "8", "B");
|
||||
mymap += get_item("1", "3", "C");
|
||||
mymap += get_item("3", "8", "D");
|
||||
|
||||
std::cout << "my map: " << "\n";
|
||||
for (auto x : mymap) {
|
||||
std::cout << x.first << " -> ";
|
||||
for (auto s : x.second) {
|
||||
std::cout << s << ", ";
|
||||
}
|
||||
std::cout << "\n";
|
||||
}
|
||||
|
||||
auto search_item = [&mymap] (std::string val) {
|
||||
auto tok = dht::global_partitioner().from_sstring(val);
|
||||
auto search = range<token>(tok);
|
||||
auto it = mymap.find(locator::token_metadata::range_to_interval(search));
|
||||
if (it != mymap.end()) {
|
||||
std::cout << "Found OK:" << " token = " << tok << " in range: " << it->first << "\n";
|
||||
return true;
|
||||
} else {
|
||||
std::cout << "Found NO:" << " token = " << tok << "\n";
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
BOOST_REQUIRE(search_item("0") == false);
|
||||
BOOST_REQUIRE(search_item("1") == false);
|
||||
BOOST_REQUIRE(search_item("2") == true);
|
||||
BOOST_REQUIRE(search_item("3") == true);
|
||||
BOOST_REQUIRE(search_item("4") == true);
|
||||
BOOST_REQUIRE(search_item("5") == true);
|
||||
BOOST_REQUIRE(search_item("6") == true);
|
||||
BOOST_REQUIRE(search_item("7") == true);
|
||||
BOOST_REQUIRE(search_item("8") == true);
|
||||
BOOST_REQUIRE(search_item("9") == false);
|
||||
}
|
||||
|
||||
130
tests/schema_registry_test.cc
Normal file
130
tests/schema_registry_test.cc
Normal file
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define BOOST_TEST_DYN_LINK
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
#include "tests/test-utils.hh"
|
||||
#include "schema_registry.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "mutation_source_test.hh"
|
||||
|
||||
#include "disk-error-handler.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
|
||||
static bytes random_column_name() {
|
||||
return to_bytes(to_hex(make_blob(32)));
|
||||
}
|
||||
|
||||
static schema_ptr random_schema() {
|
||||
return schema_builder("ks", "cf")
|
||||
.with_column("pk", bytes_type, column_kind::partition_key)
|
||||
.with_column(random_column_name(), bytes_type)
|
||||
.build();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_async_loading) {
|
||||
return seastar::async([] {
|
||||
auto s1 = random_schema();
|
||||
auto s2 = random_schema();
|
||||
|
||||
auto s1_loaded = local_schema_registry().get_or_load(s1->version(), [s1] (table_schema_version) {
|
||||
return make_ready_future<frozen_schema>(frozen_schema(s1));
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s1_loaded);
|
||||
BOOST_REQUIRE(s1_loaded->version() == s1->version());
|
||||
auto s1_later = local_schema_registry().get_or_null(s1->version());
|
||||
BOOST_REQUIRE(s1_later);
|
||||
|
||||
auto s2_loaded = local_schema_registry().get_or_load(s2->version(), [s2] (table_schema_version) {
|
||||
return later().then([s2] { return frozen_schema(s2); });
|
||||
}).get0();
|
||||
|
||||
BOOST_REQUIRE(s2_loaded);
|
||||
BOOST_REQUIRE(s2_loaded->version() == s2->version());
|
||||
auto s2_later = local_schema_registry().get_or_null(s2_loaded->version());
|
||||
BOOST_REQUIRE(s2_later);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_doesnt_defer) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_schema_is_synced_when_syncer_defers) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
s->registry_entry()->maybe_sync([] { return later(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_failed_sync_can_be_retried) {
|
||||
return seastar::async([] {
|
||||
auto s = random_schema();
|
||||
s = local_schema_registry().get_or_load(s->version(), [s] (table_schema_version) { return frozen_schema(s); });
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
promise<> fail_sync;
|
||||
|
||||
auto f1 = s->registry_entry()->maybe_sync([&fail_sync] () mutable {
|
||||
return fail_sync.get_future().then([] {
|
||||
throw std::runtime_error("sync failed");
|
||||
});
|
||||
});
|
||||
|
||||
// concurrent maybe_sync should attach the the current one
|
||||
auto f2 = s->registry_entry()->maybe_sync([] { return make_ready_future<>(); });
|
||||
|
||||
fail_sync.set_value();
|
||||
|
||||
try {
|
||||
f1.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
try {
|
||||
f2.get();
|
||||
BOOST_FAIL("Should have failed");
|
||||
} catch (...) {
|
||||
// expected
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(!s->is_synced());
|
||||
|
||||
s->registry_entry()->maybe_sync([] { return make_ready_future<>(); }).get();
|
||||
BOOST_REQUIRE(s->is_synced());
|
||||
});
|
||||
}
|
||||
@@ -1948,6 +1948,45 @@ SEASTAR_TEST_CASE(leveled_06) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(leveled_07) {
|
||||
// Check that sstable, with level > 0, that overlaps with another in the same level is sent back to L0.
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {}, {}, {}, utf8_type));
|
||||
|
||||
column_family::config cfg;
|
||||
compaction_manager cm;
|
||||
cfg.enable_disk_writes = false;
|
||||
cfg.enable_commitlog = false;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), cm);
|
||||
cf->mark_ready_for_writes();
|
||||
|
||||
auto key_and_token_pair = token_generation_for_current_shard(5);
|
||||
auto min_key = key_and_token_pair[0].first;
|
||||
auto max_key = key_and_token_pair[key_and_token_pair.size()-1].first;
|
||||
|
||||
// Creating two sstables which key range overlap.
|
||||
add_sstable_for_leveled_test(cf, /*gen*/1, /*data_size*/0, /*level*/1, min_key, max_key);
|
||||
BOOST_REQUIRE(cf->get_sstables()->size() == 1);
|
||||
|
||||
add_sstable_for_leveled_test(cf, /*gen*/2, /*data_size*/0, /*level*/1, key_and_token_pair[1].first, max_key);
|
||||
BOOST_REQUIRE(cf->get_sstables()->size() == 2);
|
||||
|
||||
BOOST_REQUIRE(sstable_overlaps(cf, 1, 2) == true);
|
||||
|
||||
auto max_sstable_size_in_mb = 1;
|
||||
auto candidates = get_candidates_for_leveled_strategy(*cf);
|
||||
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
|
||||
BOOST_REQUIRE(manifest.get_level_size(0) == 1);
|
||||
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
|
||||
|
||||
auto& l0 = manifest.get_level(0);
|
||||
auto& sst = l0.front();
|
||||
BOOST_REQUIRE(sst->generation() == 2);
|
||||
BOOST_REQUIRE(sst->get_sstable_level() == 0);
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
static lw_shared_ptr<key_reader> prepare_key_reader(schema_ptr s,
|
||||
const std::vector<shared_sstable>& ssts, const query::partition_range& range)
|
||||
{
|
||||
|
||||
@@ -275,7 +275,7 @@ future<> cql_server::stop() {
|
||||
}
|
||||
|
||||
future<>
|
||||
cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> creds, bool keepalive) {
|
||||
cql_server::listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> creds, bool keepalive) {
|
||||
_notifier = std::make_unique<event_notifier>(addr.port);
|
||||
service::get_local_migration_manager().register_listener(_notifier.get());
|
||||
service::get_local_storage_service().register_subscriber(_notifier.get());
|
||||
@@ -285,7 +285,7 @@ cql_server::listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials
|
||||
server_socket ss;
|
||||
try {
|
||||
ss = creds
|
||||
? seastar::tls::listen(creds, make_ipv4_address(addr), lo)
|
||||
? seastar::tls::listen(creds->build_server_credentials(), make_ipv4_address(addr), lo)
|
||||
: engine().listen(make_ipv4_address(addr), lo);
|
||||
} catch (...) {
|
||||
throw std::runtime_error(sprint("CQLServer error while listening on %s -> %s", make_ipv4_address(addr), std::current_exception()));
|
||||
@@ -325,11 +325,9 @@ cql_server::do_accepts(int which, bool keepalive) {
|
||||
}).then_wrapped([this, which, keepalive] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (const std::bad_alloc&) {
|
||||
logger.debug("accept failed: {}, retrying", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
} catch (...) {
|
||||
logger.debug("accept failed: {}", std::current_exception());
|
||||
logger.warn("acccept failed: {}", std::current_exception());
|
||||
do_accepts(which, keepalive);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ private:
|
||||
cql_load_balance _lb;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, cql_load_balance lb);
|
||||
future<> listen(ipv4_addr addr, ::shared_ptr<seastar::tls::server_credentials> = {}, bool keepalive = false);
|
||||
future<> listen(ipv4_addr addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive);
|
||||
future<> stop();
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user