Compare commits
28 Commits
scylla-4.4
...
next-4.4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c37f5938fd | ||
|
|
fa90112787 | ||
|
|
f5895e5c04 | ||
|
|
ce944911f2 | ||
|
|
b220130e4a | ||
|
|
de4f5b3b1f | ||
|
|
84a42570ec | ||
|
|
001f57ec0c | ||
|
|
3279718d52 | ||
|
|
c128994f90 | ||
|
|
9af2e5ead1 | ||
|
|
be695a7353 | ||
|
|
cc9285697d | ||
|
|
21d140febc | ||
|
|
77e05ca482 | ||
|
|
5375b8f1a1 | ||
|
|
7a82432e38 | ||
|
|
146f7b5421 | ||
|
|
e1c7a906f0 | ||
|
|
c5d6e75db8 | ||
|
|
da630e80ea | ||
|
|
8ea1cbe78d | ||
|
|
03b04d40f2 | ||
|
|
175d004513 | ||
|
|
091b794742 | ||
|
|
8be87bb0b1 | ||
|
|
a84142705a | ||
|
|
fc32534aee |
@@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
|
||||
PRODUCT=scylla
|
||||
VERSION=4.4.7
|
||||
VERSION=4.4.9
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -93,6 +93,10 @@ public:
|
||||
[&] (const json::json_return_type& json_return_value) {
|
||||
slogger.trace("api_handler success case");
|
||||
if (json_return_value._body_writer) {
|
||||
// Unfortunately, write_body() forces us to choose
|
||||
// from a fixed and irrelevant list of "mime-types"
|
||||
// at this point. But we'll override it with the
|
||||
// one (application/x-amz-json-1.0) below.
|
||||
rep->write_body("json", std::move(json_return_value._body_writer));
|
||||
} else {
|
||||
rep->_content += json_return_value._res;
|
||||
@@ -105,14 +109,15 @@ public:
|
||||
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}), _type("json") { }
|
||||
}) { }
|
||||
|
||||
api_handler(const api_handler&) = default;
|
||||
future<std::unique_ptr<reply>> handle(const sstring& path,
|
||||
std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
|
||||
return _f_handle(std::move(req), std::move(rep)).then(
|
||||
[this](std::unique_ptr<reply> rep) {
|
||||
rep->done(_type);
|
||||
rep->set_mime_type("application/x-amz-json-1.0");
|
||||
rep->done();
|
||||
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
|
||||
});
|
||||
}
|
||||
@@ -126,7 +131,6 @@ protected:
|
||||
}
|
||||
|
||||
future_handler_function _f_handle;
|
||||
sstring _type;
|
||||
};
|
||||
|
||||
class gated_handler : public handler_base {
|
||||
@@ -192,24 +196,31 @@ future<> server::verify_signature(const request& req) {
|
||||
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
|
||||
}
|
||||
std::string host = host_it->second;
|
||||
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
|
||||
std::string_view authorization_header = authorization_it->second;
|
||||
auto pos = authorization_header.find_first_of(' ');
|
||||
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
|
||||
}
|
||||
authorization_header.remove_prefix(pos+1);
|
||||
std::string credential;
|
||||
std::string user_signature;
|
||||
std::string signed_headers_str;
|
||||
std::vector<std::string_view> signed_headers;
|
||||
for (std::string_view entry : credentials_raw) {
|
||||
do {
|
||||
// Either one of a comma or space can mark the end of an entry
|
||||
pos = authorization_header.find_first_of(" ,");
|
||||
std::string_view entry = authorization_header.substr(0, pos);
|
||||
if (pos != std::string_view::npos) {
|
||||
authorization_header.remove_prefix(pos + 1);
|
||||
}
|
||||
if (entry.empty()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<std::string_view> entry_split = split(entry, '=');
|
||||
if (entry_split.size() != 2) {
|
||||
if (entry != "AWS4-HMAC-SHA256") {
|
||||
throw api_error::invalid_signature(format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
std::string_view auth_value = entry_split[1];
|
||||
// Commas appear as an additional (quite redundant) delimiter
|
||||
if (auth_value.back() == ',') {
|
||||
auth_value.remove_suffix(1);
|
||||
}
|
||||
if (entry_split[0] == "Credential") {
|
||||
credential = std::string(auth_value);
|
||||
} else if (entry_split[0] == "Signature") {
|
||||
@@ -219,7 +230,8 @@ future<> server::verify_signature(const request& req) {
|
||||
signed_headers = split(auth_value, ';');
|
||||
std::sort(signed_headers.begin(), signed_headers.end());
|
||||
}
|
||||
}
|
||||
} while (pos != std::string_view::npos);
|
||||
|
||||
std::vector<std::string_view> credential_split = split(credential, '/');
|
||||
if (credential_split.size() != 5) {
|
||||
throw api_error::validation(format("Incorrect credential information format: {}", credential));
|
||||
|
||||
@@ -38,6 +38,7 @@ stats::stats() : api_operations{} {
|
||||
#define OPERATION_LATENCY(name, CamelCaseName) \
|
||||
seastar::metrics::make_histogram("op_latency", \
|
||||
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName)}, [this]{return to_metrics_histogram(api_operations.name);}),
|
||||
OPERATION(batch_get_item, "BatchGetItem")
|
||||
OPERATION(batch_write_item, "BatchWriteItem")
|
||||
OPERATION(create_backup, "CreateBackup")
|
||||
OPERATION(create_global_table, "CreateGlobalTable")
|
||||
|
||||
@@ -225,7 +225,7 @@ void set_repair(http_context& ctx, routes& r, sharded<netw::messaging_service>&
|
||||
try {
|
||||
res = fut.get0();
|
||||
} catch (std::exception& e) {
|
||||
return make_exception_future<json::json_return_type>(httpd::server_error_exception(e.what()));
|
||||
return make_exception_future<json::json_return_type>(httpd::bad_param_exception(e.what()));
|
||||
}
|
||||
return make_ready_future<json::json_return_type>(json::json_return_type(res));
|
||||
});
|
||||
|
||||
@@ -709,16 +709,16 @@ private:
|
||||
}
|
||||
return false;
|
||||
}
|
||||
bool compare(const T&, const value_type& v);
|
||||
int32_t compare(const T&, const value_type& v);
|
||||
};
|
||||
|
||||
template<>
|
||||
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
int32_t maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
return _type.compare(t, v.first);
|
||||
}
|
||||
|
||||
template<>
|
||||
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
int32_t maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
|
||||
return _type.compare(t, v);
|
||||
}
|
||||
|
||||
|
||||
@@ -99,8 +99,8 @@ listen_address: localhost
|
||||
# listen_on_broadcast_address: false
|
||||
|
||||
# port for the CQL native transport to listen for clients on
|
||||
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
|
||||
# To disable the CQL native transport, set this option to 0.
|
||||
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
|
||||
# To disable the CQL native transport, remove this option and configure native_transport_port_ssl.
|
||||
native_transport_port: 9042
|
||||
|
||||
# Like native_transport_port, but clients are forwarded to specific shards, based on the
|
||||
|
||||
@@ -181,13 +181,18 @@ inline
|
||||
shared_ptr<function>
|
||||
make_from_json_function(database& db, const sstring& keyspace, data_type t) {
|
||||
return make_native_scalar_function<true>("fromjson", t, {utf8_type},
|
||||
[&db, &keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
|
||||
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
|
||||
bytes_opt parsed_json_value;
|
||||
if (!json_value.IsNull()) {
|
||||
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
|
||||
[&db, keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
|
||||
try {
|
||||
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
|
||||
bytes_opt parsed_json_value;
|
||||
if (!json_value.IsNull()) {
|
||||
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
|
||||
}
|
||||
return parsed_json_value;
|
||||
} catch(rjson::error& e) {
|
||||
throw exceptions::function_execution_exception("fromJson",
|
||||
format("Failed parsing fromJson parameter: {}", e.what()), keyspace, {t->name()});
|
||||
}
|
||||
return parsed_json_value;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,22 @@ public:
|
||||
return Pure;
|
||||
}
|
||||
virtual bytes_opt execute(cql_serialization_format sf, const std::vector<bytes_opt>& parameters) override {
|
||||
return _func(sf, parameters);
|
||||
try {
|
||||
return _func(sf, parameters);
|
||||
} catch(exceptions::cassandra_exception&) {
|
||||
// If the function's code took the time to produce an official
|
||||
// cassandra_exception, pass it through. Otherwise, below we will
|
||||
// wrap the unknown exception in a function_execution_exception.
|
||||
throw;
|
||||
} catch(...) {
|
||||
std::vector<sstring> args;
|
||||
args.reserve(arg_types().size());
|
||||
for (const data_type& a : arg_types()) {
|
||||
args.push_back(a->name());
|
||||
}
|
||||
throw exceptions::function_execution_exception(name().name,
|
||||
format("Failed execution of function {}: {}", name(), std::current_exception()), name().keyspace, std::move(args));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -964,6 +964,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
|
||||
}
|
||||
|
||||
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
|
||||
paging_state_copy->set_remaining(internal_paging_size);
|
||||
paging_state_copy->set_partition_key(std::move(index_pk));
|
||||
paging_state_copy->set_clustering_key(std::move(index_ck));
|
||||
return std::move(paging_state_copy);
|
||||
|
||||
4
dist/common/scripts/scylla_io_setup
vendored
4
dist/common/scripts/scylla_io_setup
vendored
@@ -254,7 +254,7 @@ if __name__ == "__main__":
|
||||
disk_properties["read_bandwidth"] = 2650 * mbs
|
||||
disk_properties["write_iops"] = 360000
|
||||
disk_properties["write_bandwidth"] = 1400 * mbs
|
||||
elif nr_disks == "16":
|
||||
elif nr_disks == 16:
|
||||
disk_properties["read_iops"] = 1600000
|
||||
disk_properties["read_bandwidth"] = 4521251328
|
||||
#below is google, above is our measured
|
||||
@@ -263,7 +263,7 @@ if __name__ == "__main__":
|
||||
disk_properties["write_bandwidth"] = 2759452672
|
||||
#below is google, above is our measured
|
||||
#disk_properties["write_bandwidth"] = 3120 * mbs
|
||||
elif nr_disks == "24":
|
||||
elif nr_disks == 24:
|
||||
disk_properties["read_iops"] = 2400000
|
||||
disk_properties["read_bandwidth"] = 5921532416
|
||||
#below is google, above is our measured
|
||||
|
||||
21
dist/common/scripts/scylla_raid_setup
vendored
21
dist/common/scripts/scylla_raid_setup
vendored
@@ -30,6 +30,8 @@ import distro
|
||||
from pathlib import Path
|
||||
from scylla_util import *
|
||||
from subprocess import run
|
||||
import distro
|
||||
from pkg_resources import parse_version
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
@@ -115,6 +117,25 @@ if __name__ == '__main__':
|
||||
pkg_install('xfsprogs')
|
||||
if not shutil.which('mdadm'):
|
||||
pkg_install('mdadm')
|
||||
# XXX: Workaround for mdmonitor.service issue on CentOS8
|
||||
if is_redhat_variant() and distro.version() == '8':
|
||||
mdadm_rpm = run('rpm -q mdadm', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
|
||||
match = re.match(r'^mdadm-([0-9]+\.[0-9]+-[a-zA-Z0-9]+)\.', mdadm_rpm)
|
||||
mdadm_version = match.group(1)
|
||||
if parse_version('4.1-14') < parse_version(mdadm_version):
|
||||
repo_data = '''
|
||||
[BaseOS_8_3_2011]
|
||||
name=CentOS8.3.2011 - Base
|
||||
baseurl=http://vault.centos.org/8.3.2011/BaseOS/$basearch/os/
|
||||
gpgcheck=1
|
||||
enabled=0
|
||||
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-centosofficial
|
||||
'''[1:-1]
|
||||
with open('/etc/yum.repos.d/CentOS-Vault-8.3.repo', 'w') as f:
|
||||
f.write(repo_data)
|
||||
run('dnf downgrade --enablerepo=BaseOS_8_3_2011 -y mdadm', shell=True, check=True)
|
||||
run('dnf install -y python3-dnf-plugin-versionlock', shell=True, check=True)
|
||||
run('dnf versionlock add mdadm', shell=True, check=True)
|
||||
try:
|
||||
md_service = systemd_unit('mdmonitor.service')
|
||||
except SystemdException:
|
||||
|
||||
5
dist/common/scripts/scylla_util.py
vendored
5
dist/common/scripts/scylla_util.py
vendored
@@ -147,6 +147,11 @@ class gcp_instance:
|
||||
if af == socket.AF_INET:
|
||||
addr, port = sa
|
||||
if addr == "169.254.169.254":
|
||||
# Make sure it is not on GKE
|
||||
try:
|
||||
gcp_instance().__instance_metadata("machine-type")
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
2
dist/docker/redhat/Dockerfile
vendored
2
dist/docker/redhat/Dockerfile
vendored
@@ -6,7 +6,7 @@ ENV container docker
|
||||
|
||||
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
|
||||
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.4/latest/scylla.repo
|
||||
ARG VERSION=4.4.7
|
||||
ARG VERSION=4.4.9
|
||||
|
||||
ADD scylla_bashrc /scylla_bashrc
|
||||
|
||||
|
||||
@@ -4,3 +4,4 @@ stdout_logfile=/dev/stdout
|
||||
stdout_logfile_maxbytes=0
|
||||
stderr_logfile=/dev/stderr
|
||||
stderr_logfile_maxbytes=0
|
||||
stopwaitsecs=900
|
||||
|
||||
5
dist/docker/redhat/scyllasetup.py
vendored
5
dist/docker/redhat/scyllasetup.py
vendored
@@ -121,12 +121,13 @@ class ScyllaSetup:
|
||||
if self._apiAddress is not None:
|
||||
args += ["--api-address %s" % self._apiAddress]
|
||||
|
||||
if self._alternatorPort is not None:
|
||||
if self._alternatorAddress is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
|
||||
if self._alternatorPort is not None:
|
||||
args += ["--alternator-port %s" % self._alternatorPort]
|
||||
|
||||
if self._alternatorHttpsPort is not None:
|
||||
args += ["--alternator-address %s" % self._alternatorAddress]
|
||||
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
|
||||
|
||||
if self._alternatorWriteIsolation is not None:
|
||||
|
||||
@@ -340,4 +340,18 @@ public:
|
||||
unsupported_operation_exception(const sstring& msg) : std::runtime_error("unsupported operation: " + msg) {}
|
||||
};
|
||||
|
||||
class function_execution_exception : public cassandra_exception {
|
||||
public:
|
||||
const sstring ks_name;
|
||||
const sstring func_name;
|
||||
const std::vector<sstring> args;
|
||||
function_execution_exception(sstring func_name_, sstring detail, sstring ks_name_, std::vector<sstring> args_) noexcept
|
||||
: cassandra_exception{exception_code::FUNCTION_FAILURE,
|
||||
format("execution of {} failed: {}", func_name_, detail)}
|
||||
, ks_name(std::move(ks_name_))
|
||||
, func_name(std::move(func_name_))
|
||||
, args(std::move(args_))
|
||||
{ }
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -1445,7 +1445,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as alive {}", addr);
|
||||
|
||||
// Do not mark a node with status shutdown as UP.
|
||||
auto status = get_gossip_status(local_state);
|
||||
auto status = sstring(get_gossip_status(local_state));
|
||||
if (status == sstring(versioned_value::SHUTDOWN)) {
|
||||
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
|
||||
return;
|
||||
@@ -1464,6 +1464,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Make a copy for endpoint_state because the code below can yield
|
||||
endpoint_state state = local_state;
|
||||
_live_endpoints.push_back(addr);
|
||||
if (_endpoints_to_talk_with.empty()) {
|
||||
_endpoints_to_talk_with.push_back({addr});
|
||||
@@ -1475,8 +1477,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
logger.info("InetAddress {} is now UP, status = {}", addr, status);
|
||||
}
|
||||
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, local_state);
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_alive(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
@@ -1485,11 +1487,12 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as down {}", addr);
|
||||
local_state.mark_dead();
|
||||
endpoint_state state = local_state;
|
||||
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, local_state);
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
|
||||
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
subscriber->on_dead(addr, state);
|
||||
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1590,8 +1590,8 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
|
||||
private:
|
||||
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
|
||||
const unsigned _shard;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
dht::partition_range _pr;
|
||||
query::partition_slice _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
@@ -1617,7 +1617,7 @@ public:
|
||||
: impl(std::move(schema), std::move(permit))
|
||||
, _lifecycle_policy(std::move(lifecycle_policy))
|
||||
, _shard(shard)
|
||||
, _pr(&pr)
|
||||
, _pr(pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
@@ -1702,7 +1702,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
auto s = gs.get();
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
|
||||
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), _pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
|
||||
auto f = rreader->fill_buffer(timeout);
|
||||
return f.then([rreader = std::move(rreader)] () mutable {
|
||||
@@ -1751,7 +1751,7 @@ void shard_reader::next_partition() {
|
||||
}
|
||||
|
||||
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_pr = pr;
|
||||
|
||||
if (!_reader && !_read_ahead) {
|
||||
// No need to fast-forward uncreated readers, they will be passed the new
|
||||
@@ -1760,12 +1760,12 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
|
||||
}
|
||||
|
||||
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
|
||||
return f.then([this, &pr, timeout] {
|
||||
return f.then([this, timeout] {
|
||||
_end_of_stream = false;
|
||||
clear_buffer();
|
||||
|
||||
return smp::submit_to(_shard, [this, &pr, timeout] {
|
||||
return _reader->fast_forward_to(pr, timeout);
|
||||
return smp::submit_to(_shard, [this, timeout] {
|
||||
return _reader->fast_forward_to(_pr, timeout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -57,6 +57,8 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
|
||||
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
|
||||
});
|
||||
}).then([&wr] {
|
||||
wr.consume_end_of_stream();
|
||||
}).then_wrapped([&wr] (future<> f) {
|
||||
if (f.failed()) {
|
||||
auto ex = f.get_exception();
|
||||
@@ -70,7 +72,6 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
});
|
||||
} else {
|
||||
wr.consume_end_of_stream();
|
||||
return wr.close();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -267,9 +267,14 @@ public:
|
||||
return _current_tombstone;
|
||||
}
|
||||
|
||||
const std::deque<range_tombstone>& range_tombstones_for_row(const clustering_key_prefix& ck) {
|
||||
std::vector<range_tombstone> range_tombstones_for_row(const clustering_key_prefix& ck) {
|
||||
drop_unneeded_tombstones(ck);
|
||||
return _range_tombstones;
|
||||
std::vector<range_tombstone> result(_range_tombstones.begin(), _range_tombstones.end());
|
||||
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
|
||||
return _cmp(rt1.start_bound(), rt2.start_bound());
|
||||
};
|
||||
std::sort(result.begin(), result.end(), cmp);
|
||||
return result;
|
||||
}
|
||||
|
||||
std::deque<range_tombstone> range_tombstones() && {
|
||||
|
||||
2
seastar
2
seastar
Submodule seastar updated: 4a58d76fea...1fb2187322
@@ -129,7 +129,7 @@ void sstable_writer_k_l::maybe_flush_pi_block(file_writer& out,
|
||||
// block includes them), but we set block_next_start_offset after - so
|
||||
// even if we wrote a lot of open tombstones, we still get a full
|
||||
// block size of new data.
|
||||
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
|
||||
auto rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
|
||||
clustering_key_prefix::from_range(clustering_key.values()));
|
||||
for (const auto& rt : rts) {
|
||||
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);
|
||||
|
||||
@@ -85,3 +85,20 @@ def test_signature_too_futuristic(dynamodb, test_table):
|
||||
response = requests.post(url, headers=headers, verify=False)
|
||||
assert not response.ok
|
||||
assert "InvalidSignatureException" in response.text and "Signature not yet current" in response.text
|
||||
|
||||
# A test that commas can be uses instead of whitespace to separate components
|
||||
# of the Authorization headers - reproducing issue #9568.
|
||||
def test_authorization_no_whitespace(dynamodb, test_table):
|
||||
# Unlike the above tests which checked error cases so didn't need to
|
||||
# calculate a real signature, in this test we really a correct signature,
|
||||
# so we use a function we already have in test_manual_requests.py.
|
||||
from test_manual_requests import get_signed_request
|
||||
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
# Boto3 separates the components of the Authorization header by spaces.
|
||||
# Let's remove all of them except the first one (which separates the
|
||||
# signature algorithm name from the rest) and check the result still works:
|
||||
a = req.headers['Authorization'].split()
|
||||
req.headers['Authorization'] = a[0] + ' ' + ''.join(a[1:])
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert response.ok
|
||||
|
||||
@@ -154,3 +154,25 @@ def test_incorrect_numbers(dynamodb, test_table):
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert "ValidationException" in response.text and "numeric" in response.text
|
||||
|
||||
# Although the DynamoDB API responses are JSON, additional conventions apply
|
||||
# to these responses - such as how error codes are encoded in JSON. For this
|
||||
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
|
||||
# of the standard 'application/json'. This test verifies that we return the
|
||||
# correct content type header.
|
||||
# While most DynamoDB libraries we tried do not care about an unexpected
|
||||
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
|
||||
# defined x-amz-json-1.1 - see
|
||||
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
|
||||
# which differs (only) in how it encodes error replies.
|
||||
# So in the future it may become even more important that Scylla return the
|
||||
# correct content type.
|
||||
def test_content_type(dynamodb, test_table):
|
||||
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
|
||||
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
|
||||
# *request*. In the future this may or may not effect the content type
|
||||
# in the response (today, DynamoDB doesn't allow any other content type
|
||||
# in the request anyway).
|
||||
req = get_signed_request(dynamodb, 'PutItem', payload)
|
||||
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
|
||||
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'
|
||||
|
||||
113
test/alternator/test_metrics.py
Normal file
113
test/alternator/test_metrics.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Copyright 2021-present ScyllaDB
|
||||
#
|
||||
# This file is part of Scylla.
|
||||
#
|
||||
# Scylla is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Scylla is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
##############################################################################
|
||||
# Tests for Scylla's metrics (see docs/design-notes/metrics.md) for Alternator
|
||||
# queries. Reproduces issue #9406, where although metrics was implemented for
|
||||
# Alternator requests, they were missing for some operations (BatchGetItem).
|
||||
# In the tests here we attempt to ensure that the metrics continue to work
|
||||
# for the relevant operations as the code evolves.
|
||||
#
|
||||
# Note that all tests in this file test Scylla-specific features, and are
|
||||
# "skipped" when not running against Scylla, or when unable to retrieve
|
||||
# metrics through out-of-band HTTP requests to Scylla's Prometheus port (9180).
|
||||
#
|
||||
# IMPORTANT: we do not want these tests to assume that are not running in
|
||||
# parallel with any other tests or workload - because such an assumption
|
||||
# would limit our test deployment options in the future. NOT making this
|
||||
# assumption means that these tests can't check that a certain operation
|
||||
# increases a certain counter by exactly 1 - because other concurrent
|
||||
# operations might increase it further! So our test can only check that the
|
||||
# counter increases.
|
||||
##############################################################################
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
import re
|
||||
|
||||
from util import random_string
|
||||
|
||||
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
|
||||
# are not available on AWS (of course), but may also not be available for
|
||||
# Scylla if for some reason we have only access to the Alternator protocol
|
||||
# port but no access to the metrics port (9180).
|
||||
# If metrics are *not* available, tests using this fixture will be skipped.
|
||||
# Tests using this fixture may call get_metrics(metrics).
|
||||
@pytest.fixture(scope="module")
|
||||
def metrics(dynamodb):
|
||||
if dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com'):
|
||||
pytest.skip('Scylla-only feature not supported by AWS')
|
||||
url = dynamodb.meta.client._endpoint.host
|
||||
# The Prometheus API is on port 9180, and always http, not https.
|
||||
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
|
||||
url = re.sub(r'^https:', 'http:', url)
|
||||
url = url + '/metrics'
|
||||
resp = requests.get(url)
|
||||
if resp.status_code != 200:
|
||||
pytest.skip('Metrics port 9180 is not available')
|
||||
yield url
|
||||
|
||||
# Utility function for fetching all metrics from Scylla, using an HTTP request
|
||||
# to port 9180. The response format is defined by the Prometheus protocol.
|
||||
# Only use get_metrics() in a test using the metrics_available fixture.
|
||||
def get_metrics(metrics):
|
||||
response = requests.get(metrics)
|
||||
assert response.status_code == 200
|
||||
return response.text
|
||||
|
||||
# Utility function for fetching a metric with a given name and optionally a
|
||||
# given sub-metric label (which should be a name-value map). If multiple
|
||||
# matches are found, they are summed - this is useful for summing up the
|
||||
# counts from multiple shards.
|
||||
def get_metric(metrics, name, requested_labels=None):
|
||||
total = 0.0
|
||||
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
|
||||
for match in re.findall(lines, get_metrics(metrics)):
|
||||
a = match.split()
|
||||
metric = a[0]
|
||||
val = float(a[1])
|
||||
# Check if match also matches the requested labels
|
||||
if requested_labels:
|
||||
# we know metric begins with name{ and ends with } - the labels
|
||||
# are what we have between those
|
||||
got_labels = metric[len(name)+1:-1].split(',')
|
||||
# Check that every one of the requested labels is in got_labels:
|
||||
for k, v in requested_labels.items():
|
||||
if not f'{k}="{v}"' in got_labels:
|
||||
# No match for requested label, skip this metric (python
|
||||
# doesn't have "continue 2" so let's just set val to 0...
|
||||
val = 0
|
||||
break
|
||||
total += float(val)
|
||||
return total
|
||||
|
||||
def test_batch_write_item(test_table_s, metrics):
|
||||
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
|
||||
test_table_s.meta.client.batch_write_item(RequestItems = {
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
|
||||
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
|
||||
assert n2 > n1
|
||||
|
||||
# Reproduces issue #9406:
|
||||
def test_batch_get_item(test_table_s, metrics):
|
||||
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
|
||||
test_table_s.meta.client.batch_get_item(RequestItems = {
|
||||
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
|
||||
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
|
||||
assert n2 > n1
|
||||
|
||||
# TODO: check the rest of the operations
|
||||
@@ -22,6 +22,8 @@
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
@@ -48,3 +50,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
|
||||
return do_with_cql_env_thread([] (auto& e) {
|
||||
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
|
||||
e.execute_cql("CREATE INDEX ON tab (v)").get();
|
||||
|
||||
// Enough to trigger a short read on the base table during scan
|
||||
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
|
||||
|
||||
const int row_count = 67;
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
|
||||
}
|
||||
|
||||
eventually([&] {
|
||||
uint64_t count = 0;
|
||||
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
|
||||
++count;
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
}).get();
|
||||
BOOST_REQUIRE_EQUAL(count, row_count);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -28,6 +28,8 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/mutation_assertions.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
|
||||
using namespace sstables;
|
||||
using namespace std::chrono_literals;
|
||||
@@ -62,3 +64,69 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Regression test for scylladb/scylla-enterprise#2016
|
||||
SEASTAR_THREAD_TEST_CASE(test_produces_range_tombstone) {
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", int32_type, column_kind::regular_column)
|
||||
.build();
|
||||
|
||||
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(0)));
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone{
|
||||
clustering_key::from_exploded(*s, {int32_type->decompose(6)}), bound_kind::excl_start,
|
||||
clustering_key::from_exploded(*s, {int32_type->decompose(10)}), bound_kind::incl_end,
|
||||
tombstone(0, gc_clock::time_point())
|
||||
});
|
||||
|
||||
{
|
||||
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(6)});
|
||||
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
|
||||
row.marker() = row_marker(4);
|
||||
}
|
||||
{
|
||||
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(8)});
|
||||
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
|
||||
row.apply(tombstone(2, gc_clock::time_point()));
|
||||
row.marker() = row_marker(5);
|
||||
}
|
||||
|
||||
testlog.info("m: {}", m);
|
||||
|
||||
auto slice = partition_slice_builder(*s)
|
||||
.with_range(query::clustering_range::make(
|
||||
{clustering_key::from_exploded(*s, {int32_type->decompose(8)}), false},
|
||||
{clustering_key::from_exploded(*s, {int32_type->decompose(10)}), true}
|
||||
))
|
||||
.build();
|
||||
|
||||
auto pr = dht::partition_range::make_singular(m.decorated_key());
|
||||
|
||||
std::vector<tmpdir> dirs;
|
||||
dirs.emplace_back();
|
||||
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
|
||||
storage_service_for_tests ssft;
|
||||
auto version = sstable_version_types::la;
|
||||
auto index_block_size = 1;
|
||||
sstable_writer_config cfg = env.manager().configure_writer();
|
||||
cfg.promoted_index_block_size = index_block_size;
|
||||
|
||||
auto source = make_sstable_mutation_source(env, s, dirs.back().path().string(), {m}, cfg, version, gc_clock::now());
|
||||
|
||||
{
|
||||
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
|
||||
while (auto mf = rd(db::no_timeout).get0()) {
|
||||
testlog.info("produced {}", mutation_fragment::printer(*s, *mf));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
|
||||
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
|
||||
BOOST_REQUIRE(bool(sliced_m));
|
||||
|
||||
assert_that(*sliced_m).is_equal_to(m, slice.row_ranges(*m.schema(), m.key()));
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
|
||||
from util import unique_name, new_test_table
|
||||
|
||||
from cassandra.protocol import FunctionFailure
|
||||
from cassandra.protocol import FunctionFailure, InvalidRequest
|
||||
|
||||
import pytest
|
||||
import random
|
||||
@@ -34,58 +34,62 @@ import random
|
||||
@pytest.fixture(scope="session")
|
||||
def table1(cql, test_keyspace):
|
||||
table = test_keyspace + "." + unique_name()
|
||||
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii)")
|
||||
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii, b boolean)")
|
||||
yield table
|
||||
cql.execute("DROP TABLE " + table)
|
||||
|
||||
# Test that failed fromJson() parsing an invalid JSON results in the expected
|
||||
# error - FunctionFailure - and not some weird internal error.
|
||||
# Reproduces issue #7911.
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
def test_failed_json_parsing_unprepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('dog'))")
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
def test_failed_json_parsing_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [0, 'dog'])
|
||||
cql.execute(stmt, [p, 'dog'])
|
||||
|
||||
# Similarly, if the JSON parsing did not fail, but yielded a type which is
|
||||
# incompatible with the type we want it to yield, we should get a clean
|
||||
# FunctionFailure, not some internal server error.
|
||||
# We have here examples of returning a string where a number was expected,
|
||||
# and returning a unicode string where ASCII was expected.
|
||||
# and returning a unicode string where ASCII was expected, and returning
|
||||
# a number of the wrong type
|
||||
# Reproduces issue #7911.
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
def test_fromjson_wrong_type_unprepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('\"dog\"'))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('3'))")
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
def test_fromjson_wrong_type_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [0, '"dog"'])
|
||||
cql.execute(stmt, [p, '"dog"'])
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [0, '3'])
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
cql.execute(stmt, [p, '3'])
|
||||
def test_fromjson_bad_ascii_unprepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('\"שלום\"'))")
|
||||
@pytest.mark.xfail(reason="issue #7911")
|
||||
def test_fromjson_bad_ascii_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [0, '"שלום"'])
|
||||
cql.execute(stmt, [p, '"שלום"'])
|
||||
def test_fromjson_nonint_unprepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('1.2'))")
|
||||
def test_fromjson_nonint_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [p, '1.2'])
|
||||
|
||||
# The JSON standard does not define or limit the range or precision of
|
||||
# numbers. However, if a number is assigned to a Scylla number type, the
|
||||
@@ -105,7 +109,27 @@ def test_fromjson_int_overflow_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
|
||||
with pytest.raises(FunctionFailure):
|
||||
cql.execute(stmt, [0, '2147483648'])
|
||||
cql.execute(stmt, [p, '2147483648'])
|
||||
|
||||
# Cassandra allows the strings "true" and "false", not just the JSON constants
|
||||
# true and false, to be assigned to a boolean column. However, very strangely,
|
||||
# it only allows this for prepared statements, and *not* for unprepared
|
||||
# statements - which result in an InvalidRequest!
|
||||
# Reproduces #7915.
|
||||
def test_fromjson_boolean_string_unprepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"true\"')")
|
||||
with pytest.raises(InvalidRequest):
|
||||
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"false\"')")
|
||||
@pytest.mark.xfail(reason="issue #7915")
|
||||
def test_fromjson_boolean_string_prepared(cql, table1):
|
||||
p = random.randint(1,1000000000)
|
||||
stmt = cql.prepare(f"INSERT INTO {table1} (p, b) VALUES (?, fromJson(?))")
|
||||
cql.execute(stmt, [p, '"true"'])
|
||||
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, True)]
|
||||
cql.execute(stmt, [p, '"false"'])
|
||||
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, False)]
|
||||
|
||||
# Test that null argument is allowed for fromJson(), with unprepared statement
|
||||
# Reproduces issue #7912.
|
||||
|
||||
@@ -118,6 +118,8 @@ public:
|
||||
return stop_iteration::no;
|
||||
});
|
||||
});
|
||||
}).finally([&ir] () {
|
||||
return ir->close();
|
||||
});
|
||||
}).then([l] {
|
||||
return std::move(*l);
|
||||
|
||||
Submodule tools/java updated: 14e635e5de...e8accfbf45
@@ -97,12 +97,18 @@ future<> controller::do_start_server() {
|
||||
};
|
||||
|
||||
std::vector<listen_cfg> configs;
|
||||
int native_port_idx = -1, native_shard_aware_port_idx = -1;
|
||||
|
||||
if (cfg.native_transport_port() != 0) {
|
||||
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
|
||||
if (cfg.native_transport_port.is_set() ||
|
||||
(!cfg.native_transport_port_ssl.is_set() && !cfg.native_transport_port.is_set())) {
|
||||
// Non-SSL port is specified || neither SSL nor non-SSL ports are specified
|
||||
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
|
||||
native_port_idx = 0;
|
||||
}
|
||||
if (cfg.native_shard_aware_transport_port.is_set()) {
|
||||
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
|
||||
if (cfg.native_shard_aware_transport_port.is_set() ||
|
||||
(!cfg.native_shard_aware_transport_port_ssl.is_set() && !cfg.native_shard_aware_transport_port.is_set())) {
|
||||
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
|
||||
native_shard_aware_port_idx = native_port_idx + 1;
|
||||
}
|
||||
|
||||
// main should have made sure values are clean and neatish
|
||||
@@ -127,15 +133,20 @@ future<> controller::do_start_server() {
|
||||
|
||||
logger.info("Enabling encrypted CQL connections between client and server");
|
||||
|
||||
if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
|
||||
if (cfg.native_transport_port_ssl.is_set() &&
|
||||
(!cfg.native_transport_port.is_set() ||
|
||||
cfg.native_transport_port_ssl() != cfg.native_transport_port())) {
|
||||
// SSL port is specified && non-SSL port is either left out or set to a different value
|
||||
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
|
||||
} else {
|
||||
configs[0].cred = cred;
|
||||
} else if (native_port_idx >= 0) {
|
||||
configs[native_port_idx].cred = cred;
|
||||
}
|
||||
if (cfg.native_shard_aware_transport_port_ssl.is_set() && cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port()) {
|
||||
if (cfg.native_shard_aware_transport_port_ssl.is_set() &&
|
||||
(!cfg.native_shard_aware_transport_port.is_set() ||
|
||||
cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port())) {
|
||||
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
|
||||
} else if (cfg.native_shard_aware_transport_port.is_set()) {
|
||||
configs[1].cred = std::move(cred);
|
||||
} else if (native_shard_aware_port_idx >= 0) {
|
||||
configs[native_shard_aware_port_idx].cred = std::move(cred);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -572,7 +572,17 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
|
||||
} catch (const exceptions::prepared_query_not_found_exception& ex) {
|
||||
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
|
||||
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
|
||||
} catch (const exceptions::function_execution_exception& ex) {
|
||||
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
|
||||
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
|
||||
} catch (const exceptions::cassandra_exception& ex) {
|
||||
// Note: the CQL protocol specifies that many types of errors have
|
||||
// mandatory parameters. These cassandra_exception subclasses MUST
|
||||
// be handled above. This default "cassandra_exception" case is
|
||||
// only appropriate for the specific types of errors which do not have
|
||||
// additional information, such as invalid_request_exception.
|
||||
// TODO: consider listing those types explicitly, instead of the
|
||||
// catch-all type cassandra_exception.
|
||||
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
|
||||
return make_error(stream, ex.code(), ex.what(), trace_state);
|
||||
} catch (std::exception& ex) {
|
||||
@@ -1334,6 +1344,17 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_er
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
response->write_int(static_cast<int32_t>(err));
|
||||
response->write_string(msg);
|
||||
response->write_string(ks_name);
|
||||
response->write_string(func_name);
|
||||
response->write_string_list(args);
|
||||
return response;
|
||||
}
|
||||
|
||||
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
|
||||
{
|
||||
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
|
||||
|
||||
@@ -235,6 +235,7 @@ private:
|
||||
std::unique_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
|
||||
|
||||
@@ -263,6 +263,13 @@ decltype(auto) with_simplified(const View& v, Function&& fn)
|
||||
}
|
||||
}
|
||||
|
||||
template<FragmentedView View>
|
||||
void skip_empty_fragments(View& v) {
|
||||
while (!v.empty() && v.current_fragment().empty()) {
|
||||
v.remove_current();
|
||||
}
|
||||
}
|
||||
|
||||
template<FragmentedView V1, FragmentedView V2>
|
||||
int compare_unsigned(V1 v1, V2 v2) {
|
||||
while (!v1.empty() && !v2.empty()) {
|
||||
@@ -272,6 +279,8 @@ int compare_unsigned(V1 v1, V2 v2) {
|
||||
}
|
||||
v1.remove_prefix(n);
|
||||
v2.remove_prefix(n);
|
||||
skip_empty_fragments(v1);
|
||||
skip_empty_fragments(v2);
|
||||
}
|
||||
return v1.size_bytes() - v2.size_bytes();
|
||||
}
|
||||
@@ -286,5 +295,7 @@ void write_fragmented(Dest& dest, Src src) {
|
||||
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
|
||||
dest.remove_prefix(n);
|
||||
src.remove_prefix(n);
|
||||
skip_empty_fragments(dest);
|
||||
skip_empty_fragments(src);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user