Compare commits

...

38 Commits

Author SHA1 Message Date
Konstantin Osipov
fd293768e7 storage_proxy: do not touch all_replicas.front() if it's empty.
The list of all endpoints for a query can be empty if we have
replication_factor 0 or there are no live endpoints for this token.
Do not access all_replicas.front() in this case.

Fixes #5935.
Message-Id: <20200306192521.73486-2-kostja@scylladb.com>

(cherry picked from commit 9827efe554)
2020-06-22 18:29:15 +03:00
Gleb Natapov
22dfa48585 cql transport: do not log broken pipe error when a client closes its side of a connection abruptly
Fixes #5661

Message-Id: <20200615075958.GL335449@scylladb.com>
(cherry picked from commit 7ca937778d)
2020-06-21 13:09:22 +03:00
Benny Halevy
2f3d7f1408 cql3::util::maybe_quote: avoid stack overflow and fix quote doubling
The function was reimplemented to solve the following issues.
The cutom implementation also improved its performance in
close to 19%

Using regex_match("[a-z][a-z0-9_]*") may cause stack overflow on long input strings
as found with the limits_test.py:TestLimits.max_key_length_test dtest.

std::regex_replace does not replace in-place so no doubling of
quotes was actually done.

Add unit test that reproduces the crash without this fix
and tests various string patterns for correctness.

Note that defining the regex with std::regex::optimize
still ended up with stack overflow.

Fixes #5671

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 0329fe1fd1)
2020-06-21 13:07:21 +03:00
Gleb Natapov
76a08df939 commitlog: fix size of a write used to zero a segment
Due to a bug the entire segment is written in one huge write of 32Mb.
The idea was to split it to writes of 128K, so fix it.

Fixes #5857

Message-Id: <20200220102939.30769-1-gleb@scylladb.com>
(cherry picked from commit df2f67626b)
2020-06-21 13:03:05 +03:00
Amnon Heiman
6aa129d3b0 api/storage_service.cc: stream result of token_range
The get token range API can become big which can cause large allocation
and stalls.

This patch replace the implementation so it would stream the results
using the http stream capabilities instead of serialization and sending
one big buffer.

Fixes #6297

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 7c4562d532)
2020-06-21 12:57:48 +03:00
Takuya ASADA
b4f781e4eb scylla_post_install.sh: fix operator precedence issue with multiple statements
In bash, 'A || B && C' will be problem because when A is true, then it will be
evaluates C, since && and || have the same precedence.
To avoid the issue we need make B && C in one statement.

Fixes #5764

(cherry picked from commit b6988112b4)
2020-06-21 12:47:05 +03:00
Takuya ASADA
27594ca50e scylla_raid_setup: create missing directories
We need to create hints, view_hints, saved_caches directories
on RAID volume.

Fixes #5811

(cherry picked from commit 086f0ffd5a)
2020-06-21 12:45:27 +03:00
Rafael Ávila de Espíndola
0f2f0d65d7 configure: Reduce the dynamic linker path size
gdb has a SO_NAME_MAX_PATH_SIZE of 512, so we use that as the path
size.

Fixes: #6494

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200528202741.398695-2-espindola@scylladb.com>
(cherry picked from commit aa778ec152)
2020-06-21 12:29:16 +03:00
Tomasz Grabiec
31c2f8a3ae row_cache: Fix undefined behavior on key linearization
This is relevant only when using partition or clustering keys which
have a representation in memory which is larger than 12.8 KB (10% of
LSA segment size).

There are several places in code (cache, background garbage
collection) which may need to linearize keys because of performing key
comparison, but it's not done safely:

 1) the code does not run with the LSA region locked, so pointers may
get invalidated on linearization if it needs to reclaim memory. This
is fixed by running the code inside an allocating section.

 2) LSA region is locked, but the scope of
with_linearized_managed_bytes() encloses the allocating section. If
allocating section needs to reclaim, linearization context will
contain invalidated pointers. The fix is to reorder the scopes so
that linearization context lives within an allocating section.

Example of 1 can be found in
range_populating_reader::handle_end_of_stream() where it performs a
lookup:

  auto prev = std::prev(it);
  if (prev->key().equal(*_cache._schema, *_last_key->_key)) {
     it->set_continuous(true);

but handle_end_of_stream() is not invoked under allocating section.

Example of 2 can be found in mutation_cleaner_impl::merge_some() where
it does:

  return with_linearized_managed_bytes([&] {
  ...
    return _worker_state->alloc_section(region, [&] {

Fixes #6637.
Refs #6108.

Tests:

  - unit (all)

Message-Id: <1592218544-9435-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit e81fc1f095)
2020-06-21 11:58:59 +03:00
Yaron Kaikov
ec12331f11 release: prepare for 3.3.4 2020-06-15 21:19:02 +03:00
Avi Kivity
ccc463b5e5 tools: toolchain: regenerate for gnutls 3.6.14
CVE-2020-13777.

Fixes #6627.

Toolchain source image registry disambiguated due to tighter podman defaults.
2020-06-15 08:05:58 +03:00
Calle Wilund
4a9676f6b7 gms::inet_address: Fix sign extension error in custom address formatting
Fixes #5808

Seems some gcc:s will generate the code as sign extending. Mine does not,
but this should be more correct anyhow.

Added small stringify test to serialization_test for inet_address

(cherry picked from commit a14a28cdf4)
2020-06-09 20:16:50 +03:00
Takuya ASADA
aaf4989c31 aws: update enhanced networking supported instance list
Sync enhanced networking supported instance list to latest one.

Reference: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking.html

Fixes #6540

(cherry picked from commit 969c4258cf)
2020-06-09 16:03:00 +03:00
Asias He
b29f954f20 gossip: Make is_safe_for_bootstrap more strict
Consider

1. Start n1, n2 in the cluster
2. Stop n2 and delete all data for n2
3. Start n2 to replace itself with replace_address_first_boot: n2
4. Kill n2 before n2 finishes the replace operation
5. Remove replace_address_first_boot: n2 from scylla.yaml of n2
6. Delete all data for n2
7. Start n2

At step 7, n2 will be allowed to bootstrap as a new node, because the
application state of n2 in the cluster is HIBERNATE which is not
rejected in the check of is_safe_for_bootstrap. As a result, n2 will
replace n2 with a different tokens and a different host_id, as if the
old n2 node was removed from the cluster silently.

Fixes #5172

(cherry picked from commit cdcedf5eb9)
2020-05-25 14:30:53 +03:00
Eliran Sinvani
5546d5df7b Auth: return correct error code when role is not found
Scylla returns the wrong error code (0000 - server internal error)
in response to trying to do authentication/authorization operations
that involves a non-existing role.
This commit changes those cases to return error code 2200 (invalid
query) which is the correct one and also the one that Cassandra
returns.
Tests:
    Unit tests (Dev)
    All auth and auth_role dtests

(cherry picked from commit ce8cebe34801f0ef0e327a32f37442b513ffc214)

Fixes #6363.
2020-05-25 12:58:38 +03:00
Amnon Heiman
541c29677f storage_service: get_range_to_address_map prevent use after free
The implementation of get_range_to_address_map has a default behaviour,
when getting an empty keypsace, it uses the first non-system keyspace
(first here is basically, just a keyspace).

The current implementation has two issues, first, it uses a reference to
a string that is held on a stack of another function. In other word,
there's a use after free that is not clear why we never hit.

The second, it calls get_non_system_keyspaces twice. Though this is not
a bug, it's redundant (get_non_system_keyspaces uses a loop, so calling
that function does have a cost).

This patch solves both issues, by chaning the implementation to hold a
string instead of a reference to a string.

Second, it stores the results from get_non_system_keyspaces and reuse
them it's more efficient and holds the returned values on the local
stack.

Fixes #6465

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 69a46d4179)
2020-05-25 12:48:48 +03:00
Hagit Segev
06f18108c0 release: prepare for 3.3.3 2020-05-24 23:28:07 +03:00
Tomasz Grabiec
90002ca3d2 sstables: index_reader: Fix overflow when calculating promoted index end
When index file is larger than 4GB, offset calculation will overflow
uint32_t and _promoted_index_end will be too small.

As a result, promoted_index_size calculation will underflow and the
rest of the page will be interpretd as a promoted index.

The partitions which are in the remainder of the index page will not
be found by single-partition queries.

Data is not lost.

Introduced in 6c5f8e0eda.

Fixes #6040
Message-Id: <20200521174822.8350-1-tgrabiec@scylladb.com>

(cherry picked from commit a6c87a7b9e)
2020-05-24 09:46:11 +03:00
Rafael Ávila de Espíndola
da23902311 repair: Make sure sinks are always closed
In a recent next failure I got the following backtrace

    function=function@entry=0x270360 "seastar::rpc::sink_impl<Serializer, Out>::~sink_impl() [with Serializer = netw::serializer; Out = {repair_row_on_wire_with_cmd}]") at assert.c:101
    at ./seastar/include/seastar/core/shared_ptr.hh:463
    at repair/row_level.cc:2059

This patch changes a few functions to use finally to make sure the sink
is always closed.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200515202803.60020-1-espindola@scylladb.com>
(cherry picked from commit 311fbe2f0a)

Ref #6414
2020-05-20 09:00:57 +03:00
Asias He
2b0dc21f97 repair: Fix race between write_end_of_stream and apply_rows
Consider: n1, n2, n1 is the repair master, n2 is the repair follower.

=== Case 1 ===
1) n1 sends missing rows {r1, r2} to n2
2) n2 runs apply_rows_on_follower to apply rows, e.g., {r1, r2}, r1
   is written to sstable, r2 is not written yet, r1 belongs to
   partition 1, r2 belongs to partition 2. It yields after row r1 is
   written.
   data: partition_start, r1
