Compare commits

...

28 Commits

Author SHA1 Message Date
Botond Dénes
c37f5938fd mutation_writer: feed_writer(): handle exceptions from consume_end_of_stream()
Currently the exception handling code of feed_writer() assumes
consume_end_of_stream() doesn't throw. This is false and an exception
from said method can currently lead to an unclean destroy of the writer
and reader. Fix by also handling exceptions from
consume_end_of_stream() too.

Closes #10147

(cherry picked from commit 1963d1cc25)
2022-03-03 10:45:40 +01:00
Yaron Kaikov
fa90112787 release: prepare for 4.4.9 2022-02-16 14:24:54 +02:00
Nadav Har'El
f5895e5c04 docker: don't repeat "--alternator-address" option twice
If the Docker startup script is passed both "--alternator-port" and
"--alternator-https-port", a combination which is supposed to be
allowed, it passes to Scylla the "--alternator-address" option twice.
This isn't necessary, and worse - not allowed.

So this patch fixes the scyllasetup.py script to only pass this
parameter once.

Fixes #10016.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220202165814.1700047-1-nyh@scylladb.com>
(cherry picked from commit cb6630040d)
2022-02-03 18:40:12 +02:00
Avi Kivity
ce944911f2 Update seastar submodule (gratuitous exceptions on allocation failure)
* seastar 59eeadc720...1fb2187322 (1):
  > core: memory: Avoid current_backtrace() on alloc failure when logging suppressed

Fixes #9982.
2022-01-30 20:08:43 +02:00
Avi Kivity
b220130e4a Revert "Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA"
This reverts commit de4f5b3b1f. This branch
doesn't support RAID 5, so it breaks at runtime.

Ref #9540.
2022-01-30 11:00:21 +02:00
Avi Kivity
de4f5b3b1f Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA
We found that monitor mode of mdadm does not work on RAID0, and it is
not a bug, expected behavior according to RHEL developer.
Therefore, we should stop enabling mdmonitor when RAID0 is specified.

Fixes #9540

----

This reverts 0d8f932 and introduce correct fix.

Closes #9970

* github.com:scylladb/scylla:
  scylla_raid_setup: use mdmonitor only when RAID level > 0
  Revert "scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8"

(cherry picked from commit df22396a34)
2022-01-27 10:27:45 +02:00
Avi Kivity
84a42570ec Update tools/java submodule (maxPendingPerConnection default)
* tools/java 14e635e5de...e8accfbf45 (2):
  > Fix NullPointerException in SettingsMode
  > cassandra-stress: Remove maxPendingPerConnection default

Ref #7748.
2022-01-12 21:38:48 +02:00
Nadav Har'El
001f57ec0c alternator: allow Authorization header to be without spaces
The "Authorization" HTTP header is used in DynamoDB API to sign
requests. Our parser for this header, in server::verify_signature(),
required the different components of this header to be separated by
a comma followed by a whitespace - but it turns out that in DynamoDB
both spaces and commas are optional - one of them is enough.

At least one DynamoDB client library - the old "boto" (which predated
boto3) - builds this header without spaces.

