Compare commits

...

57 Commits

Author SHA1 Message Date
Avi Kivity
f0a8465345 Update seastar submodule (json crash in describe_ring)
* seastar 0568c231cd...fd0d7c1c9a (1):
  > Merge 'stream_range_as_array: always close output stream' from Benny Halevy

Fixes #10592.
2022-06-08 16:50:51 +03:00
Beni Peled
1e6fe6391f release: prepare for 4.5.7 2022-05-16 15:27:49 +03:00
Benny Halevy
237c67367c table: clear: serialize with ongoing flush
Get all flush permits to serialize with any
ongoing flushes and preventing further flushes
during table::clear, in particular calling
discard_completed_segments for every table and
clearing the memtables in clear_and_add.

Fixes #10423

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit aae532a96b)
2022-05-15 13:44:15 +03:00
Raphael S. Carvalho
f5707399c3 compaction: LCS: don't write to disengaged optional on compaction completion
Dtest triggers the problem by:
1) creating table with LCS
2) disabling regular compaction
3) writing a few sstables
4) running maintenance compaction, e.g. cleanup

Once the maintenance compaction completes, disengaged optional _last_compacted_keys
triggers an exception in notify_completion().

_last_compacted_keys is used by regular for its round-robin file picking
policy. It stores the last compacted key for each level. Meaning it's
irrelevant for any other compaction type.

Regular compaction is responsible for initializing it when it runs for
the first time to pick files. But with it disabled, notify_completion()
will find it uninitialized, therefore resulting in bad_optional_access.

To fix this, the procedure is skipped if _last_compacted_keys is
disengaged. Regular compaction, once re-enabled, will be able to
fill _last_compacted_keys by looking at metadata of the files.

compaction_test.py::TestCompaction::test_disable_autocompaction_doesnt_
block_user_initiated_compactions[CLEANUP-LeveledCompactionStrategy]
now passes.

Fixes #10378.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #10508

(cherry picked from commit 8e99d3912e)
2022-05-15 13:20:40 +03:00
Juliusz Stasiewicz
4c28744bfd CQL: Replace assert by exception on invalid auth opcode
One user observed this assertion fail, but it's an extremely rare event.
The root cause - interlacing of processing STARTUP and OPTIONS messages -
is still there, but now it's harmless enough to leave it as is.

Fixes #10487

Closes #10503

(cherry picked from commit 603dd72f9e)
2022-05-10 14:09:54 +02:00
Eliran Sinvani
3b253e1166 prepared_statements: Invalidate batch statement too
It seams that batch prepared statements always return false for
depends_on, this in turn renders the removal criteria from the
prepared statements cache to always be false which result by the
queries not being evicted.
Here we change the function to return the true state meaning,
they will return true if one of the sub queries is dependant
upon the keyspace and/ or column family.

Fixes #10129

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>
(cherry picked from commit 4eb0398457)
2022-05-08 12:37:00 +03:00
Eliran Sinvani
2dedfacabf cql3 statements: Change dependency test API to express better it's
purpose

Cql statements used to have two API functions, depends_on_keyspace and
depends_on_column_family. The former, took as a parameter only a table
name, which makes no sense. There could be multiple tables with the same
name each in a different keyspace and it doesn't make sense to
generalize the test - i.e to ask "Does a statement depend on any table
named XXX?"
In this change we unify the two calls to one - depends on that takes a
keyspace name and optionally also a table name, that way every logical
dependency tests that makes sense is supported by a single API call.

(cherry picked from commit bf50dbd35b)

Ref #10129
2022-05-08 12:36:48 +03:00
Calle Wilund
414e2687d4 cdc: Ensure columns removed from log table are registered as dropped
If we are redefining the log table, we need to ensure any dropped
columns are registered in "dropped_columns" table, otherwise clients will not
be able to read data older than now.
Includes unit test.

Should probably be backported to all CDC enabled versions.

Fixes #10473
Closes #10474

(cherry picked from commit 78350a7e1b)

Backport notes: removed cql-pytest test from the original commit, since
the cql-pytest framework on branch-4.5 is missing the `nodetool`
package.
2022-05-05 11:31:33 +02:00
Tomasz Grabiec
3d63f12d3b loading_cache: Make invalidation take immediate effect
There are two issues with current implementation of remove/remove_if:

  1) If it happens concurrently with get_ptr(), the latter may still
  populate the cache using value obtained from before remove() was
  called. remove() is used to invalidate caches, e.g. the prepared
  statements cache, and the expected semantic is that values
  calculated from before remove() should not be present in the cache
  after invalidation.

  2) As long as there is any active pointer to the cached value
  (obtained by get_ptr()), the old value from before remove() will be
  still accessible and returned by get_ptr(). This can make remove()
  have no effect indefinitely if there is persistent use of the cache.

One of the user-perceived effects of this bug is that some prepared
statements may not get invalidated after a schema change and still use
the old schema (until next invalidation). If the schema change was
modifying UDT, this can cause statement execution failures. CQL
coordinator will try to interpret bound values using old set of
fields. If the driver uses the new schema, the coordinaotr will fail
to process the value with the following exception:

  User Defined Type value contained too many fields (expected 5, got 6)

The patch fixes the problem by making remove()/remove_if() erase old
entries from _loading_values immediately.

The predicate-based remove_if() variant has to also invalidate values
which are concurrently loading to be safe. The predicate cannot be
avaluated on values which are not ready. This may invalidate some
values unnecessarily, but I think it's fine.

Fixes #10117

Message-Id: <20220309135902.261734-1-tgrabiec@scylladb.com>
(cherry picked from commit 8fa704972f)
2022-05-04 15:50:16 +03:00
Vlad Zolotarov
e09f2d0ea0 loading_shared_values/loading_cache: get rid of iterators interface and return value_ptr from find(...) instead
loading_shared_values/loading_cache'es iterators interface is dangerous/fragile because
iterator doesn't "lock" the entry it points to and if there is a
preemption point between aquiring non-end() iterator and its
dereferencing the corresponding cache entry may had already got evicted (for
whatever reason, e.g. cache size constraints or expiration) and then
dereferencing may end up in a use-after-free and we don't have any
protection against it in the value_extractor_fn today.

And this is in addition to #8920.

So, instead of trying to fix the iterator interface this patch kills two
birds in a single shot: we are ditching the iterators interface
completely and return value_ptr from find(...) instead - the same one we
are returning from loading_cache::get_ptr(...) asyncronous APIs.

A similar rework is done to a loading_shared_values loading_cache is
based on: we drop iterators interface and return
loading_shared_values::entry_ptr from find(...) instead.

loading_cache::value_ptr already takes care of "lock"ing the returned value so that it
would relain readable even if it's evicted from the cache by the time
one tries to read it. And of course it also takes care of updating the
last read time stamp and moving the corresponding item to the top of the
MRU list.

Fixes #8920

Signed-off-by: Vlad Zolotarov <vladz@scylladb.com>
Message-Id: <20210817222404.3097708-1-vladz@scylladb.com>
(cherry picked from commit 7bd1bcd779)