3) n1 sends repair_row_level_stop to n2 because error has happened on n1
4) n2 calls wait_for_writer_done() which in turn calls write_end_of_stream()
   data: partition_start, r1, partition_end
5) Step 2 resumes to apply the rows.
   data: partition_start, r1, partition_end, partition_end, partition_start, r2

=== Case 2 ===
1) n1 sends missing rows {r1, r2} to n2
2) n2 runs apply_rows_on_follower to apply rows, e.g., {r1, r2}, r1
   is written to sstable, r2 is not written yet, r1 belongs to partition
   1, r2 belongs to partition 2. It yields after partition_start for r2
   is written but before _partition_opened is set to true.
   data: partition_start, r1, partition_end, partition_start
3) n1 sends repair_row_level_stop to n2 because error has happened on n1
4) n2 calls wait_for_writer_done() which in turn calls write_end_of_stream().
   Since _partition_opened[node_idx] is false, partition_end is skipped,
   end_of_stream is written.
   data: partition_start, r1, partition_end, partition_start, end_of_stream

This causes unbalanced partition_start and partition_end in the stream
written to sstables.

To fix, serialize the write_end_of_stream and apply_rows with a semaphore.

Fixes: #6394
Fixes: #6296
Fixes: #6414
(cherry picked from commit b2c4d9fdbc)
2020-05-20 08:22:05 +03:00
Piotr Dulikowski
b544691493 hinted handoff: don't keep positions of old hints in rps_set
When sending hints from one file, rps_set field in send_one_file_ctx
keeps track of commitlog positions of hints that are being currently
sent, or have failed to be sent. At the end of the operation, if sending
of some hints failed, we will choose position of the earliest hint that
failed to be sent, and will retry sending that file later, starting from
that position. This position is stored in _last_not_complete_rp.

Usually, this set has a bounded size, because we impose a limit of at
most 128 hints being sent concurrently. Because we do not attempt to
send any more hints after a failure is detected, rps_set should not have
more than 128 elements at a time.

Due to a bug, commitlog positions of old hints (older than
gc_grace_seconds of the destination table) were inserted into rps_set
but not removed after checking their age. This could cause rps_set to
grow very large when replaying a file with old hints.

Moreover, if the file mixed expired and non-expired hints (which could
happen if it had hints to two tables with different gc_grace_seconds),
and sending of some non-expired hints failed, then positions of expired
hints could influence calculation _last_not_complete_rp, and more hints
than necessary would be resent on the next retry.

This simple patch removes commitlog position of a hint from rps_set when
it is detected to be too old.

Fixes #6422

(cherry picked from commit 85d5c3d5ee)
2020-05-20 08:06:17 +03:00
Piotr Dulikowski
d420b06844 hinted handoff: remove discarded hint positions from rps_set
Related commit: 85d5c3d

When attempting to send a hint, an exception might occur that results in
that hint being discarded (e.g. keyspace or table of the hint was
removed).

When such an exception is thrown, position of the hint will already be
stored in rps_set. We are only allowed to retain positions of hints that
failed to be sent and needed to be retried later. Dropping a hint is not
an error, therefore its position should be removed from rps_set - but
current logic does not do that.

Because of that bug, hint files with many discardable hints might cause
rps_set to grow large when the file is replayed. Furthermore, leaving
positions of such hints in rps_set might cause more hints than necessary
to be re-sent if some non-discarded hints fail to be sent.

This commit fixes the problem by removing positions of discarded hints
from rps_set.

Fixes #6433

(cherry picked from commit 0c5ac0da98)
2020-05-20 08:04:10 +03:00
Avi Kivity
b3a2cb2f68 Update seastar submodule
* seastar 0ebd89a858...30f03aeba9 (1):
  > timer: add scheduling_group awareness

Fixes #6170.
2020-05-10 18:39:20 +03:00
Hagit Segev
c8c057f5f8 release: prepare for 3.3.2 2020-05-10 18:16:28 +03:00
Gleb Natapov
038bfc925c storage_proxy: limit read repair only to replicas that answered during speculative reads
Speculative reader has more targets that needed for CL. In case there is
a digest mismatch the repair runs between all of them, but that violates
provided CL. The patch makes it so that repair runs only between
replicas that answered (there will be CL of them).

Fixes #6123

Reviewed-by: Glauber Costa <glauber@scylladb.com>
Message-Id: <20200402132245.GA21956@scylladb.com>
(cherry picked from commit 36a24bbb70)
2020-05-07 19:48:37 +03:00
Mike Goltsov
13a4e7db83 fix error in fstrim service (scylla_util.py)
On Centos 7 machine:

fstrim.timer not enabled, only unmasked due scylla_fstrim_setup on installation
When trying run scylla-fstrim service manually you get error:

Traceback (most recent call last):
File "/opt/scylladb/scripts/libexec/scylla_fstrim", line 60, in <module>
main()
File "/opt/scylladb/scripts/libexec/scylla_fstrim", line 44, in main
cfg = parse_scylla_dirs_with_default(conf=args.config)
File "/opt/scylladb/scripts/scylla_util.py", line 484, in parse_scylla_dirs_with_default
if key not in y or not y[k]:
NameError: name 'k' is not defined

It caused by error in scylla_util.py

Fixes #6294.

(cherry picked from commit 068bb3a5bf)
2020-05-07 19:45:50 +03:00
Juliusz Stasiewicz
727d6cf8f3 atomic_cell: special rule for printing counter cells
Until now, attempts to print counter update cell would end up
calling abort() because `atomic_cell_view::value()` has no
specialized visitor for `imr::pod<int64_t>::basic_view<is_mutable>`,
i.e. counter update IMR type. Such visitor is not easy to write
if we want to intercept counters only (and not all int64_t values).

Anyway, linearized byte representation of counter cell would not
be helpful without knowing if it consists of counter shards or
counter update (delta) - and this must be known upon `deserialize`.

This commit introduces simple approach: it determines cell type on
high level (from `atomic_cell_view`) and prints counter contents by
`counter_cell_view` or `atomic_cell_view::counter_update_value()`.

Fixes #5616

(cherry picked from commit 0ea17216fe)
2020-05-07 19:40:47 +03:00
Tomasz Grabiec
6d6d7b4abe sstables: Release reserved space for sharding metadata
The intention of the code was to clear sharding metadata
chunked_vector so that it doesn't bloat memory.

The type of c is `chunked_vector*`. Assigning `{}`
clears the pointer while the intended behavior was to reset the
`chunked_vector` instance. The original instance is left unmodified
with all its reserved space.

Because of this, the previous fix had no effect because token ranges
are stored entirely inline and popping them doesn't realease memory.

Fixes #4951

Tests:
  - sstable_mutation_test (dev)
  - manual using scylla binary on customer data on top of 2019.1.5

Reviewed-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <1584559892-27653-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit 5fe626a887)
2020-05-07 19:06:22 +03:00
Tomasz Grabiec
28f974b810 Merge "Don't return stale data by properly invalidating row cache after cleanup" from Raphael
Row cache needs to be invalidated whenever data in sstables
changes. Cleanup removes data from sstables which doesn't belong to
the node anymore, which means cache must be invalidated on cleanup.
Currently, stale data can be returned when a node re-owns ranges which
data are still stored in the node's row cache, because cleanup didn't
invalidate the cache."

Fixes #4446.

tests:
- unit tests (dev mode)
- dtests:
    update_cluster_layout_tests.py:TestUpdateClusterLayout.simple_decommission_node_2_test
    cleanup_test.py

(cherry picked from commit d0b6be0820)
2020-05-07 16:24:51 +03:00
Piotr Sarna
5fdadcaf3b network_topology_strategy: validate integers
In order to prevent users from creating a network topology
strategy instance with invalid inputs, it's not enough to use
std::stol() on the input: a string "3abc" still returns the number '3',
but will later confuse cqlsh and other drivers, when they ask for
topology strategy details.
The error message is now more human readable, since for incorrect
numeric inputs it used to return a rather cryptic message:
    ServerError: stol()
This commit fixes the issue and comes with a simple test.

Fixes #3801
Tests: unit(dev)
Message-Id: <7aaae83d003738f047d28727430ca0a5cec6b9c6.1583478000.git.sarna@scylladb.com>