In this patch we add a test that shows that an Authorization header
with spaces removed works fine in DynamoDB but didn't work in
Alternator, and after this patch modifies the parsing code for this
header, the test begins to pass (and the other tests show that the
previously-working cases didn't break).

Fixes #9568

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211101214114.35693-1-nyh@scylladb.com>
(cherry picked from commit 56eb994d8f)
2021-12-29 15:07:47 +02:00
Nadav Har'El
3279718d52 alternator: return the correct Content-Type header
Although the DynamoDB API responses are JSON, additional conventions apply
to these responses - such as how error codes are encoded in JSON. For this
reason, DynamoDB uses the content type `application/x-amz-json-1.0` instead
of the standard `application/json` in its responses.

Until this patch, Scylla used `application/json` in its responses. This
unexpected content-type didn't bother any of the AWS libraries which we
tested, but it does bother the aiodynamo library (see HENNGE/aiodynamo#27).

Moreover, we should return the x-amz-json-1.0 content type for future
proofing: It turns out that AWS already defined x-amz-json-1.1 - see:
https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
The 1.1 content type differs (only) in how it encodes error replies.
If one day DynamoDB starts to use this new reply format (it doesn't yet)
and if DynamoDB libraries will need to differenciate between the two
reply formats, Alternator better return the right one.

This patch also includes a new test that the Content-Type header is
returned with the expected value. The test passes on DynamoDB, and
after this patch it starts to pass on Alternator as well.

Fixes #9554.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211031094621.1193387-1-nyh@scylladb.com>
(cherry picked from commit 6ae0ea0c48)
2021-12-29 14:14:24 +02:00
Takuya ASADA
c128994f90 scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8
On CentOS8, mdmonitor.service does not works correctly when using
mdadm-4.1-15.el8.x86_64 and later versions.
Until we find a solution, let's pinning the package version to older one
which does not cause the issue (4.1-14.el8.x86_64).

Fixes #9540

Closes #9782

(cherry picked from commit 0d8f932f0b)
2021-12-28 11:38:33 +02:00
Nadav Har'El
9af2e5ead1 Update Seastar module with additional backports
Backported an additional Seastar patch:

  > Merge 'metrics: Fix dtest->ulong conversion error' from Benny Halevy

Fixes #9794.
2021-12-14 13:06:02 +02:00
Avi Kivity
be695a7353 Revert "cql3: Reject updates with NULL key values"
This reverts commit 146f7b5421. It
causes a regression, and needs an additional fix. The bug is not
important enough to merit this complication.

Ref #9311.
2021-12-08 15:17:45 +02:00
Botond Dénes
cc9285697d mutation_reader: shard_reader: ensure referenced objects are kept alive
The shard reader can outlive its parent reader (the multishard reader).
This creates a problem for lifecycle management: readers take the range
and slice parameters by reference and users keep these alive until the
reader is alive. The shard reader outliving the top-level reader means
that any background read-ahead that it has to wait on will potentially
have stale references to the range and the slice. This was seen in the
wild recently when the evictable reader wrapped by the shard reader hit
a use-after-free while wrapping up a background read-ahead.
This problem was solved by fa43d76 but any previous versions are
susceptible to it.

This patch solves this problem by having the shard reader copy and keep
the range and slice parameters in stable storage, before passing them
further down.

Fixes: #9719

Tests: unit(dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211202113910.484591-1-bdenes@scylladb.com>
(cherry picked from commit 417e853b9b)
2021-12-06 15:25:57 +02:00
Nadav Har'El
21d140febc alternator: add missing BatchGetItem metric
Unfortunately, defining metrics in Scylla requires some code
duplication, with the metrics declared in one place but exported in a
different place in the code. When we duplicated this code in Alternator,
we accidentally dropped the first metric - for BatchGetItem. The metric
was accounted in the code, but not exported to Prometheus.

In addition to fixing the missing metric, this patch also adds a test
that confirms that the BatchGetItem metric increases when the
BatchGetItem operation is used. This test failed before this patch, and
passes with it. The test only currently tests this for BatchGetItem
(and BatchWriteItem) but it can be later expanded to cover all the other
operations as well.

Fixes #9406

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210929121611.373074-1-nyh@scylladb.com>
(cherry picked from commit 5cbe9178fd)
2021-12-06 12:45:34 +02:00
Yaron Kaikov
77e05ca482 release: prepare for 4.4.8 2021-12-05 21:52:32 +02:00
Eliran Sinvani
5375b8f1a1 testlib: close index_reader to avoid racing condition
In order to avoid race condition introduced in 9dce1e4 the
index_reader should be closed prior to it's destruction.
This only exposes 4.4 and earlier releases to this specific race.
However, it is always a good idea to first close the index reader
and only then destroy it since it is most likely to be assumed by
all developers that will change the reader index in the future.

Ref #9704 (because on 4.4 and earlier releases are vulnerable).

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>

Fixes #9704

(cherry picked from commit ddd7248b3b)

Closes #9717
2021-12-05 12:02:13 +01:00
Juliusz Stasiewicz
7a82432e38 transport: Fix abort on certain configurations of native_transport_port(_ssl)
The reason was accessing the `configs` table out of index. Also,
native_transport_port-s can no longer be disabled by setting to 0,
as per the table below.

Rules for port/encryption (the same apply to shard_aware counterpart):

np  := native_transport_port.is_set()
nps := native_transport_port_ssl.is_set()
ceo := ceo.at("enabled") == "true"
eq  := native_transport_port_ssl() == native_transport_port()

+-----+-----+-----+-----+
|  np | nps | ceo |  eq |
+-----+-----+-----+-----+
|  0  |  0  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  0  |  0  |  1  |  *  |   =>   listen on native_transport_port, encrypted
|  0  |  1  |  0  |  *  |   =>   nonsense, don't listen
|  0  |  1  |  1  |  *  |   =>   listen on native_transport_port_ssl, encrypted
|  1  |  0  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  1  |  0  |  1  |  *  |   =>   listen on native_transport_port, encrypted
|  1  |  1  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  1  |  1  |  1  |  0  |   =>   listen on native_transport_port, unencrypted + native_transport_port_ssl, encrypted
|  1  |  1  |  1  |  1  |   =>   native_transport_port(_ssl), encrypted
+-----+-----+-----+-----+

Fixes #7783
Fixes #7866

Closes #7992

(cherry picked from commit 29e4737a9b)
2021-11-29 17:37:31 +02:00
Dejan Mircevski
146f7b5421 cql3: Reject updates with NULL key values
We were silently ignoring INSERTs with NULL values for primary-key
columns, which Cassandra rejects.  Fix it by rejecting any
modification_statement that would operate on empty partition or
clustering range.

This is the most direct fix, because range and slice are calculated in
one place for all modification statements.  It covers not only NULL
cases, but also impossible restrictions like c>0 AND c<0.
Unfortunately, Cassandra doesn't treat all modification statements
consistently, so this fix cannot fully match its behavior.  We err on
the side of tolerance, accepting some DELETE statements that Cassandra
rejects.  We add a TODO for rejecting such DELETEs later.

Fixes #7852.

Tests: unit (dev), cql-pytest against Cassandra 4.0

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>

Closes #9286

(cherry picked from commit 1fdaeca7d0)
2021-11-29 17:31:54 +02:00
Nadav Har'El
e1c7a906f0 cql: fix error return from execution of fromJson() and other functions
As reproduced in cql-pytest/test_json.py and reported in issue #7911,
failing fromJson() calls should return a FUNCTION_FAILURE error, but
currently produce a generic SERVER_ERROR, which can lead the client
to think the server experienced some unknown internal error and the
query can be retried on another server.

This patch adds a new cassandra_exception subclass that we were missing -
function_execution_exception - properly formats this error message (as
described in the CQL protocol documentation), and uses this exception
in two cases:

1. Parse errors in fromJson()'s parameters are converted into a
   function_execution_exception.

2. Any exceptions during the execute() of a native_scalar_function_for
   function is converted into a function_execution_exception.
   In particular, fromJson() uses a native_scalar_function_for.

   Note, however, that functions which already took care to produce
   a specific Cassandra error, this error is passed through and not
   converted to a function_execution_exception. An example is
   the blobAsText() which can return an invalid_request error, so
   it is left as such and not converted. This also happens in Cassandra.

All relevant tests in cql-pytest/test_json.py now pass, and are
no longer marked xfail. This patch also includes a few more improvements
to test_json.py.

Fixes #7911

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210118140114.4149997-1-nyh@scylladb.com>
(cherry picked from commit 702b1b97bf)
2021-11-29 16:59:56 +02:00
Piotr Jastrzebski
c5d6e75db8 sstables: Fix writing KA/LA sstables index
Before this patch when writing an index block, the sstables writer was
storing range tombstones that span the boundary of the block in order
of end bounds. This led to a range tombstone being ignored by a reader
if there was a row tombstone inside it.

This patch sorts the range tombstones based on start bound before
writing them to the index file.

The assumption is that writing an index block is rare so we can afford
sortting the tombstones at that point. Additionally this is a writer of
an old format and writing to it will be dropped in the next major
release so it should be rarely used already.

Kudos to Kamil Braun <kbraun@scylladb.com> for finding the reproducer.

Test: unit(dev)

Fixes #9690

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit scylladb/scylla-enterprise@eb093afd6f)
(cherry picked from commit ab425a11a8)
2021-11-28 11:09:39 +02:00
Tomasz Grabiec
da630e80ea cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:30 +02:00
Takuya ASADA
8ea1cbe78d docker: add stopwaitsecs
We need stopwaitsecs just like we do TimeoutStpSec=900 on
scylla-server.service, to avoid timeout on scylla-server shutdown.

Fixes #9485

Closes #9545

(cherry picked from commit c9499230c3)
2021-11-15 13:36:48 +02:00
Asias He
03b04d40f2 gossip: Fix use-after-free in real_mark_alive and mark_dead
In commit 11a8912093 (gossiper:
get_gossip_status: return string_view and make noexcept)
get_gossip_status returns a pointer to an endpoint_state in
endpoint_state_map.

After commit 425e3b1182 (gossip: Introduce
direct failure detector), gossiper::mark_dead and gossiper::real_mark_alive
can yield in the middle of the function. It is possible that
endpoint_state can be removed, causing use-after-free to access it.

To fix, make a copy before we yield.

Fixes #8859

Closes #8862

(cherry picked from commit 7a32cab524)
2021-11-15 13:23:11 +02:00
Takuya ASADA
175d004513 scylla_util.py: On is_gce(), return False when it's on GKE
GKE metadata server does not provide same metadata as GCE, we should not
return True on is_gce().
So try to fetch machine-type from metadata server, return False if it
404 not found.

Fixes #9471

Signed-off-by: Takuya ASADA <syuu@scylladb.com>

Closes #9582

(cherry picked from commit 9b4cf8c532)
2021-11-15 13:18:13 +02:00
Asias He
091b794742 repair: Return HTTP 400 when repiar id is not found
There are two APIs for checking the repair status and they behave
differently in case the id is not found.

```
{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_async/system_auth?id=999", "duration": "1ms",
"status": 400, "bytes": 49, "dump": "HTTP/1.1 400 Bad
Request\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 400}"}

{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_status?id=999&timeout=1", "duration": "0ms",
"status": 500, "bytes": 49, "dump": "HTTP/1.1 500 Internal Server
Error\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 500}"}
```

The correct status code is 400 as this is a parameter error and should
not be retried.

Returning status code 500 makes smarter http clients retry the request
in hopes of server recovering.

After this patch:

curl -X PGET
'http://127.0.0.1:10000/storage_service/repair_async/system_auth?id=9999'
{"message": "unknown repair id 9999", "code": 400}

curl -X GET
'http://127.0.0.1:10000/storage_service/repair_status?id=9999'
{"message": "unknown repair id 9999", "code": 400}

Fixes #9576

Closes #9578

(cherry picked from commit f5f5714aa6)
2021-11-15 13:16:08 +02:00
Calle Wilund
8be87bb0b1 cdc: fix broken function signature in maybe_back_insert_iterator
Fixes #9103

compare overload was declared as "bool" even though it is a tri-cmp.
causes us to never use the speed-up shortcut (lessen search set),
in turn meaning more overhead for collections.

Closes #9104

(cherry picked from commit 59555fa363)
2021-11-15 13:13:51 +02:00
Takuya ASADA
a84142705a scylla_io_setup: handle nr_disks on GCP correctly
nr_disks is int, should not be string.

Fixes #9429

Closes #9430

(cherry picked from commit 3b798afc1e)
2021-11-15 13:06:40 +02:00
Michał Chojnowski
fc32534aee utils: fragment_range: fix FragmentedView utils for views with empty fragments
The copying and comparing utilities for FragmentedView are not prepared to
deal with empty fragments in non-empty views, and will fall into an infinite
loop in such case.
But data coming in result_row_view can contain such fragments, so we need to
fix that.

Fixes #8398.

Closes #8397

(cherry picked from commit f23a47e365)
2021-11-15 12:57:21 +02:00
34 changed files with 501 additions and 76 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.4.7
VERSION=4.4.9
if test -f version
then

View File

@@ -93,6 +93,10 @@ public:
[&] (const json::json_return_type& json_return_value) {
slogger.trace("api_handler success case");
if (json_return_value._body_writer) {
// Unfortunately, write_body() forces us to choose
// from a fixed and irrelevant list of "mime-types"
// at this point. But we'll override it with the
// one (application/x-amz-json-1.0) below.
rep->write_body("json", std::move(json_return_value._body_writer));
} else {
rep->_content += json_return_value._res;
@@ -105,14 +109,15 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}), _type("json") { }
}) { }
api_handler(const api_handler&) = default;
future<std::unique_ptr<reply>> handle(const sstring& path,
std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
return _f_handle(std::move(req), std::move(rep)).then(
[this](std::unique_ptr<reply> rep) {
rep->done(_type);
rep->set_mime_type("application/x-amz-json-1.0");
rep->done();
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}
@@ -126,7 +131,6 @@ protected:
}
future_handler_function _f_handle;
sstring _type;
};
class gated_handler : public handler_base {
@@ -192,24 +196,31 @@ future<> server::verify_signature(const request& req) {
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
}
authorization_header.remove_prefix(pos+1);
std::string credential;
std::string user_signature;
std::string signed_headers_str;
std::vector<std::string_view> signed_headers;
for (std::string_view entry : credentials_raw) {
do {
// Either one of a comma or space can mark the end of an entry
pos = authorization_header.find_first_of(" ,");
std::string_view entry = authorization_header.substr(0, pos);
if (pos != std::string_view::npos) {
authorization_header.remove_prefix(pos + 1);
}
if (entry.empty()) {
continue;
}
std::vector<std::string_view> entry_split = split(entry, '=');
if (entry_split.size() != 2) {
if (entry != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
}
continue;
}
std::string_view auth_value = entry_split[1];
// Commas appear as an additional (quite redundant) delimiter
if (auth_value.back() == ',') {
auth_value.remove_suffix(1);
}
if (entry_split[0] == "Credential") {
credential = std::string(auth_value);
} else if (entry_split[0] == "Signature") {
@@ -219,7 +230,8 @@ future<> server::verify_signature(const request& req) {
signed_headers = split(auth_value, ';');
std::sort(signed_headers.begin(), signed_headers.end());
}
}
} while (pos != std::string_view::npos);
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error::validation(format("Incorrect credential information format: {}", credential));

View File

@@ -38,6 +38,7 @@ stats::stats() : api_operations{} {
#define OPERATION_LATENCY(name, CamelCaseName) \
seastar::metrics::make_histogram("op_latency", \
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName)}, [this]{return to_metrics_histogram(api_operations.name);}),
OPERATION(batch_get_item, "BatchGetItem")
OPERATION(batch_write_item, "BatchWriteItem")
OPERATION(create_backup, "CreateBackup")
OPERATION(create_global_table, "CreateGlobalTable")

View File

@@ -225,7 +225,7 @@ void set_repair(http_context& ctx, routes& r, sharded<netw::messaging_service>&
try {
res = fut.get0();
} catch (std::exception& e) {
return make_exception_future<json::json_return_type>(httpd::server_error_exception(e.what()));
return make_exception_future<json::json_return_type>(httpd::bad_param_exception(e.what()));
}
return make_ready_future<json::json_return_type>(json::json_return_type(res));
});

View File

@@ -709,16 +709,16 @@ private:
}
return false;
}
bool compare(const T&, const value_type& v);
int32_t compare(const T&, const value_type& v);
};
template<>
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v.first);
}
template<>
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v);
}

