Compare commits

...

47 Commits

Author SHA1 Message Date
Botond Dénes
c37f5938fd mutation_writer: feed_writer(): handle exceptions from consume_end_of_stream()
Currently the exception handling code of feed_writer() assumes
consume_end_of_stream() doesn't throw. This is false and an exception
from said method can currently lead to an unclean destroy of the writer
and reader. Fix by also handling exceptions from
consume_end_of_stream() too.

Closes #10147

(cherry picked from commit 1963d1cc25)
2022-03-03 10:45:40 +01:00
Yaron Kaikov
fa90112787 release: prepare for 4.4.9 2022-02-16 14:24:54 +02:00
Nadav Har'El
f5895e5c04 docker: don't repeat "--alternator-address" option twice
If the Docker startup script is passed both "--alternator-port" and
"--alternator-https-port", a combination which is supposed to be
allowed, it passes to Scylla the "--alternator-address" option twice.
This isn't necessary, and worse - not allowed.

So this patch fixes the scyllasetup.py script to only pass this
parameter once.

Fixes #10016.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220202165814.1700047-1-nyh@scylladb.com>
(cherry picked from commit cb6630040d)
2022-02-03 18:40:12 +02:00
Avi Kivity
ce944911f2 Update seastar submodule (gratuitous exceptions on allocation failure)
* seastar 59eeadc720...1fb2187322 (1):
  > core: memory: Avoid current_backtrace() on alloc failure when logging suppressed

Fixes #9982.
2022-01-30 20:08:43 +02:00
Avi Kivity
b220130e4a Revert "Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA"
This reverts commit de4f5b3b1f. This branch
doesn't support RAID 5, so it breaks at runtime.

Ref #9540.
2022-01-30 11:00:21 +02:00
Avi Kivity
de4f5b3b1f Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA
We found that monitor mode of mdadm does not work on RAID0, and it is
not a bug, expected behavior according to RHEL developer.
Therefore, we should stop enabling mdmonitor when RAID0 is specified.

Fixes #9540

----

This reverts 0d8f932 and introduce correct fix.

Closes #9970

* github.com:scylladb/scylla:
  scylla_raid_setup: use mdmonitor only when RAID level > 0
  Revert "scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8"

(cherry picked from commit df22396a34)
2022-01-27 10:27:45 +02:00
Avi Kivity
84a42570ec Update tools/java submodule (maxPendingPerConnection default)
* tools/java 14e635e5de...e8accfbf45 (2):
  > Fix NullPointerException in SettingsMode
  > cassandra-stress: Remove maxPendingPerConnection default

Ref #7748.
2022-01-12 21:38:48 +02:00
Nadav Har'El
001f57ec0c alternator: allow Authorization header to be without spaces
The "Authorization" HTTP header is used in DynamoDB API to sign
requests. Our parser for this header, in server::verify_signature(),
required the different components of this header to be separated by
a comma followed by a whitespace - but it turns out that in DynamoDB
both spaces and commas are optional - one of them is enough.

At least one DynamoDB client library - the old "boto" (which predated
boto3) - builds this header without spaces.