(cherry picked from commit 5b7a35e02b)
2020-05-07 16:24:49 +03:00
Pekka Enberg
a960394f27 scripts/jobs: Keep memory reserve when calculating parallelism
The "jobs" script is used to determine the amount of compilation
parallelism on a machine. It attempts to ensure each GCC process has at
least 4 GB of memory per core. However, in the worst case scenario, we
could end up having the GCC processes take up all the system memory,
forcin swapping or OOM killer to kick in. For example, on a 4 core
machine with 16 GB of memory, this worst case scenario seems easy to
trigger in practice.

Fix up the problem by keeping a 1 GB of memory reserve for other
processes and calculating parallelism based on that.

Message-Id: <20200423082753.31162-1-penberg@scylladb.com>
(cherry picked from commit 7304a795e5)
2020-05-04 19:01:54 +03:00
Piotr Sarna
3216a1a70a alternator: fix signature timestamps
Generating timestamps for auth signatures used a non-thread-safe
::gmtime function instead of thread-safe ::gmtime_r.

Tests: unit(dev)
Fixes #6345

(cherry picked from commit fb7fa7f442)
2020-05-04 17:08:13 +03:00
Avi Kivity
5a7fd41618 Merge 'Fix hang in multishard_writer' from Asias
"
This series fix hang in multishard_writer when error happens. It contains
- multishard_writer: Abort the queue attached to consumers when producer fails
- repair: Fix hang when the writer is dead

Fixes #6241
Refs: #6248
"

* asias-stream_fix_multishard_writer_hang:
  repair: Fix hang when the writer is dead
  mutation_writer_test: Add test_multishard_writer_producer_aborts
  multishard_writer: Abort the queue attached to consumers when producer fails

(cherry picked from commit 8925e00e96)
2020-05-01 20:13:00 +03:00
Raphael S. Carvalho
dd24ba7a62 api/service: fix segfault when taking a snapshot without keyspace specified
If no keyspace is specified when taking snapshot, there will be a segfault
because keynames is unconditionally dereferenced. Let's return an error
because a keyspace must be specified when column families are specified.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20200427195634.99940-1-raphaelsc@scylladb.com>
(cherry picked from commit 02e046608f)

Fixes #6336.
2020-04-30 12:57:14 +03:00
Avi Kivity
204f6dd393 Update seastar submodule
* seastar a0bdc6cd85...0ebd89a858 (1):
  > http server: fix "Date" header format

Fixes #6253.
2020-04-26 19:31:44 +03:00
Nadav Har'El
b1278adc15 alternator: unzero "scylla_alternator_total_operations" metric
In commit 388b492040, which was only supposed
to move around code, we accidentally lost the line which does

    _executor.local()._stats.total_operations++;

So after this commit this counter was always zero...
This patch returns the line incrementing this counter.

Arguably, this counter is not very important - a user can also calculate
this number by summing up all the counters in the scylla_alternator_operation
array (these are counters for individual types of operations). Nevertheless,
as long as we do export a "scylla_alternator_total_operations" metric,
we need to correctly calculate it and can't leave it zero :-)

Fixes #5836

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200219162820.14205-1-nyh@scylladb.com>
(cherry picked from commit b8aed18a24)
2020-04-19 19:07:31 +03:00
Botond Dénes
ee9677ef71 schema: schema(): use std::stable_sort() to sort key columns
When multiple key columns (clustering or partition) are passed to
the schema constructor, all having the same column id, the expectation
is that these columns will retain the order in which they were passed to
`schema_builder::with_column()`. Currently however this is not
guaranteed as the schema constructor sort key columns by column id with
`std::sort()`, which doesn't guarantee that equally comparing elements
retain their order. This can be an issue for indexes, the schemas of
which are built independently on each node. If there is any room for
variance between for the key column order, this can result in different
nodes having incompatible schemas for the same index.
The fix is to use `std::stable_sort()` which guarantees that the order
of equally comparing elements won't change.

This is a suspected cause of #5856, although we don't have hard proof.

Fixes: #5856
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
[avi: upgraded "Refs" to "Fixes", since we saw that std::sort() becomes
      unstable at 17 elements, and the failing schema had a
      clustering key with 23 elements]
Message-Id: <20200417121848.1456817-1-bdenes@scylladb.com>
(cherry picked from commit a4aa753f0f)
2020-04-19 18:19:05 +03:00
Nadav Har'El
2060e361cf materialized views: fix corner case of view updates used by Alternator
While CQL does not allow creation of a materialized view with more than one
base regular column in the view's key, in Alternator we do allow this - both
partition and clustering key may be a base regular column. We had a bug in
the logic handling this case:

If the new base row is missing a value for *one* of the view key columns,
we shouldn't create a view row. Similarly, if the existing base row was
missing a value for *one* of the view key columns, a view row does not
exist and doesn't need to be deleted.  This was done incorrectly, and made
decisions based on just one of the key columns, and the logic is now
fixed (and I think, simplified) in this patch.

With this patch, the Alternator test which previously failed because of
this problem now passes. The patch also includes new tests in the existing
C++ unit test test_view_with_two_regular_base_columns_in_key. This tests
was already supposed to be testing various cases of two-new-key-columns
updates, but missed the cases explained above. These new tests failed
badly before this patch - some of them had clean write errors, others
caused crashes. With this patch, they pass.

Fixes #6008.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200312162503.8944-1-nyh@scylladb.com>
(cherry picked from commit 635e6d887c)
2020-04-19 15:24:19 +03:00
47 changed files with 534 additions and 202 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=3.3.1
VERSION=3.3.4
if test -f version
then

View File

@@ -388,6 +388,34 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
# Test that when a table has a GSI, if the indexed attribute is missing, the
# item is added to the base table but not the index.
# This is the same feature we already tested in test_gsi_missing_attribute()
# above, but on a different table: In that test we used test_table_gsi_2,
# with one indexed attribute, and in this test we use test_table_gsi_3 which
# has two base regular attributes in the view key, and more possibilities
# of which value might be missing. Reproduces issue #6008.
def test_gsi_missing_attribute_3(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
# First, add an item with a missing "a" value. It should appear in the
# base table, but not in the index:
test_table_gsi_3.put_item(Item={'p': p, 'b': b})
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p, 'b': b}
# Note: with eventually consistent read, we can't really be sure that
# an item will "never" appear in the index. We hope that if a bug exists
# and such an item did appear, sometimes the delay here will be enough
# for the unexpected item to become visible.
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
# Same thing for an item with a missing "b" value:
test_table_gsi_3.put_item(Item={'p': p, 'a': a})
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p, 'a': a}
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
# And for an item missing both:
test_table_gsi_3.put_item(Item={'p': p})
assert test_table_gsi_3.get_item(Key={'p': p})['Item'] == {'p': p}
assert not any([i['p'] == p for i in full_scan(test_table_gsi_3, IndexName='hello')])
# A fourth scenario of GSI. Two GSIs on a single base table.
@pytest.fixture(scope="session")

View File

@@ -66,8 +66,9 @@ static std::string format_time_point(db_clock::time_point tp) {
time_t time_point_repr = db_clock::to_time_t(tp);
std::string time_point_str;
time_point_str.resize(17);
::tm time_buf;
// strftime prints the terminating null character as well
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", std::gmtime(&time_point_repr));
std::strftime(time_point_str.data(), time_point_str.size(), "%Y%m%dT%H%M%SZ", ::gmtime_r(&time_point_repr, &time_buf));
time_point_str.resize(16);
return time_point_str;
}

View File