View File

@@ -99,8 +99,8 @@ listen_address: localhost
# listen_on_broadcast_address: false
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
# To disable the CQL native transport, set this option to 0.
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
# To disable the CQL native transport, remove this option and configure native_transport_port_ssl.
native_transport_port: 9042
# Like native_transport_port, but clients are forwarded to specific shards, based on the

View File

@@ -181,13 +181,18 @@ inline
shared_ptr<function>
make_from_json_function(database& db, const sstring& keyspace, data_type t) {
return make_native_scalar_function<true>("fromjson", t, {utf8_type},
[&db, &keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
bytes_opt parsed_json_value;
if (!json_value.IsNull()) {
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
[&db, keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
try {
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
bytes_opt parsed_json_value;
if (!json_value.IsNull()) {
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
}
return parsed_json_value;
} catch(rjson::error& e) {
throw exceptions::function_execution_exception("fromJson",
format("Failed parsing fromJson parameter: {}", e.what()), keyspace, {t->name()});
}
return parsed_json_value;
});
}

View File

@@ -78,7 +78,22 @@ public:
return Pure;
}
virtual bytes_opt execute(cql_serialization_format sf, const std::vector<bytes_opt>& parameters) override {
return _func(sf, parameters);
try {
return _func(sf, parameters);
} catch(exceptions::cassandra_exception&) {
// If the function's code took the time to produce an official
// cassandra_exception, pass it through. Otherwise, below we will
// wrap the unknown exception in a function_execution_exception.
throw;
} catch(...) {
std::vector<sstring> args;
args.reserve(arg_types().size());
for (const data_type& a : arg_types()) {
args.push_back(a->name());
}
throw exceptions::function_execution_exception(name().name,
format("Failed execution of function {}: {}", name(), std::current_exception()), name().keyspace, std::move(args));
}
}
};