[avi: prerequisite to backporting #10117]
2022-05-04 15:42:21 +03:00
Avi Kivity
484b23c08e Merge 'replica/database: drop_column_family(): properly cleanup stale querier cache entries' from Botond Dénes
Said method has to evict all querier cache entries, belonging to the to-be-dropped table. This is already the case, but there was a window where new entries could sneak in, causing a stale reference to the table to be de-referenced later when they are evicted due to TTL. This window is now closed, the entries are evicted after the method has waited for all ongoing operations on said table to stop.

Fixes: #10450

Closes #10451

* github.com:scylladb/scylla:
  replica/database: drop_column_family(): drop querier cache entries after waiting for ops
  replica/database: finish coroutinizing drop_column_family()
  replica/database: make remove(const column_family&) private

(cherry picked from commit 7f1e368e92)
2022-05-01 18:41:36 +03:00
Avi Kivity
97bcbd3c1f Update tools/java submodule (bad IPv6 addresses in nodetool)
* tools/java 8700c89b07...42151ec974 (1):
  > CASSANDRA-17581 fix NodeProbe: Malformed IPv6 address at index

Fixes #10442
2022-04-28 11:36:12 +03:00
Yaron Kaikov
85d3fe744b release: prepare for 4.5.6 2022-04-24 14:42:47 +03:00
Tomasz Grabiec
48d4759fad utils/chunked_vector: Fix sigsegv during reserve()
Fixes the case of make_room() invoked with last_chunk_capacity_deficit
but _size not in the last reserved chunk.

Found during code review, no known user impact.

Fixes #10363.

Message-Id: <20220411222605.641614-1-tgrabiec@scylladb.com>
(cherry picked from commit 01eeb33c6e)

[avi: make max_chunk_capacity() public for backport]
2022-04-13 10:30:57 +03:00
Avi Kivity
c4e8d2e761 transport: return correct error codes when downgrading v4 {WRITE,READ}_FAILURE to {WRITE,READ}_TIMEOUT
Protocol v4 added WRITE_FAILURE and READ_FAILURE. When running under v3
we downgrade these exceptions to WRITE_TIMEOUT and READ_TIMEOUT (since
the client won't understand the v4 errors), but we still send the new
error codes. This causes the client to become confused.

Fix by updating the error codes.

A better fix is to move the error code from the constructor parameter
list and hard-code it in the constructor, but that is left for a follow-up
after this minimal fix.

Fixes #5610.

Closes #10362

(cherry picked from commit 987e6533d2)
2022-04-13 09:49:36 +03:00
Avi Kivity
45c93db71f Update seastar submodule (logger deadlock with large messages)
* seastar 4456fcdfc0...0568c231cd (2):
  > log: Fix silencer to be shard-local and logger-global
  > log: Silence logger when logging

Fixes #10336.
2022-04-05 19:50:13 +03:00
Piotr Sarna
bab3afca5e cql3: fix qualifying restrictions with IN for indexing
When a query contains IN restriction on its partition key,
it's currently not eligible for indexing. It was however
erroneously qualified as such, which lead to fetching incorrect
results. This commit fixes the issue by not allowing such queries
to undergo indexing, and comes with a regression test.

Fixes #10300

Closes #10302

(cherry picked from commit c0fd53a9d7)
(cherry picked from commit ded169476b0498aeedeeca40a612f2cebb54348a)
2022-04-04 10:27:25 +02:00
Avi Kivity
8a0f3cc136 Update seastar submodule (pidof command not installed)
* seastar 7487cae646...4456fcdfc0 (1):
  > seastar-cpu-map.sh: switch from pidof to pgrep
Fixes #10238.
2022-03-29 12:41:53 +03:00
Yaron Kaikov
c789a18195 release: prepare for 4.5.5 2022-03-28 10:44:46 +03:00
Benny Halevy
195c8a6f80 atomic_cell: compare_atomic_cell_for_merge: compare ttl if expiry is equal
Following up on a57c087c89,
compare_atomic_cell_for_merge should compare the ttl value in the
reverse order since, when comparing two cells that are identical
in all attributes but their ttl, we want to keep the cell with the
smaller ttl value rather than the larger ttl, since it was written
at a later (wall-clock) time, and so would remain longer after it
expires, until purged after gc_grace seconds.

Fixes #10173

Test: mutation_test.test_cell_ordering, unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302154328.2400717-1-bhalevy@scylladb.com>
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220306091913.106508-1-bhalevy@scylladb.com>
(cherry picked from commit a085ef74ff)
2022-03-24 18:18:15 +02:00
Benny Halevy
8a263600dd atomic_cell: compare_atomic_cell_for_merge: compare ttl if expiry is equal
Unlike atomic_cell_or_collection::equals, compare_atomic_cell_for_merge
currently returns std::strong_ordering::equal if two cells are equal in
every way except their ttl:s.

The problem with that is that the cells' hashes are different and this
will cause repair to keep trying to repair discrepancies caused by the
ttl being different.

This may be triggered by e.g. the spark migrator that computes the ttl
based on the expiry time by subtracting the expiry time from the current
time to produce a respective ttl.

If the cell is migrated multiple times at different times, it will generate
cells that the same expiry (by design) but have different ttl values.

Fixes #10156

Test: mutation_test.test_cell_ordering, unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302154328.2400717-1-bhalevy@scylladb.com>
(cherry picked from commit a57c087c89)
2022-03-24 18:17:28 +02:00
Avi Kivity
da806c4451 replica, atomic_cell: move atomic_cell merge code from replica module to atomic_cell.cc
compare_atomic_cell_for_merge() was placed in database.cc, before
atomic_cell.cc existed. Move it to its correct place.

Closes #9889

(cherry picked from commit 6c53717a39)

[avi: 4.5 backport: retain pre-shapship-operator code]
2022-03-24 18:11:42 +02:00
Benny Halevy
619bfb7c4e main: shutdown: do not abort on certain system errors
Currently any unhandled error during deferred shutdown
is rethrown in a noexcept context (in ~deferred_action),
generating a core dump.

The core dump is not helpful if the cause of the
error is "environmental", i.e. in the system, rather
than in scylla itself.

This change detects several such errors and calls
_Exit(255) to exit the process early, without leaving
a coredump behind.  Otherwise, call abort() explicitly,
rather than letting terminate() be called implicitly
by the destructor exception handling code.

Fixes #9573

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220227101054.1294368-1-bhalevy@scylladb.com>
(cherry picked from commit 132c9d5933)
2022-03-24 14:50:12 +02:00
Nadav Har'El
06795d29c5 Seastar: backport Seastar fix for missing scring escape in JSON output
Backported Seastar fix:
  > Merge 'json/formatter: Escape strings' from Juliusz Stasiewicz

Fixes #9061

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2022-03-23 22:32:15 +02:00
Asias He
7e625f3397 storage_service: Generate view update for load and stream
Currently, view will be not updated because the streaming reason is set
to streaming::stream_reason::rebuild. On the receiver side, only
streaming with the reason streaming::stream_reason::repair will trigger
view update.

Change the stream reason to repair to trigger view update for load and
stream. This makes load_and_stream behaves the same as nodetool refresh.

Note: However, this is not very efficient though.

Consider RF = 3, sst1, sst2, sst3 from the older cluster. When sst1 is
loaded, it streams to 3 replica nodes, if we generate view updates, we
will have 3 view updates for this replica (each of the peer nodes finds
its peer and writes the view update to peer). After loading sst2 and
sst3, we will have 9 view updates in total for a single partition.
If we create the view after the load and stream process, we will only
have 3 view updates for a single partition.

If we create the view after the load and stream process, we will only
have 3 view updates for a single partition.

Fixes #9205

Closes #9213

(cherry picked from commit eaf4d2afb4)
2022-03-09 16:37:13 +02:00
Nadav Har'El
865cfebfed cql: INSERT JSON should refuse empty-string partition key
Add the missing partition-key validation in INSERT JSON statements.

Scylla, following the lead of Cassandra, forbids an empty-string partition
key (please note that this is not the same as a null partition key, and
that null clustering keys *are* allowed).

Trying to INSERT, UPDATE or DELETE a partition with an empty string as
the partition key fails with a "Key may not be empty". However, we had a
loophole - you could insert such empty-string partition keys using an
"INSERT ... JSON" statement.

The problem was that the partition key validation was done in one place -
`modification_statement::build_partition_keys()`. The INSERT, UPDATE and
DELETE statements all inherited this same method and got the correct
validation. But the INSERT JSON statement - insert_prepared_json_statement
overrode the build_partition_keys() method and this override forgot to call
the validation function. So in this patch we add the missing validation.

Note that the validation function checks for more than just empty strings -
there is also a length limit for partition keys.

This patch also adds a cql-pytest reproducer for this bug. Before this
patch, the test passed on Cassandra but failed on Scylla.

Reported by @FortTell
Fixes #9853.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220116085216.21774-1-nyh@scylladb.com>
(cherry picked from commit 8fd5041092)
2022-03-02 23:02:01 +02:00
Botond Dénes
1963d1cc25 mutation_writer: feed_writer(): handle exceptions from consume_end_of_stream()
Currently the exception handling code of feed_writer() assumes
consume_end_of_stream() doesn't throw. This is false and an exception
from said method can currently lead to an unclean destroy of the writer
and reader. Fix by also handling exceptions from
consume_end_of_stream() too.

Closes #10147
2022-03-02 14:20:20 +01:00
Takuya ASADA
53b0aaa4e8 scylla_raid_setup: revert workaround patch and stop using mdmonitor
We found that monitor mode of mdadm does not work on RAID0, and it is
not a bug, expected behavior according to RHEL developer.
Therefore, we should revert workaround patch which downgrades mdadm,
and stop enabling mdmonitor since we always use RAID0.

See #9540

Closes #10077
2022-02-17 18:36:42 +02:00
Yaron Kaikov
ebfa2279a4 release: prepare for 4.5.4 2022-02-14 08:24:59 +02:00
Nadav Har'El
5e38a69f6d alternator: allow REMOVE of non-existent nested attribute
DynamoDB allows an UpdateItem operation "REMOVE x.y" when a map x
exists in the item, but x.y doesn't - the removal silently does
nothing. Alternator incorrectly generated an error in this case,
and unfortunately we didn't have a test for this case.

So in this patch we add the missing test (which fails on Alternator
before this patch - and passes on DynamoDB) and then fix the behavior.
After this patch, "REMOVE x.y" will remain an error if "x" doesn't
exist (saying "document paths not valid for this item"), but if "x"
exists and is a map, but "x.y" doesn't, the removal will silently
do nothing and will not be an error.

Fixes #10043.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220207133652.181994-1-nyh@scylladb.com>
(cherry picked from commit 9982a28007)
2022-02-08 12:09:38 +02:00
Nadav Har'El
6a54033a63 docker: don't repeat "--alternator-address" option twice
If the Docker startup script is passed both "--alternator-port" and
"--alternator-https-port", a combination which is supposed to be
allowed, it passes to Scylla the "--alternator-address" option twice.
This isn't necessary, and worse - not allowed.

So this patch fixes the scyllasetup.py script to only pass this
parameter once.

Fixes #10016.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220202165814.1700047-1-nyh@scylladb.com>
(cherry picked from commit cb6630040d)
2022-02-03 18:40:03 +02:00
Avi Kivity
ec44412cd9 Update seastar submodule (gratuitous exceptions on allocation failure)
* seastar e26dc6665f...770167e835 (1):
  > core: memory: Avoid current_backtrace() on alloc failure when logging suppressed

Fixes #9982.
2022-01-30 20:05:06 +02:00
Avi Kivity
8f45f65b09 Merge 'scylla_raid_setup: use mdmonitor only when RAID level > 0' from Takuya ASADA
We found that monitor mode of mdadm does not work on RAID0, and it is
not a bug, expected behavior according to RHEL developer.
Therefore, we should stop enabling mdmonitor when RAID0 is specified.

Fixes #9540

----

This reverts 0d8f932 and introduce correct fix.

Closes #9970

* github.com:scylladb/scylla:
  scylla_raid_setup: use mdmonitor only when RAID level > 0
  Revert "scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8"

(cherry picked from commit df22396a34)
2022-01-27 10:24:53 +02:00
Avi Kivity
5a7324c423 Update tools/java submodule (maxPendingPerConnection default)
* tools/java dbcea78e7d...8700c89b07 (2):
  > Fix NullPointerException in SettingsMode
  > cassandra-stress: Remove maxPendingPerConnection default

Ref #7748.
2022-01-12 21:37:07 +02:00
Nadav Har'El
2eb0ad7b4f alternator: allow Authorization header to be without spaces
The "Authorization" HTTP header is used in DynamoDB API to sign
requests. Our parser for this header, in server::verify_signature(),
required the different components of this header to be separated by
a comma followed by a whitespace - but it turns out that in DynamoDB
both spaces and commas are optional - one of them is enough.

At least one DynamoDB client library - the old "boto" (which predated
boto3) - builds this header without spaces.

In this patch we add a test that shows that an Authorization header
with spaces removed works fine in DynamoDB but didn't work in
Alternator, and after this patch modifies the parsing code for this
header, the test begins to pass (and the other tests show that the
previously-working cases didn't break).

Fixes #9568

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211101214114.35693-1-nyh@scylladb.com>
(cherry picked from commit 56eb994d8f)
2021-12-29 14:40:02 +02:00
Nadav Har'El
5d7064e00e alternator: return the correct Content-Type header
Although the DynamoDB API responses are JSON, additional conventions apply
to these responses - such as how error codes are encoded in JSON. For this
reason, DynamoDB uses the content type `application/x-amz-json-1.0` instead
of the standard `application/json` in its responses.

Until this patch, Scylla used `application/json` in its responses. This
unexpected content-type didn't bother any of the AWS libraries which we
tested, but it does bother the aiodynamo library (see HENNGE/aiodynamo#27).

Moreover, we should return the x-amz-json-1.0 content type for future
proofing: It turns out that AWS already defined x-amz-json-1.1 - see:
https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
The 1.1 content type differs (only) in how it encodes error replies.
If one day DynamoDB starts to use this new reply format (it doesn't yet)
and if DynamoDB libraries will need to differenciate between the two
reply formats, Alternator better return the right one.

This patch also includes a new test that the Content-Type header is
returned with the expected value. The test passes on DynamoDB, and
after this patch it starts to pass on Alternator as well.

Fixes #9554.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20211031094621.1193387-1-nyh@scylladb.com>
(cherry picked from commit 6ae0ea0c48)
2021-12-29 12:22:49 +02:00
Takuya ASADA
b56b9f5ed5 scylla_raid_setup: workaround for mdmonitor.service issue on CentOS8
On CentOS8, mdmonitor.service does not works correctly when using
mdadm-4.1-15.el8.x86_64 and later versions.
Until we find a solution, let's pinning the package version to older one
which does not cause the issue (4.1-14.el8.x86_64).

Fixes #9540

Closes #9782

(cherry picked from commit 0d8f932f0b)
2021-12-28 11:38:24 +02:00
Avi Kivity
c8f14886dc Revert "cql3: Reject updates with NULL key values"
This reverts commit 44c784cb79. It
causes a regression without 6afdc6004c,
and it is too complicated to backport at this time.

Ref #9311.
2021-12-23 13:03:13 +02:00
Botond Dénes
5c8057749b treewide: distinguish truncated frame errors
We have two identical "Truncated frame" errors, at:
* read_frame_size() in serialization_visitors.hh;
* cql_server::connection::read_and_decompress_frame() in
  transport/server.cc;

When such an exception is thrown, it is impossible to tell where was it
thrown from and it doesn't have any further information contained in it
(beyond the basic information it being thrown implies).
This patch solves both problems: it makes the exception messages unique
per location and it adds information about why it was thrown (the
expected vs. real size of the frame).

Ref: #9482

Closes #9520

(cherry picked from commit 9ec55e054d)
2021-12-21 15:27:17 +02:00
Pavel Emelyanov
a35646b874 row-cache: Handle exception (un)safety of rows_entry insertion
The B-tree's insert_before() is throwing operation, its caller
must account for that. When the rows_entry's collection was
switched on B-tree all the risky places were fixed by ee9e1045,
but few places went under the radar.

In the cache_flat_mutation_reader there's a place where a C-pointer
is inserted into the tree, thus potentially leaking the entry.

In the partition_snapshot_row_cursor there are two places that not
only leak the entry, but also leave it in the LRU list. The latter
it quite nasty, because those entry can be evicted, eviction code
tries to get rows_entry iterator from "this", but the hook happens
to be unattached (because insertion threw) and fails the assert.

fixes: #9728

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit ee103636ac)
2021-12-14 16:01:55 +02:00
Pavel Emelyanov
da8708932d partition_snapshot_row_cursor: Shuffle ensure_result creation
Both places get the C-pointer on the freshly allocated rows_entry,
insert it where needed and return back the dereferenced pointer.

The C-pointer is going to become smart-pointer that would go out
of scope before return. This change prepares for that by constructing
the ensure_result from the iterator, that's returned from insertion
of the entry.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 9fd8db318d)

Ref #9728
2021-12-14 16:01:48 +02:00
Nadav Har'El
78a545716a Update Seastar module with additional backports
Backported a Seastar fix:
  > Merge 'metrics: Fix dtest->ulong conversion error' from Benny Halevy

Fixes #9794
2021-12-14 13:18:07 +02:00
Asias He
1488278fc1 storage_service: Wait for seastar::get_units in node_ops
The seastar::get_units returns a future, we have to wait for it.

Fixes #9767

Closes #9768

(cherry picked from commit 9859c76de1)
2021-12-12 18:42:33 +02:00
Nadav Har'El
d9455a910f alternator: add missing BatchGetItem metric
Unfortunately, defining metrics in Scylla requires some code
duplication, with the metrics declared in one place but exported in a
different place in the code. When we duplicated this code in Alternator,
we accidentally dropped the first metric - for BatchGetItem. The metric
was accounted in the code, but not exported to Prometheus.

In addition to fixing the missing metric, this patch also adds a test
that confirms that the BatchGetItem metric increases when the
BatchGetItem operation is used. This test failed before this patch, and
passes with it. The test only currently tests this for BatchGetItem
(and BatchWriteItem) but it can be later expanded to cover all the other
operations as well.

Fixes #9406

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210929121611.373074-1-nyh@scylladb.com>
(cherry picked from commit 5cbe9178fd)
2021-12-06 12:43:49 +02:00
Yaron Kaikov
406b4bce8d release: prepare for 4.5.3 2021-12-05 14:48:33 +02:00
Botond Dénes
417e853b9b mutation_reader: shard_reader: ensure referenced objects are kept alive
The shard reader can outlive its parent reader (the multishard reader).
This creates a problem for lifecycle management: readers take the range
and slice parameters by reference and users keep these alive until the
reader is alive. The shard reader outliving the top-level reader means
that any background read-ahead that it has to wait on will potentially
have stale references to the range and the slice. This was seen in the
wild recently when the evictable reader wrapped by the shard reader hit
a use-after-free while wrapping up a background read-ahead.
This problem was solved by fa43d76 but any previous versions are
susceptible to it.

This patch solves this problem by having the shard reader copy and keep
the range and slice parameters in stable storage, before passing them
further down.

Fixes: #9719

Tests: unit(dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211202113910.484591-1-bdenes@scylladb.com>
2021-12-02 17:49:36 +02:00
Dejan Mircevski
44c784cb79 cql3: Reject updates with NULL key values
We were silently ignoring INSERTs with NULL values for primary-key
columns, which Cassandra rejects.  Fix it by rejecting any
modification_statement that would operate on empty partition or
clustering range.

This is the most direct fix, because range and slice are calculated in
one place for all modification statements.  It covers not only NULL
cases, but also impossible restrictions like c>0 AND c<0.
Unfortunately, Cassandra doesn't treat all modification statements
consistently, so this fix cannot fully match its behavior.  We err on
the side of tolerance, accepting some DELETE statements that Cassandra
rejects.  We add a TODO for rejecting such DELETEs later.

Fixes #7852.

Tests: unit (dev), cql-pytest against Cassandra 4.0

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>

Closes #9286

(cherry picked from commit 1fdaeca7d0)
2021-11-29 17:19:51 +02:00
Piotr Jastrzebski
ab425a11a8 sstables: Fix writing KA/LA sstables index
Before this patch when writing an index block, the sstables writer was
storing range tombstones that span the boundary of the block in order
of end bounds. This led to a range tombstone being ignored by a reader
if there was a row tombstone inside it.

This patch sorts the range tombstones based on start bound before
writing them to the index file.

The assumption is that writing an index block is rare so we can afford
sortting the tombstones at that point. Additionally this is a writer of
an old format and writing to it will be dropped in the next major
release so it should be rarely used already.

Kudos to Kamil Braun <kbraun@scylladb.com> for finding the reproducer.

Test: unit(dev)

Fixes #9690

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit scylladb/scylla-enterprise@eb093afd6f)
2021-11-26 15:36:03 +01:00
Tomasz Grabiec
2228a1a92a cql: Fix missing data in indexed queries with base table short reads
Indexed queries are using paging over the materialized view
table. Results of the view read are then used to issue reads of the
base table. If base table reads are short reads, the page is returned
to the user and paging state is adjusted accordingly so that when
paging is resumed it will query the view starting from the row
corresponding to the next row in the base which was not yet
returned. However, paging state's "remaining" count was not reset, so
if the view read was exhausted the reading will stop even though the
base table read was short.

Fix by restoring the "remaining" count when adjusting the paging state
on short read.

Tests:

  - index_with_paging_test
  - secondary_index_test

Fixes #9198
Message-Id: <20210818131840.1160267-1-tgrabiec@scylladb.com>

(cherry picked from commit 1e4da2dcce)
2021-11-23 11:22:13 +02:00
Takuya ASADA
36b190a65e docker: add stopwaitsecs
We need stopwaitsecs just like we do TimeoutStpSec=900 on
scylla-server.service, to avoid timeout on scylla-server shutdown.

Fixes #9485

Closes #9545

(cherry picked from commit c9499230c3)
2021-11-15 13:36:40 +02:00
Takuya ASADA
f7e5339c14 scylla_io_setup: support ARM instances on AWS
Add preset parameters for AWS ARM intances.

Fixes #9493

(cherry picked from commit 4e8060ba72)
2021-11-15 13:35:15 +02:00
Asias He
4c4972cb33 gossip: Fix use-after-free in real_mark_alive and mark_dead
In commit 11a8912093 (gossiper:
get_gossip_status: return string_view and make noexcept)
get_gossip_status returns a pointer to an endpoint_state in
endpoint_state_map.

After commit 425e3b1182 (gossip: Introduce
direct failure detector), gossiper::mark_dead and gossiper::real_mark_alive
can yield in the middle of the function. It is possible that
endpoint_state can be removed, causing use-after-free to access it.

To fix, make a copy before we yield.

Fixes #8859

Closes #8862

(cherry picked from commit 7a32cab524)
2021-11-15 13:23:02 +02:00
Takuya ASADA
50ce5bef2c scylla_util.py: On is_gce(), return False when it's on GKE
GKE metadata server does not provide same metadata as GCE, we should not
return True on is_gce().
So try to fetch machine-type from metadata server, return False if it
404 not found.

Fixes #9471

Signed-off-by: Takuya ASADA <syuu@scylladb.com>

Closes #9582

(cherry picked from commit 9b4cf8c532)
2021-11-15 13:18:02 +02:00
Asias He
f864eea844 repair: Return HTTP 400 when repiar id is not found
There are two APIs for checking the repair status and they behave
differently in case the id is not found.

```
{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_async/system_auth?id=999", "duration": "1ms",
"status": 400, "bytes": 49, "dump": "HTTP/1.1 400 Bad
Request\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 400}"}

{"host": "192.168.100.11:10001", "method": "GET", "uri":
"/storage_service/repair_status?id=999&timeout=1", "duration": "0ms",
"status": 500, "bytes": 49, "dump": "HTTP/1.1 500 Internal Server
Error\r\nContent-Length: 49\r\nContent-Type: application/json\r\nDate:
Wed, 03 Nov 2021 10:49:33 GMT\r\nServer: Seastar
httpd\r\n\r\n{\"message\": \"unknown repair id 999\", \"code\": 500}"}
```

The correct status code is 400 as this is a parameter error and should
not be retried.

Returning status code 500 makes smarter http clients retry the request
in hopes of server recovering.

After this patch:

curl -X PGET
'http://127.0.0.1:10000/storage_service/repair_async/system_auth?id=9999'
{"message": "unknown repair id 9999", "code": 400}

curl -X GET
'http://127.0.0.1:10000/storage_service/repair_status?id=9999'
{"message": "unknown repair id 9999", "code": 400}

Fixes #9576

Closes #9578

(cherry picked from commit f5f5714aa6)
2021-11-15 13:15:59 +02:00
Calle Wilund
b9735ab079 cdc: fix broken function signature in maybe_back_insert_iterator
Fixes #9103

compare overload was declared as "bool" even though it is a tri-cmp.
causes us to never use the speed-up shortcut (lessen search set),
in turn meaning more overhead for collections.

Closes #9104

(cherry picked from commit 59555fa363)
2021-11-15 13:13:04 +02:00
Takuya ASADA
766e16f19e scylla_io_setup: handle nr_disks on GCP correctly
nr_disks is int, should not be string.

Fixes #9429

Closes #9430

(cherry picked from commit 3b798afc1e)
2021-11-15 13:06:30 +02:00
Michał Chojnowski
e6520df41c utils: fragment_range: fix FragmentedView utils for views with empty fragments
The copying and comparing utilities for FragmentedView are not prepared to
deal with empty fragments in non-empty views, and will fall into an infinite
loop in such case.
But data coming in result_row_view can contain such fragments, so we need to
fix that.

Fixes #8398.

Closes #8397

(cherry picked from commit f23a47e365)
2021-11-15 12:55:25 +02:00
71 changed files with 1102 additions and 389 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.5.2
VERSION=4.5.7
if test -f version
then

View File

@@ -2442,8 +2442,8 @@ static bool hierarchy_actions(
if (newv) {
rjson::set_with_string_name(v, attr, std::move(*newv));
} else {
throw api_error::validation(format("Can't remove document path {} - not present in item",
subh.get_value()._path));
// Removing a.b when a is a map but a.b doesn't exist
// is silently ignored. It's not considered an error.
}
} else {
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));

View File

@@ -129,6 +129,10 @@ public:
[&] (const json::json_return_type& json_return_value) {
slogger.trace("api_handler success case");
if (json_return_value._body_writer) {
// Unfortunately, write_body() forces us to choose
// from a fixed and irrelevant list of "mime-types"
// at this point. But we'll override it with the
// one (application/x-amz-json-1.0) below.
rep->write_body("json", std::move(json_return_value._body_writer));
} else {
rep->_content += json_return_value._res;
@@ -141,7 +145,7 @@ public:
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}), _type("json") { }
}) { }
api_handler(const api_handler&) = default;
future<std::unique_ptr<reply>> handle(const sstring& path,
@@ -149,7 +153,8 @@ public:
handle_CORS(*req, *rep, false);
return _f_handle(std::move(req), std::move(rep)).then(
[this](std::unique_ptr<reply> rep) {
rep->done(_type);
rep->set_mime_type("application/x-amz-json-1.0");
rep->done();
return make_ready_future<std::unique_ptr<reply>>(std::move(rep));
});
}
@@ -163,7 +168,6 @@ protected:
}
future_handler_function _f_handle;
sstring _type;
};
class gated_handler : public handler_base {
@@ -246,24 +250,31 @@ future<> server::verify_signature(const request& req, const chunked_content& con
throw api_error::missing_authentication_token("Authorization header is mandatory for signature verification");
}
std::string host = host_it->second;
std::vector<std::string_view> credentials_raw = split(authorization_it->second, ' ');
std::string_view authorization_header = authorization_it->second;
auto pos = authorization_header.find_first_of(' ');
if (pos == std::string_view::npos || authorization_header.substr(0, pos) != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Authorization header must use AWS4-HMAC-SHA256 algorithm: {}", authorization_header));
}
authorization_header.remove_prefix(pos+1);
std::string credential;
std::string user_signature;
std::string signed_headers_str;
std::vector<std::string_view> signed_headers;
for (std::string_view entry : credentials_raw) {
do {
// Either one of a comma or space can mark the end of an entry
pos = authorization_header.find_first_of(" ,");
std::string_view entry = authorization_header.substr(0, pos);
if (pos != std::string_view::npos) {
authorization_header.remove_prefix(pos + 1);
}
if (entry.empty()) {
continue;
}
std::vector<std::string_view> entry_split = split(entry, '=');
if (entry_split.size() != 2) {
if (entry != "AWS4-HMAC-SHA256") {
throw api_error::invalid_signature(format("Only AWS4-HMAC-SHA256 algorithm is supported. Found: {}", entry));
}
continue;
}
std::string_view auth_value = entry_split[1];
// Commas appear as an additional (quite redundant) delimiter
if (auth_value.back() == ',') {
auth_value.remove_suffix(1);
}
if (entry_split[0] == "Credential") {
credential = std::string(auth_value);
} else if (entry_split[0] == "Signature") {
@@ -273,7 +284,8 @@ future<> server::verify_signature(const request& req, const chunked_content& con
signed_headers = split(auth_value, ';');
std::sort(signed_headers.begin(), signed_headers.end());
}
}
} while (pos != std::string_view::npos);
std::vector<std::string_view> credential_split = split(credential, '/');
if (credential_split.size() != 5) {
throw api_error::validation(format("Incorrect credential information format: {}", credential));

View File

@@ -38,6 +38,7 @@ stats::stats() : api_operations{} {
#define OPERATION_LATENCY(name, CamelCaseName) \
seastar::metrics::make_histogram("op_latency", \
seastar::metrics::description("Latency histogram of an operation via Alternator API"), {op(CamelCaseName)}, [this]{return to_metrics_histogram(api_operations.name);}),
OPERATION(batch_get_item, "BatchGetItem")
OPERATION(batch_write_item, "BatchWriteItem")
OPERATION(create_backup, "CreateBackup")
OPERATION(create_global_table, "CreateGlobalTable")

View File

@@ -262,7 +262,7 @@ void set_repair(http_context& ctx, routes& r, sharded<netw::messaging_service>&
try {
res = fut.get0();
} catch (std::exception& e) {
return make_exception_future<json::json_return_type>(httpd::server_error_exception(e.what()));
return make_exception_future<json::json_return_type>(httpd::bad_param_exception(e.what()));
}
return make_ready_future<json::json_return_type>(json::json_return_type(res));
});

View File

@@ -79,6 +79,54 @@ atomic_cell::atomic_cell(const abstract_type& type, atomic_cell_view other)
set_view(_data);
}
// Based on:
// - org.apache.cassandra.db.AbstractCell#reconcile()
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
int
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
if (left.timestamp() != right.timestamp()) {
return left.timestamp() > right.timestamp() ? 1 : -1;
}
if (left.is_live() != right.is_live()) {
return left.is_live() ? -1 : 1;
}
if (left.is_live()) {
auto c = compare_unsigned(left.value(), right.value());
if (c != 0) {
return c;
}
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
// prefer expiring cells.
return left.is_live_and_has_ttl() ? 1 : -1;
}
if (left.is_live_and_has_ttl()) {
if (left.expiry() != right.expiry()) {
return left.expiry() < right.expiry() ? -1 : 1;
} else {
// prefer the cell that was written later,
// so it survives longer after it expires, until purged.
if (left.ttl() != right.ttl()) {
return left.ttl() < right.ttl() ? 1 : -1;
} else {
return 0;
}
}
}
} else {
// Both are deleted
if (left.deletion_time() != right.deletion_time()) {
// Origin compares big-endian serialized deletion time. That's because it
// delegates to AbstractCell.reconcile() which compares values after
// comparing timestamps, which in case of deleted cells will hold
// serialized expiry.
return (uint64_t) left.deletion_time().time_since_epoch().count()
< (uint64_t) right.deletion_time().time_since_epoch().count() ? -1 : 1;
}
}
return 0;
}
atomic_cell_or_collection atomic_cell_or_collection::copy(const abstract_type& type) const {
if (_data.empty()) {
return atomic_cell_or_collection();

View File

@@ -576,8 +576,8 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
clogger.trace("csm {}: insert dummy at {}", fmt::ptr(this), _lower_bound);
auto it = with_allocator(_lsa_manager.region().allocator(), [&] {
auto& rows = _snp->version()->partition().clustered_rows();
auto new_entry = current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no);
return rows.insert_before(_next_row.get_iterator_in_latest_version(), *new_entry);
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(*_schema, _lower_bound, is_dummy::yes, is_continuous::no));
return rows.insert_before(_next_row.get_iterator_in_latest_version(), std::move(new_entry));
});
_snp->tracker()->insert(*it);
_last_row = partition_snapshot_row_weakref(*_snp, it, true);