@@ -215,6 +215,7 @@ future<> server::verify_signature(const request& req) {
}
future<json::json_return_type> server::handle_api_request(std::unique_ptr<request>&& req) {
_executor.local()._stats.total_operations++;
sstring target = req->get_header(TARGET);
std::vector<std::string_view> split_target = split(target, '.');
//NOTICE(sarna): Target consists of Dynamo API version followed by a dot '.' and operation type (e.g. CreateTable)

View File

@@ -56,26 +56,22 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
}
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
std::vector<ss::token_range> res;
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
ss::token_range r;
r.start_token = d._start_token;
r.end_token = d._end_token;
r.endpoints = d._endpoints;
r.rpc_endpoints = d._rpc_endpoints;
for (auto det : d._endpoint_details) {
ss::endpoint_detail ed;
ed.host = det._host;
ed.datacenter = det._datacenter;
if (det._rack != "") {
ed.rack = det._rack;
}
r.endpoint_details.push(ed);
static ss::token_range token_range_endpoints_to_json(const dht::token_range_endpoints& d) {
ss::token_range r;
r.start_token = d._start_token;
r.end_token = d._end_token;
r.endpoints = d._endpoints;
r.rpc_endpoints = d._rpc_endpoints;
for (auto det : d._endpoint_details) {
ss::endpoint_detail ed;
ed.host = det._host;
ed.datacenter = det._datacenter;
if (det._rack != "") {
ed.rack = det._rack;
}
res.push_back(r);
r.endpoint_details.push(ed);
}
return res;
return r;
}
void set_storage_service(http_context& ctx, routes& r) {
@@ -177,13 +173,13 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(res);
});
ss::describe_any_ring.set(r, [&ctx](const_req req) {
return describe_ring("");
ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
});
ss::describe_ring.set(r, [&ctx](const_req req) {
auto keyspace = validate_keyspace(ctx, req.param);
return describe_ring(keyspace);
ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
});
ss::get_host_id_map.set(r, [](const_req req) {
@@ -255,6 +251,9 @@ void set_storage_service(http_context& ctx, routes& r) {
if (column_family.empty()) {
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
} else {
if (keynames.empty()) {
throw httpd::bad_param_exception("The keyspace of column families must be specified");
}
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}

View File

@@ -21,6 +21,7 @@
#include "atomic_cell.hh"
#include "atomic_cell_or_collection.hh"
#include "counters.hh"
#include "types.hh"
/// LSA mirator for cells with irrelevant type
@@ -218,7 +219,9 @@ std::ostream&
operator<<(std::ostream& os, const atomic_cell_view& acv) {
if (acv.is_live()) {
return fmt_print(os, "atomic_cell{{{},ts={:d},expiry={:d},ttl={:d}}}",
to_hex(acv.value().linearize()),
acv.is_counter_update()
? "counter_update_value=" + to_sstring(acv.counter_update_value())
: to_hex(acv.value().linearize()),
acv.timestamp(),
acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1,
acv.is_live_and_has_ttl() ? acv.ttl().count() : 0);
@@ -238,8 +241,21 @@ operator<<(std::ostream& os, const atomic_cell_view::printer& acvp) {
auto& type = acvp._type;
auto& acv = acvp._cell;
if (acv.is_live()) {
std::ostringstream cell_value_string_builder;
if (type.is_counter()) {
if (acv.is_counter_update()) {
cell_value_string_builder << "counter_update_value=" << acv.counter_update_value();
} else {
cell_value_string_builder << "shards: ";
counter_cell_view::with_linearized(acv, [&cell_value_string_builder] (counter_cell_view& ccv) {
cell_value_string_builder << ::join(", ", ccv.shards());
});
}
} else {
cell_value_string_builder << type.to_string(acv.value().linearize());
}
return fmt_print(os, "atomic_cell{{{},ts={:d},expiry={:d},ttl={:d}}}",
type.to_string(acv.value().linearize()),
cell_value_string_builder.str(),
acv.timestamp(),
acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1,
acv.is_live_and_has_ttl() ? acv.ttl().count() : 0);

View File

@@ -33,6 +33,7 @@
#include "auth/resource.hh"
#include "seastarx.hh"
#include "exceptions/exceptions.hh"
namespace auth {
@@ -52,9 +53,9 @@ struct role_config_update final {
///
/// A logical argument error for a role-management operation.
///
class roles_argument_exception : public std::invalid_argument {
class roles_argument_exception : public exceptions::invalid_request_exception {
public:
using std::invalid_argument::invalid_argument;
using exceptions::invalid_request_exception::invalid_request_exception;
};
class role_already_exists : public roles_argument_exception {

View File

@@ -430,14 +430,42 @@ operator<<(std::ostream& os, const cql3_type::raw& r) {
namespace util {
sstring maybe_quote(const sstring& identifier) {
static const std::regex unquoted_identifier_re("[a-z][a-z0-9_]*");
if (std::regex_match(identifier.begin(), identifier.end(), unquoted_identifier_re)) {
const auto* p = identifier.begin();
const auto* ep = identifier.end();
// quote empty string
if (__builtin_expect(p == ep, false)) {
return "\"\"";
}
// string needs no quoting if it matches [a-z][a-z0-9_]*
// quotes ('"') in the string are doubled
bool need_quotes;
bool has_quotes;
auto c = *p;
if ('a' <= c && c <= 'z') {
need_quotes = false;
has_quotes = false;
} else {
need_quotes = true;
has_quotes = (c == '"');
}
while ((++p != ep) && !has_quotes) {
c = *p;
if (!(('a' <= c && c <= 'z') || ('0' <= c && c <= '9') || (c == '_'))) {
need_quotes = true;
has_quotes = (c == '"');
}
}
if (!need_quotes) {
return identifier;
}
if (!has_quotes) {
return make_sstring("\"", identifier, "\"");
}
static const std::regex double_quote_re("\"");
std::string result = identifier;
std::regex_replace(result, double_quote_re, "\"\"");
return '"' + result + '"';
return '"' + std::regex_replace(identifier.c_str(), double_quote_re, "\"\"") + '"';
}
}

View File

@@ -116,6 +116,7 @@ namespace sstables {
class sstable;
class entry_descriptor;
class compaction_descriptor;
class compaction_completion_desc;
class foreign_sstable_open_info;
class sstables_manager;
@@ -592,9 +593,8 @@ private:
void rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& old_sstables);
// Rebuilds the sstable set right away and schedule deletion of old sstables.
void on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove);
// Rebuild sstable set, delete input sstables right away, and update row cache and statistics.
void on_compaction_completion(sstables::compaction_completion_desc& desc);
void rebuild_statistics();

View File

@@ -1323,7 +1323,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
std::vector<iovec> v;
v.reserve(n);
size_t m = 0;
while (m < rem && n < max_write) {
while (m < rem && n--) {
auto s = std::min(rem - m, buf_size);
v.emplace_back(iovec{ buf.get_write(), s});
m += s;

View File

@@ -704,6 +704,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
// Files are aggregated for at most manager::hints_timer_period therefore the oldest hint there is
// (last_modification - manager::hints_timer_period) old.
if (gc_clock::now().time_since_epoch() - secs_since_file_mod > gc_grace_sec - manager::hints_flush_period) {
ctx_ptr->rps_set.erase(rp);
return make_ready_future<>();
}
@@ -726,6 +727,7 @@ future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<s
manager_logger.debug("send_hints(): {} at {}: {}", fname, rp, e.what());
++this->shard_stats().discarded;
}
ctx_ptr->rps_set.erase(rp);
return make_ready_future<>();
}).finally([units = std::move(units), ctx_ptr] {});
}).handle_exception([this, ctx_ptr] (auto eptr) {

View File

@@ -678,9 +678,14 @@ void view_updates::generate_update(
return;
}
bool should_update = false;
bool should_replace = false;
bool should_create = false;
// If one of the key columns is missing, set has_new_row = false
// meaning that after the update there will be no view row.
// If one of the key columns is missing in the existing value,
// set has_old_row = false meaning we don't have an old row to
// delete.
bool has_old_row = true;
bool has_new_row = true;
bool same_row = true;
for (auto col_id : col_ids) {
auto* after = update.cells().find_cell(col_id);
// Note: multi-cell columns can't be part of the primary key.
@@ -690,27 +695,31 @@ void view_updates::generate_update(
if (before && before->as_atomic_cell(cdef).is_live()) {
if (after && after->as_atomic_cell(cdef).is_live()) {
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
if (cmp == 0) {
should_update = true;
} else {
should_replace = true;
if (cmp != 0) {
same_row = false;
}
} else {
delete_old_entry(base_key, *existing, update, now);
return;
}
} else {
has_old_row = false;
}
} else {
has_old_row = false;
}
if (after && after->as_atomic_cell(cdef).is_live()) {
should_create = true;
if (!after || !after->as_atomic_cell(cdef).is_live()) {
has_new_row = false;
}
}
if (should_replace) {
replace_entry(base_key, update, *existing, now);
} else if (should_update) {
update_entry(base_key, update, *existing, now);
} else if (should_create) {
if (has_old_row) {
if (has_new_row) {
if (same_row) {
update_entry(base_key, update, *existing, now);
} else {
replace_entry(base_key, update, *existing, now);
}
} else {
delete_old_entry(base_key, *existing, update, now);
}
} else if (has_new_row) {
create_entry(base_key, update, now);
}
}

View File

@@ -543,6 +543,15 @@ to_partition_range(dht::token_range r) {
return { std::move(start), std::move(end) };
}
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges) {
dht::partition_range_vector prs;
prs.reserve(ranges.size());
for (auto& range : ranges) {
prs.push_back(dht::to_partition_range(range));
}
return prs;
}
std::map<unsigned, dht::partition_range_vector>
split_range_to_shards(dht::partition_range pr, const schema& s) {
std::map<unsigned, dht::partition_range_vector> ret;

View File

@@ -903,6 +903,7 @@ public:
};
dht::partition_range to_partition_range(dht::token_range);
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges);
// Each shard gets a sorted, disjoint vector of ranges
std::map<unsigned, dht::partition_range_vector>

View File

@@ -31,6 +31,7 @@ if __name__ == '__main__':
sys.exit(1)
if is_systemd():
systemd_unit('scylla-fstrim.timer').unmask()
systemd_unit('scylla-fstrim.timer').enable()
if is_redhat_variant():
systemd_unit('fstrim.timer').disable()
if dist_name() == 'Ubuntu' and os.path.exists('/etc/cron.weekly/fstrim'):

View File

@@ -130,17 +130,14 @@ if __name__ == '__main__':
makedirs(mount_at)
run('mount -t xfs -o noatime {raid} "{mount_at}"'.format(raid=fsdev, mount_at=mount_at))
makedirs('{}/data'.format(root))
makedirs('{}/commitlog'.format(root))
makedirs('{}/coredump'.format(root))
uid = pwd.getpwnam('scylla').pw_uid
gid = grp.getgrnam('scylla').gr_gid
os.chown(root, uid, gid)
os.chown('{}/data'.format(root), uid, gid)
os.chown('{}/commitlog'.format(root), uid, gid)
os.chown('{}/coredump'.format(root), uid, gid)
for d in ['coredump', 'data', 'commitlog', 'hints', 'view_hints', 'saved_caches']:
dpath = '{}/{}'.format(root, d)
makedirs(dpath)
os.chown(dpath, uid, gid)
if args.update_fstab:
res = out('blkid {}'.format(fsdev))

View File

@@ -182,7 +182,7 @@ class aws_instance:
instance_size = self.instance_size()
if instance_class in ['c3', 'c4', 'd2', 'i2', 'r3']:
return 'ixgbevf'
if instance_class in ['c5', 'c5d', 'f1', 'g3', 'h1', 'i3', 'i3en', 'm5', 'm5d', 'p2', 'p3', 'r4', 'x1']:
if instance_class in ['a1', 'c5', 'c5d', 'f1', 'g3', 'g4', 'h1', 'i3', 'i3en', 'inf1', 'm5', 'm5a', 'm5ad', 'm5d', 'm5dn', 'm5n', 'm6g', 'p2', 'p3', 'r4', 'r5', 'r5a', 'r5ad', 'r5d', 'r5dn', 'r5n', 't3', 't3a', 'u-6tb1', 'u-9tb1', 'u-12tb1', 'u-18tn1', 'u-24tb1', 'x1', 'x1e', 'z1d']:
return 'ena'
if instance_class == 'm4':
if instance_size == '16xlarge':
@@ -481,8 +481,8 @@ def parse_scylla_dirs_with_default(conf='/etc/scylla/scylla.yaml'):
y['data_file_directories'] = [os.path.join(y['workdir'], 'data')]
for t in [ "commitlog", "hints", "view_hints", "saved_caches" ]:
key = "%s_directory" % t
if key not in y or not y[k]:
y[k] = os.path.join(y['workdir'], t)
if key not in y or not y[key]:
y[key] = os.path.join(y['workdir'], t)
return y

View File

@@ -2171,25 +2171,24 @@ future<> gossiper::wait_for_range_setup() {
}
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
// We allow to bootstrap a new node in only two cases:
// 1) The node is a completely new node and no state in gossip at all
// 2) The node has state in gossip and it is already removed from the
// cluster either by nodetool decommission or nodetool removenode
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (!eps || is_dead_state(*eps)) {
return true;
bool allowed = true;
if (!eps) {
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
return allowed;
}
sstring status = get_gossip_status(*eps);
logger.debug("is_safe_for_bootstrap: node {} status {}", endpoint, status);
// these states are not allowed to join the cluster as it would not be safe
std::unordered_set<sstring> unsafe_statuses{
sstring(""), // failed bootstrap but we did start gossiping
sstring(versioned_value::STATUS_NORMAL), // node is legit in the cluster or it was stopped with kill -9
sstring(versioned_value::SHUTDOWN) // node was shutdown
std::unordered_set<sstring> allowed_statuses{
sstring(versioned_value::STATUS_LEFT),
sstring(versioned_value::REMOVED_TOKEN),
};
return !unsafe_statuses.count(status);
allowed = allowed_statuses.count(status);
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
return allowed;
}
std::set<sstring> to_feature_set(sstring features_string) {

View File

@@ -69,7 +69,8 @@ std::ostream& gms::operator<<(std::ostream& os, const inet_address& x) {
auto&& bytes = x.bytes();
auto i = 0u;
auto acc = 0u;
for (auto b : bytes) {
// extra paranoid sign extension evasion - #5808
for (uint8_t b : bytes) {
acc <<= 8;
acc |= b;
if ((++i & 1) == 0) {

View File

@@ -85,6 +85,10 @@ network_topology_strategy::network_topology_strategy(
"NetworkTopologyStrategy");
}
if (val.empty() || std::any_of(val.begin(), val.end(), [] (char c) {return !isdigit(c);})) {
throw exceptions::configuration_exception(
format("Replication factor must be numeric and non-negative, found '{}'", val));
}
_dc_rep_factor.emplace(key, std::stol(val));
_datacenteres.push_back(key);
}

View File

@@ -2602,7 +2602,7 @@ void mutation_cleaner_impl::start_worker() {
stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexcept {
auto&& region = snp.region();
return with_allocator(region.allocator(), [&] {
return with_linearized_managed_bytes([&] {
{
// Allocating sections require the region to be reclaimable
// which means that they cannot be nested.
// It is, however, possible, that if the snapshot is taken
@@ -2614,13 +2614,15 @@ stop_iteration mutation_cleaner_impl::merge_some(partition_snapshot& snp) noexce
}
try {
return _worker_state->alloc_section(region, [&] {
return with_linearized_managed_bytes([&] {
return snp.merge_partition_versions(_app_stats);
});
});
} catch (...) {
// Merging failed, give up as there is no guarantee of forward progress.
return stop_iteration::yes;
}
});
}
});
}

View File

@@ -177,6 +177,13 @@ future<> multishard_writer::distribute_mutation_fragments() {
return handle_end_of_stream();
}
});
}).handle_exception([this] (std::exception_ptr ep) {
for (auto& q : _queue_reader_handles) {
if (q) {
q->abort(ep);
}
}
return make_exception_future<>(std::move(ep));
});
}

View File

@@ -21,8 +21,12 @@
# At the end of the build we check that the build-id is indeed in the
# first page. At install time we check that patchelf doesn't modify
# the program headers.
# gdb has a SO_NAME_MAX_PATH_SIZE of 512, so limit the path size to
# that. The 512 includes the null at the end, hence the 511 bellow.
ORIGINAL_DYNAMIC_LINKER=$(gcc -### /dev/null -o t 2>&1 | perl -n -e '/-dynamic-linker ([^ ]*) / && print $1')
DYNAMIC_LINKER=$(printf "%2000s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
DYNAMIC_LINKER=$(printf "%511s$ORIGINAL_DYNAMIC_LINKER" | sed 's| |/|g')
COMMON_FLAGS="--enable-dpdk --cflags=-ffile-prefix-map=$PWD=. --ldflags=-Wl,--build-id=sha1,--dynamic-linker=$DYNAMIC_LINKER"

View File

@@ -444,7 +444,7 @@ class repair_writer {
uint64_t _estimated_partitions;
size_t _nr_peer_nodes;
// Needs more than one for repair master
std::vector<std::optional<future<uint64_t>>> _writer_done;
std::vector<std::optional<future<>>> _writer_done;
std::vector<std::optional<seastar::queue<mutation_fragment_opt>>> _mq;
// Current partition written to disk
std::vector<lw_shared_ptr<const decorated_key_with_hash>> _current_dk_written_to_sstable;
@@ -452,6 +452,7 @@ class repair_writer {
// partition_start is written and is closed when a partition_end is
// written.
std::vector<bool> _partition_opened;
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
public:
repair_writer(
schema_ptr schema,
@@ -524,7 +525,15 @@ public:
return consumer(std::move(reader));
});
},
t.stream_in_progress());
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
_schema->ks_name(), _schema->cf_name(), partitions);
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
_mq[node_idx]->abort(ep);
return make_exception_future<>(std::move(ep));
});
}
future<> write_partition_end(unsigned node_idx) {
@@ -551,23 +560,41 @@ public:
}
}
future<> write_end_of_stream(unsigned node_idx) {
if (_mq[node_idx]) {
return with_semaphore(_sem, 1, [this, node_idx] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt());
});
});
} else {
return make_ready_future<>();
}
}
future<> do_wait_for_writer_done(unsigned node_idx) {
if (_writer_done[node_idx]) {
return std::move(*(_writer_done[node_idx]));
} else {
return make_ready_future<>();
}
}
future<> wait_for_writer_done() {
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
if (_writer_done[node_idx] && _mq[node_idx]) {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end(node_idx).then([this, node_idx] () mutable {
// Empty mutation_fragment_opt means no more data, so the writer can seal the sstables.
return _mq[node_idx]->push_eventually(mutation_fragment_opt()).then([this, node_idx] () mutable {
return (*_writer_done[node_idx]).then([] (uint64_t partitions) {
rlogger.debug("Managed to write partitions={} to sstable", partitions);
return make_ready_future<>();
});
});
});
}
return make_ready_future<>();
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
}).handle_exception([this] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
}
named_semaphore& sem() {
return _sem;
}
};
class repair_meta {
@@ -1167,6 +1194,23 @@ private:
}
}
future<> do_apply_rows(std::list<repair_row>& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer.create_writer(node_idx);
return do_for_each(row_diff, [this, node_idx, update_buf] (repair_row& r) {
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
});
});
}
// Give a list of rows, apply the rows to disk and update the _working_row_buf and _peer_row_hash_sets if requested
// Must run inside a seastar thread
void apply_rows_on_master_in_thread(repair_rows_on_wire rows, gms::inet_address from, update_working_row_buf update_buf,
@@ -1192,18 +1236,7 @@ private:
_peer_row_hash_sets[node_idx] = boost::copy_range<std::unordered_set<repair_hash>>(row_diff |
boost::adaptors::transformed([] (repair_row& r) { thread::maybe_yield(); return r.hash(); }));
}
_repair_writer.create_writer(node_idx);
for (auto& r : row_diff) {
if (update_buf) {
_working_row_buf_combined_hash.add(r.hash());
}
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
_repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).get();
}
do_apply_rows(row_diff, node_idx, update_buf).get();
}
future<>
@@ -1214,15 +1247,7 @@ private:
return to_repair_rows_list(rows).then([this] (std::list<repair_row> row_diff) {
return do_with(std::move(row_diff), [this] (std::list<repair_row>& row_diff) {
unsigned node_idx = 0;
_repair_writer.create_writer(node_idx);
return do_for_each(row_diff, [this, node_idx] (repair_row& r) {
// The repair_row here is supposed to have
// mutation_fragment attached because we have stored it in
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf));
});
return do_apply_rows(row_diff, node_idx, update_working_row_buf::no);
});
});
}
@@ -1905,22 +1930,17 @@ static future<> repair_get_row_diff_with_rpc_stream_handler(
current_set_diff,
std::move(hash_cmd_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_row_on_wire_with_cmd{repair_stream_cmd::error, repair_row_on_wire()}).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}
@@ -1946,22 +1966,17 @@ static future<> repair_put_row_diff_with_rpc_stream_handler(
current_rows,
std::move(row_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_stream_cmd::error).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_stream_cmd::error).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}
@@ -1986,22 +2001,17 @@ static future<> repair_get_full_row_hashes_with_rpc_stream_handler(
error,
std::move(status_opt)).handle_exception([sink, &error] (std::exception_ptr ep) mutable {
error = true;
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([sink] () mutable {
return sink.close();
}).then([sink] {
return sink(repair_hash_with_cmd{repair_stream_cmd::error, repair_hash()}).then([] () {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
} else {
if (error) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return sink.close().then([sink] {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}).finally([sink] () mutable {
return sink.close().finally([sink] { });
});
}

View File

@@ -528,8 +528,12 @@ public:
return _reader.move_to_next_partition(timeout).then([this] (auto&& mfopt) mutable {
{
if (!mfopt) {
this->handle_end_of_stream();
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
return _cache._read_section(_cache._tracker.region(), [&] {
return with_linearized_managed_bytes([&] {
this->handle_end_of_stream();
return make_ready_future<flat_mutation_reader_opt, mutation_fragment_opt>(std::nullopt, std::nullopt);
});
});
}
_cache.on_partition_miss();
const partition_start& ps = mfopt->as_partition_start();
@@ -952,13 +956,15 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
// expensive and we need to amortize it somehow.
do {
STAP_PROBE(scylla, row_cache_update_partition_start);
with_linearized_managed_bytes([&] {
{
if (!update) {
_update_section(_tracker.region(), [&] {
with_linearized_managed_bytes([&] {
memtable_entry& mem_e = *m.partitions.begin();
size_entry = mem_e.size_in_allocator_without_rows(_tracker.allocator());
auto cache_i = _partitions.lower_bound(mem_e.key(), cmp);
update = updater(_update_section, cache_i, mem_e, is_present, real_dirty_acc);
});
});
}
// We use cooperative deferring instead of futures so that
@@ -970,14 +976,16 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
update = {};
real_dirty_acc.unpin_memory(size_entry);
_update_section(_tracker.region(), [&] {
with_linearized_managed_bytes([&] {
auto i = m.partitions.begin();
memtable_entry& mem_e = *i;
m.partitions.erase(i);
mem_e.partition().evict(_tracker.memtable_cleaner());
current_allocator().destroy(&mem_e);
});
});
++partition_count;
});
}
STAP_PROBE(scylla, row_cache_update_partition_end);
} while (!m.partitions.empty() && !need_preempt());
with_allocator(standard_allocator(), [&] {
@@ -1124,8 +1132,8 @@ future<> row_cache::invalidate(external_updater eu, dht::partition_range_vector&
seastar::thread::maybe_yield();
while (true) {
auto done = with_linearized_managed_bytes([&] {
return _update_section(_tracker.region(), [&] {
auto done = _update_section(_tracker.region(), [&] {
return with_linearized_managed_bytes([&] {
auto cmp = cache_entry::compare(_schema);
auto it = _partitions.lower_bound(*_prev_snapshot_pos, cmp);
auto end = _partitions.lower_bound(dht::ring_position_view::for_range_end(range), cmp);

View File

@@ -290,10 +290,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
+ column_offset(column_kind::regular_column),
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
std::sort(_raw._columns.begin(),
std::stable_sort(_raw._columns.begin(),
_raw._columns.begin() + column_offset(column_kind::clustering_key),
[] (auto x, auto y) { return x.id < y.id; });
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
_raw._columns.begin() + column_offset(column_kind::static_column),
[] (auto x, auto y) { return x.id < y.id; });

View File

@@ -33,9 +33,10 @@ import os
procs = os.sysconf('SC_NPROCESSORS_ONLN')
mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGESIZE')
mem_reserve = 1000000000
job_mem = 4000000000
jobs = min(procs, mem // job_mem)
jobs = min(procs, (mem-mem_reserve) // job_mem)
jobs = max(jobs, 1)
print(jobs)

View File

@@ -45,7 +45,7 @@ if [[ "$ID" = "debian" && "$VERSION_ID" = "8" ]] || [[ "$ID" = "ubuntu" && "$VER
echo "scylla ALL=(ALL) NOPASSWD: /opt/scylladb/scripts/scylla_prepare,/opt/scylladb/scripts/scylla_stop,/opt/scylladb/scripts/scylla_io_setup,/opt/scylladb/scripts/scylla-ami/scylla_ami_setup" > /etc/sudoers.d/scylla
else
# AmbientCapabilities supported from v229 but it backported to v219-33 on RHEL7
if [ $SYSTEMD_VER -ge 229 ] || [ $SYSTEMD_VER -eq 219 ] && [ $SYSTEMD_REL -ge 33 ]; then
if [ $SYSTEMD_VER -ge 229 ] || [[ $SYSTEMD_VER -eq 219 && $SYSTEMD_REL -ge 33 ]]; then
if [ $AMB_SUPPORT -eq 1 ]; then
mkdir -p /etc/systemd/system/scylla-server.service.d/
cat << EOS > /etc/systemd/system/scylla-server.service.d/capabilities.conf

Submodule seastar updated: a0bdc6cd85...30f03aeba9

View File

@@ -3254,7 +3254,9 @@ protected:
uint32_t original_partition_limit() const {
return _cmd->partition_limit;
}
virtual void adjust_targets_for_reconciliation() {}
void reconcile(db::consistency_level cl, storage_proxy::clock_type::time_point timeout, lw_shared_ptr<query::read_command> cmd) {
adjust_targets_for_reconciliation();
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
auto exec = shared_from_this();
@@ -3481,6 +3483,9 @@ public:
virtual void got_cl() override {
_speculate_timer.cancel();
}
virtual void adjust_targets_for_reconciliation() override {
_targets = used_targets();
}
};
class range_slice_read_executor : public never_speculating_read_executor {
@@ -3527,7 +3532,7 @@ db::read_repair_decision storage_proxy::new_read_repair_decision(const schema& s
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_live_sorted_endpoints()
// orders the list by proximity to the local endpoint.
is_read_non_local |= all_replicas.front() != utils::fb_utilities::get_broadcast_address();
is_read_non_local |= !all_replicas.empty() && all_replicas.front() != utils::fb_utilities::get_broadcast_address();
auto cf = _db.local().find_column_family(schema).shared_from_this();
std::vector<gms::inet_address> target_replicas = db::filter_for_query(cl, ks, all_replicas, preferred_endpoints, repair_decision,

View File

@@ -903,12 +903,16 @@ storage_service::is_local_dc(const inet_address& targetHost) const {
std::unordered_map<dht::token_range, std::vector<inet_address>>
storage_service::get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
sstring ks = keyspace;
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
throw std::runtime_error("No keyspace provided and no non system kespace exist");
if (keyspace == "") {
auto keyspaces = _db.local().get_non_system_keyspaces();
if (keyspaces.empty()) {
throw std::runtime_error("No keyspace provided and no non system kespace exist");
}
ks = keyspaces[0];
}
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
}

View File

@@ -452,6 +452,11 @@ protected:
encoding_stats get_encoding_stats() const {
return _stats_collector.get();
}
virtual compaction_completion_desc
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) {
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables)};
}
public:
compaction& operator=(const compaction&) = delete;
compaction(const compaction&) = delete;
@@ -828,7 +833,7 @@ private:
_compacting->erase(sst);
});
auto exhausted_ssts = std::vector<shared_sstable>(exhausted, _sstables.end());
_replacer(exhausted_ssts, std::move(_new_unused_sstables));
_replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables)));
_sstables.erase(exhausted, _sstables.end());
backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts));
}
@@ -838,7 +843,7 @@ private:
if (!_sstables.empty()) {
std::vector<shared_sstable> sstables_compacted;
std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted));
_replacer(std::move(sstables_compacted), std::move(_new_unused_sstables));
_replacer(get_compaction_completion_desc(std::move(sstables_compacted), std::move(_new_unused_sstables)));
}
}
@@ -869,9 +874,42 @@ private:
};
class cleanup_compaction final : public regular_compaction {
dht::token_range_vector _owned_ranges;
private:
dht::partition_range_vector
get_ranges_for_invalidation(const std::vector<shared_sstable>& sstables) {
auto owned_ranges = dht::to_partition_ranges(_owned_ranges);
auto non_owned_ranges = boost::copy_range<dht::partition_range_vector>(sstables
| boost::adaptors::transformed([] (const shared_sstable& sst) {
return dht::partition_range::make({sst->get_first_decorated_key(), true},
{sst->get_last_decorated_key(), true});
}));
// optimize set of potentially overlapping ranges by deoverlapping them.
non_owned_ranges = dht::partition_range::deoverlap(std::move(non_owned_ranges), dht::ring_position_comparator(*_schema));
// subtract *each* owned range from the partition range of *each* sstable*,
// such that we'll be left only with a set of non-owned ranges.
for (auto& owned_range : owned_ranges) {
dht::partition_range_vector new_non_owned_ranges;
for (auto& non_owned_range : non_owned_ranges) {
auto ret = non_owned_range.subtract(owned_range, dht::ring_position_comparator(*_schema));
new_non_owned_ranges.insert(new_non_owned_ranges.end(), ret.begin(), ret.end());
}
non_owned_ranges = std::move(new_non_owned_ranges);
}
return non_owned_ranges;
}
protected:
virtual compaction_completion_desc
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) override {
auto ranges_for_for_invalidation = get_ranges_for_invalidation(input_sstables);
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
}
public:
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, std::function<shared_sstable()> creator, replacer_fn replacer)
: regular_compaction(cf, std::move(descriptor), std::move(creator), std::move(replacer))
, _owned_ranges(service::get_local_storage_service().get_local_ranges(_schema->ks_name()))
{
_info->type = compaction_type::Cleanup;
}
@@ -885,15 +923,13 @@ public:
}
flat_mutation_reader::filter make_partition_filter() const override {
dht::token_range_vector owned_ranges = service::get_local_storage_service().get_local_ranges(_schema->ks_name());
return [this, owned_ranges = std::move(owned_ranges)] (const dht::decorated_key& dk) {
return [this] (const dht::decorated_key& dk) {
if (dht::shard_of(dk.token()) != engine().cpu_id()) {
clogger.trace("Token {} does not belong to CPU {}, skipping", dk.token(), engine().cpu_id());
return false;
}
if (!belongs_to_current_node(dk.token(), owned_ranges)) {
if (!belongs_to_current_node(dk.token(), _owned_ranges)) {
clogger.trace("Token {} does not belong to this node, skipping", dk.token());
return false;
}

View File

@@ -27,6 +27,7 @@
#include "gc_clock.hh"
#include "compaction_weight_registration.hh"
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include <seastar/core/thread.hh>
#include <functional>
@@ -133,8 +134,15 @@ namespace sstables {
}
};
struct compaction_completion_desc {
std::vector<shared_sstable> input_sstables;
std::vector<shared_sstable> output_sstables;
// Set of compacted partition ranges that should be invalidated in the cache.
dht::partition_range_vector ranges_for_cache_invalidation;
};
// Replaces old sstable(s) by new one(s) which contain all non-expired data.
using replacer_fn = std::function<void(std::vector<shared_sstable> removed, std::vector<shared_sstable> added)>;
using replacer_fn = std::function<void(compaction_completion_desc)>;
// Compact a list of N sstables into M sstables.
// Returns info about the finished compaction, which includes vector to new sstables.

View File

@@ -85,7 +85,7 @@ private:
} _state = state::START;
temporary_buffer<char> _key;
uint32_t _promoted_index_end;
uint64_t _promoted_index_end;
uint64_t _position;
uint64_t _partition_header_length = 0;
std::optional<deletion_time> _deletion_time;

View File

@@ -1310,7 +1310,7 @@ future<> sstable::open_data() {
c->pop_back();
return make_ready_future<>();
}).then([this, c] () mutable {
c = {};
*c = {};
_open = true;
return make_ready_future<>();
});

View File

@@ -69,15 +69,6 @@ stream_transfer_task::stream_transfer_task(shared_ptr<stream_session> session, U
stream_transfer_task::~stream_transfer_task() = default;
dht::partition_range_vector to_partition_ranges(const dht::token_range_vector& ranges) {
dht::partition_range_vector prs;
prs.reserve(ranges.size());
for (auto& range : ranges) {
prs.push_back(dht::to_partition_range(range));
}
return prs;
}
struct send_info {
database& db;
utils::UUID plan_id;
@@ -103,7 +94,7 @@ struct send_info {
, reason(reason_)
, cf(db.find_column_family(cf_id))
, ranges(std::move(ranges_))
, prs(to_partition_ranges(ranges))
, prs(dht::to_partition_ranges(ranges))
, reader(cf.make_streaming_reader(cf.schema(), prs)) {
}
future<bool> has_relevant_range_on_this_shard() {

View File

@@ -1181,8 +1181,7 @@ table::rebuild_sstable_list(const std::vector<sstables::shared_sstable>& new_sst
// Note: must run in a seastar thread
void
table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new_sstables,
const std::vector<sstables::shared_sstable>& sstables_to_remove) {
table::on_compaction_completion(sstables::compaction_completion_desc& desc) {
// Build a new list of _sstables: We remove from the existing list the
// tables we compacted (by now, there might be more sstables flushed
// later), and we add the new tables generated by the compaction.
@@ -1195,7 +1194,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
// unbounded time, because all shards must agree on the deletion).
// make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion.
for (auto& sst : sstables_to_remove) {
for (auto& sst : desc.input_sstables) {
auto shards = sst->get_shards_for_this_sstable();
if (shards.size() > 1) {
throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!",
@@ -1209,9 +1208,12 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted;
// rebuilding _sstables_compacted_but_not_deleted first to make the entire rebuild operation exception safe.
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), sstables_to_remove.begin(), sstables_to_remove.end());
new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), desc.input_sstables.begin(), desc.input_sstables.end());
rebuild_sstable_list(new_sstables, sstables_to_remove);
_cache.invalidate([this, &desc] () noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
rebuild_sstable_list(desc.output_sstables, desc.input_sstables);
}, std::move(desc.ranges_for_cache_invalidation)).get();
// refresh underlying data source in row cache to prevent it from holding reference
// to sstables files that are about to be deleted.
@@ -1221,7 +1223,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
rebuild_statistics();
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove = desc.input_sstables] {
return with_semaphore(_sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] {
return sstables::delete_atomically(std::move(sstables_to_remove));
});
@@ -1239,7 +1241,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
// or they could stay forever in the set, resulting in deleted files remaining
// opened and disk space not being released until shutdown.
std::unordered_set<sstables::shared_sstable> s(
sstables_to_remove.begin(), sstables_to_remove.end());
desc.input_sstables.begin(), desc.input_sstables.end());
auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool {
return s.count(sst);
});
@@ -1296,13 +1298,12 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) {
sst->set_unshared();
return sst;
};
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (std::vector<sstables::shared_sstable> old_ssts,
std::vector<sstables::shared_sstable> new_ssts) {
_compaction_strategy.notify_completion(old_ssts, new_ssts);
_compaction_manager.propagate_replacement(this, old_ssts, new_ssts);
this->on_compaction_completion(new_ssts, old_ssts);
auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) {
_compaction_strategy.notify_completion(desc.input_sstables, desc.output_sstables);
_compaction_manager.propagate_replacement(this, desc.input_sstables, desc.output_sstables);
this->on_compaction_completion(desc);
if (release_exhausted) {
release_exhausted(old_ssts);
release_exhausted(desc.input_sstables);
}
};
@@ -1889,6 +1890,9 @@ future<> table::flush_streaming_mutations(utils::UUID plan_id, dht::partition_ra
return _streaming_memtables->seal_active_memtable_delayed().then([this] {
return _streaming_flush_phaser.advance_and_await();
}).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
if (sstables.empty()) {
return make_ready_future<>();
}
return _cache.invalidate([this, sstables = std::move(sstables)] () mutable noexcept {
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
for (auto&& sst : sstables) {

View File

@@ -31,6 +31,44 @@
#include "cql3/statements/raw/parsed_statement.hh"
#include "cql3/util.hh"
//
// Test basic CQL string quoting
//
BOOST_AUTO_TEST_CASE(maybe_quote) {
std::string s(65536, 'x');
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), s);
s += " " + std::string(65536, 'y');
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(s), "\"" + s + "\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("a"), "a");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("z"), "z");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("b0"), "b0");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("y9"), "y9");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("c_d"), "c_d");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("x8_"), "x8_");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote(""), "\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("0"), "\"0\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("9"), "\"9\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("_"), "\"_\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("A"), "\"A\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("To"), "\"To\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("zeD"), "\"zeD\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world"), "\"hello world\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234"), "hello_world01234");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234"), "\"hello world01234\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234"), "\"hello world\"\"1234\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello_world01234hello_world01234"), "hello_world01234hello_world01234");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world01234hello_world01234"), "\"hello world01234hello_world01234\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello world\"1234hello_world\"1234"), "\"hello world\"\"1234hello_world\"\"1234\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\""), "\"\"\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("[\"]"), "\"[\"\"]\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"\""), "\"\"\"\"\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"hell0\""), "\"\"\"hell0\"\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello \"my\" world"), "\"hello \"\"my\"\" world\"");
}
//
// These tests verify that all excepted variations of CQL syntax related to access-control ("auth") functionality are
// accepted by the parser. They do not verify that invalid syntax is rejected, nor do they verify the correctness of

View File

@@ -4491,6 +4491,44 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) {
assert_that(msg).is_rows().with_rows({
{{int32_type->decompose(11), int32_type->decompose(13), int32_type->decompose(1), int32_type->decompose(2)}},
});
// Reproduce issue #6008 - updates with not-previously-existing row,
// not setting both v1 and v2 - should not create a view row, and
// definitely not cause a crash as they did in #6008. Same for
// deletes when no previous row exists.
cquery_nofail(e, "DELETE FROM t WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "UPDATE t SET v1 = 17 WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "DELETE FROM t WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "UPDATE t SET v2 = 7 WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
// Same tests as above, but with a row marker left behind, so there
// is an existing base row - it's just empty.
cquery_nofail(e, "INSERT INTO t (p, c, v1, v2) VALUES (1, 2, 3, 4)");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_rows({
{{int32_type->decompose(3), int32_type->decompose(4), int32_type->decompose(1), int32_type->decompose(2)}},
});
cquery_nofail(e, "UPDATE t SET v1 = NULL, v2 = NULL WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "UPDATE t SET v1 = 17 WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "UPDATE t SET v1 = NULL, v2 = NULL WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
cquery_nofail(e, "UPDATE t SET v2 = 7 WHERE p = 1 AND c = 2");
msg = cquery_nofail(e, "SELECT * FROM tv");
assert_that(msg).is_rows().with_size(0);
});
}