View File

@@ -964,6 +964,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -254,7 +254,7 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 2650 * mbs
disk_properties["write_iops"] = 360000
disk_properties["write_bandwidth"] = 1400 * mbs
elif nr_disks == "16":
elif nr_disks == 16:
disk_properties["read_iops"] = 1600000
disk_properties["read_bandwidth"] = 4521251328
#below is google, above is our measured
@@ -263,7 +263,7 @@ if __name__ == "__main__":
disk_properties["write_bandwidth"] = 2759452672
#below is google, above is our measured
#disk_properties["write_bandwidth"] = 3120 * mbs
elif nr_disks == "24":
elif nr_disks == 24:
disk_properties["read_iops"] = 2400000
disk_properties["read_bandwidth"] = 5921532416
#below is google, above is our measured

View File

@@ -30,6 +30,8 @@ import distro
from pathlib import Path
from scylla_util import *
from subprocess import run
import distro
from pkg_resources import parse_version
if __name__ == '__main__':
if os.getuid() > 0:
@@ -115,6 +117,25 @@ if __name__ == '__main__':
pkg_install('xfsprogs')
if not shutil.which('mdadm'):
pkg_install('mdadm')
# XXX: Workaround for mdmonitor.service issue on CentOS8
if is_redhat_variant() and distro.version() == '8':
mdadm_rpm = run('rpm -q mdadm', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
match = re.match(r'^mdadm-([0-9]+\.[0-9]+-[a-zA-Z0-9]+)\.', mdadm_rpm)
mdadm_version = match.group(1)
if parse_version('4.1-14') < parse_version(mdadm_version):
repo_data = '''
[BaseOS_8_3_2011]
name=CentOS8.3.2011 - Base
baseurl=http://vault.centos.org/8.3.2011/BaseOS/$basearch/os/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-centosofficial
'''[1:-1]
with open('/etc/yum.repos.d/CentOS-Vault-8.3.repo', 'w') as f:
f.write(repo_data)
run('dnf downgrade --enablerepo=BaseOS_8_3_2011 -y mdadm', shell=True, check=True)
run('dnf install -y python3-dnf-plugin-versionlock', shell=True, check=True)
run('dnf versionlock add mdadm', shell=True, check=True)
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:

View File

@@ -147,6 +147,11 @@ class gcp_instance:
if af == socket.AF_INET:
addr, port = sa
if addr == "169.254.169.254":
# Make sure it is not on GKE
try:
gcp_instance().__instance_metadata("machine-type")
except urllib.error.HTTPError:
return False
return True
return False

View File

@@ -6,7 +6,7 @@ ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.4/latest/scylla.repo
ARG VERSION=4.4.7
ARG VERSION=4.4.9
ADD scylla_bashrc /scylla_bashrc

View File

@@ -4,3 +4,4 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stopwaitsecs=900

View File

@@ -121,12 +121,13 @@ class ScyllaSetup:
if self._apiAddress is not None:
args += ["--api-address %s" % self._apiAddress]
if self._alternatorPort is not None:
if self._alternatorAddress is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
if self._alternatorPort is not None:
args += ["--alternator-port %s" % self._alternatorPort]
if self._alternatorHttpsPort is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
if self._alternatorWriteIsolation is not None:

View File

@@ -340,4 +340,18 @@ public:
unsupported_operation_exception(const sstring& msg) : std::runtime_error("unsupported operation: " + msg) {}
};
class function_execution_exception : public cassandra_exception {
public:
const sstring ks_name;
const sstring func_name;
const std::vector<sstring> args;
function_execution_exception(sstring func_name_, sstring detail, sstring ks_name_, std::vector<sstring> args_) noexcept
: cassandra_exception{exception_code::FUNCTION_FAILURE,
format("execution of {} failed: {}", func_name_, detail)}
, ks_name(std::move(ks_name_))
, func_name(std::move(func_name_))
, args(std::move(args_))
{ }
};
}