View File

@@ -74,7 +74,7 @@ using namespace std::chrono_literals;
logging::logger cdc_log("cdc");
namespace cdc {
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
}
static constexpr auto cdc_group_name = "cdc";
@@ -221,7 +221,7 @@ public:
return;
}
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
@@ -490,7 +490,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
b.set_uuid(*uuid);
}
/**
* #10473 - if we are redefining the log table, we need to ensure any dropped
* columns are registered in "dropped_columns" table, otherwise clients will not
* be able to read data older than now.
*/
if (old) {
// not super efficient, but we don't do this often.
for (auto& col : old->all_columns()) {
if (!b.has_column({col.name(), col.name_as_text() })) {
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
}
}
}
return b.build();
}
@@ -716,16 +730,16 @@ private:
}
return false;
}
bool compare(const T&, const value_type& v);
int32_t compare(const T&, const value_type& v);
};
template<>
bool maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<std::pair<bytes_view, bytes_view>>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v.first);
}
template<>
bool maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
int32_t maybe_back_insert_iterator<std::vector<bytes_view>, bytes_view>::compare(const bytes_view& t, const value_type& v) {
return _type.compare(t, v);
}

View File

@@ -35,6 +35,28 @@ struct authorized_prepared_statements_cache_size {
class authorized_prepared_statements_cache_key {
public:
using cache_key_type = std::pair<auth::authenticated_user, typename cql3::prepared_cache_key_type::cache_key_type>;
struct view {
const auth::authenticated_user& user_ref;
const cql3::prepared_cache_key_type& prep_cache_key_ref;
};
struct view_hasher {
size_t operator()(const view& kv) {
return cql3::authorized_prepared_statements_cache_key::hash(kv.user_ref, kv.prep_cache_key_ref.key());
}
};
struct view_equal {
bool operator()(const authorized_prepared_statements_cache_key& k1, const view& k2) {
return k1.key().first == k2.user_ref && k1.key().second == k2.prep_cache_key_ref.key();
}
bool operator()(const view& k2, const authorized_prepared_statements_cache_key& k1) {
return operator()(k1, k2);
}
};
private:
cache_key_type _key;
@@ -100,10 +122,12 @@ private:
public:
using key_type = cache_key_type;
using key_view_type = typename key_type::view;
using key_view_hasher = typename key_type::view_hasher;
using key_view_equal = typename key_type::view_equal;
using value_type = checked_weak_ptr;
using entry_is_too_big = typename cache_type::entry_is_too_big;
using iterator = typename cache_type::iterator;
using value_ptr = typename cache_type::value_ptr;
private:
cache_type _cache;
logging::logger& _logger;
@@ -124,38 +148,12 @@ public:
}).discard_result();
}
iterator find(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
struct key_view {
const auth::authenticated_user& user_ref;
const cql3::prepared_cache_key_type& prep_cache_key_ref;
};
struct hasher {
size_t operator()(const key_view& kv) {
return cql3::authorized_prepared_statements_cache_key::hash(kv.user_ref, kv.prep_cache_key_ref.key());
}
};
struct equal {
bool operator()(const key_type& k1, const key_view& k2) {
return k1.key().first == k2.user_ref && k1.key().second == k2.prep_cache_key_ref.key();
}
bool operator()(const key_view& k2, const key_type& k1) {
return operator()(k1, k2);
}
};
return _cache.find(key_view{user, prep_cache_key}, hasher(), equal());
}
iterator end() {
return _cache.end();
value_ptr find(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
return _cache.find(key_view_type{user, prep_cache_key}, key_view_hasher(), key_view_equal());
}
void remove(const auth::authenticated_user& user, const cql3::prepared_cache_key_type& prep_cache_key) {
iterator it = find(user, prep_cache_key);
_cache.remove(it);
_cache.remove(key_view_type{user, prep_cache_key}, key_view_hasher(), key_view_equal());
}
size_t size() const {

View File

@@ -103,9 +103,7 @@ public:
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
virtual bool depends_on_keyspace(const sstring& ks_name) const = 0;
virtual bool depends_on_column_family(const sstring& cf_name) const = 0;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
virtual shared_ptr<const metadata> get_result_metadata() const = 0;

View File

@@ -102,13 +102,7 @@ private:
using cache_key_type = typename prepared_cache_key_type::cache_key_type;
using cache_type = utils::loading_cache<cache_key_type, prepared_cache_entry, utils::loading_cache_reload_enabled::no, prepared_cache_entry_size, utils::tuple_hash, std::equal_to<cache_key_type>, prepared_cache_stats_updater>;
using cache_value_ptr = typename cache_type::value_ptr;
using cache_iterator = typename cache_type::iterator;
using checked_weak_ptr = typename statements::prepared_statement::checked_weak_ptr;
struct value_extractor_fn {
checked_weak_ptr operator()(prepared_cache_entry& e) const {
return e->checked_weak_from_this();
}
};
public:
static const std::chrono::minutes entry_expiry;
@@ -116,12 +110,9 @@ public:
using key_type = prepared_cache_key_type;
using value_type = checked_weak_ptr;
using statement_is_too_big = typename cache_type::entry_is_too_big;
/// \note both iterator::reference and iterator::value_type are checked_weak_ptr
using iterator = boost::transform_iterator<value_extractor_fn, cache_iterator>;
private:
cache_type _cache;
value_extractor_fn _value_extractor_fn;
public:
prepared_statements_cache(logging::logger& logger, size_t size)
@@ -135,16 +126,12 @@ public:
});
}
iterator find(const key_type& key) {
return boost::make_transform_iterator(_cache.find(key.key()), _value_extractor_fn);
}
iterator end() {
return boost::make_transform_iterator(_cache.end(), _value_extractor_fn);
}
iterator begin() {
return boost::make_transform_iterator(_cache.begin(), _value_extractor_fn);
value_type find(const key_type& key) {
cache_value_ptr vp = _cache.find(key.key());
if (vp) {
return (*vp)->checked_weak_from_this();
}
return value_type();
}
template <typename Pred>

View File

@@ -943,7 +943,7 @@ bool query_processor::migration_subscriber::should_invalidate(
sstring ks_name,
std::optional<sstring> cf_name,
::shared_ptr<cql_statement> statement) {
return statement->depends_on_keyspace(ks_name) && (!cf_name || statement->depends_on_column_family(*cf_name));
return statement->depends_on(ks_name, cf_name);
}
future<> query_processor::query_internal(

View File

@@ -179,10 +179,10 @@ public:
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto it = _authorized_prepared_cache.find(*user, key);
if (it != _authorized_prepared_cache.end()) {
auto vp = _authorized_prepared_cache.find(*user, key);
if (vp) {
try {
return it->get()->checked_weak_from_this();
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache.remove(*user, key);
@@ -193,11 +193,7 @@ public:
}
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
auto it = _prepared_cache.find(key);
if (it == _prepared_cache.end()) {
return statements::prepared_statement::checked_weak_ptr();
}
return *it;
return _prepared_cache.find(key);
}
future<::shared_ptr<cql_transport::messages::result_message>>

View File

@@ -133,6 +133,7 @@ statement_restrictions::statement_restrictions(schema_ptr schema, bool allow_fil
, _partition_key_restrictions(get_initial_partition_key_restrictions(allow_filtering))
, _clustering_columns_restrictions(get_initial_clustering_key_restrictions(allow_filtering))
, _nonprimary_key_restrictions(::make_shared<single_column_restrictions>(schema))
, _partition_range_is_simple(true)
{ }
#if 0
static const column_definition*
@@ -335,7 +336,7 @@ statement_restrictions::statement_restrictions(database& db,
}
if (!_nonprimary_key_restrictions->empty()) {
if (_has_queriable_regular_index) {
if (_has_queriable_regular_index && _partition_range_is_simple) {
_uses_secondary_indexing = true;
} else if (!allow_filtering) {
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "
@@ -377,6 +378,7 @@ void statement_restrictions::add_single_column_restriction(::shared_ptr<single_c
"Only EQ and IN relation are supported on the partition key (unless you use the token() function or allow filtering)");
}
_partition_key_restrictions = _partition_key_restrictions->merge_to(_schema, restriction);
_partition_range_is_simple &= !find(restriction->expression, expr::oper_t::IN);
} else if (def.is_clustering_key()) {
_clustering_columns_restrictions = _clustering_columns_restrictions->merge_to(_schema, restriction);
} else {

View File

@@ -107,6 +107,8 @@ private:
std::optional<expr::expression> _where; ///< The entire WHERE clause.
std::vector<expr::expression> _clustering_prefix_restrictions; ///< Parts of _where defining the clustering slice.
bool _partition_range_is_simple; ///< False iff _partition_range_restrictions imply a Cartesian product.
public:
/**
* Creates a new empty <code>StatementRestrictions</code>.

View File

@@ -46,13 +46,7 @@ uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
return 0;
}
bool cql3::statements::authentication_statement::depends_on_keyspace(
const sstring& ks_name) const {
return false;
}
bool cql3::statements::authentication_statement::depends_on_column_family(
const sstring& cf_name) const {
bool cql3::statements::authentication_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return false;
}

View File

@@ -56,9 +56,7 @@ public:
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;

View File

@@ -46,13 +46,7 @@ uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
return 0;
}
bool cql3::statements::authorization_statement::depends_on_keyspace(
const sstring& ks_name) const {
return false;
}
bool cql3::statements::authorization_statement::depends_on_column_family(
const sstring& cf_name) const {
bool cql3::statements::authorization_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return false;
}

View File

@@ -60,9 +60,7 @@ public:
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;

View File

@@ -93,14 +93,9 @@ batch_statement::batch_statement(type type_,
{
}
bool batch_statement::depends_on_keyspace(const sstring& ks_name) const
bool batch_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}
bool batch_statement::depends_on_column_family(const sstring& cf_name) const
{
return false;
return boost::algorithm::any_of(_statements, [&ks_name, &cf_name] (auto&& s) { return s.statement->depends_on(ks_name, cf_name); });
}
uint32_t batch_statement::get_bound_terms() const

View File

@@ -120,9 +120,7 @@ public:
std::unique_ptr<attributes> attrs,
cql_stats& stats);
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual uint32_t get_bound_terms() const override;

View File

@@ -540,12 +540,8 @@ modification_statement::validate(service::storage_proxy&, const service::client_
}
}
bool modification_statement::depends_on_keyspace(const sstring& ks_name) const {
return keyspace() == ks_name;
}
bool modification_statement::depends_on_column_family(const sstring& cf_name) const {
return column_family() == cf_name;
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
}
void modification_statement::add_operation(::shared_ptr<operation> op) {

View File

@@ -173,9 +173,7 @@ public:
// Validate before execute, using client state and current schema
void validate(service::storage_proxy&, const service::client_state& state) const override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
void add_operation(::shared_ptr<operation> op);

View File

@@ -67,12 +67,7 @@ future<> schema_altering_statement::grant_permissions_to_creator(const service::
return make_ready_future<>();
}
bool schema_altering_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool schema_altering_statement::depends_on_column_family(const sstring& cf_name) const
bool schema_altering_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -81,9 +81,7 @@ protected:
*/
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual uint32_t get_bound_terms() const override;

View File

@@ -190,12 +190,8 @@ void select_statement::validate(service::storage_proxy&, const service::client_s
// Nothing to do, all validation has been done by raw_statemet::prepare()
}
bool select_statement::depends_on_keyspace(const sstring& ks_name) const {
return keyspace() == ks_name;
}
bool select_statement::depends_on_column_family(const sstring& cf_name) const {
return column_family() == cf_name;
bool select_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
}
const sstring& select_statement::keyspace() const {
@@ -965,6 +961,7 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
}
auto paging_state_copy = make_lw_shared<service::pager::paging_state>(service::pager::paging_state(*paging_state));
paging_state_copy->set_remaining(internal_paging_size);
paging_state_copy->set_partition_key(std::move(index_pk));
paging_state_copy->set_clustering_key(std::move(index_ck));
return std::move(paging_state_copy);

View File

@@ -121,8 +121,7 @@ public:
virtual uint32_t get_bound_terms() const override;
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;
virtual void validate(service::storage_proxy&, const service::client_state& state) const override;
virtual bool depends_on_keyspace(const sstring& ks_name) const;
virtual bool depends_on_column_family(const sstring& cf_name) const;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
service::query_state& state, const query_options& options) const override;

View File

@@ -66,12 +66,7 @@ std::unique_ptr<prepared_statement> truncate_statement::prepare(database& db,cql
return std::make_unique<prepared_statement>(::make_shared<truncate_statement>(*this));
}
bool truncate_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool truncate_statement::depends_on_column_family(const sstring& cf_name) const
bool truncate_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -60,9 +60,7 @@ public:
virtual std::unique_ptr<prepared_statement> prepare(database& db, cql_stats& stats) override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;

View File

@@ -52,6 +52,7 @@
#include "types/list.hh"
#include "types/user.hh"
#include "concrete_types.hh"
#include "validation.hh"
namespace cql3 {
@@ -252,6 +253,7 @@ insert_prepared_json_statement::build_partition_keys(const query_options& option
exploded.emplace_back(json_value->second);
}
auto pkey = partition_key::from_optional_exploded(*s, std::move(exploded));
validation::validate_cql_key(*s, pkey);
auto k = query::range<query::ring_position>::make_singular(dht::decorate_key(*s, std::move(pkey)));
ranges.emplace_back(std::move(k));
return ranges;

View File

@@ -73,12 +73,7 @@ std::unique_ptr<prepared_statement> use_statement::prepare(database& db, cql_sta
}
bool use_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool use_statement::depends_on_column_family(const sstring& cf_name) const
bool use_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -61,9 +61,7 @@ public:
virtual uint32_t get_bound_terms() const override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual future<> check_access(service::storage_proxy& proxy, const service::client_state& state) const override;

View File

@@ -975,10 +975,9 @@ bool database::update_column_family(schema_ptr new_schema) {
return columns_changed;
}
future<> database::remove(const column_family& cf) noexcept {
void database::remove(const table& cf) noexcept {
auto s = cf.schema();
auto& ks = find_keyspace(s->ks_name());
_querier_cache.evict_all_for_table(s->id());
_column_families.erase(s->id());
ks.metadata()->remove_column_family(s);
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
@@ -989,20 +988,26 @@ future<> database::remove(const column_family& cf) noexcept {
// Drop view mutations received after base table drop.
}
}
co_return;
}
future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func tsf, bool snapshot) {
auto& ks = find_keyspace(ks_name);
auto uuid = find_uuid(ks_name, cf_name);
auto cf = _column_families.at(uuid);
co_await remove(*cf);
remove(*cf);
cf->clear_views();
co_return co_await cf->await_pending_ops().then([this, &ks, cf, tsf = std::move(tsf), snapshot] {
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
return cf->stop();
});
}).finally([cf] {});
co_await cf->await_pending_ops();
_querier_cache.evict_all_for_table(cf->schema()->id());
std::exception_ptr ex;
try {
co_await truncate(ks, *cf, std::move(tsf), snapshot);
} catch (...) {
ex = std::current_exception();
}
co_await cf->stop();
if (ex) {
std::rethrow_exception(std::move(ex));
}
}
const utils::UUID& database::find_uuid(std::string_view ks, std::string_view cf) const {
@@ -1384,44 +1389,6 @@ database::existing_index_names(const sstring& ks_name, const sstring& cf_to_excl
return names;
}
// Based on:
// - org.apache.cassandra.db.AbstractCell#reconcile()
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
// - org.apache.cassandra.db.BufferDeletedCell#reconcile()
int
compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
if (left.timestamp() != right.timestamp()) {
return left.timestamp() > right.timestamp() ? 1 : -1;
}
if (left.is_live() != right.is_live()) {
return left.is_live() ? -1 : 1;
}
if (left.is_live()) {
auto c = compare_unsigned(left.value(), right.value());
if (c != 0) {
return c;
}
if (left.is_live_and_has_ttl() != right.is_live_and_has_ttl()) {
// prefer expiring cells.
return left.is_live_and_has_ttl() ? 1 : -1;
}
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
return left.expiry() < right.expiry() ? -1 : 1;
}
} else {
// Both are deleted
if (left.deletion_time() != right.deletion_time()) {
// Origin compares big-endian serialized deletion time. That's because it
// delegates to AbstractCell.reconcile() which compares values after
// comparing timestamps, which in case of deleted cells will hold
// serialized expiry.
return (uint64_t) left.deletion_time().time_since_epoch().count()
< (uint64_t) right.deletion_time().time_since_epoch().count() ? -1 : 1;
}
}
return 0;
}
future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {

View File

@@ -1367,6 +1367,7 @@ private:
Future update_write_metrics(Future&& f);
void update_write_metrics_for_timed_out_write();
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, bool is_bootstrap, system_keyspace system);
void remove(const table&) noexcept;
public:
static utils::UUID empty_version;
@@ -1550,7 +1551,6 @@ public:
bool update_column_family(schema_ptr s);
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
future<> remove(const column_family&) noexcept;
const logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_manager.region_group();

View File

@@ -215,6 +215,12 @@ public:
});
}
future<flush_permit> get_all_flush_permits() {
return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) {
return this->get_flush_permit(std::move(units));
});
}
bool has_extraneous_flushes_requested() const {
return _extraneous_flushes > 0;
}