View File

@@ -118,6 +118,53 @@ SEASTAR_TEST_CASE(test_multishard_writer) {
});
}
SEASTAR_TEST_CASE(test_multishard_writer_producer_aborts) {
return do_with_cql_env_thread([] (cql_test_env& e) {
auto test_random_streams = [] (random_mutation_generator&& gen, size_t partition_nr, generate_error error = generate_error::no) {
auto muts = gen(partition_nr);
schema_ptr s = gen.schema();
auto source_reader = partition_nr > 0 ? flat_mutation_reader_from_mutations(muts) : make_empty_flat_reader(s);
int mf_produced = 0;
auto get_next_mutation_fragment = [&source_reader, &mf_produced] () mutable {
if (mf_produced++ > 800) {
return make_exception_future<mutation_fragment_opt>(std::runtime_error("the producer failed"));
} else {
return source_reader(db::no_timeout);
}
};
auto& partitioner = dht::global_partitioner();
try {
distribute_reader_and_consume_on_shards(s, partitioner,
make_generating_reader(s, std::move(get_next_mutation_fragment)),
[&partitioner, error] (flat_mutation_reader reader) mutable {
if (error) {
return make_exception_future<>(std::runtime_error("Failed to write"));
}
return repeat([&partitioner, reader = std::move(reader), error] () mutable {
return reader(db::no_timeout).then([&partitioner, error] (mutation_fragment_opt mf_opt) mutable {
if (mf_opt) {
if (mf_opt->is_partition_start()) {
auto shard = partitioner.shard_of(mf_opt->as_partition_start().key().token());
BOOST_REQUIRE_EQUAL(shard, this_shard_id());
}
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
});
});
}
).get0();
} catch (...) {
// The distribute_reader_and_consume_on_shards is expected to fail and not block forever
}
};
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::no);
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no, local_shard_only::yes), 1000, generate_error::yes);
});
}
namespace {
class bucket_writer {

View File

@@ -36,6 +36,7 @@
#include <sstream>
#include <boost/range/algorithm/adjacent_find.hpp>
#include <boost/algorithm/cxx11/iota.hpp>
#include "test/lib/cql_test_env.hh"
static logging::logger nlogger("NetworkTopologyStrategyLogger");
@@ -607,4 +608,13 @@ SEASTAR_TEST_CASE(testCalculateEndpoints) {
});
}
SEASTAR_TEST_CASE(test_invalid_dcs) {
return do_with_cql_env_thread([] (auto& e) {
for (auto& incorrect : std::vector<std::string>{"3\"", "", "!!!", "abcb", "!3", "-5", "0x123"}) {
BOOST_REQUIRE_THROW(e.execute_cql("CREATE KEYSPACE abc WITH REPLICATION "
"= {'class': 'NetworkTopologyStrategy', 'dc1':'" + incorrect + "'}").get(),
exceptions::configuration_exception);
};
});
}