View File

@@ -1445,7 +1445,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as alive {}", addr);
// Do not mark a node with status shutdown as UP.
auto status = get_gossip_status(local_state);
auto status = sstring(get_gossip_status(local_state));
if (status == sstring(versioned_value::SHUTDOWN)) {
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
return;
@@ -1464,6 +1464,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
return;
}
// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
_live_endpoints.push_back(addr);
if (_endpoints_to_talk_with.empty()) {
_endpoints_to_talk_with.push_back({addr});
@@ -1475,8 +1477,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, local_state);
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
@@ -1485,11 +1487,12 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as down {}", addr);
local_state.mark_dead();
endpoint_state state = local_state;
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
_unreachable_endpoints[addr] = now();
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, local_state);
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}

View File

@@ -1590,8 +1590,8 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
private:
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
const unsigned _shard;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
dht::partition_range _pr;
query::partition_slice _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
@@ -1617,7 +1617,7 @@ public:
: impl(std::move(schema), std::move(permit))
, _lifecycle_policy(std::move(lifecycle_policy))
, _shard(shard)
, _pr(&pr)
, _pr(pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
@@ -1702,7 +1702,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
});
auto s = gs.get();
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), _pr, _ps, _pc, _trace_state, _fwd_mr));
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
auto f = rreader->fill_buffer(timeout);
return f.then([rreader = std::move(rreader)] () mutable {
@@ -1751,7 +1751,7 @@ void shard_reader::next_partition() {
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_pr = pr;
if (!_reader && !_read_ahead) {
// No need to fast-forward uncreated readers, they will be passed the new
@@ -1760,12 +1760,12 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
}
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
return f.then([this, &pr, timeout] {
return f.then([this, timeout] {
_end_of_stream = false;
clear_buffer();
return smp::submit_to(_shard, [this, &pr, timeout] {
return _reader->fast_forward_to(pr, timeout);
return smp::submit_to(_shard, [this, timeout] {
return _reader->fast_forward_to(_pr, timeout);
});
});
}

View File

@@ -57,6 +57,8 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
});
}).then([&wr] {
wr.consume_end_of_stream();
}).then_wrapped([&wr] (future<> f) {
if (f.failed()) {
auto ex = f.get_exception();
@@ -70,7 +72,6 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
return make_exception_future<>(std::move(ex));
});
} else {
wr.consume_end_of_stream();
return wr.close();
}
});