View File

@@ -229,6 +229,52 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 507338935 * nr_disks
disk_properties["write_iops"] = 57100 * nr_disks
disk_properties["write_bandwidth"] = 483141731 * nr_disks
elif idata.instance_class() in ("c6gd", "m6gd", "r6gd", "x2gd"):
if idata.instance_size() == "medium":
disk_properties["read_iops"] = 14808
disk_properties["read_bandwidth"] = 77869147
disk_properties["write_iops"] = 5972
disk_properties["write_bandwidth"] = 32820302
elif idata.instance_size() == "large":
disk_properties["read_iops"] = 29690
disk_properties["read_bandwidth"] = 157712240
disk_properties["write_iops"] = 12148
disk_properties["write_bandwidth"] = 65978069
elif idata.instance_size() == "xlarge":
disk_properties["read_iops"] = 59688
disk_properties["read_bandwidth"] = 318762880
disk_properties["write_iops"] = 24449
disk_properties["write_bandwidth"] = 133311808
elif idata.instance_size() == "2xlarge":
disk_properties["read_iops"] = 119353
disk_properties["read_bandwidth"] = 634795733
disk_properties["write_iops"] = 49069
disk_properties["write_bandwidth"] = 266841680
elif idata.instance_size() == "4xlarge":
disk_properties["read_iops"] = 237196
disk_properties["read_bandwidth"] = 1262309504
disk_properties["write_iops"] = 98884
disk_properties["write_bandwidth"] = 533938080
elif idata.instance_size() == "8xlarge":
disk_properties["read_iops"] = 442945
disk_properties["read_bandwidth"] = 2522688939
disk_properties["write_iops"] = 166021
disk_properties["write_bandwidth"] = 1063041152
elif idata.instance_size() == "12xlarge":
disk_properties["read_iops"] = 353691 * nr_disks
disk_properties["read_bandwidth"] = 1908192256 * nr_disks
disk_properties["write_iops"] = 146732 * nr_disks
disk_properties["write_bandwidth"] = 806399360 * nr_disks
elif idata.instance_size() == "16xlarge":
disk_properties["read_iops"] = 426893 * nr_disks
disk_properties["read_bandwidth"] = 2525781589 * nr_disks
disk_properties["write_iops"] = 161740 * nr_disks
disk_properties["write_bandwidth"] = 1063389952 * nr_disks
elif idata.instance_size() == "metal":
disk_properties["read_iops"] = 416257 * nr_disks
disk_properties["read_bandwidth"] = 2527296683 * nr_disks
disk_properties["write_iops"] = 156326 * nr_disks
disk_properties["write_bandwidth"] = 1063657088 * nr_disks
properties_file = open(etcdir() + "/scylla.d/io_properties.yaml", "w")
yaml.dump({ "disks": [ disk_properties ] }, properties_file, default_flow_style=False)
ioconf = open(etcdir() + "/scylla.d/io.conf", "w")
@@ -254,7 +300,7 @@ if __name__ == "__main__":
disk_properties["read_bandwidth"] = 2650 * mbs
disk_properties["write_iops"] = 360000
disk_properties["write_bandwidth"] = 1400 * mbs
elif nr_disks == "16":
elif nr_disks == 16:
disk_properties["read_iops"] = 1600000
disk_properties["read_bandwidth"] = 4521251328
#below is google, above is our measured
@@ -263,7 +309,7 @@ if __name__ == "__main__":
disk_properties["write_bandwidth"] = 2759452672
#below is google, above is our measured
#disk_properties["write_bandwidth"] = 3120 * mbs
elif nr_disks == "24":
elif nr_disks == 24:
disk_properties["read_iops"] = 2400000
disk_properties["read_bandwidth"] = 5921532416
#below is google, above is our measured