View File

@@ -200,5 +200,16 @@ BOOST_AUTO_TEST_CASE(inet_address) {
auto res = ser::deserialize_from_buffer(buf, boost::type<gms::inet_address>{});
BOOST_CHECK_EQUAL(res, ip);
}
// stringify tests
{
for (sstring s : { "2001:6b0:8:2::232", "2a05:d018:223:f00:97af:f4d9:eac2:6a0f", "fe80::8898:3e04:215b:2cd6" }) {
gms::inet_address ip(s);
BOOST_CHECK(ip.addr().is_ipv6());
auto s2 = boost::lexical_cast<std::string>(ip);
gms::inet_address ip2(s);
BOOST_CHECK_EQUAL(ip2, ip);
}
}
}

View File

@@ -4835,7 +4835,9 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
| boost::adaptors::transformed([] (auto& sst) { return sst->generation(); }));
auto expected_sst = sstable_run.begin();
auto closed_sstables_tracker = sstable_run.begin();
auto replacer = [&] (auto old_sstables, auto new_sstables) {
auto replacer = [&] (sstables::compaction_completion_desc desc) {
auto old_sstables = std::move(desc.input_sstables);
auto new_sstables = std::move(desc.output_sstables);
BOOST_REQUIRE(expected_sst != sstable_run.end());
if (incremental_enabled) {
do_incremental_replace(std::move(old_sstables), std::move(new_sstables), expected_sst, closed_sstables_tracker);
@@ -5371,8 +5373,9 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
cf->add_sstable_and_update_cache(expired_sst).get();
BOOST_REQUIRE(is_partition_dead(alpha));
auto replacer = [&] (std::vector<sstables::shared_sstable> old_sstables, std::vector<sstables::shared_sstable> new_sstables) {
auto replacer = [&] (sstables::compaction_completion_desc desc) {
auto old_sstables = std::move(desc.input_sstables);
auto new_sstables = std::move(desc.output_sstables);
// expired_sst is exhausted, and new sstable is written with mut 2.
BOOST_REQUIRE(old_sstables.size() == 1);
BOOST_REQUIRE(old_sstables.front() == expired_sst);

View File

@@ -223,7 +223,7 @@ public:
};
inline auto replacer_fn_no_op() {
return [](std::vector<shared_sstable> removed, std::vector<shared_sstable> added) -> void {};
return [](sstables::compaction_completion_desc desc) -> void {};
}
inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)

View File

@@ -1,4 +1,4 @@
FROM fedora:31
FROM docker.io/fedora:31
ADD ./install-dependencies.sh ./
ADD ./seastar/install-dependencies.sh ./seastar/
ADD ./tools/toolchain/system-auth ./

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-31-20200115
docker.io/scylladb/scylla-toolchain:fedora-31-branch-3.3-20200615

View File

@@ -254,6 +254,14 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
--_connections;
return unadvertise_connection(conn);
}).handle_exception([] (std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch(std::system_error& serr) {
if (serr.code().category() == std::system_category() &&
serr.code().value() == EPIPE) { // expected if another side closes a connection
return;
}
} catch(...) {};
clogger.info("exception while processing connection: {}", ep);
});
});