View File

@@ -267,9 +267,14 @@ public:
return _current_tombstone;
}
const std::deque<range_tombstone>& range_tombstones_for_row(const clustering_key_prefix& ck) {
std::vector<range_tombstone> range_tombstones_for_row(const clustering_key_prefix& ck) {
drop_unneeded_tombstones(ck);
return _range_tombstones;
std::vector<range_tombstone> result(_range_tombstones.begin(), _range_tombstones.end());
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
return _cmp(rt1.start_bound(), rt2.start_bound());
};
std::sort(result.begin(), result.end(), cmp);
return result;
}
std::deque<range_tombstone> range_tombstones() && {

Submodule seastar updated: 4a58d76fea...1fb2187322

View File

@@ -129,7 +129,7 @@ void sstable_writer_k_l::maybe_flush_pi_block(file_writer& out,
// block includes them), but we set block_next_start_offset after - so
// even if we wrote a lot of open tombstones, we still get a full
// block size of new data.
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
auto rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
clustering_key_prefix::from_range(clustering_key.values()));
for (const auto& rt : rts) {
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);

View File

@@ -85,3 +85,20 @@ def test_signature_too_futuristic(dynamodb, test_table):
response = requests.post(url, headers=headers, verify=False)
assert not response.ok
assert "InvalidSignatureException" in response.text and "Signature not yet current" in response.text
# A test that commas can be uses instead of whitespace to separate components
# of the Authorization headers - reproducing issue #9568.
def test_authorization_no_whitespace(dynamodb, test_table):
# Unlike the above tests which checked error cases so didn't need to
# calculate a real signature, in this test we really a correct signature,
# so we use a function we already have in test_manual_requests.py.
from test_manual_requests import get_signed_request
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
# Boto3 separates the components of the Authorization header by spaces.
# Let's remove all of them except the first one (which separates the
# signature algorithm name from the rest) and check the result still works:
a = req.headers['Authorization'].split()
req.headers['Authorization'] = a[0] + ' ' + ''.join(a[1:])
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.ok

View File

@@ -154,3 +154,25 @@ def test_incorrect_numbers(dynamodb, test_table):
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert "ValidationException" in response.text and "numeric" in response.text
# Although the DynamoDB API responses are JSON, additional conventions apply
# to these responses - such as how error codes are encoded in JSON. For this
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
# of the standard 'application/json'. This test verifies that we return the
# correct content type header.
# While most DynamoDB libraries we tried do not care about an unexpected
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
# defined x-amz-json-1.1 - see
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
# which differs (only) in how it encodes error replies.
# So in the future it may become even more important that Scylla return the
# correct content type.
def test_content_type(dynamodb, test_table):
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
# *request*. In the future this may or may not effect the content type
# in the response (today, DynamoDB doesn't allow any other content type
# in the request anyway).
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'

View File

@@ -0,0 +1,113 @@
# Copyright 2021-present ScyllaDB
#
# This file is part of Scylla.
#
# Scylla is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scylla is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
# Tests for Scylla's metrics (see docs/design-notes/metrics.md) for Alternator
# queries. Reproduces issue #9406, where although metrics was implemented for
# Alternator requests, they were missing for some operations (BatchGetItem).
# In the tests here we attempt to ensure that the metrics continue to work
# for the relevant operations as the code evolves.
#
# Note that all tests in this file test Scylla-specific features, and are
# "skipped" when not running against Scylla, or when unable to retrieve
# metrics through out-of-band HTTP requests to Scylla's Prometheus port (9180).
#
# IMPORTANT: we do not want these tests to assume that are not running in
# parallel with any other tests or workload - because such an assumption
# would limit our test deployment options in the future. NOT making this
# assumption means that these tests can't check that a certain operation
# increases a certain counter by exactly 1 - because other concurrent
# operations might increase it further! So our test can only check that the
# counter increases.
##############################################################################
import pytest
import requests
import re
from util import random_string
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
# are not available on AWS (of course), but may also not be available for
# Scylla if for some reason we have only access to the Alternator protocol
# port but no access to the metrics port (9180).
# If metrics are *not* available, tests using this fixture will be skipped.
# Tests using this fixture may call get_metrics(metrics).
@pytest.fixture(scope="module")
def metrics(dynamodb):
if dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com'):
pytest.skip('Scylla-only feature not supported by AWS')
url = dynamodb.meta.client._endpoint.host
# The Prometheus API is on port 9180, and always http, not https.
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
url = re.sub(r'^https:', 'http:', url)
url = url + '/metrics'
resp = requests.get(url)
if resp.status_code != 200:
pytest.skip('Metrics port 9180 is not available')
yield url
# Utility function for fetching all metrics from Scylla, using an HTTP request
# to port 9180. The response format is defined by the Prometheus protocol.
# Only use get_metrics() in a test using the metrics_available fixture.
def get_metrics(metrics):
response = requests.get(metrics)
assert response.status_code == 200
return response.text
# Utility function for fetching a metric with a given name and optionally a
# given sub-metric label (which should be a name-value map). If multiple
# matches are found, they are summed - this is useful for summing up the
# counts from multiple shards.
def get_metric(metrics, name, requested_labels=None):
total = 0.0
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
for match in re.findall(lines, get_metrics(metrics)):
a = match.split()
metric = a[0]
val = float(a[1])
# Check if match also matches the requested labels
if requested_labels:
# we know metric begins with name{ and ends with } - the labels
# are what we have between those
got_labels = metric[len(name)+1:-1].split(',')
# Check that every one of the requested labels is in got_labels:
for k, v in requested_labels.items():
if not f'{k}="{v}"' in got_labels:
# No match for requested label, skip this metric (python
# doesn't have "continue 2" so let's just set val to 0...
val = 0
break
total += float(val)
return total
def test_batch_write_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
assert n2 > n1
# Reproduces issue #9406:
def test_batch_get_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
assert n2 > n1
# TODO: check the rest of the operations

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -48,3 +50,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -28,6 +28,8 @@
#include "sstables/sstables.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/mutation_assertions.hh"
#include "partition_slice_builder.hh"
using namespace sstables;
using namespace std::chrono_literals;
@@ -62,3 +64,69 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
}
});
}
// Regression test for scylladb/scylla-enterprise#2016
SEASTAR_THREAD_TEST_CASE(test_produces_range_tombstone) {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type, column_kind::regular_column)
.build();
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(0)));
m.partition().apply_row_tombstone(*s, range_tombstone{
clustering_key::from_exploded(*s, {int32_type->decompose(6)}), bound_kind::excl_start,
clustering_key::from_exploded(*s, {int32_type->decompose(10)}), bound_kind::incl_end,
tombstone(0, gc_clock::time_point())
});
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(6)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.marker() = row_marker(4);
}
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(8)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.apply(tombstone(2, gc_clock::time_point()));
row.marker() = row_marker(5);
}
testlog.info("m: {}", m);
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make(
{clustering_key::from_exploded(*s, {int32_type->decompose(8)}), false},
{clustering_key::from_exploded(*s, {int32_type->decompose(10)}), true}
))
.build();
auto pr = dht::partition_range::make_singular(m.decorated_key());
std::vector<tmpdir> dirs;
dirs.emplace_back();
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
auto version = sstable_version_types::la;
auto index_block_size = 1;
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = index_block_size;
auto source = make_sstable_mutation_source(env, s, dirs.back().path().string(), {m}, cfg, version, gc_clock::now());
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
while (auto mf = rd(db::no_timeout).get0()) {
testlog.info("produced {}", mutation_fragment::printer(*s, *mf));
}
}
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(bool(sliced_m));
assert_that(*sliced_m).is_equal_to(m, slice.row_ranges(*m.schema(), m.key()));
}
}).get();
}