In this patch we add a test that shows that an Authorization header
with spaces removed works fine in DynamoDB but didn't work in
Alternator, and after this patch modifies the parsing code for this
header, the test begins to pass (and the other tests show that the
previously-working cases didn't break).

Fixes #9568

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211101214114.35693-1-nyh@scylladb.com>
(cherry picked from commit 56eb994d8f)
2021-12-29 15:07:47 +02:00
Nadav Har'El
3279718d52 alternator: return the correct Content-Type header
Although the DynamoDB API responses are JSON, additional conventions apply
to these responses - such as how error codes are encoded in JSON. For this
reason, DynamoDB uses the content type `application/x-amz-json-1.0` instead
of the standard `application/json` in its responses.

Until this patch, Scylla used `application/json` in its responses. This
unexpected content-type didn't bother any of the AWS libraries which we
tested, but it does bother the aiodynamo library (see HENNGE/aiodynamo#27).

Moreover, we should return the x-amz-json-1.0 content type for future
proofing: It turns out that AWS already defined x-amz-json-1.1 - see:
https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
The 1.1 content type differs (only) in how it encodes error replies.
If one day DynamoDB starts to use this new reply format (it doesn't yet)
and if DynamoDB libraries will need to differenciate between the two
reply formats, Alternator better return the right one.

This patch also includes a new test that the Content-Type header is
returned with the expected value. The test passes on DynamoDB, and
after this patch it starts to pass on Alternator as well.

Fixes #9554.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211031094621.1193387-1-nyh@scylladb.com>
(cherry picked from commit 6ae0ea0c48)
2021-12-29 14:14:24 +02:00
Takuya ASADA
c128994f90 scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8
On CentOS8, mdmonitor.service does not works correctly when using
mdadm-4.1-15.el8.x86_64 and later versions.
Until we find a solution, let's pinning the package version to older one
which does not cause the issue (4.1-14.el8.x86_64).

Fixes #9540

Closes #9782

(cherry picked from commit 0d8f932f0b)
2021-12-28 11:38:33 +02:00
Nadav Har'El
9af2e5ead1 Update Seastar module with additional backports
Backported an additional Seastar patch:

  > Merge 'metrics: Fix dtest->ulong conversion error' from Benny Halevy

Fixes #9794.
2021-12-14 13:06:02 +02:00
Avi Kivity
be695a7353 Revert "cql3: Reject updates with NULL key values"
This reverts commit 146f7b5421. It
causes a regression, and needs an additional fix. The bug is not
important enough to merit this complication.

Ref #9311.
2021-12-08 15:17:45 +02:00
Botond Dénes
cc9285697d mutation_reader: shard_reader: ensure referenced objects are kept alive
The shard reader can outlive its parent reader (the multishard reader).
This creates a problem for lifecycle management: readers take the range
and slice parameters by reference and users keep these alive until the
reader is alive. The shard reader outliving the top-level reader means
that any background read-ahead that it has to wait on will potentially
have stale references to the range and the slice. This was seen in the
wild recently when the evictable reader wrapped by the shard reader hit
a use-after-free while wrapping up a background read-ahead.
This problem was solved by fa43d76 but any previous versions are
susceptible to it.

This patch solves this problem by having the shard reader copy and keep
the range and slice parameters in stable storage, before passing them
further down.

Fixes: #9719

Tests: unit(dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211202113910.484591-1-bdenes@scylladb.com>
(cherry picked from commit 417e853b9b)
2021-12-06 15:25:57 +02:00
Nadav Har'El
21d140febc alternator: add missing BatchGetItem metric
Unfortunately, defining metrics in Scylla requires some code
duplication, with the metrics declared in one place but exported in a
different place in the code. When we duplicated this code in Alternator,
we accidentally dropped the first metric - for BatchGetItem. The metric
was accounted in the code, but not exported to Prometheus.

In addition to fixing the missing metric, this patch also adds a test
that confirms that the BatchGetItem metric increases when the
BatchGetItem operation is used. This test failed before this patch, and
passes with it. The test only currently tests this for BatchGetItem
(and BatchWriteItem) but it can be later expanded to cover all the other
operations as well.

Fixes #9406

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210929121611.373074-1-nyh@scylladb.com>
(cherry picked from commit 5cbe9178fd)
2021-12-06 12:45:34 +02:00
Yaron Kaikov
77e05ca482 release: prepare for 4.4.8 2021-12-05 21:52:32 +02:00
Eliran Sinvani
5375b8f1a1 testlib: close index_reader to avoid racing condition
In order to avoid race condition introduced in 9dce1e4 the
index_reader should be closed prior to it's destruction.
This only exposes 4.4 and earlier releases to this specific race.
However, it is always a good idea to first close the index reader
and only then destroy it since it is most likely to be assumed by
all developers that will change the reader index in the future.

Ref #9704 (because on 4.4 and earlier releases are vulnerable).

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>

Fixes #9704

(cherry picked from commit ddd7248b3b)

Closes #9717
2021-12-05 12:02:13 +01:00
Juliusz Stasiewicz
7a82432e38 transport: Fix abort on certain configurations of native_transport_port(_ssl)
The reason was accessing the `configs` table out of index. Also,
native_transport_port-s can no longer be disabled by setting to 0,
as per the table below.

Rules for port/encryption (the same apply to shard_aware counterpart):

np  := native_transport_port.is_set()
nps := native_transport_port_ssl.is_set()
ceo := ceo.at("enabled") == "true"
eq  := native_transport_port_ssl() == native_transport_port()

+-----+-----+-----+-----+
|  np | nps | ceo |  eq |
+-----+-----+-----+-----+
|  0  |  0  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  0  |  0  |  1  |  *  |   =>   listen on native_transport_port, encrypted
|  0  |  1  |  0  |  *  |   =>   nonsense, don't listen
|  0  |  1  |  1  |  *  |   =>   listen on native_transport_port_ssl, encrypted
|  1  |  0  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  1  |  0  |  1  |  *  |   =>   listen on native_transport_port, encrypted
|  1  |  1  |  0  |  *  |   =>   listen on native_transport_port, unencrypted
|  1  |  1  |  1  |  0  |   =>   listen on native_transport_port, unencrypted + native_transport_port_ssl, encrypted
|  1  |  1  |  1  |  1  |   =>   native_transport_port(_ssl), encrypted
+-----+-----+-----+-----+

Fixes #7783
Fixes #7866

Closes #7992

(cherry picked from commit 29e4737a9b)
2021-11-29 17:37:31 +02:00
Dejan Mircevski
146f7b5421 cql3: Reject updates with NULL key values
We were silently ignoring INSERTs with NULL values for primary-key
columns, which Cassandra rejects.  Fix it by rejecting any
modification_statement that would operate on empty partition or
clustering range.

This is the most direct fix, because range and slice are calculated in
one place for all modification statements.  It covers not only NULL
cases, but also impossible restrictions like c>0 AND c<0.
Unfortunately, Cassandra doesn't treat all modification statements
consistently, so this fix cannot fully match its behavior.  We err on
the side of tolerance, accepting some DELETE statements that Cassandra
rejects.  We add a TODO for rejecting such DELETEs later.

Fixes #7852.

Tests: unit (dev), cql-pytest against Cassandra 4.0

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>

Closes #9286

(cherry picked from commit 1fdaeca7d0)
2021-11-29 17:31:54 +02:00
Nadav Har'El
e1c7a906f0 cql: fix error return from execution of fromJson() and other functions
As reproduced in cql-pytest/test_json.py and reported in issue #7911,
failing fromJson() calls should return a FUNCTION_FAILURE error, but
currently produce a generic SERVER_ERROR, which can lead the client
to think the server experienced some unknown internal error and the
query can be retried on another server.

This patch adds a new cassandra_exception subclass that we were missing -
function_execution_exception - properly formats this error message (as
described in the CQL protocol documentation), and uses this exception
in two cases:

1. Parse errors in fromJson()'s parameters are converted into a
   function_execution_exception.

2. Any exceptions during the execute() of a native_scalar_function_for
   function is converted into a function_execution_exception.
   In particular, fromJson() uses a native_scalar_function_for.

   Note, however, that functions which already took care to produce
   a specific Cassandra error, this error is passed through and not
   converted to a function_execution_exception. An example is
   the blobAsText() which can return an invalid_request error, so
   it is left as such and not converted. This also happens in Cassandra.

All relevant tests in cql-pytest/test_json.py now pass, and are
no longer marked xfail. This patch also includes a few more improvements
to test_json.py.

Fixes #7911

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210118140114.4149997-1-nyh@scylladb.com>
(cherry picked from commit 702b1b97bf)
2021-11-29 16:59:56 +02:00
Piotr Jastrzebski
c5d6e75db8 sstables: Fix writing KA/LA sstables index
Before this patch when writing an index block, the sstables writer was
storing range tombstones that span the boundary of the block in order
of end bounds. This led to a range tombstone being ignored by a reader
if there was a row tombstone inside it.

This patch sorts the range tombstones based on start bound before
writing them to the index file.

The assumption is that writing an index block is rare so we can afford
sortting the tombstones at that point. Additionally this is a writer of
an old format and writing to it will be dropped in the next major
release so it should be rarely used already.

Kudos to Kamil Braun <kbraun@scylladb.com> for finding the reproducer.

Test: unit(dev)

Fixes #9690

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit scylladb/scylla-enterprise@eb093afd6f)
(cherry picked from commit ab425a11a8)
2021-11-28 11:09:39 +02:00
Tomasz Grabiec
da630e80ea cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:30 +02:00
Takuya ASADA
8ea1cbe78d docker: add stopwaitsecs
We need stopwaitsecs just like we do TimeoutStpSec=900 on
scylla-server.service, to avoid timeout on scylla-server shutdown.

Fixes #9485

Closes #9545

(cherry picked from commit c9499230c3)
2021-11-15 13:36:48 +02:00
Asias He
03b04d40f2 gossip: Fix use-after-free in real_mark_alive and mark_dead
In commit 11a8912093 (gossiper:
get_gossip_status: return string_view and make noexcept)
get_gossip_status returns a pointer to an endpoint_state in
endpoint_state_map.

After commit 425e3b1182 (gossip: Introduce
direct failure detector), gossiper::mark_dead and gossiper::real_mark_alive
can yield in the middle of the function. It is possible that
endpoint_state can be removed, causing use-after-free to access it.

To fix, make a copy before we yield.

Fixes #8859

Closes #8862

(cherry picked from commit 7a32cab524)
2021-11-15 13:23:11 +02:00
Takuya ASADA
175d004513 scylla_util.py: On is_gce(), return False when it's on GKE
GKE metadata server does not provide same metadata as GCE, we should not
return True on is_gce().
So try to fetch machine-type from metadata server, return False if it
404 not found.

Fixes #9471

Signed-off-by: Takuya ASADA <syuu@scylladb.com>

Closes #9582

(cherry picked from commit 9b4cf8c532)
2021-11-15 13:18:13 +02:00
Asias He
091b794742 repair: Return HTTP 400 when repiar id is not found
There are two APIs for checking the repair status and they behave
differently in case the id is not found.

```
{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_async/system_auth?id=999", "duration": "1ms",
"status": 400, "bytes": 49, "dump": "HTTP/1.1 400 Bad
Request\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 400}"}

{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_status?id=999&timeout=1", "duration": "0ms",
"status": 500, "bytes": 49, "dump": "HTTP/1.1 500 Internal Server
Error\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 500}"}
```

The correct status code is 400 as this is a parameter error and should
not be retried.

Returning status code 500 makes smarter http clients retry the request
in hopes of server recovering.

After this patch:

curl -X PGET
'http://127.0.0.1:10000/storage_service/repair_async/system_auth?id=9999'
{"message": "unknown repair id 9999", "code": 400}

curl -X GET
'http://127.0.0.1:10000/storage_service/repair_status?id=9999'
{"message": "unknown repair id 9999", "code": 400}

Fixes #9576

Closes #9578

(cherry picked from commit f5f5714aa6)
2021-11-15 13:16:08 +02:00
Calle Wilund
8be87bb0b1 cdc: fix broken function signature in maybe_back_insert_iterator
Fixes #9103

compare overload was declared as "bool" even though it is a tri-cmp.
causes us to never use the speed-up shortcut (lessen search set),
in turn meaning more overhead for collections.

Closes #9104

(cherry picked from commit 59555fa363)
2021-11-15 13:13:51 +02:00
Takuya ASADA
a84142705a scylla_io_setup: handle nr_disks on GCP correctly
nr_disks is int, should not be string.

Fixes #9429

Closes #9430

(cherry picked from commit 3b798afc1e)
2021-11-15 13:06:40 +02:00
Michał Chojnowski
fc32534aee utils: fragment_range: fix FragmentedView utils for views with empty fragments
The copying and comparing utilities for FragmentedView are not prepared to
deal with empty fragments in non-empty views, and will fall into an infinite
loop in such case.
But data coming in result_row_view can contain such fragments, so we need to
fix that.

Fixes #8398.

Closes #8397

(cherry picked from commit f23a47e365)
2021-11-15 12:57:21 +02:00
Hagit Segev
4e526ad88a release: prepare for 4.4.7 2021-11-14 19:54:05 +02:00
Avi Kivity
176f253aa3 build: clobber user/group info from node_exporter tarball
node_exporter is packaged with some random uid/gid in the tarball.
When extracting it as an ordinary user this isn't a problem, since
the uid/gid are reset to the current user, but that doesn't happen
under dbuild since `tar` thinks the current user is root. This causes
a problem if one wants to delete the build directory later, since it
becomes owned by some random user (see /etc/subuid)

Reset the uid/gid infomation so this doesn't happen.

Closes #9579

Fixes #9610.

(cherry picked from commit e1817b536f)
2021-11-10 14:19:28 +02:00
Nadav Har'El
c49cd5d9b6 alternator: fix bug in ReturnValues=ALL_NEW
This patch fixes a bug in UpdateItem's ReturnValues=ALL_NEW, which in
some cases returned the OLD (pre-modification) value of some of the
attributes, instead of its NEW value.

The bug was caused by a confusion in our JSON utility function,
rjson::set(), which sounds like it can set any member of a map, but in
fact may only be used to add a *new* member - if a member with the same
name (key) already existed, the result is undefined (two values for the
same key). In ReturnValues=ALL_NEW we did exactly this: we started with
a copy of the original item, and then used set() to override some of the
members. This is not allowed.

So in this patch, we introduce a new function, rjson::replace(), which
does what we previously thought that rjson::set() does - i.e., replace a
member if it exists, or if not, add it. We call this function in
the ReturnValues=ALL_NEW code.

This patch also adds a test case that reproduces the incorrect ALL_NEW
results - and gets fixed by this patch.

In an upcoming patch, we should rename the confusingly-named set()
functions and audit all their uses. But we don't do this in this patch
yet. We just add some comments to clarify what set() does - but don't
change it, and just add one new function for replace().

Fixes #9542

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211104134937.40797-1-nyh@scylladb.com>
(cherry picked from commit b95e431228)
2021-11-08 14:10:36 +02:00
Dejan Mircevski
5d4abb521b types: Unreverse tuple subtype for serialization
When a tuple value is serialized, we go through every element type and
use it to serialize element values.  But an element type can be
reversed, which is artificially different from the type of the value
being read.  This results in a server error due to the type mismatch.
Fix it by unreversing the element type prior to comparing it to the
value type.

Fixes #7902

Tests: unit (dev)

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>

Closes #8316

(cherry picked from commit 318f773d81)
2021-11-07 19:25:43 +02:00
Asias He
cfc2562dec storage_service: Abort restore_replica_count when node is removed from the cluster
Consider the following procedure:

- n1, n2, n3
- n3 is down
- n1 runs nodetool removenode uuid_of_n3 to removenode from n3 the
  cluster
- n1 is down in the middle of removenode operation

Node n1 will set n3 to removing gossip status during removenode
operation. Whenever existing nodes learn a node is in removing gossip
status, they will call restore_replica_count to stream data from other
nodes for the ranges n3 loses if n3 was removed from the cluster. If
the streaming fails, the streaming will sleep and retry. The current
max number of retry attempts is 5. The sleep interval starts at 60
seconds and increases 1.5 times per sleep.

This can leave the cluster in a bad state. For example, nodes can go
out of disk space if the streaming continues.  We need a way to abort
such streaming attempts.

To abort the removenode operation and forcely remove the node, users
can run `nodetool removenode force` on any existing nodes to move the
node from removing gossip status to removed gossip status. However,
the restore_replica_count will not be aborted.

In this patch, a status checker is added in restore_replica_count, so
that once a node is in removed gossip status, restore_replica_count
will be aborted.

This patch is for older releases without the new NODE_OPS_CMD
infrastructure where such abort will happen automatically in case of
error.

Fixes #8651

Closes #8655

(cherry picked from commit 0858619cba)
2021-11-02 17:26:35 +02:00
Benny Halevy
4a1171e2fa large_data_handle: add sstable name to log messages
Although the sstable name is part of the system.large_* records,
it is not printed in the log.
In particular, this is essential for the "too many rows" warning
that currently does not record a row in any large_* table
so we can't correlate it with a sstable.

Fixes #9524

Test: unit(dev)
DTest: wide_rows_test.py

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20211027074104.1753093-1-bhalevy@scylladb.com>
(cherry picked from commit a21b1fbb2f)
2021-10-29 10:48:35 +03:00
Asias He
542a508c50 repair: Handle everywhere_topology in bootstrap_with_repair
The everywhere_topology returns the number of nodes in the cluster as
RF. This makes only streaming from the node losing the range impossible
since no node is losing the range after bootstrap.

Shortcut to stream from all nodes in local dc in case the keyspace is
everywhere_topology.

Fixes #8503

(cherry picked from commit 3c36517598)
2021-10-28 18:56:23 +03:00
Hagit Segev
dd018d4de4 release: prepare for 4.4.6 2021-10-28 18:00:13 +03:00
Benny Halevy
70098a1991 date_tiered_manifest: get_now: fix use after free of sstable_list
The sstable_list is destroyed right after the temporary
lw_shared_ptr<sstable_list> returned from `cf.get_sstables()`
is dereferenced.

Fixes #9138

Test: unit(dev)
DTest: resharding_test.py:ReshardingTombstones_with_DateTieredCompactionStrategy.disable_tombstone_removal_during_reshard_test (debug)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210804075813.42526-1-bhalevy@scylladb.com>
(cherry picked from commit 3ad0067272)
2021-10-28 11:24:03 +03:00
Jan Ciolek
008f2ff370 cql3: Fix need_filtering on indexed table
There were cases where a query on an indexed table
needed filtering but need_filtering returned false.

This is fixed by using new conditions in cases where
we are using an index.

Fixes #8991.
Fixes #7708.

For now this is an overly conservative implementation
that returns true in some cases where filtering
is not needed.

Signed-off-by: Jan Ciolek <jan.ciolek@scylladb.com>
(cherry picked from commit 54149242b4)
2021-10-28 11:24:03 +03:00
Benny Halevy
f71cdede5e bytes_ostream: max_chunk_size: account for chunk header
Currently, if the data_size is greater than
max_chunk_size - sizeof(chunk), we end up
allocating up to max_chunk_size + sizeof(chunk) bytes,
exceeding buf.max_chunk_size().

This may lead to allocation failures, as seen in
https://github.com/scylladb/scylla/issues/7950,
where we couldn't allocate 131088 (= 128K + 16) bytes.

This change adjusted the expose max_chunk_size()
to be max_alloc_size (128KB) - sizeof(chunk)
so that the allocated chunks would normally be allocated
in 128KB chunks in the write() path.

Added a unit test - test_large_placeholder that
stresses the chunk allocation path from the
write_place_holder(size) entry point to make
sure it handles large chunk allocations correctly.

Refs #7950
Refs #8081

Test: unit(release), bytes_ostream_test(debug)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210303143413.902968-1-bhalevy@scylladb.com>
(cherry picked from commit ff5b42a0fa)
2021-10-28 11:24:03 +03:00
Botond Dénes
0fd17af2ee evictable_reader: reset _range_override after fast-forwarding
`_range_override` is used to store the modified range the reader reads
after it has to be recreated (when recreating a reader it's read range
is reduced to account for partitions it already read). When engaged,
this field overrides the `_pr` field as the definitive range the reader
is supposed to be currently reading. Fast forwarding conceptually
overrides the range the reader is currently reading, however currently
it doesn't reset the `_range_override` field. This resulted in
`_range_override` (containing the modified pre-fast-forward range)
incorrectly overriding the fast-forwarded-to range in `_pr` when
validating the first partition produced by the just recreated reader,
resulting in a false-positive validation failure.

Fixes: #8059

Tests: unit(release)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20210217164744.420100-1-bdenes@scylladb.com>
[avi: add #include]
(cherry picked from commit c3b4c3f451)
2021-10-28 11:12:01 +03:00
Benny Halevy
77cb6596c4 utils: phased_barrier: advance_and_await: make noexcept
As a function returning a future, simplify
its interface by handling any exceptions and
returning an exceptional future instead of
propagating the exception.

In this specific case, throwing from advance_and_await()
will propagate through table::await_pending_* calls
short-circuiting a .finally clause in table::stop().

Also, mark as noexcept methods of class table calling
advance_and_await and table::await_pending_ops that depends on them.

Fixes #8636

A followup patch will convert advance_and_await to a coroutine.
This is done separately to facilitate backporting of this patch.

Test: unit(dev)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210511161407.218402-1-bhalevy@scylladb.com>
(cherry picked from commit c0dafa75d9)
2021-10-13 12:26:12 +03:00
Avi Kivity
c81c7d2d89 Merge 'rjson: Add throwing allocator' from Piotr Sarna
This series adds a wrapper for the default rjson allocator which throws on allocation/reallocation failures. It's done to work around several rapidjson (the underlying JSON parsing library) bugs - in a few cases, malloc/realloc return value is not checked, which results in dereferencing a null pointer (or an arbitrary pointer computed as 0 + `size`, with the `size` parameter being provided by the user). The new allocator will throw an `rjson:error` if it fails to allocate or reallocate memory.
This series comes with unit tests which checks the new allocator behavior and also validates that an internal rapidjson structure which we indirectly rely upon (Stack) is not left in invalid state after throwing. The last part is verified by the fact that its destructor ran without errors.

Fixes #8521
Refs #8515

Tests:
 * unit(release)
 * YCSB: inserting data similar to the one mentioned in #8515 - 1.5MB objects clustered in partitions 30k objects in size - nothing crashed during various YCSB workloads, but nothing also crashed for me locally before this patch, so it's not 100% robust
 relevant YCSB workload config for using 1.5MB objects:
```yaml
fieldcount=150
fieldlength=10000
```

Closes #8529

* github.com:scylladb/scylla:
  test: add a test for rjson allocation
  test: rename alternator_base64_test to alternator_unit_test
  rjson: add a throwing allocator

(cherry picked from commit c36549b22e)
2021-10-12 13:57:15 +03:00
Benny Halevy
b3a762f179 streaming: stream_session: do not escape curly braces in format strings
Those turn into '{}' in the formatted strings and trigger
a logger error in the following sstlog.warn(err.c_str())
call.

Fixes #8436

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210408173048.124417-1-bhalevy@scylladb.com>
(cherry picked from commit 76cd315c42)
2021-10-12 13:49:24 +03:00
Calle Wilund
2bba07bdf4 table: ensure memtable is actually in memtable list before erasing
Fixes #8749

if a table::clear() was issued while we were flushing a memtable,
the memtable is already gone from list. We need to check this before
erase. Otherwise we get random memory corruption via
std::vector::erase

v2:
* Make interface more set-like (tolerate non-existance in erase).

Closes #8904

(cherry picked from commit 373fa3fa07)
2021-10-12 13:47:33 +03:00
Benny Halevy
87bfb57ccf utils: merge_to_gently: prevent stall in std::copy_if
std::copy_if runs without yielding.

See https://github.com/scylladb/scylla/issues/8897#issuecomment-867522480

Note that the standard states that no iterators or references are invalidated
on insert so we can keep inserting before last1 when merging the
remainder of list2 at the tail of list1.

Fixes #8897

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 453e7c8795)
2021-10-12 13:05:58 +03:00
Michael Livshin
6ca8590540 avoid race between compaction and table stop
Also add a debug-only compaction-manager-side assertion that tests
that no new compaction tasks were submitted for a table that is being
removed (debug-only because not constant-time).

Fixes #9448.

Signed-off-by: Michael Livshin <michael.livshin@scylladb.com>
Message-Id: <20211007110416.159110-1-michael.livshin@scylladb.com>
(cherry picked from commit e88891a8af)
2021-10-12 12:51:44 +03:00
Takuya ASADA
da57d6c7cd scylla_cpuscaling_setup: add --force option
To building Ubuntu AMI with CPU scaling configuration, we need force
running mode for scylla_cpuscaling_setup, which run setup without
checking scaling_governor support.

See scylladb/scylla-machine-image#204

Closes #9326

(cherry picked from commit f928dced0c)
2021-10-05 16:20:22 +03:00
58 changed files with 900 additions and 136 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.4.5
VERSION=4.4.9
if test -f version
then

View File

@@ -2509,7 +2509,7 @@ update_item_operation::apply(std::unique_ptr<rjson::value> previous_item, api::t
const attribute_path_map_node<parsed::update_expression::action>* h = nullptr) {
any_updates = true;
if (_returnvalues == returnvalues::ALL_NEW) {
rjson::set_with_string_name(_return_attributes,
rjson::replace_with_string_name(_return_attributes,
to_sstring_view(column_name), rjson::copy(json_value));
} else if (_returnvalues == returnvalues::UPDATED_NEW) {
rjson::value&& v = rjson::copy(json_value);

View File

@@ -93,6 +93,10 @@ public:
[&] (const json::json_return_type& json_return_value) {
slogger.trace("api_handler success case");
if (json_return_value._body_writer) {
// Unfortunately, write_body() forces us to choose
// from a fixed and irrelevant list of "mime-types"
// at this point. But we'll override it with the
// one (application/x-amz-json-1.0) below.
rep->write_body("json", std::move(json_return_value._body_writer));
} else {
rep->_content += json_return_value._res;
@@ -105,14 +109,15 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}), _type("json") { }
}) { }
api_handler(const api_handler&) = default;
future<std::unique_ptr<reply>> handle(const sstring& path,
std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
return _f_handle(std::move(req), std::move(rep)).then(
[this](std::unique_ptr<reply> rep) {
rep->done(_type);
rep->set_mime_type("application/x-amz-json-1.0");
rep->done();
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}
@@ -126,7 +131,6 @@ protected:
}
future_handler_function _f_handle;
sstring _type;
};
class gated_handler : public handler_base {
@@ -192,24 +196,31 @@ future<> server::verify_signature(const request& req) {
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
}
authorization_header.remove_prefix(pos+1);
std::string credential;
std::string user_signature;
std::string signed_headers_str;
std::vector<std::string_view> signed_headers;
for (std::string_view entry : credentials_raw) {
do {
// Either one of a comma or space can mark the end of an entry
pos = authorization_header.find_first_of(" ,");
std::string_view entry = authorization_header.substr(0, pos);
if (pos != std::string_view::npos) {
authorization_header.remove_prefix(pos + 1);
}
if (entry.empty()) {
continue;
}
std::vector<std::string_view> entry_split = split(entry, '=');
if (entry_split.size() != 2) {
if (entry != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
}
continue;
}
std::string_view auth_value = entry_split[1];
// Commas appear as an additional (quite redundant) delimiter
if (auth_value.back() == ',') {
auth_value.remove_suffix(1);
}
if (entry_split[0] == "Credential") {
credential = std::string(auth_value);
} else if (entry_split[0] == "Signature") {
@@ -219,7 +230,8 @@ future<> server::verify_signature(const request& req) {
signed_headers = split(auth_value, ';');
std::sort(signed_headers.begin(), signed_headers.end());
}
}
} while (pos != std::string_view::npos);
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error::validation(format("Incorrect credential information format: {}", credential));

View File

@@ -38,6 +38,7 @@ stats::stats() : api_operations{} {
#define OPERATION_LATENCY(name, CamelCaseName) \
seastar::metrics::make_histogram("op_latency", \
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName)}, [this]{return to_metrics_histogram(api_operations.name);}),
OPERATION(batch_get_item, "BatchGetItem")
OPERATION(batch_write_item, "BatchWriteItem")
OPERATION(create_backup, "CreateBackup")
OPERATION(create_global_table, "CreateGlobalTable")

View File

@@ -225,7 +225,7 @@ void set_repair(http_context& ctx, routes& r, sharded<netw::messaging_service>&
try {
res = fut.get0();
} catch (std::exception& e) {
return make_exception_future<json::json_return_type>(httpd::server_error_exception(e.what()));
return make_exception_future<json::json_return_type>(httpd::bad_param_exception(e.what()));
}
return make_ready_future<json::json_return_type>(json::json_return_type(res));
});

View File

@@ -39,7 +39,7 @@ public:
using size_type = bytes::size_type;
using value_type = bytes::value_type;
using fragment_type = bytes_view;
static constexpr size_type max_chunk_size() { return 128 * 1024; }
static constexpr size_type max_chunk_size() { return max_alloc_size() - sizeof(chunk); }
private:
static_assert(sizeof(value_type) == 1, "value_type is assumed to be one byte long");
struct chunk {
@@ -59,6 +59,7 @@ private:
void operator delete(void* ptr) { free(ptr); }
};
static constexpr size_type default_chunk_size{512};
static constexpr size_type max_alloc_size() { return 128 * 1024; }
private:
std::unique_ptr<chunk> _begin;
chunk* _current;
@@ -132,16 +133,15 @@ private:
return _current->size - _current->offset;
}
// Figure out next chunk size.
// - must be enough for data_size
// - must be enough for data_size + sizeof(chunk)
// - must be at least _initial_chunk_size
// - try to double each time to prevent too many allocations
// - do not exceed max_chunk_size
// - should not exceed max_alloc_size, unless data_size requires so
size_type next_alloc_size(size_t data_size) const {
auto next_size = _current
? _current->size * 2
: _initial_chunk_size;
next_size = std::min(next_size, max_chunk_size());
// FIXME: check for overflow?
next_size = std::min(next_size, max_alloc_size());
return std::max<size_type>(next_size, data_size + sizeof(chunk));
}
// Makes room for a contiguous region of given size.

View File

@@ -709,16 +709,16 @@ private:
}
return false;
}
bool compare(const T&, const value_type& v);
int32_t compare(const T&, const value_type& v);
};
template<>
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v.first);
}
template<>
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v);
}