View File

@@ -115,10 +115,6 @@ if __name__ == '__main__':
pkg_install('xfsprogs')
if not shutil.which('mdadm'):
pkg_install('mdadm')
try:
md_service = systemd_unit('mdmonitor.service')
except SystemdException:
md_service = systemd_unit('mdadm.service')
print('Creating {type} for scylla using {nr_disk} disk(s): {disks}'.format(type='RAID0' if raid else 'XFS volume', nr_disk=len(disks), disks=args.disks))
procs=[]
@@ -153,15 +149,11 @@ if __name__ == '__main__':
os.makedirs(mount_at, exist_ok=True)
uuid = run(f'blkid -s UUID -o value {fsdev}', shell=True, check=True, capture_output=True, encoding='utf-8').stdout.strip()
after = 'local-fs.target'
if raid:
after += f' {md_service}'
unit_data = f'''
[Unit]
Description=Scylla data directory
Before=scylla-server.service
After={after}
Wants={md_service}
After=local-fs.target
DefaultDependencies=no
[Mount]
@@ -185,7 +177,6 @@ WantedBy=multi-user.target
f.write(f'RequiresMountsFor={mount_at}\n')
systemd_unit.reload()
md_service.start()
mount = systemd_unit(mntunit_bn)
mount.start()
if args.enable_on_nextboot:

View File

@@ -147,6 +147,11 @@ class gcp_instance:
if af == socket.AF_INET:
addr, port = sa
if addr == "169.254.169.254":
# Make sure it is not on GKE
try:
gcp_instance().__instance_metadata("machine-type")
except urllib.error.HTTPError:
return False
return True
return False

View File

@@ -6,7 +6,7 @@ ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=downloads.scylladb.com/unstable/scylla/branch-4.5/rpm/centos/latest/
ARG VERSION=4.5.2
ARG VERSION=4.5.7
ADD scylla_bashrc /scylla_bashrc

View File

@@ -4,3 +4,4 @@ stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stopwaitsecs=900

View File

@@ -121,12 +121,13 @@ class ScyllaSetup:
if self._apiAddress is not None:
args += ["--api-address %s" % self._apiAddress]
if self._alternatorPort is not None:
if self._alternatorAddress is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
if self._alternatorPort is not None:
args += ["--alternator-port %s" % self._alternatorPort]
if self._alternatorHttpsPort is not None:
args += ["--alternator-address %s" % self._alternatorAddress]
args += ["--alternator-https-port %s" % self._alternatorHttpsPort]
if self._alternatorWriteIsolation is not None:

View File

@@ -1448,7 +1448,7 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as alive {}", addr);
// Do not mark a node with status shutdown as UP.
auto status = get_gossip_status(local_state);
auto status = sstring(get_gossip_status(local_state));
if (status == sstring(versioned_value::SHUTDOWN)) {
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
return;
@@ -1467,6 +1467,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
return;
}
// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
_live_endpoints.push_back(addr);
if (_endpoints_to_talk_with.empty()) {
_endpoints_to_talk_with.push_back({addr});
@@ -1478,8 +1480,8 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, local_state);
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_alive(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
@@ -1488,11 +1490,12 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as down {}", addr);
local_state.mark_dead();
endpoint_state state = local_state;
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
_unreachable_endpoints[addr] = now();
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, local_state);
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
subscriber->on_dead(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}

33
main.cc
View File

@@ -388,11 +388,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
startlog.info("Shutting down {}", what);
try {
func();
startlog.info("Shutting down {} was successful", what);
} catch (...) {
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
throw;
auto ex = std::current_exception();
bool do_abort = true;
try {
std::rethrow_exception(ex);
} catch (const std::system_error& e) {
// System error codes we consider "environmental",
// i.e. not scylla's fault, therefore there is no point in
// aborting and dumping core.
for (int i : {EIO, EACCES, ENOSPC}) {
if (e.code() == std::error_code(i, std::system_category())) {
do_abort = false;
break;
}
}
} catch (...) {
}
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
if (do_abort) {
startlog.error("{}: aborting", msg);
abort();
} else {
startlog.error("{}: exiting, at {}", msg, current_backtrace());
// Call _exit() rather than exit() to exit immediately
// without calling exit handlers, avoiding
// boost::intrusive::detail::destructor_impl assert failure
// from ~segment_pool exit handler.
_exit(255);
}
}
startlog.info("Shutting down {} was successful", what);
};
auto ret = deferred_action(std::move(vfunc));

View File

@@ -1557,8 +1557,8 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
private:
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
const unsigned _shard;
const dht::partition_range* _pr;
const query::partition_slice& _ps;
dht::partition_range _pr;
query::partition_slice _ps;
const io_priority_class& _pc;
tracing::global_trace_state_ptr _trace_state;
const mutation_reader::forwarding _fwd_mr;
@@ -1583,7 +1583,7 @@ public:
: impl(std::move(schema), std::move(permit))
, _lifecycle_policy(std::move(lifecycle_policy))
, _shard(shard)
, _pr(&pr)
, _pr(pr)
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
@@ -1667,7 +1667,7 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
});
auto s = gs.get();
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), *_pr, _ps, _pc, _trace_state, _fwd_mr));
s, _lifecycle_policy->semaphore().make_permit(s.get(), "shard-reader"), _pr, _ps, _pc, _trace_state, _fwd_mr));
tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id());
auto f = rreader->fill_buffer(timeout);
return f.then([rreader = std::move(rreader)] () mutable {
@@ -1721,7 +1721,7 @@ future<> shard_reader::next_partition() {
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_pr = pr;
if (!_reader && !_read_ahead) {
// No need to fast-forward uncreated readers, they will be passed the new
@@ -1730,12 +1730,12 @@ future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeo
}
auto f = _read_ahead ? *std::exchange(_read_ahead, std::nullopt) : make_ready_future<>();
return f.then([this, &pr, timeout] {
return f.then([this, timeout] {
_end_of_stream = false;
clear_buffer();
return smp::submit_to(_shard, [this, &pr, timeout] {
return _reader->fast_forward_to(pr, timeout);
return smp::submit_to(_shard, [this, timeout] {
return _reader->fast_forward_to(_pr, timeout);
});
});
}

View File

@@ -57,6 +57,8 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
});
}).then([&wr] {
wr.consume_end_of_stream();
}).then_wrapped([&wr] (future<> f) {
if (f.failed()) {
auto ex = f.get_exception();
@@ -70,7 +72,6 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
return make_exception_future<>(std::move(ex));
});
} else {
wr.consume_end_of_stream();
return wr.close();
}
});

View File

@@ -417,11 +417,11 @@ public:
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
e->set_continuous(latest_i != rows.end() && latest_i->continuous());
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return {*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
}
@@ -453,11 +453,11 @@ public:
}
auto&& rows = _snp.version()->partition().clustered_rows();
auto latest_i = get_iterator_in_latest_version();
auto e = current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i != rows.end() && latest_i->continuous()));
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i != rows.end() && latest_i->continuous())));
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);
return ensure_result{*e, true};
auto e_i = rows.insert_before(latest_i, std::move(e));
return ensure_result{*e_i, true};
}
// Brings the entry pointed to by the cursor to the front of the LRU

View File

@@ -267,9 +267,14 @@ public:
return _current_tombstone;
}
const std::deque<range_tombstone>& range_tombstones_for_row(const clustering_key_prefix& ck) {
std::vector<range_tombstone> range_tombstones_for_row(const clustering_key_prefix& ck) {
drop_unneeded_tombstones(ck);
return _range_tombstones;
std::vector<range_tombstone> result(_range_tombstones.begin(), _range_tombstones.end());
auto cmp = [&] (const range_tombstone& rt1, const range_tombstone& rt2) {
return _cmp(rt1.start_bound(), rt2.start_bound());
};
std::sort(result.begin(), result.end(), cmp);
return result;
}
std::deque<range_tombstone> range_tombstones() && {

Submodule seastar updated: 70ea9312a1...fd0d7c1c9a

View File

@@ -89,7 +89,7 @@ template<typename Input>
size_type read_frame_size(Input& in) {
auto sz = deserialize(in, boost::type<size_type>());
if (sz < sizeof(size_type)) {
throw std::runtime_error("Truncated frame");
throw std::runtime_error(fmt::format("IDL frame truncated: expected to have at least {} bytes, got {}", sizeof(size_type), sz));
}
return sz - sizeof(size_type);
}

View File

@@ -2623,7 +2623,7 @@ future<> storage_service::load_and_stream(sstring ks_name, sstring cf_name,
auto& table = _db.local().find_column_family(table_id);
auto s = table.schema();
const auto cf_id = s->id();
const auto reason = streaming::stream_reason::rebuild;
const auto reason = streaming::stream_reason::repair;
auto& rs = _db.local().find_keyspace(ks_name).get_replication_strategy();
size_t nr_sst_total = sstables.size();
@@ -3348,7 +3348,7 @@ shared_ptr<node_ops_info> node_ops_meta_data::get_ops_info() {
void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
slogger.debug("node_ops_update_heartbeat: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3358,7 +3358,7 @@ void storage_service::node_ops_update_heartbeat(utils::UUID ops_uuid) {
void storage_service::node_ops_done(utils::UUID ops_uuid) {
slogger.debug("node_ops_done: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3369,7 +3369,7 @@ void storage_service::node_ops_done(utils::UUID ops_uuid) {
void storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = seastar::get_units(_node_ops_abort_sem, 1);
auto permit = seastar::get_units(_node_ops_abort_sem, 1).get0();
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;

View File

@@ -124,7 +124,7 @@ void sstable_writer_k_l::maybe_flush_pi_block(file_writer& out,
// block includes them), but we set block_next_start_offset after - so
// even if we wrote a lot of open tombstones, we still get a full
// block size of new data.
auto& rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
auto rts = _pi_write.tombstone_accumulator->range_tombstones_for_row(
clustering_key_prefix::from_range(clustering_key.values()));
for (const auto& rt : rts) {
auto start = composite::from_clustering_element(*_pi_write.schemap, rt.start);

View File

@@ -78,7 +78,11 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(colu
}
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
if (removed.empty() || added.empty() || !_last_compacted_keys) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();

View File

@@ -1473,12 +1473,13 @@ bool table::can_flush() const {
}
future<> table::clear() {
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
if (_commitlog) {
_commitlog->discard_completed_segments(_schema->id());
}
_memtables->clear();
_memtables->add_memtable();
return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
}
// NOTE: does not need to be futurized, but might eventually, depending on

View File

@@ -85,3 +85,20 @@ def test_signature_too_futuristic(dynamodb, test_table):
response = requests.post(url, headers=headers, verify=False)
assert not response.ok
assert "InvalidSignatureException" in response.text and "Signature not yet current" in response.text
# A test that commas can be uses instead of whitespace to separate components
# of the Authorization headers - reproducing issue #9568.
def test_authorization_no_whitespace(dynamodb, test_table):
# Unlike the above tests which checked error cases so didn't need to
# calculate a real signature, in this test we really a correct signature,
# so we use a function we already have in test_manual_requests.py.
from test_manual_requests import get_signed_request
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
req = get_signed_request(dynamodb, 'PutItem', payload)
# Boto3 separates the components of the Authorization header by spaces.
# Let's remove all of them except the first one (which separates the
# signature algorithm name from the rest) and check the result still works:
a = req.headers['Authorization'].split()
req.headers['Authorization'] = a[0] + ' ' + ''.join(a[1:])
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.ok

View File

@@ -186,3 +186,25 @@ def test_incorrect_numbers(dynamodb, test_table):
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert "ValidationException" in response.text and "numeric" in response.text
# Although the DynamoDB API responses are JSON, additional conventions apply
# to these responses - such as how error codes are encoded in JSON. For this
# reason, DynamoDB uses the content type 'application/x-amz-json-1.0' instead
# of the standard 'application/json'. This test verifies that we return the
# correct content type header.
# While most DynamoDB libraries we tried do not care about an unexpected
# content-type, it turns out that one (aiodynamo) does. Moreover, AWS already
# defined x-amz-json-1.1 - see
# https://awslabs.github.io/smithy/1.0/spec/aws/aws-json-1_1-protocol.html
# which differs (only) in how it encodes error replies.
# So in the future it may become even more important that Scylla return the
# correct content type.
def test_content_type(dynamodb, test_table):
payload = '{"TableName": "' + test_table.name + '", "Item": {"p": {"S": "x"}, "c": {"S": "x"}}}'
# Note that get_signed_request() uses x-amz-json-1.0 to encode the
# *request*. In the future this may or may not effect the content type
# in the response (today, DynamoDB doesn't allow any other content type
# in the request anyway).
req = get_signed_request(dynamodb, 'PutItem', payload)
response = requests.post(req.url, headers=req.headers, data=req.body, verify=False)
assert response.headers['Content-Type'] == 'application/x-amz-json-1.0'

View File

@@ -0,0 +1,113 @@
# Copyright 2021-present ScyllaDB
#
# This file is part of Scylla.
#
# Scylla is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scylla is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
# Tests for Scylla's metrics (see docs/design-notes/metrics.md) for Alternator
# queries. Reproduces issue #9406, where although metrics was implemented for
# Alternator requests, they were missing for some operations (BatchGetItem).
# In the tests here we attempt to ensure that the metrics continue to work
# for the relevant operations as the code evolves.
#
# Note that all tests in this file test Scylla-specific features, and are
# "skipped" when not running against Scylla, or when unable to retrieve
# metrics through out-of-band HTTP requests to Scylla's Prometheus port (9180).
#
# IMPORTANT: we do not want these tests to assume that are not running in
# parallel with any other tests or workload - because such an assumption
# would limit our test deployment options in the future. NOT making this
# assumption means that these tests can't check that a certain operation
# increases a certain counter by exactly 1 - because other concurrent
# operations might increase it further! So our test can only check that the
# counter increases.
##############################################################################
import pytest
import requests
import re
from util import random_string
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
# are not available on AWS (of course), but may also not be available for
# Scylla if for some reason we have only access to the Alternator protocol
# port but no access to the metrics port (9180).
# If metrics are *not* available, tests using this fixture will be skipped.
# Tests using this fixture may call get_metrics(metrics).
@pytest.fixture(scope="module")
def metrics(dynamodb):
if dynamodb.meta.client._endpoint.host.endswith('.amazonaws.com'):
pytest.skip('Scylla-only feature not supported by AWS')
url = dynamodb.meta.client._endpoint.host
# The Prometheus API is on port 9180, and always http, not https.
url = re.sub(r':[0-9]+(/|$)', ':9180', url)
url = re.sub(r'^https:', 'http:', url)
url = url + '/metrics'
resp = requests.get(url)
if resp.status_code != 200:
pytest.skip('Metrics port 9180 is not available')
yield url
# Utility function for fetching all metrics from Scylla, using an HTTP request
# to port 9180. The response format is defined by the Prometheus protocol.
# Only use get_metrics() in a test using the metrics_available fixture.
def get_metrics(metrics):
response = requests.get(metrics)
assert response.status_code == 200
return response.text
# Utility function for fetching a metric with a given name and optionally a
# given sub-metric label (which should be a name-value map). If multiple
# matches are found, they are summed - this is useful for summing up the
# counts from multiple shards.
def get_metric(metrics, name, requested_labels=None):
total = 0.0
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
for match in re.findall(lines, get_metrics(metrics)):
a = match.split()
metric = a[0]
val = float(a[1])
# Check if match also matches the requested labels
if requested_labels:
# we know metric begins with name{ and ends with } - the labels
# are what we have between those
got_labels = metric[len(name)+1:-1].split(',')
# Check that every one of the requested labels is in got_labels:
for k, v in requested_labels.items():
if not f'{k}="{v}"' in got_labels:
# No match for requested label, skip this metric (python
# doesn't have "continue 2" so let's just set val to 0...
val = 0
break
total += float(val)
return total
def test_batch_write_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchWriteItem'})
assert n2 > n1
# Reproduces issue #9406:
def test_batch_get_item(test_table_s, metrics):
n1 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
n2 = get_metric(metrics, 'scylla_alternator_operation', {'op': 'BatchGetItem'})
assert n2 > n1
# TODO: check the rest of the operations

View File

@@ -1014,6 +1014,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
# exist - and generates a ValidationException, if x *does* exist but y
# doesn't, it's fine and the removal should just be silently ignored.
def test_nested_attribute_remove_missing_leaf(test_table_s):
p = random_string()
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
test_table_s.put_item(Item=item)
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
# The above UpdateItem calls didn't change anything...
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
# Similarly for other types of bad paths - using [0] on something which
# doesn't exist or isn't an array.
def test_nested_attribute_update_bad_path_array(test_table_s):

View File

@@ -191,3 +191,32 @@ BOOST_AUTO_TEST_CASE(tests_reserve_partial) {
BOOST_REQUIRE_EQUAL(v.capacity(), orig_size);
}
}
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
// the last reserved chunk.
BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
using vector_type = utils::chunked_vector<std::unique_ptr<uint64_t>>;
vector_type v;
// Fill two chunks
v.reserve(vector_type::max_chunk_capacity() * 3 / 2);
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 3 / 2; ++i) {
v.emplace_back(std::make_unique<uint64_t>(i));
}
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
v.shrink_to_fit();
// Leave the last chunk reserved but empty
for (uint64_t i = 0; i < vector_type::max_chunk_capacity(); ++i) {
v.pop_back();
}
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
// with _size not in the last chunk. Should not sigsegv.
v.reserve(vector_type::max_chunk_capacity() * 4);
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 2; ++i) {
v.emplace_back(std::make_unique<uint64_t>(i));
}
}

View File

@@ -618,3 +618,38 @@ SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
}
}, std::move(cfg)).get();
}
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
const auto ts = db_clock::now();
auto& tbl = db.find_column_family("ks", "cf");
auto op = std::optional(tbl.read_in_progress());
auto s = tbl.schema();
auto q = query::data_querier(
tbl.as_mutation_source(),
tbl.schema(),
tests::make_permit(),
query::full_partition_range,
s->full_slice(),
default_priority_class(),
nullptr);
auto f = e.db().invoke_on_all([ts] (database& db) {
return db.drop_column_family("ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
});
// we add a querier to the querier cache while the drop is ongoing
auto& qc = db.get_querier_cache();
qc.insert(utils::make_random_uuid(), std::move(q), nullptr);
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
op.reset(); // this should allow the drop to finish
f.get();
// the drop should have cleaned up all entries belonging to that table
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
});
}

View File

@@ -22,6 +22,8 @@
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "cql3/untyped_result_set.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
SEASTAR_TEST_CASE(test_index_with_paging) {
@@ -48,3 +50,51 @@ SEASTAR_TEST_CASE(test_index_with_paging) {
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, ck text, v int, v2 int, v3 text, PRIMARY KEY (pk, ck))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, ck, v, v2, v3) VALUES ({}, 'hello{}', 1, {}, '{}')", i % 3, i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}
SEASTAR_TEST_CASE(test_index_with_paging_with_base_short_read_no_ck) {
return do_with_cql_env_thread([] (auto& e) {
e.execute_cql("CREATE TABLE tab (pk int, v int, v2 int, v3 text, PRIMARY KEY (pk))").get();
e.execute_cql("CREATE INDEX ON tab (v)").get();
// Enough to trigger a short read on the base table during scan
sstring big_string(2 * query::result_memory_limiter::maximum_result_size, 'j');
const int row_count = 67;
for (int i = 0; i < row_count; ++i) {
e.execute_cql(format("INSERT INTO tab (pk, v, v2, v3) VALUES ({}, 1, {}, '{}')", i, i, big_string)).get();
}
eventually([&] {
uint64_t count = 0;
e.qp().local().query_internal("SELECT * FROM ks.tab WHERE v = 1", [&] (const cql3::untyped_result_set_row&) {
++count;
return make_ready_future<stop_iteration>(stop_iteration::no);
}).get();
BOOST_REQUIRE_EQUAL(count, row_count);
});
});
}

View File

@@ -197,9 +197,9 @@ SEASTAR_TEST_CASE(test_loading_shared_values_parallel_loading_explicit_eviction)
}).get();
int rand_key = rand_int(num_loaders);
BOOST_REQUIRE(shared_values.find(rand_key) != shared_values.end());
BOOST_REQUIRE(shared_values.find(rand_key) != nullptr);
anchors_vec[rand_key] = nullptr;
BOOST_REQUIRE_MESSAGE(shared_values.find(rand_key) == shared_values.end(), format("explicit removal for key {} failed", rand_key));
BOOST_REQUIRE_MESSAGE(shared_values.find(rand_key) == nullptr, format("explicit removal for key {} failed", rand_key));
anchors_vec.clear();
});
}
@@ -236,29 +236,10 @@ SEASTAR_THREAD_TEST_CASE(test_loading_cache_removing_key) {
loading_cache.get_ptr(0, loader).discard_result().get();
BOOST_REQUIRE_EQUAL(load_count, 1);
BOOST_REQUIRE(loading_cache.find(0) != loading_cache.end());
BOOST_REQUIRE(loading_cache.find(0) != nullptr);
loading_cache.remove(0);
BOOST_REQUIRE(loading_cache.find(0) == loading_cache.end());
}
SEASTAR_THREAD_TEST_CASE(test_loading_cache_removing_iterator) {
using namespace std::chrono;
load_count = 0;
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
loading_cache.get_ptr(0, loader).discard_result().get();
BOOST_REQUIRE_EQUAL(load_count, 1);
auto it = loading_cache.find(0);
BOOST_REQUIRE(it != loading_cache.end());
loading_cache.remove(it);
BOOST_REQUIRE(loading_cache.find(0) == loading_cache.end());
BOOST_REQUIRE(loading_cache.find(0) == nullptr);
}
SEASTAR_TEST_CASE(test_loading_cache_loading_different_keys) {
@@ -292,10 +273,69 @@ SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_eviction) {
loading_cache.get_ptr(0, loader).discard_result().get();
BOOST_REQUIRE(loading_cache.find(0) != loading_cache.end());
BOOST_REQUIRE(loading_cache.find(0) != nullptr);
sleep(20ms).get();
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(0), loading_cache.end());
REQUIRE_EVENTUALLY_EQUAL(loading_cache.find(0), nullptr);
});
}
SEASTAR_TEST_CASE(test_loading_cache_loading_expiry_reset_on_sync_op) {
return seastar::async([] {
using namespace std::chrono;
utils::loading_cache<int, sstring> loading_cache(num_loaders, 30ms, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
loading_cache.get_ptr(0, loader).discard_result().get();
auto vp = loading_cache.find(0);
auto load_time = steady_clock::now();
// Check that the expiration timer is reset every time we call a find()
for (int i = 0; i < 10; ++i) {
// seastar::lowres_clock has 10ms resolution. This means that we should use 10ms threshold to compensate.
if (steady_clock::now() <= load_time + 20ms) {
BOOST_REQUIRE(vp != nullptr);
} else {
// If there was a delay and we weren't able to execute the next loop iteration during 20ms let's repopulate
// the cache.
loading_cache.get_ptr(0, loader).discard_result().get();
BOOST_TEST_MESSAGE("Test " << i << " was skipped. Repopulating...");
}
vp = loading_cache.find(0);
load_time = steady_clock::now();
sleep(10ms).get();
}
sleep(30ms).get();
REQUIRE_EVENTUALLY_EQUAL(loading_cache.size(), 0);
});
}
SEASTAR_TEST_CASE(test_loading_cache_move_item_to_mru_list_front_on_sync_op) {
return seastar::async([] {
using namespace std::chrono;
utils::loading_cache<int, sstring> loading_cache(2, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
for (int i = 0; i < 2; ++i) {
loading_cache.get_ptr(i, loader).discard_result().get();
}
auto vp0 = loading_cache.find(0);
BOOST_REQUIRE(vp0 != nullptr);
loading_cache.get_ptr(2, loader).discard_result().get();
// "0" should be at the beginning of the list and "1" right after it before we try to add a new entry to the
// cache ("2"). And hence "1" should get evicted.
vp0 = loading_cache.find(0);
auto vp1 = loading_cache.find(1);
BOOST_REQUIRE(vp0 != nullptr);
BOOST_REQUIRE(vp1 == nullptr);
});
}
@@ -351,3 +391,87 @@ SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
});
}
SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind) {
using namespace std::chrono;
load_count = 0;
auto load_v1 = [] (auto key) { return make_ready_future<sstring>("v1"); };
auto load_v2 = [] (auto key) { return make_ready_future<sstring>("v2"); };
auto load_v3 = [] (auto key) { return make_ready_future<sstring>("v3"); };
{
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
//
// Test remove() concurrent with loading
//
auto f = loading_cache.get_ptr(0, [&](auto key) {
return later().then([&] {
return load_v1(key);
});
});
loading_cache.remove(0);
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
auto ptr1 = f.get0();
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
loading_cache.remove(0);
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
//
// Test that live ptr1, removed from cache, does not prevent reload of new value
//
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
ptr1 = nullptr;
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
}
// Test remove_if()
{
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
//
// Test remove_if() concurrent with loading
//
auto f = loading_cache.get_ptr(0, [&](auto key) {
return later().then([&] {
return load_v1(key);
});
});
loading_cache.remove_if([] (auto&& v) { return v == "v1"; });
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
auto ptr1 = f.get0();
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
loading_cache.remove_if([] (auto&& v) { return v == "v2"; });
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
//
// Test that live ptr1, removed from cache, does not prevent reload of new value
//
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
ptr1 = nullptr;
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
ptr2 = nullptr;
}
}

View File

@@ -687,6 +687,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
};
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
testlog.trace("Expected {} == {}", c1, c2);
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
};
@@ -708,9 +709,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
// Origin doesn't compare ttl (is it wise?)
assert_equal(
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
// But we do. See https://github.com/scylladb/scylla/issues/10156
// and https://github.com/scylladb/scylla/issues/10173
assert_order(
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
assert_order(
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),

View File

@@ -28,6 +28,8 @@
#include "sstables/sstables.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/mutation_assertions.hh"
#include "partition_slice_builder.hh"
using namespace sstables;
using namespace std::chrono_literals;
@@ -62,3 +64,69 @@ SEASTAR_TEST_CASE(test_sstable_conforms_to_mutation_source) {
}
});
}
// Regression test for scylladb/scylla-enterprise#2016
SEASTAR_THREAD_TEST_CASE(test_produces_range_tombstone) {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type, column_kind::regular_column)
.build();
mutation m(s, partition_key::from_single_value(*s, int32_type->decompose(0)));
m.partition().apply_row_tombstone(*s, range_tombstone{
clustering_key::from_exploded(*s, {int32_type->decompose(6)}), bound_kind::excl_start,
clustering_key::from_exploded(*s, {int32_type->decompose(10)}), bound_kind::incl_end,
tombstone(0, gc_clock::time_point())
});
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(6)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.marker() = row_marker(4);
}
{
auto ckey = clustering_key::from_exploded(*s, {int32_type->decompose(8)});
deletable_row& row = m.partition().clustered_row(*s, ckey, is_dummy::no, is_continuous(false));
row.apply(tombstone(2, gc_clock::time_point()));
row.marker() = row_marker(5);
}
testlog.info("m: {}", m);
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make(
{clustering_key::from_exploded(*s, {int32_type->decompose(8)}), false},
{clustering_key::from_exploded(*s, {int32_type->decompose(10)}), true}
))
.build();
auto pr = dht::partition_range::make_singular(m.decorated_key());
std::vector<tmpdir> dirs;
dirs.emplace_back();
sstables::test_env::do_with_async([&] (sstables::test_env& env) {
storage_service_for_tests ssft;
auto version = sstable_version_types::la;
auto index_block_size = 1;
sstable_writer_config cfg = env.manager().configure_writer();
cfg.promoted_index_block_size = index_block_size;
auto source = make_sstable_mutation_source(env, s, dirs.back().path().string(), {m}, cfg, version, gc_clock::now());
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
while (auto mf = rd(db::no_timeout).get0()) {
testlog.info("produced {}", mutation_fragment::printer(*s, *mf));
}
}
{
auto rd = source.make_reader(s, tests::make_permit(), pr, slice);
mutation_opt sliced_m = read_mutation_from_flat_mutation_reader(rd, db::no_timeout).get0();
BOOST_REQUIRE(bool(sliced_m));
assert_that(*sliced_m).is_equal_to(m, slice.row_ranges(*m.schema(), m.key()));
}
}).get();
}

View File

@@ -86,3 +86,16 @@ def test_filtering_contiguous_nonmatching_single_partition(cql, test_keyspace):
# Scylla won't count its size as part of the 1MB limit, and will not
# return empty pages - the first page will contain the result.
assert list(cql.execute(f"SELECT c, s FROM {table} WHERE p=1 AND v={count-1} ALLOW FILTERING")) == [(count-1, long)]
# Test that the fact that a column is indexed does not cause us to fetch
# incorrect results from a filtering query (issue #10300).
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
schema = 'p int, c int, v boolean, primary key (p,c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"create index on {table}(v)")
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])

View File

@@ -64,8 +64,9 @@ def test_insert_null_key(cql, table1):
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(stmt, [None, s])
# Tests handling of "key_column in ?" where ? is bound to null.
# Reproduces issue #8265.
def test_primary_key_in_null(cql, table1):
'''Tests handling of "key_column in ?" where ? is bound to null.'''
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(cql.prepare(f"SELECT p FROM {table1} WHERE p IN ?"), [None])
with pytest.raises(InvalidRequest, match='null value'):
@@ -80,3 +81,98 @@ def test_regular_column_in_null(scylla_only, cql, table1):
cql.execute(f"INSERT INTO {table1} (p,c) VALUES ('p', 'c')")
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(cql.prepare(f"SELECT v FROM {table1} WHERE v IN ? ALLOW FILTERING"), [None])
# Though nonsensical, this operation is allowed by Cassandra. Ensure we allow it, too.
def test_delete_impossible_clustering_range(cql, table1):
cql.execute(f"DELETE FROM {table1} WHERE p='p' and c<'a' and c>'a'")
@pytest.mark.xfail(reason="issue #7852")
def test_delete_null_key(cql, table1):
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(f"DELETE FROM {table1} WHERE p=null")
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(cql.prepare(f"DELETE FROM {table1} WHERE p=?"), [None])
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(f"DELETE FROM {table1} WHERE p='p' AND c=null")
with pytest.raises(InvalidRequest, match='null value'):
cql.execute(cql.prepare(f"DELETE FROM {table1} WHERE p='p' AND c=?"), [None])
# Test what SELECT does with the restriction "WHERE v=NULL".
# In SQL, "WHERE v=NULL" doesn't match anything - because nothing is equal
# to null - not even null. SQL also provides a more useful restriction
# "WHERE v IS NULL" which matches all rows where v is unset.
# Scylla and Cassandra do *not* support the "IS NULL" syntax yet (they do
# have "IS NOT NULL" but only in a definition of a materialized view),
# so it is commonly requested that "WHERE v=NULL" should do what "IS NULL"
# is supposed to do - see issues #4776 and #8489 for Scylla and
# CASSANDRA-10715 for Cassandra, where this feature was requested.
# Nevertheless, in Scylla we decided to follow SQL: "WHERE v=NULL" should
# matche nothing, not even rows where v is unset. This is what the following
# test verifies.
# This test fails on Cassandra (hence cassandra_bug) because Cassandra
# refuses the "WHERE v=NULL" relation, rather than matching nothing.
# We consider this a mistake, and not something we want to emulate in Scylla.
def test_filtering_eq_null(cassandra_bug, cql, table1):
p = random_string()
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{p}', '1', 'hello')")
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{p}', '2', '')")
cql.execute(f"INSERT INTO {table1} (p,c) VALUES ('{p}', '3')")
# As explained above, none of the above-inserted rows should match -
# not even the one with an unset v:
assert list(cql.execute(f"SELECT c FROM {table1} WHERE p='{p}' AND v=NULL ALLOW FILTERING")) == []
# In test_insert_null_key() above we verified that a null value is not
# allowed as a key column - neither as a partition key nor clustering key.
# An *empty string*, in contrast, is NOT a null. So ideally should have been
# allowed as a key. However, for undocumented reasons (having to do with how
# partition keys are serialized in sstables), an empty string is NOT allowed
# as a partition key. It is allowed as a clustering key, though. In the
# following test we confirm those things.
# See issue #9352.
def test_insert_empty_string_key(cql, table1):
s = random_string()
# An empty-string clustering *is* allowed:
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('{s}', '', 'cat')")
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
# But an empty-string partition key is *not* allowed, with a specific
# error that a "Key may not be empty":
with pytest.raises(InvalidRequest, match='Key may not be empty'):
cql.execute(f"INSERT INTO {table1} (p,c,v) VALUES ('', '{s}', 'dog')")
# test_update_empty_string_key() is the same as test_insert_empty_string_key()
# just uses an UPDATE instead of INSERT. It turns out that exactly the cases
# which are allowed by INSERT are also allowed by UPDATE.
def test_update_empty_string_key(cql, table1):
s = random_string()
# An empty-string clustering *is* allowed:
cql.execute(f"UPDATE {table1} SET v = 'cat' WHERE p='{s}' AND c=''")
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
# But an empty-string partition key is *not* allowed, with a specific
# error that a "Key may not be empty":
with pytest.raises(InvalidRequest, match='Key may not be empty'):
cql.execute(f"UPDATE {table1} SET v = 'dog' WHERE p='' AND c='{s}'")
# ... and same for DELETE
def test_delete_empty_string_key(cql, table1):
s = random_string()
# An empty-string clustering *is* allowed:
cql.execute(f"DELETE FROM {table1} WHERE p='{s}' AND c=''")
# But an empty-string partition key is *not* allowed, with a specific
# error that a "Key may not be empty":
with pytest.raises(InvalidRequest, match='Key may not be empty'):
cql.execute(f"DELETE FROM {table1} WHERE p='' AND c='{s}'")
# Another test like test_insert_empty_string_key() just using an INSERT JSON
# instead of a regular INSERT. Because INSERT JSON takes a different code path
# from regular INSERT, we need the emptiness test in yet another place.
# Reproduces issue #9853 (the empty-string partition key was allowed, and
# actually inserted into the table.)
def test_insert_json_empty_string_key(cql, table1):
s = random_string()
# An empty-string clustering *is* allowed:
cql.execute("""INSERT INTO %s JSON '{"p": "%s", "c": "", "v": "cat"}'""" % (table1, s))
assert list(cql.execute(f"SELECT v FROM {table1} WHERE p='{s}' AND c=''")) == [('cat',)]
# But an empty-string partition key is *not* allowed, with a specific
# error that a "Key may not be empty":
with pytest.raises(InvalidRequest, match='Key may not be empty'):
cql.execute("""INSERT INTO %s JSON '{"p": "", "c": "%s", "v": "cat"}'""" % (table1, s))

View File

@@ -548,7 +548,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
break;
case auth_state::AUTHENTICATION:
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
assert(cqlop == cql_binary_opcode::AUTH_RESPONSE || cqlop == cql_binary_opcode::CREDENTIALS);
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
}
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
client_state.set_auth_state(auth_state::READY);
}
@@ -793,7 +795,7 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
if (flags & cql_frame_flags::compression) {
if (_compression == cql_compression::lz4) {
if (length < 4) {
throw std::runtime_error("Truncated frame");
throw std::runtime_error(fmt::format("CQL frame truncated: expected to have at least 4 bytes, got {}", length));
}
return _buffer_reader.read_exactly(_read_buf, length).then([this] (fragmented_temporary_buffer buf) {
auto linearization_buffer = bytes_ostream();
@@ -1294,7 +1296,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
{
if (_version < 4) {
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
@@ -1322,7 +1324,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
{
if (_version < 4) {
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));

View File

@@ -52,10 +52,11 @@ class chunked_vector {
utils::small_vector<chunk_ptr, 1> _chunks;
size_t _size = 0;
size_t _capacity = 0;
private:
public:
static size_t max_chunk_capacity() {
return std::max(max_contiguous_allocation / sizeof(T), size_t(1));
}
private:
void reserve_for_push_back() {
if (_size == _capacity) {
do_reserve_for_push_back();
@@ -387,7 +388,9 @@ chunked_vector<T, max_contiguous_allocation>::make_room(size_t n, bool stop_afte
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
// FIXME: realloc? maybe not worth the complication; only works for PODs
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
if (_size > _capacity - last_chunk_capacity) {
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
}
_chunks.back() = std::move(new_last_chunk);
_capacity += capacity_increase;
}

View File

@@ -278,6 +278,13 @@ decltype(auto) with_simplified(const View& v, Function&& fn)
}
}
template<FragmentedView View>
void skip_empty_fragments(View& v) {
while (!v.empty() && v.current_fragment().empty()) {
v.remove_current();
}
}
template<FragmentedView V1, FragmentedView V2>
int compare_unsigned(V1 v1, V2 v2) {
while (!v1.empty() && !v2.empty()) {
@@ -287,6 +294,8 @@ int compare_unsigned(V1 v1, V2 v2) {
}
v1.remove_prefix(n);
v2.remove_prefix(n);
skip_empty_fragments(v1);
skip_empty_fragments(v2);
}
return v1.size_bytes() - v2.size_bytes();
}
@@ -306,6 +315,8 @@ void write_fragmented(Dest& dest, Src src) {
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
dest.remove_prefix(n);
src.remove_prefix(n);
skip_empty_fragments(dest);
skip_empty_fragments(src);
}
}
@@ -319,6 +330,8 @@ void copy_fragmented_view(Dest dest, Src src) {
memcpy(dest.current_fragment().data(), src.current_fragment().data(), n);
dest.remove_prefix(n);
src.remove_prefix(n);
skip_empty_fragments(dest);
skip_empty_fragments(src);
}
}

View File

@@ -111,9 +111,10 @@ public:
private:
void touch() noexcept {
assert(_lru_entry_ptr);
_last_read = loading_cache_clock_type::now();
_lru_entry_ptr->touch();
if (_lru_entry_ptr) {
_lru_entry_ptr->touch();
}
}
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
@@ -142,10 +143,21 @@ private:
timestamped_val_ptr _ts_val_ptr;
public:
value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) { _ts_val_ptr->touch(); }
value_ptr(timestamped_val_ptr ts_val_ptr) : _ts_val_ptr(std::move(ts_val_ptr)) {
if (_ts_val_ptr) {
_ts_val_ptr->touch();
}
}
value_ptr(std::nullptr_t) noexcept : _ts_val_ptr() {}
bool operator==(const value_ptr& x) const { return _ts_val_ptr == x._ts_val_ptr; }
bool operator!=(const value_ptr& x) const { return !operator==(x); }
explicit operator bool() const noexcept { return bool(_ts_val_ptr); }
value_type& operator*() const noexcept { return _ts_val_ptr->value(); }
value_type* operator->() const noexcept { return &_ts_val_ptr->value(); }
friend std::ostream& operator<<(std::ostream& os, const value_ptr& vp) {
return os << vp._ts_val_ptr;
}
};
/// \brief This is and LRU list entry which is also an anchor for a loading_cache value.
@@ -258,14 +270,8 @@ private:
using loading_values_type = typename ts_value_type::loading_values_type;
using timestamped_val_ptr = typename loading_values_type::entry_ptr;
using ts_value_lru_entry = typename ts_value_type::lru_entry;
using set_iterator = typename loading_values_type::iterator;
using lru_list_type = typename ts_value_lru_entry::lru_list_type;
using list_iterator = typename lru_list_type::iterator;
struct value_extractor_fn {
Tp& operator()(ts_value_lru_entry& le) const {
return le.timestamped_value().value();
}
};
public:
using value_type = Tp;
@@ -273,7 +279,6 @@ public:
using value_ptr = typename ts_value_type::value_ptr;
class entry_is_too_big : public std::exception {};
using iterator = boost::transform_iterator<value_extractor_fn, list_iterator>;
private:
loading_cache(size_t max_size, std::chrono::milliseconds expiry, std::chrono::milliseconds refresh, logging::logger& logger)
@@ -338,7 +343,7 @@ public:
});
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
// check again since it could have already been inserted and initialized
if (!ts_val_ptr->ready()) {
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
_logger.trace("{}: storing the value for the first time", k);
if (ts_val_ptr->size() > _max_size) {
@@ -383,23 +388,31 @@ public:
return _timer_reads_gate.close().finally([this] { _timer.cancel(); });
}
/// Find a value for a specific Key value and touch() it.
/// \tparam KeyType Key type
/// \tparam KeyHasher Hash functor type
/// \tparam KeyEqual Equality functor type
///
/// \param key Key value to look for
/// \param key_hasher_func Hash functor
/// \param key_equal_func Equality functor
/// \return cache_value_ptr object pointing to the found value or nullptr otherwise.
template<typename KeyType, typename KeyHasher, typename KeyEqual>
iterator find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
return boost::make_transform_iterator(to_list_iterator(set_find(key, std::move(key_hasher_func), std::move(key_equal_func))), _value_extractor_fn);
value_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
// cache_value_ptr constructor is going to update a "last read" timestamp of the corresponding object and move
// the object to the front of the LRU
return set_find(key, std::move(key_hasher_func), std::move(key_equal_func));
};
iterator find(const Key& k) noexcept {
return boost::make_transform_iterator(to_list_iterator(set_find(k)), _value_extractor_fn);
}
iterator end() {
return boost::make_transform_iterator(list_end(), _value_extractor_fn);
}
iterator begin() {
return boost::make_transform_iterator(list_begin(), _value_extractor_fn);
value_ptr find(const Key& k) noexcept {
return set_find(k);
}
// Removes all values matching a given predicate and values which are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// The predicate may be invoked multiple times on the same value.
// It must return the same result for a given value (it must be a pure function).
template <typename Pred>
void remove_if(Pred&& pred) {
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");
@@ -409,24 +422,29 @@ public:
}, [this] (ts_value_lru_entry* p) {
loading_cache::destroy_ts_value(p);
});
_loading_values.remove_if([&pred] (const ts_value_type& v) {
return pred(v.value());
});
}
// Removes a given key from the cache.
// The key is removed immediately.
// After this, get_ptr() is guaranteed to reload the value before returning it.
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
// its value will not populate the cache. It will still succeed.
void remove(const Key& k) {
auto it = set_find(k);
if (it == set_end()) {
return;
}
_lru_list.erase_and_dispose(_lru_list.iterator_to(*it->lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
remove_ts_value(set_find(k));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(k);
}
void remove(iterator it) {
if (it == end()) {
return;
}
const ts_value_type& val = ts_value_type::container_of(*it);
_lru_list.erase_and_dispose(_lru_list.iterator_to(*val.lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
// Removes a given key from the cache.
// Same guarantees as with remove(key).
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(key, key_hasher_func, key_equal_func);
}
size_t size() const {
@@ -439,49 +457,29 @@ public:
}
private:
/// Should only be called on values for which the following holds: set_it == set_end() || set_it->ready()
/// For instance this always holds for iterators returned by set_find(...).
list_iterator to_list_iterator(set_iterator set_it) {
if (set_it != set_end()) {
return _lru_list.iterator_to(*set_it->lru_entry_ptr());
void remove_ts_value(timestamped_val_ptr ts_ptr) {
if (!ts_ptr) {
return;
}
return list_end();
_lru_list.erase_and_dispose(_lru_list.iterator_to(*ts_ptr->lru_entry_ptr()), [this] (ts_value_lru_entry* p) { loading_cache::destroy_ts_value(p); });
}
set_iterator ready_entry_iterator(set_iterator it) {
set_iterator end_it = set_end();
if (it == end_it || !it->ready()) {
return end_it;
timestamped_val_ptr ready_entry_ptr(timestamped_val_ptr tv_ptr) {
if (!tv_ptr || !tv_ptr->ready()) {
return nullptr;
}
return it;
return std::move(tv_ptr);
}
template<typename KeyType, typename KeyHasher, typename KeyEqual>
set_iterator set_find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
return ready_entry_iterator(_loading_values.find(key, std::move(key_hasher_func), std::move(key_equal_func)));
timestamped_val_ptr set_find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
return ready_entry_ptr(_loading_values.find(key, std::move(key_hasher_func), std::move(key_equal_func)));
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
set_iterator set_find(const Key& key) noexcept {
return ready_entry_iterator(_loading_values.find(key));
}
set_iterator set_end() noexcept {
return _loading_values.end();
}
set_iterator set_begin() noexcept {
return _loading_values.begin();
}
list_iterator list_end() noexcept {
return _lru_list.end();
}
list_iterator list_begin() noexcept {
return _lru_list.begin();
timestamped_val_ptr set_find(const Key& key) noexcept {
return ready_entry_ptr(_loading_values.find(key));
}
bool caching_enabled() const {
@@ -613,7 +611,6 @@ private:
std::function<future<Tp>(const Key&)> _load;
timer<loading_cache_clock_type> _timer;
seastar::gate _timer_reads_gate;
value_extractor_fn _value_extractor_fn;
};
}

View File

@@ -28,7 +28,6 @@
#include <seastar/core/future.hh>
#include <seastar/core/bitops.hh>
#include <boost/intrusive/unordered_set.hpp>
#include <boost/iterator/transform_iterator.hpp>
#include <boost/lambda/bind.hpp>
#include "seastarx.hh"
@@ -98,6 +97,10 @@ private:
_val.emplace(std::move(new_val));
}
bool orphaned() const {
return !is_linked();
}
shared_promise<>& loaded() {
return _loaded;
}
@@ -110,7 +113,9 @@ private:
: _parent(parent), _key(std::move(k)) {}
~entry() {
_parent._set.erase(_parent._set.iterator_to(*this));
if (is_linked()) {
_parent._set.erase(_parent._set.iterator_to(*this));
}
Stats::inc_evictions();
}
@@ -137,16 +142,8 @@ private:
using set_type = bi::unordered_set<entry, bi::power_2_buckets<true>, bi::compare_hash<true>>;
using bi_set_bucket_traits = typename set_type::bucket_traits;
using set_iterator = typename set_type::iterator;
struct value_extractor_fn {
value_type& operator()(entry& e) const {
return e.value();
}
};
enum class shrinking_is_allowed { no, yes };
public:
using iterator = boost::transform_iterator<value_extractor_fn, set_iterator>;
public:
// Pointer to entry value
class entry_ptr {
@@ -154,12 +151,15 @@ public:
public:
using element_type = value_type;
entry_ptr() = default;
entry_ptr(std::nullptr_t) noexcept : _e() {};
explicit entry_ptr(lw_shared_ptr<entry> e) : _e(std::move(e)) {}
entry_ptr& operator=(std::nullptr_t) noexcept {
_e = nullptr;
return *this;
}
explicit operator bool() const noexcept { return bool(_e); }
bool operator==(const entry_ptr& x) const { return _e == x._e; }
bool operator!=(const entry_ptr& x) const { return !operator==(x); }
element_type& operator*() const noexcept { return _e->value(); }
element_type* operator->() const noexcept { return &_e->value(); }
@@ -173,13 +173,27 @@ public:
return res;
}
// Returns the key this entry is associated with.
// Valid if bool(*this).
const key_type& key() const {
return _e->key();
}
// Returns true iff the entry is not linked in the set.
// Call only when bool(*this).
bool orphaned() const {
return _e->orphaned();
}
friend class loading_shared_values;
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
return os << ep._e.get();
}
};
private:
std::vector<typename set_type::bucket_type> _buckets;
set_type _set;
value_extractor_fn _value_extractor_fn;
public:
static const key_type& to_key(const entry_ptr& e_ptr) noexcept {
@@ -274,26 +288,62 @@ public:
return _set.size();
}
iterator end() {
return boost::make_transform_iterator(_set.end(), _value_extractor_fn);
}
iterator begin() {
return boost::make_transform_iterator(_set.begin(), _value_extractor_fn);
}
template<typename KeyType, typename KeyHasher, typename KeyEqual>
iterator find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
entry_ptr find(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it == _set.end() || !it->ready()) {
return end();
return entry_ptr();
}
return boost::make_transform_iterator(it, _value_extractor_fn);
return entry_ptr(it->shared_from_this());
};
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it != _set.end()) {
_set.erase(it);
}
}
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType>
void remove(const KeyType& key) {
remove(key, Hash(), EqualPred());
}
// Removes all values which match a given predicate or are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// Same effects as if remove(e.key()) was called on each matching entry.
template<typename Pred>
requires std::is_invocable_r_v<bool, Pred, const Tp&>
void remove_if(const Pred& pred) {
auto it = _set.begin();
while (it != _set.end()) {
if (!it->ready() || pred(it->value())) {
auto next = std::next(it);
_set.erase(it);
it = next;
} else {
++it;
}
}
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
iterator find(const key_type& key) noexcept {
entry_ptr find(const key_type& key) noexcept {
return find(key, Hash(), EqualPred());
}