View File

@@ -26,7 +26,7 @@
from util import unique_name, new_test_table
from cassandra.protocol import FunctionFailure
from cassandra.protocol import FunctionFailure, InvalidRequest
import pytest
import random
@@ -34,58 +34,62 @@ import random
@pytest.fixture(scope="session")
def table1(cql, test_keyspace):
table = test_keyspace + "." + unique_name()
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii)")
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii, b boolean)")
yield table
cql.execute("DROP TABLE " + table)
# Test that failed fromJson() parsing an invalid JSON results in the expected
# error - FunctionFailure - and not some weird internal error.
# Reproduces issue #7911.
@pytest.mark.xfail(reason="issue #7911")
def test_failed_json_parsing_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('dog'))")
@pytest.mark.xfail(reason="issue #7911")
def test_failed_json_parsing_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, 'dog'])
cql.execute(stmt, [p, 'dog'])
# Similarly, if the JSON parsing did not fail, but yielded a type which is
# incompatible with the type we want it to yield, we should get a clean
# FunctionFailure, not some internal server error.
# We have here examples of returning a string where a number was expected,
# and returning a unicode string where ASCII was expected.
# and returning a unicode string where ASCII was expected, and returning
# a number of the wrong type
# Reproduces issue #7911.
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_wrong_type_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('\"dog\"'))")
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('3'))")
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_wrong_type_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '"dog"'])
cql.execute(stmt, [p, '"dog"'])
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '3'])
@pytest.mark.xfail(reason="issue #7911")
cql.execute(stmt, [p, '3'])
def test_fromjson_bad_ascii_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('\"שלום\"'))")
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_bad_ascii_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '"שלום"'])
cql.execute(stmt, [p, '"שלום"'])
def test_fromjson_nonint_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('1.2'))")
def test_fromjson_nonint_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [p, '1.2'])
# The JSON standard does not define or limit the range or precision of
# numbers. However, if a number is assigned to a Scylla number type, the
@@ -105,7 +109,27 @@ def test_fromjson_int_overflow_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '2147483648'])
cql.execute(stmt, [p, '2147483648'])
# Cassandra allows the strings "true" and "false", not just the JSON constants
# true and false, to be assigned to a boolean column. However, very strangely,
# it only allows this for prepared statements, and *not* for unprepared
# statements - which result in an InvalidRequest!
# Reproduces #7915.
def test_fromjson_boolean_string_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(InvalidRequest):
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"true\"')")
with pytest.raises(InvalidRequest):
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"false\"')")
@pytest.mark.xfail(reason="issue #7915")
def test_fromjson_boolean_string_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, b) VALUES (?, fromJson(?))")
cql.execute(stmt, [p, '"true"'])
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, True)]
cql.execute(stmt, [p, '"false"'])
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, False)]
# Test that null argument is allowed for fromJson(), with unprepared statement
# Reproduces issue #7912.