View File

@@ -99,8 +99,8 @@ listen_address: localhost
# listen_on_broadcast_address: false
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
# To disable the CQL native transport, set this option to 0.
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
# To disable the CQL native transport, remove this option and configure native_transport_port_ssl.
native_transport_port: 9042
# Like native_transport_port, but clients are forwarded to specific shards, based on the

View File

@@ -281,7 +281,7 @@ scylla_tests = set([
'test/boost/cdc_generation_test',
'test/boost/aggregate_fcts_test',
'test/boost/allocation_strategy_test',
'test/boost/alternator_base64_test',
'test/boost/alternator_unit_test',
'test/boost/anchorless_list_test',
'test/boost/auth_passwords_test',
'test/boost/auth_resource_test',
@@ -1033,7 +1033,7 @@ pure_boost_tests = set([
])
tests_not_using_seastar_test_framework = set([
'test/boost/alternator_base64_test',
'test/boost/alternator_unit_test',
'test/boost/small_vector_test',
'test/manual/gossip',
'test/manual/message',
@@ -1107,7 +1107,7 @@ deps['test/boost/linearizing_input_stream_test'] = [
]
deps['test/boost/duration_test'] += ['test/lib/exception_utils.cc']
deps['test/boost/alternator_base64_test'] += ['alternator/base64.cc']
deps['test/boost/alternator_unit_test'] += ['alternator/base64.cc']
deps['test/raft/replication_test'] = ['test/raft/replication_test.cc'] + scylla_raft_dependencies
deps['test/boost/raft_fsm_test'] = ['test/boost/raft_fsm_test.cc', 'test/lib/log.cc'] + scylla_raft_dependencies
@@ -1969,7 +1969,7 @@ with open(buildfile_tmp, 'w') as f:
command = ./dist/debian/debian_files_gen.py
build $builddir/debian/debian: debian_files_gen | always
rule extract_node_exporter
command = tar -C build -xvpf {node_exporter_filename} && rm -rfv build/node_exporter && mv -v build/{node_exporter_dirname} build/node_exporter
command = tar -C build -xvpf {node_exporter_filename} --no-same-owner && rm -rfv build/node_exporter && mv -v build/{node_exporter_dirname} build/node_exporter
build $builddir/node_exporter: extract_node_exporter | always
''').format(**globals()))

View File

@@ -181,13 +181,18 @@ inline
shared_ptr<function>
make_from_json_function(database& db, const sstring& keyspace, data_type t) {
return make_native_scalar_function<true>("fromjson", t, {utf8_type},
[&db, &keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
bytes_opt parsed_json_value;
if (!json_value.IsNull()) {
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
[&db, keyspace, t](cql_serialization_format sf, const std::vector<bytes_opt>& parameters) -> bytes_opt {
try {
rjson::value json_value = rjson::parse(utf8_type->to_string(parameters[0].value()));
bytes_opt parsed_json_value;
if (!json_value.IsNull()) {
parsed_json_value.emplace(from_json_object(*t, json_value, sf));
}
return parsed_json_value;
} catch(rjson::error& e) {
throw exceptions::function_execution_exception("fromJson",
format("Failed parsing fromJson parameter: {}", e.what()), keyspace, {t->name()});
}
return parsed_json_value;
});
}

View File

@@ -78,7 +78,22 @@ public:
return Pure;
}
virtual bytes_opt execute(cql_serialization_format sf, const std::vector<bytes_opt>& parameters) override {
return _func(sf, parameters);
try {
return _func(sf, parameters);
} catch(exceptions::cassandra_exception&) {
// If the function's code took the time to produce an official
// cassandra_exception, pass it through. Otherwise, below we will
// wrap the unknown exception in a function_execution_exception.
throw;
} catch(...) {
std::vector<sstring> args;
args.reserve(arg_types().size());
for (const data_type& a : arg_types()) {
args.push_back(a->name());
}
throw exceptions::function_execution_exception(name().name,
format("Failed execution of function {}: {}", name(), std::current_exception()), name().keyspace, std::move(args));
}
}
};

View File

@@ -551,16 +551,27 @@ bool statement_restrictions::need_filtering() const {
// clustering restrictions. Therefore, a continuous clustering range is guaranteed.
return false;
}
if (!_clustering_columns_restrictions->needs_filtering(*_schema)) { // Guaranteed continuous clustering range.
return false;
}
// Now we know there are some clustering-column restrictions that are out-of-order or not EQ. A naive base-table
// query must be filtered. What about an index-table query? That can only avoid filtering if there is exactly one
// EQ supported by an index.
return !(_clustering_columns_restrictions->size() == 1 && _has_queriable_ck_index);
// TODO: it is also possible to avoid filtering here if a non-empty CK prefix is specified and token_known, plus
// there's exactly one out-of-order-but-index-supported clustering-column restriction.
if (_has_queriable_ck_index && _uses_secondary_indexing) {
// In cases where we use an index, clustering column restrictions might cause the need for filtering.
// TODO: This is overly conservative, there are some cases when this returns true but filtering
// is not needed. Because of that the database will sometimes perform filtering when it's not actually needed.
// Query performance shouldn't be affected much, at most we will filter rows that are all correct.
// Here are some cases to consider:
// On a table with primary key (p, c1, c2, c3) with an index on c3
// WHERE c3 = ? - doesn't require filtering
// WHERE c1 = ? AND c2 = ? AND c3 = ? - requires filtering
// WHERE p = ? AND c1 = ? AND c3 = ? - doesn't require filtering, but we conservatively report it does
// WHERE p = ? AND c1 LIKE ? AND c3 = ? - requires filtering
// WHERE p = ? AND c1 = ? AND c2 LIKE ? AND c3 = ? - requires filtering
// WHERE p = ? AND c1 = ? AND c2 = ? AND c3 = ? - doesn't use an index
// WHERE p = ? AND c1 = ? AND c2 < ? AND c3 = ? - doesn't require filtering, but we report it does
return _clustering_columns_restrictions->size() > 1;
}
// Now we know that the query doesn't use an index.
// The only thing that can cause filtering now are the clustering columns.
return _clustering_columns_restrictions->needs_filtering(*_schema);
}
void statement_restrictions::validate_secondary_index_selections(bool selects_only_static_columns) {

View File

@@ -964,6 +964,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -240,9 +240,13 @@ public:
return _memtables.back();
}
// The caller has to make sure the element exist before calling this.
// # 8904 - this method is akin to std::set::erase(key_type), not
// erase(iterator). Should be tolerant against non-existing.
void erase(const shared_memtable& element) {
_memtables.erase(boost::range::find(_memtables, element));
auto i = boost::range::find(_memtables, element);
if (i != _memtables.end()) {
_memtables.erase(i);
}
}
void clear() {
_memtables.clear();
@@ -893,7 +897,7 @@ public:
return _pending_writes_phaser.start();
}
future<> await_pending_writes() {
future<> await_pending_writes() noexcept {
return _pending_writes_phaser.advance_and_await();
}
@@ -905,7 +909,7 @@ public:
return _pending_reads_phaser.start();
}
future<> await_pending_reads() {
future<> await_pending_reads() noexcept {
return _pending_reads_phaser.advance_and_await();
}
@@ -917,7 +921,7 @@ public:
return _pending_streams_phaser.start();
}
future<> await_pending_streams() {
future<> await_pending_streams() noexcept {
return _pending_streams_phaser.advance_and_await();
}
@@ -925,11 +929,11 @@ public:
return _pending_streams_phaser.operations_in_progress();
}
future<> await_pending_flushes() {
future<> await_pending_flushes() noexcept {
return _pending_flushes_phaser.advance_and_await();
}
future<> await_pending_ops() {
future<> await_pending_ops() noexcept {
return when_all(await_pending_reads(), await_pending_writes(), await_pending_streams(), await_pending_flushes()).discard_result();
}

View File

@@ -124,7 +124,7 @@ static future<> try_record(std::string_view large_table, const sstables::sstable
const auto sstable_name = sst.get_filename();
std::string pk_str = key_to_str(partition_key.to_partition_key(s), s);
auto timestamp = db_clock::now();
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes)", desc, ks_name, cf_name, pk_str, extra_path, size);
large_data_logger.warn("Writing large {} {}/{}: {}{} ({} bytes) to {}", desc, ks_name, cf_name, pk_str, extra_path, size, sstable_name);
return db::qctx->execute_cql(req, ks_name, cf_name, sstable_name, size, pk_str, timestamp, args...)
.discard_result()
.handle_exception([ks_name, cf_name, large_table, sstable_name] (std::exception_ptr ep) {
@@ -140,9 +140,10 @@ future<> cql_table_large_data_handler::record_large_partitions(const sstables::s
void cql_table_large_data_handler::log_too_many_rows(const sstables::sstable& sst, const sstables::key& partition_key,
uint64_t rows_count) const {
const schema& s = *sst.get_schema();
large_data_logger.warn("Writing a partition with too many rows [{}/{}:{}] ({} rows)",
const auto sstable_name = sst.get_filename();
large_data_logger.warn("Writing a partition with too many rows [{}/{}:{}] ({} rows) to {}",
s.ks_name(), s.cf_name(), partition_key.to_partition_key(s).with_schema(s),
rows_count);
rows_count, sstable_name);
}
future<> cql_table_large_data_handler::record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,

View File

@@ -22,6 +22,7 @@
import os
import sys
import argparse
import shlex
import distro
from scylla_util import *
@@ -46,7 +47,12 @@ if __name__ == '__main__':
if os.getuid() > 0:
print('Requires root permission.')
sys.exit(1)
if not os.path.exists('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'):
parser = argparse.ArgumentParser(description='CPU scaling setup script for Scylla.')
parser.add_argument('--force', dest='force', action='store_true',
help='force running setup even CPU scaling unsupported')
args = parser.parse_args()
if not args.force and not os.path.exists('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'):
print('This computer doesn\'t supported CPU scaling configuration.')
sys.exit(0)
if not is_debian_variant():

View File

@@ -254,7 +254,7 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 2650 * mbs
disk_properties["write_iops"] = 360000
disk_properties["write_bandwidth"] = 1400 * mbs
elif nr_disks == "16":
elif nr_disks == 16:
disk_properties["read_iops"] = 1600000
disk_properties["read_bandwidth"] = 4521251328
#below is google, above is our measured
@@ -263,7 +263,7 @@ if __name__ == "__main__":
disk_properties["write_bandwidth"] = 2759452672
#below is google, above is our measured
#disk_properties["write_bandwidth"] = 3120 * mbs
elif nr_disks == "24":
elif nr_disks == 24:
disk_properties["read_iops"] = 2400000
disk_properties["read_bandwidth"] = 5921532416
#below is google, above is our measured

View File

@@ -30,6 +30,8 @@ import distro
from pathlib import Path
from scylla_util import *
from subprocess import run
import distro
from pkg_resources import parse_version
if __name__ == '__main__':
if os.getuid() > 0:
@@ -115,6 +117,25 @@ if __name__ == '__main__':
pkg_install('xfsprogs')
if not shutil.which('mdadm'):
pkg_install('mdadm')
# XXX: Workaround for mdmonitor.service issue on CentOS8
if is_redhat_variant() and distro.version() == '8':
mdadm_rpm = run('rpm -q mdadm', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
match = re.match(r'^mdadm-([0-9]+\.[0-9]+-[a-zA-Z0-9]+)\.', mdadm_rpm)
mdadm_version = match.group(1)
if parse_version('4.1-14') < parse_version(mdadm_version):
repo_data = '''
[BaseOS_8_3_2011]
name=CentOS8.3.2011 - Base
baseurl=http://vault.centos.org/8.3.2011/BaseOS/$basearch/os/
gpgcheck=1
enabled=0
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-centosofficial
'''[1:-1]
with open('/etc/yum.repos.d/CentOS-Vault-8.3.repo', 'w') as f:
f.write(repo_data)
run('dnf downgrade --enablerepo=BaseOS_8_3_2011 -y mdadm', shell=True, check=True)
run('dnf install -y python3-dnf-plugin-versionlock', shell=True, check=True)
run('dnf versionlock add mdadm', shell=True, check=True)
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:

View File

@@ -147,6 +147,11 @@ class gcp_instance:
if af == socket.AF_INET:
addr, port = sa
if addr == "169.254.169.254":
# Make sure it is not on GKE
try:
gcp_instance().__instance_metadata("machine-type")
except urllib.error.HTTPError:
return False
return True
return False

View File

@@ -6,7 +6,7 @@ ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.4/latest/scylla.repo
ARG VERSION=4.4.5
ARG VERSION=4.4.9
ADD scylla_bashrc /scylla_bashrc

View File

@@ -4,3 +4,4 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stopwaitsecs=900

View File

@@ -121,12 +121,13 @@ class ScyllaSetup:
if self._apiAddress is not None:
args += ["--api-address %s" % self._apiAddress]
if self._alternatorPort is not None:
if self._alternatorAddress is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
if self._alternatorPort is not None:
args += ["--alternator-port %s" % self._alternatorPort]
if self._alternatorHttpsPort is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
if self._alternatorWriteIsolation is not None:

View File

@@ -340,4 +340,18 @@ public:
unsupported_operation_exception(const sstring& msg) : std::runtime_error("unsupported operation: " + msg) {}
};
class function_execution_exception : public cassandra_exception {
public:
const sstring ks_name;
const sstring func_name;
const std::vector<sstring> args;
function_execution_exception(sstring func_name_, sstring detail, sstring ks_name_, std::vector<sstring> args_) noexcept
: cassandra_exception{exception_code::FUNCTION_FAILURE,
format("execution of {} failed: {}", func_name_, detail)}
, ks_name(std::move(ks_name_))
, func_name(std::move(func_name_))
, args(std::move(args_))
{ }
};
}

View File

@@ -1445,7 +1445,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as alive {}", addr);
// Do not mark a node with status shutdown as UP.
auto status = get_gossip_status(local_state);
auto status = sstring(get_gossip_status(local_state));
if (status == sstring(versioned_value::SHUTDOWN)) {
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
return;
@@ -1464,6 +1464,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
return;
}
// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
_live_endpoints.push_back(addr);
if (_endpoints_to_talk_with.empty()) {
_endpoints_to_talk_with.push_back({addr});
@@ -1475,8 +1477,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, local_state);
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
@@ -1485,11 +1487,12 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as down {}", addr);
local_state.mark_dead();
endpoint_state state = local_state;
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
_unreachable_endpoints[addr] = now();
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, local_state);
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}

View File

@@ -26,6 +26,7 @@
#include "mutation_reader.hh"
#include <seastar/core/future-util.hh>
#include <seastar/core/coroutine.hh>
#include "flat_mutation_reader.hh"
#include "schema_registry.hh"
#include "mutation_compactor.hh"
@@ -1525,18 +1526,18 @@ future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::t
_end_of_stream = false;
if (_reader) {
return _reader->fast_forward_to(pr, timeout);
co_await _reader->fast_forward_to(pr, timeout);
_range_override.reset();
co_return;
}
if (!_reader_created || !_irh) {
return make_ready_future<>();
co_return;
}
if (auto reader_opt = try_resume()) {
auto f = reader_opt->fast_forward_to(pr, timeout);
return f.then([this, reader = std::move(*reader_opt)] () mutable {
maybe_pause(std::move(reader));
});
co_await reader_opt->fast_forward_to(pr, timeout);
_range_override.reset();
maybe_pause(std::move(*reader_opt));
}
return make_ready_future<>();
}
evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r)
@@ -1589,8 +1590,8 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
private:
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
const unsigned _shard;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
dht::partition_range _pr;
query::partition_slice _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
@@ -1616,7 +1617,7 @@ public:
: impl(std::move(schema), std::move(permit))
, _lifecycle_policy(std::move(lifecycle_policy))
, _shard(shard)
, _pr(&pr)
, _pr(pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
@@ -1701,7 +1702,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
});
auto s = gs.get();
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), _pr, _ps, _pc, _trace_state, _fwd_mr));
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
auto f = rreader->fill_buffer(timeout);
return f.then([rreader = std::move(rreader)] () mutable {
@@ -1750,7 +1751,7 @@ void shard_reader::next_partition() {
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_pr = pr;
if (!_reader && !_read_ahead) {
// No need to fast-forward uncreated readers, they will be passed the new
@@ -1759,12 +1760,12 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
}
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
return f.then([this, &pr, timeout] {
return f.then([this, timeout] {
_end_of_stream = false;
clear_buffer();
return smp::submit_to(_shard, [this, &pr, timeout] {
return _reader->fast_forward_to(pr, timeout);
return smp::submit_to(_shard, [this, timeout] {
return _reader->fast_forward_to(_pr, timeout);
});
});
}

View File

@@ -57,6 +57,8 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
});
}).then([&wr] {
wr.consume_end_of_stream();
}).then_wrapped([&wr] (future<> f) {
if (f.failed()) {
auto ex = f.get_exception();
@@ -70,7 +72,6 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
return make_exception_future<>(std::move(ex));
});
} else {
wr.consume_end_of_stream();
return wr.close();
}
});

View File

@@ -267,9 +267,14 @@ public:
return _current_tombstone;
}
const std::deque<range_tombstone>& range_tombstones_for_row(const clustering_key_prefix& ck) {
std::vector<range_tombstone> range_tombstones_for_row(const clustering_key_prefix& ck) {
drop_unneeded_tombstones(ck);
return _range_tombstones;
std::vector<range_tombstone> result(_range_tombstones.begin(), _range_tombstones.end());
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
return _cmp(rt1.start_bound(), rt2.start_bound());
};
std::sort(result.begin(), result.end(), cmp);
return result;
}
std::deque<range_tombstone> range_tombstones() && {

View File

@@ -1783,6 +1783,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
auto& strat = ks.get_replication_strategy();
dht::token_range_vector desired_ranges = strat.get_pending_address_ranges(tmptr, tokens, myip, utils::can_yield::yes);
bool find_node_in_local_dc_only = strat.get_type() == locator::replication_strategy_type::network_topology;
bool everywhere_topology = strat.get_type() == locator::replication_strategy_type::everywhere_topology;
//Active ranges
auto metadata_clone = tmptr->clone_only_token_map().get0();
@@ -1860,7 +1861,9 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, seastar::sharded<
};
auto old_endpoints_in_local_dc = get_old_endpoints_in_local_dc();
auto rf_in_local_dc = get_rf_in_local_dc();
if (old_endpoints.size() == strat.get_replication_factor()) {
if (everywhere_topology) {
neighbors = old_endpoints_in_local_dc;
} else if (old_endpoints.size() == strat.get_replication_factor()) {
// For example, with RF = 3 and 3 nodes n1, n2, n3
// in the cluster, n4 is bootstrapped, old_replicas
// = {n1, n2, n3}, new_replicas = {n1, n2, n4}, n3

Submodule seastar updated: 4a58d76fea...1fb2187322

View File

@@ -2532,7 +2532,13 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
}
return seastar::async([this, endpoint, notify_endpoint] {
auto tmptr = get_token_metadata_ptr();
auto streamer = make_lw_shared<dht::range_streamer>(_db, tmptr, _abort_source, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
abort_source as;
auto sub = _abort_source.subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});
auto streamer = make_lw_shared<dht::range_streamer>(_db, tmptr, as, get_broadcast_address(), "Restore_replica_count", streaming::stream_reason::removenode);
auto my_address = get_broadcast_address();
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
@@ -2550,6 +2556,42 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
}
streamer->add_rx_ranges(keyspace_name, std::move(ranges_per_endpoint));
}
auto status_checker = seastar::async([this, endpoint, &as] {
slogger.info("restore_replica_count: Started status checker for removing node {}", endpoint);
while (!as.abort_requested()) {
auto status = _gossiper.get_gossip_status(endpoint);
// If the node to be removed is already in removed status, it has
// probably been removed forcely with `nodetool removenode force`.
// Abort the restore_replica_count in such case to avoid streaming
// attempt since the user has removed the node forcely.
if (status == sstring(versioned_value::REMOVED_TOKEN)) {
slogger.info("restore_replica_count: Detected node {} has left the cluster, status={}, abort restore_replica_count for removing node {}",
endpoint, status, endpoint);
if (!as.abort_requested()) {
as.request_abort();
}
return;
}
slogger.debug("restore_replica_count: Sleep and detect removing node {}, status={}", endpoint, status);
sleep_abortable(std::chrono::seconds(10), as).get();
}
});
auto stop_status_checker = defer([endpoint, &status_checker, &as] () mutable {
try {
slogger.info("restore_replica_count: Started to stop status checker for removing node {}", endpoint);
if (!as.abort_requested()) {
as.request_abort();
}
status_checker.get();
} catch (const seastar::sleep_aborted& ignored) {
slogger.debug("restore_replica_count: Got sleep_abort to stop status checker for removing node {}: {}", endpoint, ignored);
} catch (...) {
slogger.warn("restore_replica_count: Found error in status checker for removing node {}: {}",
endpoint, std::current_exception());
}
slogger.info("restore_replica_count: Finished to stop status checker for removing node {}", endpoint);
});
streamer->stream_async().then_wrapped([this, streamer, notify_endpoint] (auto&& f) {
try {
f.get();

View File

@@ -808,6 +808,9 @@ future<> compaction_manager::remove(column_family* cf) {
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
return this->task_stop(task);
}).then([this, cf, tasks_to_stop] {
#ifdef DEBUG
assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end());
#endif
_compaction_locks.erase(cf);
});
}

View File

@@ -503,7 +503,8 @@ date_tiered_manifest::get_compaction_candidates(column_family& cf, std::vector<s
int64_t date_tiered_manifest::get_now(column_family& cf) {
int64_t max_timestamp = 0;
for (auto& sst : *cf.get_sstables()) {
auto shared_set = cf.get_sstables();
for (auto& sst : *shared_set) {
int64_t candidate = sst->get_stats_metadata().max_timestamp;
max_timestamp = candidate > max_timestamp ? candidate : max_timestamp;
}

View File

@@ -129,7 +129,7 @@ void sstable_writer_k_l::maybe_flush_pi_block(file_writer& out,
// block includes them), but we set block_next_start_offset after - so
// even if we wrote a lot of open tombstones, we still get a full
// block size of new data.
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
auto rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
clustering_key_prefix::from_range(clustering_key.values()));
for (const auto& rt : rts) {
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);

View File

@@ -380,7 +380,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
try {
db.find_column_family(ks, cf);
} catch (no_such_column_family&) {
auto err = format("[Stream #{{}}] prepare requested ks={{}} cf={{}} does not exist", plan_id, ks, cf);
auto err = format("[Stream #{}] prepare requested ks={} cf={} does not exist", plan_id, ks, cf);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}
@@ -394,7 +394,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
try {
db.find_column_family(cf_id);
} catch (no_such_column_family&) {
auto err = format("[Stream #{{}}] prepare cf_id={} does not exist", plan_id, cf_id);
auto err = format("[Stream #{}] prepare cf_id={} does not exist", plan_id, cf_id);
sslog.warn(err.c_str());
throw std::runtime_error(err);
}

View File

@@ -864,8 +864,8 @@ void table::try_trigger_compaction() noexcept {
}
void table::do_trigger_compaction() {
// But only submit if we're not locked out
if (!_compaction_disabled) {
// But not if we're locked out or stopping
if (!_compaction_disabled && !_async_gate.is_closed()) {
_compaction_manager.submit(this);
}
}

View File

@@ -85,3 +85,20 @@ def test_signature_too_futuristic(dynamodb, test_table):
response = requests.post(url, headers=headers, verify=False)
assert not response.ok
assert "InvalidSignatureException" in response.text and "Signature not yet current" in response.text
# A test that commas can be uses instead of whitespace to separate components
# of the Authorization headers - reproducing issue #9568.
def test_authorization_no_whitespace(dynamodb, test_table):
# Unlike the above tests which checked error cases so didn't need to
# calculate a real signature, in this test we really a correct signature,
# so we use a function we already have in test_manual_requests.py.
from test_manual_requests import get_signed_request
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
# Boto3 separates the components of the Authorization header by spaces.
# Let's remove all of them except the first one (which separates the
# signature algorithm name from the rest) and check the result still works:
a = req.headers['Authorization'].split()
req.headers['Authorization'] = a[0] + ' ' + ''.join(a[1:])
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.ok

View File

@@ -154,3 +154,25 @@ def test_incorrect_numbers(dynamodb, test_table):
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert "ValidationException" in response.text and "numeric" in response.text
# Although the DynamoDB API responses are JSON, additional conventions apply
# to these responses - such as how error codes are encoded in JSON. For this
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
# of the standard 'application/json'. This test verifies that we return the
# correct content type header.
# While most DynamoDB libraries we tried do not care about an unexpected
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
# defined x-amz-json-1.1 - see
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
# which differs (only) in how it encodes error replies.
# So in the future it may become even more important that Scylla return the
# correct content type.
def test_content_type(dynamodb, test_table):
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
# *request*. In the future this may or may not effect the content type
# in the response (today, DynamoDB doesn't allow any other content type
# in the request anyway).
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'

View File

@@ -0,0 +1,113 @@
# Copyright 2021-present ScyllaDB
#
# This file is part of Scylla.
#
# Scylla is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scylla is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
# Tests for Scylla's metrics (see docs/design-notes/metrics.md) for Alternator
# queries. Reproduces issue #9406, where although metrics was implemented for
# Alternator requests, they were missing for some operations (BatchGetItem).
# In the tests here we attempt to ensure that the metrics continue to work
# for the relevant operations as the code evolves.
#
# Note that all tests in this file test Scylla-specific features, and are
# "skipped" when not running against Scylla, or when unable to retrieve
# metrics through out-of-band HTTP requests to Scylla's Prometheus port (9180).
#
# IMPORTANT: we do not want these tests to assume that are not running in
# parallel with any other tests or workload - because such an assumption
# would limit our test deployment options in the future. NOT making this
# assumption means that these tests can't check that a certain operation
# increases a certain counter by exactly 1 - because other concurrent
# operations might increase it further! So our test can only check that the
# counter increases.
##############################################################################
import pytest
import requests
import re
from util import random_string
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
# are not available on AWS (of course), but may also not be available for
# Scylla if for some reason we have only access to the Alternator protocol
# port but no access to the metrics port (9180).
# If metrics are *not* available, tests using this fixture will be skipped.
# Tests using this fixture may call get_metrics(metrics).
@pytest.fixture(scope="module")
def metrics(dynamodb):
if dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com'):
pytest.skip('Scylla-only feature not supported by AWS')
url = dynamodb.meta.client._endpoint.host
# The Prometheus API is on port 9180, and always http, not https.
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
url = re.sub(r'^https:', 'http:', url)
url = url + '/metrics'
resp = requests.get(url)
if resp.status_code != 200:
pytest.skip('Metrics port 9180 is not available')
yield url
# Utility function for fetching all metrics from Scylla, using an HTTP request
# to port 9180. The response format is defined by the Prometheus protocol.
# Only use get_metrics() in a test using the metrics_available fixture.
def get_metrics(metrics):
response = requests.get(metrics)
assert response.status_code == 200
return response.text
# Utility function for fetching a metric with a given name and optionally a
# given sub-metric label (which should be a name-value map). If multiple
# matches are found, they are summed - this is useful for summing up the
# counts from multiple shards.
def get_metric(metrics, name, requested_labels=None):
total = 0.0
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
for match in re.findall(lines, get_metrics(metrics)):
a = match.split()
metric = a[0]
val = float(a[1])
# Check if match also matches the requested labels
if requested_labels:
# we know metric begins with name{ and ends with } - the labels
# are what we have between those
got_labels = metric[len(name)+1:-1].split(',')
# Check that every one of the requested labels is in got_labels:
for k, v in requested_labels.items():
if not f'{k}="{v}"' in got_labels:
# No match for requested label, skip this metric (python
# doesn't have "continue 2" so let's just set val to 0...
val = 0
break
total += float(val)
return total
def test_batch_write_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
assert n2 > n1
# Reproduces issue #9406:
def test_batch_get_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
assert n2 > n1
# TODO: check the rest of the operations

View File

@@ -431,3 +431,14 @@ def test_update_item_returnvalues_nested(test_table_s):
ret=test_table_s.update_item(Key={'p': p}, ReturnValues='UPDATED_NEW',
UpdateExpression='REMOVE a.c[1]')
assert ret['Attributes'] == {'a': {'c': [70]}}
# A reproducer for issue #9542 - when UpdateExpression's REMOVE operation
# actually deletes an existing attribute, it breaks the ALL_NEW ReturnValues
# for other attributes set in the same command.
def test_update_item_returnvalues_all_new_remove_etc(test_table_s):
p = random_string()
test_table_s.put_item(Item={'p': p, 's': 'dog', 'd': 'foo'})
ret=test_table_s.update_item(Key={'p': p}, ReturnValues='ALL_NEW',
UpdateExpression='REMOVE d SET s = :v',
ExpressionAttributeValues={':v': 'cat'})
assert ret['Attributes']['s'] == 'cat'

View File

@@ -22,6 +22,7 @@
#define BOOST_TEST_MODULE alternator
#include <boost/test/included/unit_test.hpp>
#include <seastar/util/defer.hh>
#include "alternator/base64.hh"
static bytes_view to_bytes_view(const std::string& s) {
@@ -78,3 +79,22 @@ BOOST_AUTO_TEST_CASE(test_base64_begins_with) {
BOOST_REQUIRE(!base64_begins_with(encoded_str3, encoded_non_prefix));
}
}
BOOST_AUTO_TEST_CASE(test_allocator_fail_gracefully) {
// Unfortunately the address sanitizer fails if the allocator is not able
// to allocate the requested memory. The test is therefore skipped for debug mode
#ifndef DEBUG
static constexpr size_t too_large_alloc_size = 0xffffffffff;
rjson::allocator allocator;
// Impossible allocation should throw
BOOST_REQUIRE_THROW(allocator.Malloc(too_large_alloc_size), rjson::error);
// So should impossible reallocation
void* memory = allocator.Malloc(1);
auto release = defer([memory] { rjson::allocator::Free(memory); });
BOOST_REQUIRE_THROW(allocator.Realloc(memory, 1, too_large_alloc_size), rjson::error);
// Internal rapidjson stack should also throw
// and also be destroyed gracefully later
rapidjson::internal::Stack stack(&allocator, 0);
BOOST_REQUIRE_THROW(stack.Push<char>(too_large_alloc_size), rjson::error);
#endif
}

View File

@@ -269,6 +269,21 @@ BOOST_AUTO_TEST_CASE(test_writing_placeholders) {
BOOST_REQUIRE(in.size() == 0);
}
BOOST_AUTO_TEST_CASE(test_large_placeholder) {
bytes_ostream::size_type size;
try {
for (size = 1; (int32_t)size > 0; size *= 2) {
bytes_ostream buf;
int8_t* ph;
BOOST_TEST_MESSAGE(fmt::format("try size={}", size));
ph = buf.write_place_holder(size);
std::fill(ph, ph + size, 0);
}
} catch (const std::bad_alloc&) {
}
BOOST_REQUIRE(size >= bytes_ostream::max_chunk_size());
}
BOOST_AUTO_TEST_CASE(test_append_big_and_small_chunks) {
bytes_ostream small;
append_sequence(small, 12);

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -48,3 +50,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -3749,6 +3749,112 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
max_buffer_size);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) {
class test_reader : public flat_mutation_reader::impl {
simple_schema _s;
const std::vector<dht::decorated_key> _pkeys;
std::vector<dht::decorated_key>::const_iterator _it;
std::vector<dht::decorated_key>::const_iterator _end;
private:
void on_range_change(const dht::partition_range& pr) {
dht::ring_position_comparator cmp(*_schema);
_it = _pkeys.begin();
while (_it != _pkeys.end() && !pr.contains(*_it, cmp)) {
++_it;
}
_end = _it;
while (_end != _pkeys.end() && pr.contains(*_end, cmp)) {
++_end;
}
}
public:
test_reader(simple_schema s, reader_permit permit, const dht::partition_range& pr, std::vector<dht::decorated_key> pkeys)
: impl(s.schema(), std::move(permit))
, _s(std::move(s))
, _pkeys(std::move(pkeys)) {
on_range_change(pr);
}
virtual future<> fill_buffer(db::timeout_clock::time_point) override {
if (_it == _end) {
_end_of_stream = true;
return make_ready_future<>();
}
push_mutation_fragment(*_schema, _permit, partition_start(*_it++, {}));
uint32_t ck = 0;
while (!is_buffer_full()) {
auto ckey = _s.make_ckey(ck);
push_mutation_fragment(*_schema, _permit, _s.make_row(_s.make_ckey(ck++), make_random_string(1024)));
++ck;
}
push_mutation_fragment(*_schema, _permit, partition_end());
return make_ready_future<>();
}
virtual void next_partition() override {
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point) override {
on_range_change(pr);
clear_buffer();
_end_of_stream = false;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(6);
boost::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
auto ms = mutation_source([&] (schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
std::vector<dht::decorated_key> pkeys_with_data;
bool empty = false;
for (const auto& pkey : pkeys) {
empty = !empty;
if (empty) {
pkeys_with_data.push_back(pkey);
}
}
return make_flat_mutation_reader<test_reader>(
s,
std::move(permit),
range,
std::move(pkeys_with_data));
});
auto pr0 = dht::partition_range::make({pkeys[0], true}, {pkeys[3], true});
auto [reader, handle] = make_manually_paused_evictable_reader(std::move(ms), s.schema(), permit, pr0, s.schema()->full_slice(),
seastar::default_priority_class(), {}, mutation_reader::forwarding::yes);
auto reader_assert = assert_that(std::move(reader));
reader_assert.produces(pkeys[0]);
reader_assert.produces(pkeys[2]);
handle.pause();
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
reader_assert.produces_end_of_stream();
auto pr1 = dht::partition_range::make({pkeys[4], true}, {pkeys[5], true});
reader_assert.fast_forward_to(pr1);
// Failure will happen in the form of `on_internal_error()`.
reader_assert.produces(pkeys[4]);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
reader_concurrency_semaphore semaphore(1, 0, get_name());
simple_schema s;

View File

@@ -28,6 +28,8 @@
#include "sstables/sstables.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/mutation_assertions.hh"
#include "partition_slice_builder.hh"
using namespace sstables;
using namespace std::chrono_literals;
@@ -62,3 +64,69 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
}
});
}
// Regression test for scylladb/scylla-enterprise#2016
SEASTAR_THREAD_TEST_CASE(test_produces_range_tombstone) {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type, column_kind::regular_column)
.build();
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(0)));
m.partition().apply_row_tombstone(*s, range_tombstone{
clustering_key::from_exploded(*s, {int32_type->decompose(6)}), bound_kind::excl_start,
clustering_key::from_exploded(*s, {int32_type->decompose(10)}), bound_kind::incl_end,
tombstone(0, gc_clock::time_point())
});
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(6)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.marker() = row_marker(4);
}
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(8)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.apply(tombstone(2, gc_clock::time_point()));
row.marker() = row_marker(5);
}
testlog.info("m: {}", m);
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make(
{clustering_key::from_exploded(*s, {int32_type->decompose(8)}), false},
{clustering_key::from_exploded(*s, {int32_type->decompose(10)}), true}
))
.build();
auto pr = dht::partition_range::make_singular(m.decorated_key());
std::vector<tmpdir> dirs;
dirs.emplace_back();
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
auto version = sstable_version_types::la;
auto index_block_size = 1;
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = index_block_size;
auto source = make_sstable_mutation_source(env, s, dirs.back().path().string(), {m}, cfg, version, gc_clock::now());
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
while (auto mf = rd(db::no_timeout).get0()) {
testlog.info("produced {}", mutation_fragment::printer(*s, *mf));
}
}
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(bool(sliced_m));
assert_that(*sliced_m).is_equal_to(m, slice.row_ranges(*m.schema(), m.key()));
}
}).get();
}

View File

@@ -440,7 +440,6 @@ def testNestedClusteringKeyUsage(cql, test_keyspace):
)
# Reproduces issue #7868 and #7902
@pytest.mark.xfail(reason="fails because of issue #7902")
def testNestedClusteringKeyUsageWithReverseOrder(cql, test_keyspace):
with create_table(cql, test_keyspace, "(a int, b frozen<map<set<int>, list<int>>>, c frozen<set<int>>, d int, PRIMARY KEY (a, b, c)) WITH CLUSTERING ORDER BY (b DESC)") as table:
execute(cql, table, "INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, {}, set(), 0)

View File

@@ -175,9 +175,12 @@ def wait_for_index(cql, table, column, everything):
results = []
for v in column_values:
results.extend(list(cql.execute(f'SELECT * FROM {table} WHERE {column}={v}')))
if set(results) == set(everything):
if sorted(results) == sorted(everything):
return
time.sleep(0.1)
pytest.fail('Timeout waiting for index to become up to date.')
@pytest.fixture(scope="session")
@@ -291,3 +294,46 @@ def test_contains_frozen_collection_ck(cql, test_keyspace):
"SELECT * FROM " + table + " WHERE a=0 AND c=0 AND b CONTAINS 0 ALLOW FILTERING")))
assert 1 == len(list(cql.execute(
"SELECT * FROM " + table + " WHERE a=0 AND c=0 AND b CONTAINS KEY 0 ALLOW FILTERING")))
# table5 contains an indexed table with 3 clustering columns.
# used to test correct filtering of rows fetched from an index table.
@pytest.fixture(scope="module")
def table5(cql, test_keyspace):
table = test_keyspace + "." + unique_name()
cql.execute(f"CREATE TABLE {table} (p int, c1 frozen<list<int>>, c2 frozen<list<int>>, c3 int, PRIMARY KEY (p,c1,c2,c3))")
cql.execute(f"CREATE INDEX ON {table} (c3)")
cql.execute(f"INSERT INTO {table} (p, c1, c2, c3) VALUES (0, [1], [2], 0)")
cql.execute(f"INSERT INTO {table} (p, c1, c2, c3) VALUES (0, [2], [2], 0)")
cql.execute(f"INSERT INTO {table} (p, c1, c2, c3) VALUES (0, [1], [3], 0)")
cql.execute(f"INSERT INTO {table} (p, c1, c2, c3) VALUES (0, [1], [2], 1)")
everything = list(cql.execute(f"SELECT * FROM {table}"))
wait_for_index(cql, table, 'c3', everything)
yield (table, everything)
cql.execute(f"DROP TABLE {table}")
# Test that implementation of filtering for indexes works ok.
# Current implementation is a bit conservative - it might sometimes state
# that filtering is needed when it isn't actually required, but at least it's safe.
def test_select_indexed_cluster_three_keys(cql, table5):
def check_good_row(row):
return row.p == 0 and row.c1 == [1] and row.c2 == [2] and row.c3 == 0
check_af_optional(cql, table5, "c3 = 0", lambda r : r.c3 == 0)
check_af_mandatory(cql, table5, "c1 = [1] AND c2 = [2] AND c3 = 0", check_good_row)
check_af_mandatory(cql, table5, "p = 0 AND c1 CONTAINS 1 AND c3 = 0", lambda r : r.p == 0 and r.c1 == [1] and r.c3 == 0)
check_af_mandatory(cql, table5, "p = 0 AND c1 = [1] AND c2 CONTAINS 2 AND c3 = 0", check_good_row)
# Doesn't use an index - shouldn't be affected
check_af_optional(cql, table5, "p = 0 AND c1 = [1] AND c2 = [2] AND c3 = 0", check_good_row)
# Here are the cases where current implementation of need_filtering() fails
# By coincidence they also fail on cassandra, it looks like cassandra is buggy
@pytest.mark.xfail(reason="Too conservative need_filtering() implementation")
def test_select_indexed_cluster_three_keys_conservative(cql, table5, cassandra_bug):
def check_good_row(row):
return row.p == 0 and row.c1 == [1] and row.c3 == 0
# Don't require filtering, but for now we report they do
check_af_optional(cql, table5, "p = 0 AND c1 = [1] AND c3 = 0", check_good_row)
check_af_optional(cql, table5, "p = 0 AND c1 = [1] AND c2 < [3] AND c3 = 0", lambda r : check_good_row(r) and r.c2 < [3])

View File

@@ -26,7 +26,7 @@
from util import unique_name, new_test_table
from cassandra.protocol import FunctionFailure
from cassandra.protocol import FunctionFailure, InvalidRequest
import pytest
import random
@@ -34,58 +34,62 @@ import random
@pytest.fixture(scope="session")
def table1(cql, test_keyspace):
table = test_keyspace + "." + unique_name()
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii)")
cql.execute(f"CREATE TABLE {table} (p int PRIMARY KEY, v int, a ascii, b boolean)")
yield table
cql.execute("DROP TABLE " + table)
# Test that failed fromJson() parsing an invalid JSON results in the expected
# error - FunctionFailure - and not some weird internal error.
# Reproduces issue #7911.
@pytest.mark.xfail(reason="issue #7911")
def test_failed_json_parsing_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('dog'))")
@pytest.mark.xfail(reason="issue #7911")
def test_failed_json_parsing_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, 'dog'])
cql.execute(stmt, [p, 'dog'])
# Similarly, if the JSON parsing did not fail, but yielded a type which is
# incompatible with the type we want it to yield, we should get a clean
# FunctionFailure, not some internal server error.
# We have here examples of returning a string where a number was expected,
# and returning a unicode string where ASCII was expected.
# and returning a unicode string where ASCII was expected, and returning
# a number of the wrong type
# Reproduces issue #7911.
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_wrong_type_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('\"dog\"'))")
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('3'))")
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_wrong_type_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '"dog"'])
cql.execute(stmt, [p, '"dog"'])
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '3'])
@pytest.mark.xfail(reason="issue #7911")
cql.execute(stmt, [p, '3'])
def test_fromjson_bad_ascii_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, a) VALUES ({p}, fromJson('\"שלום\"'))")
@pytest.mark.xfail(reason="issue #7911")
def test_fromjson_bad_ascii_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, a) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '"שלום"'])
cql.execute(stmt, [p, '"שלום"'])
def test_fromjson_nonint_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(FunctionFailure):
cql.execute(f"INSERT INTO {table1} (p, v) VALUES ({p}, fromJson('1.2'))")
def test_fromjson_nonint_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [p, '1.2'])
# The JSON standard does not define or limit the range or precision of
# numbers. However, if a number is assigned to a Scylla number type, the
@@ -105,7 +109,27 @@ def test_fromjson_int_overflow_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, v) VALUES (?, fromJson(?))")
with pytest.raises(FunctionFailure):
cql.execute(stmt, [0, '2147483648'])
cql.execute(stmt, [p, '2147483648'])
# Cassandra allows the strings "true" and "false", not just the JSON constants
# true and false, to be assigned to a boolean column. However, very strangely,
# it only allows this for prepared statements, and *not* for unprepared
# statements - which result in an InvalidRequest!
# Reproduces #7915.
def test_fromjson_boolean_string_unprepared(cql, table1):
p = random.randint(1,1000000000)
with pytest.raises(InvalidRequest):
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"true\"')")
with pytest.raises(InvalidRequest):
cql.execute(f"INSERT INTO {table1} (p, b) VALUES ({p}, '\"false\"')")
@pytest.mark.xfail(reason="issue #7915")
def test_fromjson_boolean_string_prepared(cql, table1):
p = random.randint(1,1000000000)
stmt = cql.prepare(f"INSERT INTO {table1} (p, b) VALUES (?, fromJson(?))")
cql.execute(stmt, [p, '"true"'])
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, True)]
cql.execute(stmt, [p, '"false"'])
assert list(cql.execute(f"SELECT p, b from {table1} where p = {p}")) == [(p, False)]
# Test that null argument is allowed for fromJson(), with unprepared statement
# Reproduces issue #7912.

View File

@@ -118,6 +118,8 @@ public:
return stop_iteration::no;
});
});
}).finally([&ir] () {
return ir->close();
});
}).then([l] {
return std::move(*l);

View File

@@ -97,12 +97,18 @@ future<> controller::do_start_server() {
};
std::vector<listen_cfg> configs;
int native_port_idx = -1, native_shard_aware_port_idx = -1;
if (cfg.native_transport_port() != 0) {
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
if (cfg.native_transport_port.is_set() ||
(!cfg.native_transport_port_ssl.is_set() && !cfg.native_transport_port.is_set())) {
// Non-SSL port is specified || neither SSL nor non-SSL ports are specified
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_transport_port()}, false });
native_port_idx = 0;
}
if (cfg.native_shard_aware_transport_port.is_set()) {
configs.push_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
if (cfg.native_shard_aware_transport_port.is_set() ||
(!cfg.native_shard_aware_transport_port_ssl.is_set() && !cfg.native_shard_aware_transport_port.is_set())) {
configs.emplace_back(listen_cfg{ socket_address{ip, cfg.native_shard_aware_transport_port()}, true });
native_shard_aware_port_idx = native_port_idx + 1;
}
// main should have made sure values are clean and neatish
@@ -127,15 +133,20 @@ future<> controller::do_start_server() {
logger.info("Enabling encrypted CQL connections between client and server");
if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
if (cfg.native_transport_port_ssl.is_set() &&
(!cfg.native_transport_port.is_set() ||
cfg.native_transport_port_ssl() != cfg.native_transport_port())) {
// SSL port is specified && non-SSL port is either left out or set to a different value
configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, false, cred});
} else {
configs[0].cred = cred;
} else if (native_port_idx >= 0) {
configs[native_port_idx].cred = cred;
}
if (cfg.native_shard_aware_transport_port_ssl.is_set() && cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port()) {
if (cfg.native_shard_aware_transport_port_ssl.is_set() &&
(!cfg.native_shard_aware_transport_port.is_set() ||
cfg.native_shard_aware_transport_port_ssl() != cfg.native_shard_aware_transport_port())) {
configs.emplace_back(listen_cfg{{ip, cfg.native_shard_aware_transport_port_ssl()}, true, std::move(cred)});
} else if (cfg.native_shard_aware_transport_port.is_set()) {
configs[1].cred = std::move(cred);
} else if (native_shard_aware_port_idx >= 0) {
configs[native_shard_aware_port_idx].cred = std::move(cred);
}
}

View File

@@ -572,7 +572,17 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
} catch (const exceptions::prepared_query_not_found_exception& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
} catch (const exceptions::function_execution_exception& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
} catch (const exceptions::cassandra_exception& ex) {
// Note: the CQL protocol specifies that many types of errors have
// mandatory parameters. These cassandra_exception subclasses MUST
// be handled above. This default "cassandra_exception" case is
// only appropriate for the specific types of errors which do not have
// additional information, such as invalid_request_exception.
// TODO: consider listing those types explicitly, instead of the
// catch-all type cassandra_exception.
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_error(stream, ex.code(), ex.what(), trace_state);
} catch (std::exception& ex) {
@@ -1334,6 +1344,17 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_unprepared_er
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
response->write_string(msg);
response->write_string(ks_name);
response->write_string(func_name);
response->write_string_list(args);
return response;
}
std::unique_ptr<cql_server::response> cql_server::connection::make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const
{
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);

View File

@@ -235,6 +235,7 @@ private:
std::unique_ptr<cql_server::response> make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_already_exists_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring cf_name, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_unprepared_error(int16_t stream, exceptions::exception_code err, sstring msg, bytes id, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_function_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, sstring ks_name, sstring func_name, std::vector<sstring> args, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_error(int16_t stream, exceptions::exception_code err, sstring msg, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_ready(int16_t stream, const tracing::trace_state_ptr& tr_state) const;
std::unique_ptr<cql_server::response> make_supported(int16_t stream, const tracing::trace_state_ptr& tr_state) const;

View File

@@ -1637,10 +1637,10 @@ static void serialize_aux(const tuple_type_impl& type, const tuple_type_impl::na
assert(elems.size() <= type.size());
for (size_t i = 0; i < elems.size(); ++i) {
const data_type& t = type.type(i);
const abstract_type& t = type.type(i)->without_reversed();
const data_value& v = elems[i];
if (!v.is_null() && t != v.type()) {
throw std::runtime_error(format("tuple element type mismatch: expected {}, got {}", t->name(), v.type()->name()));
if (!v.is_null() && t != *v.type()) {
throw std::runtime_error(format("tuple element type mismatch: expected {}, got {}", t.name(), v.type()->name()));
}
if (v.is_null()) {

View File

@@ -263,6 +263,13 @@ decltype(auto) with_simplified(const View& v, Function&& fn)
}
}
template<FragmentedView View>
void skip_empty_fragments(View& v) {
while (!v.empty() && v.current_fragment().empty()) {
v.remove_current();
}
}
template<FragmentedView V1, FragmentedView V2>
int compare_unsigned(V1 v1, V2 v2) {
while (!v1.empty() && !v2.empty()) {
@@ -272,6 +279,8 @@ int compare_unsigned(V1 v1, V2 v2) {
}
v1.remove_prefix(n);
v2.remove_prefix(n);
skip_empty_fragments(v1);
skip_empty_fragments(v2);
}
return v1.size_bytes() - v2.size_bytes();
}
@@ -286,5 +295,7 @@ void write_fragmented(Dest& dest, Src src) {
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
dest.remove_prefix(n);
src.remove_prefix(n);
skip_empty_fragments(dest);
skip_empty_fragments(src);
}
}

View File

@@ -69,11 +69,15 @@ public:
// Starts a new phase and waits for all operations started in any of the earlier phases.
// It is fine to start multiple awaits in parallel.
// Strong exception guarantees.
future<> advance_and_await() {
future<> advance_and_await() noexcept {
try {
auto new_gate = make_lw_shared<gate>();
++_phase;
auto old_gate = std::exchange(_gate, std::move(new_gate));
return old_gate->close().then([old_gate, op = start()] {});
} catch (...) {
return current_exception_as_future();
}
}
// Returns current phase number. The smallest value returned is 0.

View File

@@ -120,6 +120,26 @@ protected:
}
};
void* internal::throwing_allocator::Malloc(size_t size) {
void* ret = base::Malloc(size);
if (size > 0 && !ret) {
throw rjson::error(format("Failed to allocate {} bytes", size));
}
return ret;
}
void* internal::throwing_allocator::Realloc(void* orig_ptr, size_t orig_size, size_t new_size) {
void* ret = base::Realloc(orig_ptr, orig_size, new_size);
if (new_size > 0 && !ret) {
throw rjson::error(format("Failed to reallocate {} bytes to {} bytes from {}", orig_size, new_size, orig_ptr));
}
return ret;
}
void internal::throwing_allocator::Free(void* ptr) {
base::Free(ptr);
}
std::string print(const rjson::value& value) {
string_buffer buffer;
guarded_yieldable_json_handler<writer, false> writer(buffer, 78);
@@ -262,6 +282,15 @@ void set(rjson::value& base, rjson::string_ref_type name, rjson::string_ref_type
base.AddMember(name, rjson::value(member), the_allocator);
}
void replace_with_string_name(rjson::value& base, const std::string_view name, rjson::value&& member) {
rjson::value *m = rjson::find(base, name);
if (m) {
*m = std::move(member);
} else {
set_with_string_name(base, name, std::move(member));
}
}
void push_back(rjson::value& base_array, rjson::value&& item) {
base_array.PushBack(std::move(item), the_allocator);

View File

@@ -66,18 +66,35 @@ public:
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/error/en.h>
#include <rapidjson/allocators.h>
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
namespace rjson {
using allocator = rapidjson::CrtAllocator;
// The internal namespace is a workaround for the fact that fmt::format
// also has a to_string_view function and erroneously looks up our rjson::to_string_view
// if this allocator is in the rjson namespace.
namespace internal {
// Implements an interface conforming to the one in rapidjson/allocators.h,
// but throws rjson::error on allocation failures
class throwing_allocator : public rapidjson::CrtAllocator {
using base = rapidjson::CrtAllocator;
public:
static const bool kNeedFree = base::kNeedFree;
void* Malloc(size_t size);
void* Realloc(void* orig_ptr, size_t orig_size, size_t new_size);
static void Free(void* ptr);
};
}
using allocator = internal::throwing_allocator;
using encoding = rapidjson::UTF8<>;
using document = rapidjson::GenericDocument<encoding, allocator>;
using document = rapidjson::GenericDocument<encoding, allocator, allocator>;
using value = rapidjson::GenericValue<encoding, allocator>;
using string_ref_type = value::StringRefType;
using string_buffer = rapidjson::GenericStringBuffer<encoding>;
using writer = rapidjson::Writer<string_buffer, encoding>;
using string_buffer = rapidjson::GenericStringBuffer<encoding, allocator>;
using writer = rapidjson::Writer<string_buffer, encoding, encoding, allocator>;
using type = rapidjson::Type;
/**
@@ -186,24 +203,37 @@ std::optional<T> get_opt(const rjson::value& value, std::string_view name) {
}
}
// Sets a member in given JSON object by moving the member - allocates the name.
// The various set*() functions below *add* a new member to a JSON object.
// They all assume that a member with the same key (name) doesn't already
// exist in that object, so they are meant to be used just to build a new
// object from scratch. If a member with the same name *may* exist, and
// might need to be replaced, use the replace*() functions instead.
// The benefit of the set*() functions is that they are faster (O(1),
// compared to O(n) for the replace* function that need to inspect the
// existing members).
// Adds a member to a given JSON object by moving the member - allocates the name.
// Throws if base is not a JSON object.
// Assumes a member with the same name does not yet exist in base.
void set_with_string_name(rjson::value& base, std::string_view name, rjson::value&& member);
// Sets a string member in given JSON object by assigning its reference - allocates the name.
// Adds a string member to a given JSON object by assigning its reference - allocates the name.
// NOTICE: member string liveness must be ensured to be at least as long as base's.
// Throws if base is not a JSON object.
// Assumes a member with the same name does not yet exist in base.
void set_with_string_name(rjson::value& base, std::string_view name, rjson::string_ref_type member);
// Sets a member in given JSON object by moving the member.
// Adds a member to a given JSON object by moving the member.
// NOTICE: name liveness must be ensured to be at least as long as base's.
// Throws if base is not a JSON object.
// Assumes a member with the same name does not yet exist in base.
void set(rjson::value& base, rjson::string_ref_type name, rjson::value&& member);
// Sets a string member in given JSON object by assigning its reference.
// Adds a string member to a given JSON object by assigning its reference.
// NOTICE: name liveness must be ensured to be at least as long as base's.
// NOTICE: member liveness must be ensured to be at least as long as base's.
// Throws if base is not a JSON object.
// Assumes a member with the same name does not yet exist in base.
void set(rjson::value& base, rjson::string_ref_type name, rjson::string_ref_type member);
/**
@@ -224,6 +254,12 @@ set(rjson::value& base, rjson::string_ref_type name, T&& member) {
set(base, std::move(name), std::move(v));
}
// Set a member in a given JSON object by moving the member - allocates the name.
// If a member with the same name already exist in base, it is replaced.
// Throws if base is not a JSON object.
void replace_with_string_name(rjson::value& base, std::string_view name, rjson::value&& member);
// Adds a value to a JSON list by moving the item to its end.
// Throws if base_array is not a JSON array.
void push_back(rjson::value& base_array, rjson::value&& item);

View File

@@ -44,8 +44,9 @@ void merge_to_gently(std::list<T>& list1, const std::list<T>& list2, Compare com
seastar::thread::maybe_yield();
if (first1 == last1) {
// Copy remaining items of list2 into list1
std::copy_if(first2, last2, std::back_inserter(list1), [] (const auto&) { return true; });
return;
list1.insert(last1, *first2);
++first2;
continue;
}
if (comp(*first2, *first1)) {
first1 = list1.insert(first1, *first2);