View File

@@ -118,6 +118,8 @@ public:
return stop_iteration::no;
});
});
}).finally([&ir] () {
return ir->close();
});
}).then([l] {
return std::move(*l);

View File

@@ -97,12 +97,18 @@ future<> controller::do_start_server() {
};
std::vector<listen_cfg> configs;
int native_port_idx = -1, native_shard_aware_port_idx = -1;
if (cfg.native_transport_port() != 0) {
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
if (cfg.native_transport_port.is_set() ||
(!cfg.native_transport_port_ssl.is_set() && !cfg.native_transport_port.is_set())) {
// Non-SSL port is specified || neither SSL nor non-SSL ports are specified
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
native_port_idx = 0;
}
if (cfg.native_shard_aware_transport_port.is_set()) {
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
if (cfg.native_shard_aware_transport_port.is_set() ||
(!cfg.native_shard_aware_transport_port_ssl.is_set() && !cfg.native_shard_aware_transport_port.is_set())) {
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
native_shard_aware_port_idx = native_port_idx + 1;
}
// main should have made sure values are clean and neatish
@@ -127,15 +133,20 @@ future<> controller::do_start_server() {
logger.info("Enabling encrypted CQL connections between client and server");
if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
if (cfg.native_transport_port_ssl.is_set() &&
(!cfg.native_transport_port.is_set() ||
cfg.native_transport_port_ssl() != cfg.native_transport_port())) {
// SSL port is specified && non-SSL port is either left out or set to a different value
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
} else {
configs[0].cred = cred;
} else if (native_port_idx >= 0) {
configs[native_port_idx].cred = cred;
}
if (cfg.native_shard_aware_transport_port_ssl.is_set() && cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port()) {
if (cfg.native_shard_aware_transport_port_ssl.is_set() &&
(!cfg.native_shard_aware_transport_port.is_set() ||
cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port())) {
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
} else if (cfg.native_shard_aware_transport_port.is_set()) {
configs[1].cred = std::move(cred);
} else if (native_shard_aware_port_idx >= 0) {
configs[native_shard_aware_port_idx].cred = std::move(cred);
}
}

View File

@@ -572,7 +572,17 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
} catch (const exceptions::prepared_query_not_found_exception& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
} catch (const exceptions::function_execution_exception& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
} catch (const exceptions::cassandra_exception& ex) {
// Note: the CQL protocol specifies that many types of errors have
// mandatory parameters. These cassandra_exception subclasses MUST
// be handled above. This default "cassandra_exception" case is
// only appropriate for the specific types of errors which do not have
// additional information, such as invalid_request_exception.
// TODO: consider listing those types explicitly, instead of the
// catch-all type cassandra_exception.
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_error(stream, ex.code(), ex.what(), trace_state);
} catch (std::exception& ex) {
@@ -1334,6 +1344,17 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_er
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_string(ks_name);
response->write_string(func_name);
response->write_string_list(args);
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);

View File

@@ -235,6 +235,7 @@ private:
std::unique_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;

View File

@@ -263,6 +263,13 @@ decltype(auto) with_simplified(const View& v, Function&& fn)
}
}
template<FragmentedView View>
void skip_empty_fragments(View& v) {
while (!v.empty() && v.current_fragment().empty()) {
v.remove_current();
}
}
template<FragmentedView V1, FragmentedView V2>
int compare_unsigned(V1 v1, V2 v2) {
while (!v1.empty() && !v2.empty()) {
@@ -272,6 +279,8 @@ int compare_unsigned(V1 v1, V2 v2) {
}
v1.remove_prefix(n);
v2.remove_prefix(n);
skip_empty_fragments(v1);
skip_empty_fragments(v2);
}
return v1.size_bytes() - v2.size_bytes();
}
@@ -286,5 +295,7 @@ void write_fragmented(Dest& dest, Src src) {
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
dest.remove_prefix(n);
src.remove_prefix(n);
skip_empty_fragments(dest);
skip_empty_fragments(src);
}
}