Compare commits

...

42 Commits

Author SHA1 Message Date
Hagit Segev
000585522e release: prepare for 4.3.0 2021-01-10 10:04:40 +02:00
Evgeniy Naydanov
47b121130a scylla_raid_setup: try /dev/md[0-9] if no --raiddev provided
If scylla_raid_setup script called without --raiddev argument
then try to use any of /dev/md[0-9] devices instead of only
one /dev/md0.  Do it in this way because on Ubuntu 20.04
/dev/md0 used by OS already.

Closes #7628

(cherry picked from commit 587b909c5c)

Fixes #7627.
2021-01-03 16:46:16 +02:00
Takuya ASADA
15f55141ec scylla_raid_setup: use sysfs to detect existing RAID volume
We may not able to detect existing RAID volume by device file existance,
we should use sysfs instead to make sure it's running.

Fixes #7383

Closes #7399

(cherry picked from commit fc1c4f2261)
2021-01-03 16:45:36 +02:00
Avi Kivity
69fbeaa27e Update tools/jmx submodule
* tools/jmx c51906e...47b355e (1):
  > install.sh: set a valid WorkingDirectory for nonroot offline install

Ref scylladb/scylla-jmx#151
2020-12-31 14:11:51 +02:00
Benny Halevy
a366de2a63 compaction: compaction_writer: destroy shared_sstable after the sstable_writer
sstable_writer may depend on the sstable throughout its whole lifecycle.
If the sstable is freed before the sstable_writer we might hit use-after-free
as in the follwing case:
```
std::_Deque_iterator<sstables::compression::segmented_offsets::bucket, sstables::compression::segmented_offsets::bucket&, sstables::compression::segmented_offsets::bucket*>::operator+=(long) at /usr/include/c++/10/bits/stl_deque.h:240
 (inlined by) std::operator+(std::_Deque_iterator<sstables::compression::segmented_offsets::bucket, sstables::compression::segmented_offsets::bucket&, sstables::compression::segmented_offsets::bucket*> const&, long) at /usr/include/c++/10/bits/stl_deque.h:378
 (inlined by) std::_Deque_iterator<sstables::compression::segmented_offsets::bucket, sstables::compression::segmented_offsets::bucket&, sstables::compression::segmented_offsets::bucket*>::operator[](long) const at /usr/include/c++/10/bits/stl_deque.h:252
 (inlined by) std::deque<sstables::compression::segmented_offsets::bucket, std::allocator<sstables::compression::segmented_offsets::bucket> >::operator[](unsigned long) at /usr/include/c++/10/bits/stl_deque.h:1327
 (inlined by) sstables::compression::segmented_offsets::push_back(unsigned long, sstables::compression::segmented_offsets::state&) at ./sstables/compress.cc:214
sstables::compression::segmented_offsets::writer::push_back(unsigned long) at ./sstables/compress.hh:123
 (inlined by) compressed_file_data_sink_impl<crc32_utils, (compressed_checksum_mode)1>::put(seastar::temporary_buffer<char>) at ./sstables/compress.cc:519
seastar::output_stream<char>::put(seastar::temporary_buffer<char>) at table.cc:?
 (inlined by) seastar::output_stream<char>::put(seastar::temporary_buffer<char>) at ././seastar/include/seastar/core/iostream-impl.hh:432
seastar::output_stream<char>::flush() at table.cc:?
seastar::output_stream<char>::close() at table.cc:?
sstables::file_writer::close() at sstables.cc:?
sstables::mc::writer::~writer() at writer.cc:?
 (inlined by) sstables::mc::writer::~writer() at ./sstables/mx/writer.cc:790
sstables::mc::writer::~writer() at writer.cc:?
flat_mutation_reader::impl::consumer_adapter<stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> > >::~consumer_adapter() at compaction.cc:?
 (inlined by) std::_Optional_payload_base<sstables::compaction_writer>::_M_destroy() at /usr/include/c++/10/optional:260
 (inlined by) std::_Optional_payload_base<sstables::compaction_writer>::_M_reset() at /usr/include/c++/10/optional:280
 (inlined by) std::_Optional_payload<sstables::compaction_writer, false, false, false>::~_Optional_payload() at /usr/include/c++/10/optional:401
 (inlined by) std::_Optional_base<sstables::compaction_writer, false, false>::~_Optional_base() at /usr/include/c++/10/optional:474
 (inlined by) std::optional<sstables::compaction_writer>::~optional() at /usr/include/c++/10/optional:659
 (inlined by) sstables::compacting_sstable_writer::~compacting_sstable_writer() at ./sstables/compaction.cc:229
 (inlined by) compact_mutation<(emit_only_live_rows)0, (compact_for_sstables)1, sstables::compacting_sstable_writer, noop_compacted_fragments_consumer>::~compact_mutation() at ././mutation_compactor.hh:468
 (inlined by) compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer>::~compact_for_compaction() at ././mutation_compactor.hh:538
 (inlined by) std::default_delete<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >::operator()(compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer>*) const at /usr/include/c++/10/bits/unique_ptr.h:85
 (inlined by) std::unique_ptr<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer>, std::default_delete<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> > >::~unique_ptr() at /usr/include/c++/10/bits/unique_ptr.h:361
 (inlined by) stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >::~stable_flattened_mutations_consumer() at ././mutation_reader.hh:342
 (inlined by) flat_mutation_reader::impl::consumer_adapter<stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> > >::~consumer_adapter() at ././flat_mutation_reader.hh:201
auto flat_mutation_reader::impl::consume_in_thread<stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >, flat_mutation_reader::no_filter>(stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >, flat_mutation_reader::no_filter, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000l> > >) at ././flat_mutation_reader.hh:272
 (inlined by) auto flat_mutation_reader::consume_in_thread<stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >, flat_mutation_reader::no_filter>(stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >, flat_mutation_reader::no_filter, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000l> > >) at ././flat_mutation_reader.hh:383
 (inlined by) auto flat_mutation_reader::consume_in_thread<stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> > >(stable_flattened_mutations_consumer<compact_for_compaction<sstables::compacting_sstable_writer, noop_compacted_fragments_consumer> >, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000l> > >) at ././flat_mutation_reader.hh:389
 (inlined by) seastar::future<void> sstables::compaction::setup<noop_compacted_fragments_consumer>(noop_compacted_fragments_consumer)::{lambda(flat_mutation_reader)#1}::operator()(flat_mutation_reader)::{lambda()#1}::operator()() at ./sstables/compaction.cc:612
```

What happens here is that:

    compressed_file_data_sink_impl(output_stream<char> out, sstables::compression* cm, sstables::local_compression lc)
            : _out(std::move(out))
            , _compression_metadata(cm)
            , _offsets(_compression_metadata->offsets.get_writer())
            , _compression(lc)
            , _full_checksum(ChecksumType::init_checksum())

_compression_metadata points to a buffer held by the sstable object.
and _compression_metadata->offsets.get_writer returns a writer that keeps
a reference to the segmented_offsets in the sstables::compression
that is used in the ~writer -> close path.

Fixes #7821

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20201227145726.33319-1-bhalevy@scylladb.com>
(cherry picked from commit 8a745a0ee0)
2020-12-29 15:07:12 +02:00
Yaron Kaikov
5bd52e4dba release: prepare for 4.3.rc3 2020-12-17 14:27:38 +02:00
Gleb Natapov
8a3a69bc3e mutation_writer: pass exceptions through feed_writer
feed_writer() eats exception and transforms it into an end of stream
instead. Downstream validators hate when this happens.

Fixes #7482
Message-Id: <20201216090038.GB3244976@scylladb.com>

(cherry picked from commit 61520a33d6)
2020-12-16 17:19:39 +02:00
Aleksandr Bykov
50c01f7331 dist: scylla_util: fix aws_instance.ebs_disks method
aws_instance.ebs_disks() method should return ebs disk
instead of ephemeral

Signed-off-by: Aleksandr Bykov <alex.bykov@scylladb.com>

Closes #7780

(cherry picked from commit e74dc311e7)
2020-12-16 11:58:19 +02:00
Avi Kivity
ecfe466e7b dist: rpm: uninstall tuned when installing scylla-kernel-conf
tuned 2.11.0-9 and later writes to kerned.sched_wakeup_granularity_ns
and other sysctl tunables that we so laboriously tuned, dropping
performance by a factor of 5 (due to increased latency). Fix by
obsoleting tuned during install (in effect, we are a better tuned,
at least for us).

Not needed for .deb, since debian/ubunto do not install tuned by
default.

Fixes #7696

Closes #7776

(cherry picked from commit 615b8e8184)
2020-12-12 14:29:58 +02:00
Kamil Braun
69e5caadb6 cdc: produce postimage when inserting with no regular columns
When a row was inserted into a table with no regular columns, and no
such row existed in the first place, postimage would not be produced.
Fix this.

Fixes #7716.

Closes #7723

(cherry picked from commit 2da723b9c8)
2020-12-11 20:14:03 +02:00
Piotr Sarna
0ff3c0dcb5 Merge 'Cleanup CDC tests after CDC became GA' from Piotr Jastrzębski
Now that CDC is GA, it should be enabled in all the tests by default.
To achieve that the PR adds a special db::config::add_cdc_extension()
helper which is used in cql_test_envm to make sure CDC is usable in
all the tests that use cql_test_env.m As a result, cdc_tests can be
simplified.
Finally, some trailing whitespaces are removed from cdc_tests.

Tests: unit(dev)

Closes #7657

* github.com:scylladb/scylla:
  cdc: Remove trailing whitespaces from cdc_tests
  cdc: Remove mk_cdc_test_config from tests
  config: Add add_cdc_extension function for testing
  cdc: Add missing includes to cdc_extension.hh

(cherry picked from commit 5a9dc6a3cc)
2020-12-11 20:13:08 +02:00
Nadav Har'El
2148a194c2 alternator: fix broken Scan/Query paging with bytes keys
When an Alternator table has partition keys or sort keys of type "bytes"
(blobs), a Scan or Query which required paging used to fail - we used
an incorrect function to output LastEvaluatedKey (which tells the user
where to continue at the next page), and this incorrect function was
correct for strings and numbers - but NOT for bytes (for bytes, we
need to encode them as base-64).

This patch also includes two tests - for bytes partition key and
for bytes sort key - that failed before this patch and now pass.
The test test_fetch_from_system_tables also used to fail after a
Limit was added to it, because one of the tables it scans had a bytes
key. That test is also fixed by this patch.

Fixes #7768

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20201207175957.2585456-1-nyh@scylladb.com>
(cherry picked from commit 86779664f4)
2020-12-10 18:58:36 +02:00
Piotr Sarna
77ab7b1221 db: fix getting local ranges for size estimates table
When getting local ranges, an assumption is made that
if a range does not contain an end or when its end is a maximum token,
then it must contain a start. This assumption proven not true
during manual tests, so it's now fortified with an additional check.

Here's a gdb output for a set of local ranges which causes an assertion
failure when calling `get_local_ranges` on it:

(gdb) p ranges
$1 = std::vector of length 2, capacity 2 = {{_interval = {_start = std::optional<interval_bound<dht::token>> = {[contained value] = {_value = {_kind = dht::token_kind::before_all_keys,
            _data = 0}, _inclusive = false}}, _end = std::optional<interval_bound<dht::token>> [no contained value], _singular = false}}, {_interval = {
      _start = std::optional<interval_bound<dht::token>> [no contained value], _end = std::optional<interval_bound<dht::token>> = {[contained value] = {_value = {
            _kind = dht::token_kind::before_all_keys, _data = 0}, _inclusive = true}}, _singular = false}}}

Closes #7764

(cherry picked from commit 1cc4ed50c1)
2020-12-10 18:58:36 +02:00
Nadav Har'El
59bcd7f029 alternator, test: make test_fetch_from_system_tables faster
The test test_fetch_from_system_tables tests Alternator's system-table
feature by reading from all system tables. The intention was to confirm
we don't crash reading any of them - as they have different schemas and
can run into different problems (we had such problems in the initial
implementation). The intention was not to read *a lot* from each table -
we only make a single "Scan" call on each, to read one page of data.
However, the Scan call did not set a Limit, so the single page can get
pretty big.

This is not normally a problem, but in extremely slow runs - such as when
running the debug build on an extremely overcommitted test machine (e.g.,
issue #7706) reading this large page may take longer than our default
timeout. I'll send a separate patch for the timeout issue, but for now,
there is really no reason why we need to read a big page. It is good
enough to just read 50 rows (with Limit=50). This will still read all
the different types and make the test faster.

As an example, in the debug run on my laptop, this test spent 2.4
seconds to read the "compaction_history" table before this patch,
and only 0.1 seconds after this patch. 2.4 seconds is close to our
default timeout (10 seconds), 0.1 is very far.

Fixes #7706

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20201207075112.2548178-1-nyh@scylladb.com>
(cherry picked from commit 220d6dde17)
2020-12-10 18:58:36 +02:00
Calle Wilund
bc5008b165 alternator::streams: Use end-of-record info in get_records
Fixes #7496

Since cdc log now has an end-of-batch/record marker that tells
us explicitly that we've read the last row of a change, we
can use this instead of timestamp checks + limit extra to
ensure we have complete records.

Note that this does not try to fulfill user query limit
exact. To do this we would need to add a loop and potentially
re-query if quried rows are not enough. But that is a
separate exercise, and superbly suited for coroutines!

(cherry picked from commit c79108edbb)
2020-12-10 18:58:36 +02:00
Nadav Har'El
dd7e3d3eab alternator: fix query with both projection and filtering
We had a bug when a Query/Scan had both projection (ProjectionExpression
or AttributesToGet) and filtering (FilterExpression or Query/ScanFilter).
The problem was that projection left only the requested attributes, and
the filter might have needed - and not got - additional attributes.

The solution in this patch is to add the generated JSON item also
the extra attributes needed by filtering (if any), run the filter on
that, and only at the end remove the extra filtering attributes from
the item to be returned.

The two tests

 test_query_filter.py::test_query_filter_and_attributes_to_get
 test_filter_expression.py::test_filter_expression_and_projection_expression

Which failed before this patch now pass so we drop their "xfail" tag.

Fixes #6951.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 282742a469)
2020-12-10 18:58:36 +02:00
Lubos Kosco
3b617164dc scylla_util.py: Increase disk to ram ratio for GCP
Increase accepted disk-to-RAM ratio to 105 to accomodate even 7.5GB of
RAM for one NVMe log various reasons for not recommending the instance
type.

Fixes #7587

Closes #7600

(cherry picked from commit a0b1474bba)
2020-12-09 09:38:01 +02:00
Benny Halevy
bb99d7ced6 large_data_handler: disable deletion of large data entries
Currently we decide whether to delete large data entries
based on the overall sstable data_size, since the entries
themselves are typically much smaller than the whole sstable
(especially cells and rows), this causes overzealous
deletions (#7668) and inefficiency in the rows cache
due to the large number of range tombstones created.

Refs #7575

Test: sstable_3_x_test(dev)
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

This patch is targetted for branch-4.3 or earlier.
In 4.4, the problem was fixed in #7669, but the fix
is out of scope for backporting.

Branch: 4.3
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20201203130018.1920271-1-bhalevy@scylladb.com>
2020-12-06 11:33:41 +02:00
Asias He
9877246251 repair: Make repair_writer a shared pointer
The future of the fiber that writes data into sstables inside
the repair_writer is stored in _writer_done like below:

class repair_writer {
   _writer_done[node_idx] =
      mutation_writer::distribute_reader_and_consume_on_shards().then([this] {
         ...
      }).handle_exception([this] {
         ...
      });
}

The fiber access repair_writer object in the error handling path. We
wait for the _writer_done to finish before we destroy repair_meta
object which contains the repair_writer object to avoid the fiber
accessing already freed repair_writer object.

To be safer, we can make repair_writer a shared pointer and take a
reference in the distribute_reader_and_consume_on_shards code path.

Fixes #7406

Closes #7430

(cherry picked from commit 289a08072a)
2020-11-29 12:03:18 +02:00
Takuya ASADA
d966e2d500 install.sh: set PATH for relocatable CLI tools in python thunk
We currently set PATH for relocatable CLI tools in scylla_util.run() and
scylla_util.out(), but it doesn't work for perftune.py, since it's not part of
Scylla, does not use scylla_util module.
We can set PATH in python thunk instead, it can set PATH for all python scripts.

Fixes #7350

(cherry picked from commit 5867af4edd)
2020-11-29 11:52:24 +02:00
Takuya ASADA
81831d93d2 dist/redhat: packaging dependencies.conf as normal file, not ghost
When we introduced dependencies.conf, we mistakenly added it on rpm as %ghost,
but it should be normal file, should be installed normally on package installation.

Fixes #7703

Closes #7704

(cherry picked from commit ba4d54efa3)
2020-11-29 11:40:15 +02:00
Lubos Kosco
542a7d28a3 scylla_util.py: fix metadata gcp call for disks to get details
disk parsing expects output from recursive listing of GCP
metadata REST call, the method used to do it by default,
but now it requires a boolean flag to run in recursive mode

Fixes #7684

Closes #7685

(cherry picked from commit 4d0587ed11)
2020-11-29 11:39:23 +02:00
Takuya ASADA
1310e6cb48 install.sh: apply sysctl.d files on non-packaging installation
We don't apply sysctl.d files on non-packaging installation, apply them
just like rpm/deb taking care of that.

Fixes #7702

Closes #7705

(cherry picked from commit 5f81f97773)
2020-11-29 11:35:24 +02:00
Avi Kivity
99a6ecb25d dist: sysctl: configure more inotify instances
Since f3bcd4d205 ("Merge 'Support SSL Certificate Hot
Reloading' from Calle"), we reload certificates as they are
modified on disk. This uses inotify, which is limited by a
sysctl fs.inotify.max_user_instances, with a default of 128.

This is enough for 64 shards only, if both rpc and cql are
encrypted; above that startup fails.

Increase to 1200, which is enough for 6 instances * 200 shards.

Fixes #7700.

Closes #7701

(cherry picked from commit 390e07d591)
2020-11-29 10:54:27 +02:00
Hagit Segev
bc922a743f release: prepare for 4.3.rc2 2020-11-23 14:20:17 +02:00
Piotr Sarna
1ec4f50e3c db,view: remove duplicate entries from the list of target endpoints
If a list of target endpoints for sending view updates contains
duplicates, it results in benign (but annoying) broken promise
errors happening due to duplicated write response handlers being
instantiated for a single endpoint.
In order to avoid such errors, target remote endpoints are deduplicated
from the list of pending endpoints.
A similar issue (#5459) solved the case for duplicated local endpoints,
but that didn't solve the general case.

Fixes #7572

Closes #7641

(cherry picked from commit c0d72b4491)
2020-11-22 14:15:53 +02:00
Raphael S. Carvalho
9c7ff01c5d compaction: Make sure a partition is filtered out only by producer
If interposer consumer is enabled, partition filtering will be done by the
consumer instead, but that's not possible because only the producer is able
to skip to the next partition if the current one is filtered out, so scylla
crashes when that happens with a bad function call in queue_reader.
This is a regression which started here: 55a8b6e3c9

To fix this problem, let's make sure that partition filtering will only
happen on the producer side.

Fixes #7590.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20201111221513.312283-1-raphaelsc@scylladb.com>
(cherry picked from commit 13fa2bec4c)
2020-11-19 14:07:54 +02:00
Nadav Har'El
da29b65e04 Merge 'Mark CDC as GA' from Piotr Jastrzębski
CDC is ready to be a non-experimental feature so remove the experimental flag for it.
Also, guard Alternator Streams with their own experimental flag. Previously, they were using CDC experimental flag as they depend on CDC.

Tests: unit(dev)

Closes #7539

* github.com:scylladb/scylla:
  alternator: guard streams with an experimental flag
  Mark CDC as GA
  cdc: Make it possible for CDC generation creation to fail

(cherry picked from commit 78649c2322)
2020-11-17 14:12:42 +02:00
Kamil Braun
8c3e8350d6 cdc: ensure that CDC generation write is flushed to commitlog before ack
When a node bootstraps or upgrades from a pre-CDC version, it creates a
new CDC generation, writes it to a distributed table
(system_distributed.cdc_generation_descriptions), and starts gossiping
its timestamp. When other nodes see the timestamp being gossiped, they
retrieve the generation from the table.

The bootstrapping/upgrading node therefore assumes that the generation
is made durable and other nodes will be able to retrieve it from the
table. This assumption could be invalidated if periodic commitlog mode
was used: replicas would acknowledge the write and then immediately
crash, losing the write if they were unlucky (i.e. commitlog wasn't
synced to disk before the write was acknowledged).

This commit enforces all writes to the generations table to be
synced to commitlog immediately. It does not matter for performance as
these writes are very rare.

Fixes https://github.com/scylladb/scylla/issues/7610.

Closes #7619

(cherry picked from commit d74f303406)
2020-11-17 14:03:31 +02:00
Lubos Kosco
708588bf8b scylla_util.py: properly parse GCP instances without size
fixes #7577

Closes #7592

(cherry picked from commit 5c488b6e9a)
2020-11-16 14:02:36 +02:00
Botond Dénes
b2271800a5 mutation_reader: queue_reader: don't set EOS flag on abort
If the consumer happens to check the EOS flag before it hits the
exception injected by the abort (by calling fill_buffer()), they can
think the stream ended normally and expect it to be valid. However this
is not guaranteed when the reader is aborted. To avoid consumers falsely
thinking the stream ended normally, don't set the EOS flag on abort at
all.

Additionally make sure the producer is aborted too on abort. In theory
this is not needed as they are the one initiating the abort, but better
to be safe then sorry.

Fixes: #7411
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20201102100732.35132-1-bdenes@scylladb.com>
(cherry picked from commit f5323b29d9)
2020-11-15 10:50:22 +02:00
Piotr Dulikowski
209c3512e7 main.cc: wait for hints manager to start
In main.cc, we spawn a future which starts the hints manager, but we
don't wait for it to complete. This can have the following consequences:

- The hints manager does some asynchronous operations during startup,
  so it can take some time to start. If it is started after we start
  handling requests, and we admit some requests which would result in
  hints being generated, those hints will be dropped instead because we
  check if hints manager is started before writing them.
- Initialization of hints manager may fail, and Scylla won't be stopped
  because of it (e.g. we don't have permissions to create hints
  directories). The consequence of this is that hints manager won't be
  started, and hints will be dropped instead of being written. This may
  affect both regular hints manager, and the view hints manager.

This commit causes us to wait until hints manager start and see if there
were any errors during initialization.

Fixes #7598

(cherry picked from commit 1e03924509)
2020-11-13 18:53:50 +02:00
Bentsi Magidovich
4896ce0fd4 scylla_util.py: fix exception handling in curl
Retry mechanism didn't work when URLError happend. For example:

  urllib.error.URLError: <urlopen error [Errno 101] Network is unreachable>

Let's catch URLError instead of HTTP since URLError is a base exception
for all exceptions in the urllib module.

Fixes: #7569

Closes #7567

(cherry picked from commit 956b97b2a8)
2020-11-11 14:18:05 +02:00
Avi Kivity
9d84b1f13d Merge 'improvements for GCE image' from Bentsi
when logging in to the GCE instance that is created from the GCE image it takes 10 seconds to understand that we are not running on AWS. Also, some unnecessary debug logging messages are printed:
```
bentsi@bentsi-G3-3590:~/devel/scylladb$ ssh -i ~/.ssh/scylla-qa-ec2 bentsi@35.196.8.86
Warning: Permanently added '35.196.8.86' (ECDSA) to the list of known hosts.
Last login: Sun Nov  1 22:14:57 2020 from 108.128.125.4

   _____            _ _       _____  ____
  / ____|          | | |     |  __ \|  _ \
 | (___   ___ _   _| | | __ _| |  | | |_) |
  \___ \ / __| | | | | |/ _` | |  | |  _ <
  ____) | (__| |_| | | | (_| | |__| | |_) |
 |_____/ \___|\__, |_|_|\__,_|_____/|____/
               __/ |
              |___/

Version:
       666.development-0.20201101.6be9f4938
Nodetool:
	nodetool help
CQL Shell:
	cqlsh
More documentation available at:
	http://www.scylladb.com/doc/
By default, Scylla sends certain information about this node to a data collection server. For information, see http://www.scylladb.com/privacy/

WARNING:root:Failed to grab http://169.254.169.254/latest/...
WARNING:root:Failed to grab http://169.254.169.254/latest/...
    Initial image configuration failed!

To see status, run
 'systemctl status scylla-image-setup'

[bentsi@artifacts-gce-image-jenkins-db-node-aa57409d-0-1 ~]$

```
this PR fixes this

Closes #7523

* github.com:scylladb/scylla:
  scylla_util.py: remove unnecessary logging
  scylla_util.py: make is_aws_instance faster
  scylla_util.py: added ability to control sleep time between retries in curl()

(cherry picked from commit 7a3376907e)
2020-11-11 14:17:59 +02:00
Yaron Kaikov
a8e372bf94 release: prepare for 4.3.rc1 2020-11-09 23:08:50 +02:00
Amnon Heiman
17e5ac9ab1 scyllatop/livedata.py: Safe iteration over metrics
This patch change the code that iterates over the metrics to use a copy
of the metrics names to make it safe to remove the metrics from the
metrics object.

Fixes #7488

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 52db99f25f)
2020-11-08 19:16:03 +02:00
Calle Wilund
d1d968c6e9 partition_version: Change range_tombstones() to return chunked_vector
Refs #7364

The number of tombstones can be large. As a stopgap measure to
just returning a source range (with keepalive), we can at least
alleviate the problem by using a chunked vector.

Closes #7433

(cherry picked from commit 4b65d67a1a)
2020-11-08 14:38:18 +02:00
Tomasz Grabiec
e186f66bfe sstables: ka/la: Fix abort when next_partition() is called with certain reader state
Cleanup compaction is using consume_pausable_in_thread() to skip over
disowned partitions, which uses flat_mutation_reader::next_partition().

The implementation of next_partition() for the sstable reader has a
bug which may cause the following assertion failure:

  scylla: sstables/mp_row_consumer.hh:422: row_consumer::proceed sstables::mp_row_consumer_k_l::flush(): Assertion `!_ready' failed.

This happens when the sstable reader's buffer gets full when we reach
the partition end. The last fragment of the partition won't be pushed
into the buffer but will stay in the _ready variable. When
next_partition() is called in this state, _ready will not be cleared
and the fragment will be carried over to the next partition. This will
cause assertion failure when the reader attempts to emit the first
fragment of the next partition.

The fix is to clear _ready when entering a partition, just like we
clear _range_tombstones there.

Fixes #7553.
Message-Id: <1604534702-12777-1-git-send-email-tgrabiec@scylladb.com>

(cherry picked from commit fb9b5cae05)
2020-11-08 14:25:27 +02:00
Piotr Sarna
78a39e8364 schema_tables: fix fixing old secondary index schemas
Old secondary index schemas did not have their idx_token column
marked as computed, and there already exists code which updates
them. Unfortunately, the fix itself contains an error and doesn't
fire if computed columns are not yet supported by the whole cluster,
which is a very common situation during upgrades.

Fixes #7515

Closes #7516

(cherry picked from commit b66c285f94)
2020-11-05 17:52:44 +02:00
Calle Wilund
bbef05ae3c cdc: Add an "end-of-record" column to
Fixes #7435

Adds an "eor" (end-of-record) column to cdc log. This is non-null only on
last-in-timestamp group rows, i.e. end of a singular source "event".

A client can use this as a shortcut to knowing whether or not he has a
full cdc "record" for a given source mutation (single row change).

Closes #7436

(cherry picked from commit 46ea8c9b8b)
2020-10-28 12:47:17 +02:00
Yaron Kaikov
6f324cb732 release: prepare for 4.3.rc0
Closes #7484
2020-10-26 14:43:13 +02:00
Avi Kivity
239499a35a Revert "Revert "config: Do not enable repair based node operations by default""
This reverts commit 71d0d58f8c. We still
do not have confirmation that it works reliably.
2020-10-26 14:21:26 +02:00
55 changed files with 579 additions and 253 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=4.3.0
if test -f version
then

View File

@@ -1881,7 +1881,8 @@ static std::string get_item_type_string(const rjson::value& v) {
// calculate_attrs_to_get() takes either AttributesToGet or
// ProjectionExpression parameters (having both is *not* allowed),
// and returns the list of cells we need to read.
// and returns the list of cells we need to read, or an empty set when
// *all* attributes are to be returned.
// In our current implementation, only top-level attributes are stored
// as cells, and nested documents are stored serialized as JSON.
// So this function currently returns only the the top-level attributes
@@ -2571,6 +2572,10 @@ public:
std::unordered_set<std::string>& used_attribute_values);
bool check(const rjson::value& item) const;
bool filters_on(std::string_view attribute) const;
// for_filters_on() runs the given function on the attributes that the
// filter works on. It may run for the same attribute more than once if
// used more than once in the filter.
void for_filters_on(const noncopyable_function<void(std::string_view)>& func) const;
operator bool() const { return bool(_imp); }
};
@@ -2651,10 +2656,26 @@ bool filter::filters_on(std::string_view attribute) const {
}, *_imp);
}
void filter::for_filters_on(const noncopyable_function<void(std::string_view)>& func) const {
if (_imp) {
std::visit(overloaded_functor {
[&] (const conditions_filter& f) -> void {
for (auto it = f.conditions.MemberBegin(); it != f.conditions.MemberEnd(); ++it) {
func(rjson::to_string_view(it->name));
}
},
[&] (const expression_filter& f) -> void {
return for_condition_expression_on(f.expression, func);
}
}, *_imp);
}
}
class describe_items_visitor {
typedef std::vector<const column_definition*> columns_t;
const columns_t& _columns;
const std::unordered_set<std::string>& _attrs_to_get;
std::unordered_set<std::string> _extra_filter_attrs;
const filter& _filter;
typename columns_t::const_iterator _column_it;
rjson::value _item;
@@ -2670,7 +2691,20 @@ public:
, _item(rjson::empty_object())
, _items(rjson::empty_array())
, _scanned_count(0)
{ }
{
// _filter.check() may need additional attributes not listed in
// _attrs_to_get (i.e., not requested as part of the output).
// We list those in _extra_filter_attrs. We will include them in
// the JSON but take them out before finally returning the JSON.
if (!_attrs_to_get.empty()) {
_filter.for_filters_on([&] (std::string_view attr) {
std::string a(attr); // no heterogenous maps searches :-(
if (!_attrs_to_get.contains(a)) {
_extra_filter_attrs.emplace(std::move(a));
}
});
}
}
void start_row() {
_column_it = _columns.begin();
@@ -2684,7 +2718,7 @@ public:
result_bytes_view->with_linearized([this] (bytes_view bv) {
std::string column_name = (*_column_it)->name_as_text();
if (column_name != executor::ATTRS_COLUMN_NAME) {
if (_attrs_to_get.empty() || _attrs_to_get.contains(column_name)) {
if (_attrs_to_get.empty() || _attrs_to_get.contains(column_name) || _extra_filter_attrs.contains(column_name)) {
if (!_item.HasMember(column_name.c_str())) {
rjson::set_with_string_name(_item, column_name, rjson::empty_object());
}
@@ -2696,7 +2730,7 @@ public:
auto keys_and_values = value_cast<map_type_impl::native_type>(deserialized);
for (auto entry : keys_and_values) {
std::string attr_name = value_cast<sstring>(entry.first);
if (_attrs_to_get.empty() || _attrs_to_get.contains(attr_name)) {
if (_attrs_to_get.empty() || _attrs_to_get.contains(attr_name) || _extra_filter_attrs.contains(attr_name)) {
bytes value = value_cast<bytes>(entry.second);
rjson::set_with_string_name(_item, attr_name, deserialize_item(value));
}
@@ -2708,6 +2742,11 @@ public:
void end_row() {
if (_filter.check(_item)) {
// Remove the extra attributes _extra_filter_attrs which we had
// to add just for the filter, and not requested to be returned:
for (const auto& attr : _extra_filter_attrs) {
rjson::remove_member(_item, attr);
}
rjson::push_back(_items, std::move(_item));
}
_item = rjson::empty_object();
@@ -2742,7 +2781,7 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
for (const column_definition& cdef : schema.partition_key_columns()) {
rjson::set_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(to_json_string(*cdef.type, *exploded_pk_it)));
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
++exploded_pk_it;
}
auto ck = paging_state.get_clustering_key();
@@ -2752,7 +2791,7 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
for (const column_definition& cdef : schema.clustering_key_columns()) {
rjson::set_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(to_json_string(*cdef.type, *exploded_ck_it)));
rjson::set_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_ck_it, cdef));
++exploded_ck_it;
}
}

View File

@@ -348,6 +348,39 @@ bool condition_expression_on(const parsed::condition_expression& ce, std::string
}, ce._expression);
}
// for_condition_expression_on() runs a given function over all the attributes
// mentioned in the expression. If the same attribute is mentioned more than
// once, the function will be called more than once for the same attribute.
static void for_value_on(const parsed::value& v, const noncopyable_function<void(std::string_view)>& func) {
std::visit(overloaded_functor {
[&] (const parsed::constant& c) { },
[&] (const parsed::value::function_call& f) {
for (const parsed::value& value : f._parameters) {
for_value_on(value, func);
}
},
[&] (const parsed::path& p) {
func(p.root());
}
}, v._value);
}
void for_condition_expression_on(const parsed::condition_expression& ce, const noncopyable_function<void(std::string_view)>& func) {
std::visit(overloaded_functor {
[&] (const parsed::primitive_condition& cond) {
for (const parsed::value& value : cond._values) {
for_value_on(value, func);
}
},
[&] (const parsed::condition_expression::condition_list& list) {
for (const parsed::condition_expression& cond : list.conditions) {
for_condition_expression_on(cond, func);
}
}
}, ce._expression);
}
// The following calculate_value() functions calculate, or evaluate, a parsed
// expression. The parsed expression is assumed to have been "resolved", with
// the matching resolve_* function.

View File

@@ -27,6 +27,8 @@
#include <unordered_set>
#include <string_view>
#include <seastar/util/noncopyable_function.hh>
#include "expressions_types.hh"
#include "utils/rjson.hh"
@@ -59,6 +61,11 @@ void validate_value(const rjson::value& v, const char* caller);
bool condition_expression_on(const parsed::condition_expression& ce, std::string_view attribute);
// for_condition_expression_on() runs the given function on the attributes
// that the expression uses. It may run for the same attribute more than once
// if the same attribute is used more than once in the expression.
void for_condition_expression_on(const parsed::condition_expression& ce, const noncopyable_function<void(std::string_view)>& func);
// calculate_value() behaves slightly different (especially, different
// functions supported) when used in different types of expressions, as
// enumerated in this enum:

View File

@@ -849,6 +849,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
static const bytes timestamp_column_name = cdc::log_meta_column_name_bytes("time");
static const bytes op_column_name = cdc::log_meta_column_name_bytes("operation");
static const bytes eor_column_name = cdc::log_meta_column_name_bytes("end_of_batch");
auto key_names = boost::copy_range<std::unordered_set<std::string>>(
boost::range::join(std::move(base->partition_key_columns()), std::move(base->clustering_key_columns()))
@@ -872,7 +873,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
auto regular_columns = boost::copy_range<query::column_id_vector>(schema->regular_columns()
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
| boost::adaptors::filtered([](const column_definition& cdef) { return cdef.name() == op_column_name || cdef.name() == eor_column_name || !cdc::is_cdc_metacolumn_name(cdef.name_as_text()); })
| boost::adaptors::transformed([&] (const column_definition& cdef) { columns.emplace_back(&cdef); return cdef.id; })
);
@@ -905,6 +906,11 @@ future<executor::request_return_type> executor::get_records(client_state& client
return cdef->name->name() == timestamp_column_name;
})
);
auto eor_index = std::distance(metadata.get_names().begin(),
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [](const lw_shared_ptr<cql3::column_specification>& cdef) {
return cdef->name->name() == eor_column_name;
})
);
std::optional<utils::UUID> timestamp;
auto dynamodb = rjson::empty_object();
@@ -930,15 +936,7 @@ future<executor::request_return_type> executor::get_records(client_state& client
for (auto& row : result_set->rows()) {
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
if (timestamp && timestamp != ts) {
maybe_add_record();
if (limit == 0) {
break;
}
}
timestamp = ts;
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
if (!dynamodb.HasMember("Keys")) {
auto keys = rjson::empty_object();
@@ -991,9 +989,13 @@ future<executor::request_return_type> executor::get_records(client_state& client
rjson::set(record, "eventName", "REMOVE");
break;
}
}
if (limit > 0 && timestamp) {
maybe_add_record();
if (eor) {
maybe_add_record();
timestamp = ts;
if (limit == 0) {
break;
}
}
}
auto ret = rjson::empty_object();
@@ -1047,6 +1049,9 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
if (!db.features().cluster_supports_cdc()) {
throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster.");
}
if (!db.features().cluster_supports_alternator_streams()) {
throw api_error::validation("StreamSpecification: alternator streams feature not enabled in cluster.");
}
cdc::options opts;
opts.enabled(true);

View File

@@ -20,10 +20,16 @@
#pragma once
#include <map>
#include <seastar/core/sstring.hh>
#include "bytes.hh"
#include "serializer.hh"
#include "db/extensions.hh"
#include "cdc/cdc_options.hh"
#include "schema.hh"
#include "serializer_impl.hh"
namespace cdc {

View File

@@ -154,7 +154,7 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
future<db_clock::time_point> get_local_streams_timestamp();
/* Generate a new set of CDC streams and insert it into the distributed cdc_generation_descriptions table.
* Returns the timestamp of this new generation.
* Returns the timestamp of this new generation
*
* Should be called when starting the node for the first time (i.e., joining the ring).
*

View File

@@ -519,6 +519,7 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
b.with_column(log_meta_column_name_bytes("batch_seq_no"), int32_type, column_kind::clustering_key);
b.with_column(log_meta_column_name_bytes("operation"), data_type_for<operation_native_type>());
b.with_column(log_meta_column_name_bytes("ttl"), long_type);
b.with_column(log_meta_column_name_bytes("end_of_batch"), boolean_type);
b.set_caching_options(caching_options::get_disabled_caching_options());
auto add_columns = [&] (const schema::const_iterator_range_type& columns, bool is_data_col = false) {
for (const auto& column : columns) {
@@ -880,14 +881,26 @@ public:
return _base_schema;
}
clustering_key create_ck(int batch) const {
return clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(batch) });
}
// Creates a new clustering row in the mutation, assigning it the next `cdc$batch_seq_no`.
// The numbering of batch sequence numbers starts from 0.
clustering_key allocate_new_log_row() {
auto log_ck = clustering_key::from_exploded(_log_schema, { _tuuid, int32_type->decompose(_batch_no++) });
auto log_ck = create_ck(_batch_no++);
set_key_columns(log_ck, _base_schema.partition_key_columns(), _base_pk);
return log_ck;
}
bool has_rows() const {
return _batch_no != 0;
}
clustering_key last_row_key() const {
return create_ck(_batch_no - 1);
}
// A common pattern is to allocate a row and then immediately set its `cdc$operation` column.
clustering_key allocate_new_log_row(operation op) {
auto log_ck = allocate_new_log_row();
@@ -944,6 +957,11 @@ public:
_log_mut.set_cell(log_ck, log_cdef, atomic_cell::make_live(*log_cdef.type, _ts, deleted_elements, _ttl));
}
void end_record() {
if (has_rows()) {
_log_mut.set_cell(last_row_key(), log_meta_column_name_bytes("end_of_batch"), data_value(true), _ts, _ttl);
}
}
private:
void set_key_columns(const clustering_key& log_ck, schema::const_iterator_range_type columns, const std::vector<bytes>& key) {
size_t pos = 0;
@@ -1272,6 +1290,13 @@ struct process_change_visitor {
_clustering_row_states, _generate_delta_values);
visit_row_cells(v);
if (_enable_updating_state) {
// #7716: if there are no regular columns, our visitor would not have visited any cells,
// hence it would not have created a row_state for this row. In effect, postimage wouldn't be produced.
// Ensure that the row state exists.
_clustering_row_states.try_emplace(ckey);
}
_builder.set_operation(log_ck, v._cdc_op);
_builder.set_ttl(log_ck, v._ttl_column);
}
@@ -1519,6 +1544,11 @@ public:
cdc::inspect_mutation(m, v);
}
void end_record() override {
assert(_builder);
_builder->end_record();
}
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {

View File

@@ -684,6 +684,8 @@ void process_changes_with_splitting(const mutation& base_mutation, change_proces
processor.produce_postimage(&ck);
}
}
processor.end_record();
}
}
@@ -731,6 +733,8 @@ void process_changes_without_splitting(const mutation& base_mutation, change_pro
processor.produce_postimage(&cr.key());
}
}
processor.end_record();
}
} // namespace cdc

View File

@@ -77,6 +77,10 @@ public:
// both columns have different timestamp or TTL set.
// m - the small mutation to be converted into CDC log rows.
virtual void process_change(const mutation& m) = 0;
// Tells processor we have reached end of record - last part
// of a given timestamp batch
virtual void end_record() = 0;
};
bool should_split(const mutation& base_mutation);

View File

@@ -31,6 +31,7 @@
#include <seastar/core/print.hh>
#include <seastar/util/log.hh>
#include "cdc/cdc_extension.hh"
#include "config.hh"
#include "extensions.hh"
#include "log.hh"
@@ -694,7 +695,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, false, "Set true to use enable repair based node operations instead of streaming based")
, ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.")
, shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.")
, fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.")
@@ -792,6 +793,10 @@ db::config::config()
db::config::~config()
{}
void db::config::add_cdc_extension() {
_extensions->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
}
void db::config::setup_directories() {
maybe_in_workdir(commitlog_directory, "commitlog");
maybe_in_workdir(data_file_directories, "data");
@@ -874,7 +879,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
}
bool db::config::check_experimental(experimental_features_t::feature f) const {
if (experimental() && f != experimental_features_t::UNUSED) {
if (experimental() && f != experimental_features_t::UNUSED && f != experimental_features_t::UNUSED_CDC) {
return true;
}
const auto& optval = experimental_features();
@@ -928,11 +933,13 @@ std::unordered_map<sstring, db::experimental_features_t::feature> db::experiment
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
// Lightweight transactions are no longer experimental. Map them
// to UNUSED switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
// Change Data Capture is no longer experimental. Map it
// to UNUSED_CDC switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", UNUSED_CDC}, {"alternator-streams", ALTERNATOR_STREAMS}};
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
return {UDF, CDC};
return {UDF, ALTERNATOR_STREAMS};
}
template struct utils::config_file::named_value<seastar::log_level>;

View File

@@ -81,7 +81,7 @@ namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { UNUSED, UDF, CDC };
enum feature { UNUSED, UDF, UNUSED_CDC, ALTERNATOR_STREAMS };
static std::unordered_map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();
};
@@ -92,6 +92,9 @@ public:
config(std::shared_ptr<db::extensions>);
~config();
// For testing only
void add_cdc_extension();
/// True iff the feature is enabled.
bool check_experimental(experimental_features_t::feature f) const;

View File

@@ -111,27 +111,12 @@ public:
return make_ready_future<>();
}
future<> maybe_delete_large_data_entries(const schema& s, sstring filename, uint64_t data_size) {
future<> maybe_delete_large_data_entries(const schema& /*s*/, sstring /*filename*/, uint64_t /*data_size*/) {
assert(running());
future<> large_partitions = make_ready_future<>();
if (__builtin_expect(data_size > _partition_threshold_bytes, false)) {
large_partitions = with_sem([&s, filename, this] () mutable {
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_PARTITIONS);
});
}
future<> large_rows = make_ready_future<>();
if (__builtin_expect(data_size > _row_threshold_bytes, false)) {
large_rows = with_sem([&s, filename, this] () mutable {
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_ROWS);
});
}
future<> large_cells = make_ready_future<>();
if (__builtin_expect(data_size > _cell_threshold_bytes, false)) {
large_cells = with_sem([&s, filename, this] () mutable {
return delete_large_data_entries(s, std::move(filename), db::system_keyspace::LARGE_CELLS);
});
}
return when_all(std::move(large_partitions), std::move(large_rows), std::move(large_cells)).discard_result();
// Deletion of large data entries is disabled due to #7668
// They will evetually expire based on the 30 days TTL.
return make_ready_future<>();
}
const large_data_handler::stats& stats() const { return _stats; }

View File

@@ -58,6 +58,7 @@
#include "schema_registry.hh"
#include "mutation_query.hh"
#include "system_keyspace.hh"
#include "system_distributed_keyspace.hh"
#include "cql3/cql3_type.hh"
#include "cql3/functions/functions.hh"
#include "cql3/util.hh"
@@ -104,6 +105,11 @@ using namespace std::chrono_literals;
static logging::logger diff_logger("schema_diff");
static bool is_extra_durable(const sstring& ks_name, const sstring& cf_name) {
return (is_system_keyspace(ks_name) && db::system_keyspace::is_extra_durable(cf_name))
|| (ks_name == db::system_distributed_keyspace::NAME && db::system_distributed_keyspace::is_extra_durable(cf_name));
}
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db {
@@ -2499,7 +2505,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
builder.with_sharder(smp::count, ctxt.murmur3_partitioner_ignore_msb_bits());
}
if (is_system_keyspace(ks_name) && is_extra_durable(cf_name)) {
if (is_extra_durable(ks_name, cf_name)) {
builder.set_wait_for_sync_to_commitlog(true);
}
@@ -3035,10 +3041,6 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage
// format, where "token" is not marked as computed. Once we're sure that all indexes have their
// columns marked as computed (because they were either created on a node that supports computed
// columns or were fixed by this utility function), it's safe to remove this function altogether.
if (!db.features().cluster_supports_computed_columns()) {
return make_ready_future<>();
}
if (v->clustering_key_size() == 0) {
return make_ready_future<>();
}

View File

@@ -201,10 +201,10 @@ static future<std::vector<token_range>> get_local_ranges(database& db) {
// All queries will be on that table, where all entries are text and there's no notion of
// token ranges form the CQL point of view.
auto left_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.start() || r.start()->value() == dht::minimum_token();
return r.end() && (!r.start() || r.start()->value() == dht::minimum_token());
});
auto right_inf = boost::find_if(ranges, [] (auto&& r) {
return !r.end() || r.start()->value() == dht::maximum_token();
return r.start() && (!r.end() || r.end()->value() == dht::maximum_token());
});
if (left_inf != right_inf && left_inf != ranges.end() && right_inf != ranges.end()) {
local_ranges.push_back(token_range{to_bytes(right_inf->start()), to_bytes(left_inf->end())});

View File

@@ -113,6 +113,10 @@ static std::vector<schema_ptr> all_tables() {
};
}
bool system_distributed_keyspace::is_extra_durable(const sstring& cf_name) {
return cf_name == CDC_TOPOLOGY_DESCRIPTION;
}
system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& qp, service::migration_manager& mm)
: _qp(qp)
, _mm(mm) {

View File

@@ -64,6 +64,10 @@ private:
service::migration_manager& _mm;
public:
/* Should writes to the given table always be synchronized by commitlog (flushed to disk)
* before being acknowledged? */
static bool is_extra_durable(const sstring& cf_name);
system_distributed_keyspace(cql3::query_processor&, service::migration_manager&);
future<> start();

View File

@@ -1241,6 +1241,14 @@ future<> mutate_MV(
}
}
}
// It's still possible that a target endpoint is dupliated in the remote endpoints list,
// so let's get rid of the duplicate if it exists
if (target_endpoint) {
auto remote_it = std::find(remote_endpoints.begin(), remote_endpoints.end(), *target_endpoint);
if (remote_it != remote_endpoints.end()) {
remote_endpoints.erase(remote_it);
}
}
if (target_endpoint && *target_endpoint == my_address) {
++stats.view_updates_pushed_local;

View File

@@ -36,7 +36,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Configure RAID volume for Scylla.')
parser.add_argument('--disks', required=True,
help='specify disks for RAID')
parser.add_argument('--raiddev', default='/dev/md0',
parser.add_argument('--raiddev',
help='MD device name for RAID')
parser.add_argument('--enable-on-nextboot', '--update-fstab', action='store_true', default=False,
help='mount RAID on next boot')
@@ -73,9 +73,25 @@ if __name__ == '__main__':
print('{} is busy'.format(disk))
sys.exit(1)
if os.path.exists(args.raiddev):
print('{} is already using'.format(args.raiddev))
sys.exit(1)
if len(disks) == 1 and not args.force_raid:
raid = False
fsdev = disks[0]
else:
raid = True
if args.raiddev is None:
raiddevs_to_try = [f'/dev/md{i}' for i in range(10)]
else:
raiddevs_to_try = [args.raiddev, ]
for fsdev in raiddevs_to_try:
raiddevname = os.path.basename(fsdev)
if not os.path.exists(f'/sys/block/{raiddevname}/md/array_state'):
break
print(f'{fsdev} is already using')
else:
if args.raiddev is None:
print("Can't find unused /dev/mdX")
sys.exit(1)
print(f'{fsdev} will be used to setup a RAID')
if os.path.ismount(mount_at):
print('{} is already mounted'.format(mount_at))
@@ -94,13 +110,6 @@ if __name__ == '__main__':
except SystemdException:
md_service = systemd_unit('mdadm.service')
if len(disks) == 1 and not args.force_raid:
raid = False
fsdev = disks[0]
else:
raid = True
fsdev = args.raiddev
print('Creating {type} for scylla using {nr_disk} disk(s): {disks}'.format(type='RAID0' if raid else 'XFS volume', nr_disk=len(disks), disks=args.disks))
if distro.name() == 'Ubuntu' and distro.version() == '14.04':
if raid:

View File

@@ -92,7 +92,7 @@ def scyllabindir():
# @param headers dict of k:v
def curl(url, headers=None, byte=False, timeout=3, max_retries=5):
def curl(url, headers=None, byte=False, timeout=3, max_retries=5, retry_interval=5):
retries = 0
while True:
try:
@@ -102,9 +102,8 @@ def curl(url, headers=None, byte=False, timeout=3, max_retries=5):
return res.read()
else:
return res.read().decode('utf-8')
except urllib.error.HTTPError:
logging.warning("Failed to grab %s..." % url)
time.sleep(5)
except urllib.error.URLError:
time.sleep(retry_interval)
retries += 1
if retries >= max_retries:
raise
@@ -188,7 +187,7 @@ class gcp_instance:
"""get list of nvme disks from metadata server"""
import json
try:
disksREST=self.__instance_metadata("disks")
disksREST=self.__instance_metadata("disks", True)
disksobj=json.loads(disksREST)
nvmedisks=list(filter(self.isNVME, disksobj))
except Exception as e:
@@ -236,7 +235,8 @@ class gcp_instance:
def instance_size(self):
"""Returns the size of the instance we are running in. i.e.: 2"""
return self.instancetype.split("-")[2]
instancetypesplit = self.instancetype.split("-")
return instancetypesplit[2] if len(instancetypesplit)>2 else 0
def instance_class(self):
"""Returns the class of the instance we are running in. i.e.: n2"""
@@ -298,22 +298,30 @@ class gcp_instance:
return self.__firstNvmeSize
def is_recommended_instance(self):
if self.is_recommended_instance_size() and not self.is_unsupported_instance_class() and self.is_supported_instance_class():
if not self.is_unsupported_instance_class() and self.is_supported_instance_class() and self.is_recommended_instance_size():
# at least 1:2GB cpu:ram ratio , GCP is at 1:4, so this should be fine
if self.cpu/self.memoryGB < 0.5:
# 30:1 Disk/RAM ratio must be kept at least(AWS), we relax this a little bit
# on GCP we are OK with 50:1 , n1-standard-2 can cope with 1 disk, not more
diskCount = self.nvmeDiskCount
# to reach max performance for > 16 disks we mandate 32 or more vcpus
# https://cloud.google.com/compute/docs/disks/local-ssd#performance
if diskCount >= 16 and self.cpu < 32:
return False
diskSize= self.firstNvmeSize
if diskCount < 1:
return False
disktoramratio = (diskCount*diskSize)/self.memoryGB
if (disktoramratio <= 50) and (disktoramratio > 0):
return True
diskCount = self.nvmeDiskCount
# to reach max performance for > 16 disks we mandate 32 or more vcpus
# https://cloud.google.com/compute/docs/disks/local-ssd#performance
if diskCount >= 16 and self.cpu < 32:
logging.warning(
"This machine doesn't have enough CPUs for allocated number of NVMEs (at least 32 cpus for >=16 disks). Performance will suffer.")
return False
diskSize = self.firstNvmeSize
if diskCount < 1:
return False
max_disktoramratio = 105
# 30:1 Disk/RAM ratio must be kept at least(AWS), we relax this a little bit
# on GCP we are OK with {max_disktoramratio}:1 , n1-standard-2 can cope with 1 disk, not more
disktoramratio = (diskCount * diskSize) / self.memoryGB
if (disktoramratio > max_disktoramratio):
logging.warning(
f"Instance disk-to-RAM ratio is {disktoramratio}, which is higher than the recommended ratio {max_disktoramratio}. Performance may suffer.")
return False
return True
else:
logging.warning("At least 2G of RAM per CPU is needed. Performance will suffer.")
return False
def private_ipv4(self):
@@ -398,7 +406,7 @@ class aws_instance:
def is_aws_instance(cls):
"""Check if it's AWS instance via query to metadata server."""
try:
curl(cls.META_DATA_BASE_URL, max_retries=2)
curl(cls.META_DATA_BASE_URL, max_retries=2, retry_interval=1)
return True
except (urllib.error.URLError, urllib.error.HTTPError):
return False
@@ -462,7 +470,7 @@ class aws_instance:
def ebs_disks(self):
"""Returns all EBS disks"""
return set(self._disks["ephemeral"])
return set(self._disks["ebs"])
def public_ipv4(self):
"""Returns the public IPv4 address of this instance"""
@@ -490,9 +498,7 @@ class aws_instance:
return curl(self.META_DATA_BASE_URL + "user-data")
# When a CLI tool is not installed, use relocatable CLI tool provided by Scylla
scylla_env = os.environ.copy()
scylla_env['PATH'] = '{}:{}'.format(scyllabindir(), scylla_env['PATH'])
scylla_env['DEBIAN_FRONTEND'] = 'noninteractive'
def run(cmd, shell=False, silent=False, exception=True):

View File

@@ -0,0 +1,4 @@
# allocate enough inotify instances for large machines
# each tls instance needs 1 inotify instance, and there can be
# multiple tls instances per shard.
fs.inotify.max_user_instances = 1200

View File

@@ -11,6 +11,7 @@ else
sysctl -p/usr/lib/sysctl.d/99-scylla-sched.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-aio.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-vm.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-inotify.conf || :
fi
#DEBHELPER#

View File

@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG VERSION=666.development
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/scylla-4.3/latest/scylla.repo
ARG VERSION=4.3.rc0
ADD scylla_bashrc /scylla_bashrc

View File

@@ -129,10 +129,9 @@ rm -rf $RPM_BUILD_ROOT
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla-housekeeping
%ghost /etc/systemd/system/scylla-helper.slice.d/
%ghost /etc/systemd/system/scylla-helper.slice.d/memory.conf
%ghost /etc/systemd/system/scylla-server.service.d/
%ghost /etc/systemd/system/scylla-server.service.d/capabilities.conf
%ghost /etc/systemd/system/scylla-server.service.d/mounts.conf
%ghost /etc/systemd/system/scylla-server.service.d/dependencies.conf
/etc/systemd/system/scylla-server.service.d/dependencies.conf
%ghost /etc/systemd/system/var-lib-systemd-coredump.mount
%ghost /etc/systemd/system/scylla-cpupower.service
%ghost /etc/systemd/system/var-lib-scylla.mount
@@ -190,6 +189,8 @@ Summary: Scylla configuration package for the Linux kernel
License: AGPLv3
URL: http://www.scylladb.com/
Requires: kmod
# tuned overwrites our sysctl settings
Obsoletes: tuned
%description kernel-conf
This package contains Linux kernel configuration changes for the Scylla database. Install this package
@@ -201,6 +202,7 @@ if Scylla is the main application on your server and you wish to optimize its la
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-aio.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-vm.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-inotify.conf >/dev/null 2>&1 || :
%files kernel-conf
%defattr(-,root,root)

View File

@@ -143,6 +143,7 @@ extern const std::string_view LWT;
extern const std::string_view PER_TABLE_PARTITIONERS;
extern const std::string_view PER_TABLE_CACHING;
extern const std::string_view DIGEST_FOR_NULL_VALUES;
extern const std::string_view ALTERNATOR_STREAMS;
}

View File

@@ -62,6 +62,7 @@ constexpr std::string_view features::LWT = "LWT";
constexpr std::string_view features::PER_TABLE_PARTITIONERS = "PER_TABLE_PARTITIONERS";
constexpr std::string_view features::PER_TABLE_CACHING = "PER_TABLE_CACHING";
constexpr std::string_view features::DIGEST_FOR_NULL_VALUES = "DIGEST_FOR_NULL_VALUES";
constexpr std::string_view features::ALTERNATOR_STREAMS = "ALTERNATOR_STREAMS";
static logging::logger logger("features");
@@ -86,6 +87,7 @@ feature_service::feature_service(feature_config cfg) : _config(cfg)
, _per_table_partitioners_feature(*this, features::PER_TABLE_PARTITIONERS)
, _per_table_caching_feature(*this, features::PER_TABLE_CACHING)
, _digest_for_null_values_feature(*this, features::DIGEST_FOR_NULL_VALUES)
, _alternator_streams_feature(*this, features::ALTERNATOR_STREAMS)
{}
feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring> disabled) {
@@ -116,8 +118,8 @@ feature_config feature_config_from_db_config(db::config& cfg, std::set<sstring>
}
}
if (!cfg.check_experimental(db::experimental_features_t::CDC)) {
fcfg._disabled_features.insert(sstring(gms::features::CDC));
if (!cfg.check_experimental(db::experimental_features_t::ALTERNATOR_STREAMS)) {
fcfg._disabled_features.insert(sstring(gms::features::ALTERNATOR_STREAMS));
}
return fcfg;
@@ -187,6 +189,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
gms::features::UDF,
gms::features::CDC,
gms::features::DIGEST_FOR_NULL_VALUES,
gms::features::ALTERNATOR_STREAMS,
};
for (const sstring& s : _config._disabled_features) {
@@ -266,6 +269,7 @@ void feature_service::enable(const std::set<std::string_view>& list) {
std::ref(_per_table_partitioners_feature),
std::ref(_per_table_caching_feature),
std::ref(_digest_for_null_values_feature),
std::ref(_alternator_streams_feature),
})
{
if (list.contains(f.name())) {

View File

@@ -92,6 +92,7 @@ private:
gms::feature _per_table_partitioners_feature;
gms::feature _per_table_caching_feature;
gms::feature _digest_for_null_values_feature;
gms::feature _alternator_streams_feature;
public:
bool cluster_supports_user_defined_functions() const {
@@ -160,6 +161,10 @@ public:
bool cluster_supports_lwt() const {
return bool(_lwt_feature);
}
bool cluster_supports_alternator_streams() const {
return bool(_alternator_streams_feature);
}
};
} // namespace gms

View File

@@ -142,7 +142,7 @@ DEBIAN_SSL_CERT_FILE="/etc/ssl/certs/ca-certificates.crt"
if [ -f "\${DEBIAN_SSL_CERT_FILE}" ]; then
c=\${DEBIAN_SSL_CERT_FILE}
fi
PYTHONPATH="\${d}:\${d}/libexec:\$PYTHONPATH" PATH="\${d}/$pythonpath:\${PATH}" SSL_CERT_FILE="\${c}" exec -a "\$0" "\${d}/libexec/\${b}" "\$@"
PYTHONPATH="\${d}:\${d}/libexec:\$PYTHONPATH" PATH="\${d}/../bin:\${d}/$pythonpath:\${PATH}" SSL_CERT_FILE="\${c}" exec -a "\$0" "\${d}/libexec/\${b}" "\$@"
EOF
chmod +x "$install"
}
@@ -412,6 +412,10 @@ elif ! $packaging; then
chown -R scylla:scylla $rdata
chown -R scylla:scylla $rhkdata
for file in dist/common/sysctl.d/*.conf; do
bn=$(basename "$file")
sysctl -p "$rusr"/lib/sysctl.d/"$bn"
done
$rprefix/scripts/scylla_post_install.sh
echo "Scylla offline install completed."
fi

View File

@@ -1023,8 +1023,7 @@ int main(int ac, char** av) {
proxy.invoke_on_all([] (service::storage_proxy& local_proxy) {
auto& ss = service::get_local_storage_service();
ss.register_subscriber(&local_proxy);
//FIXME: discarded future
(void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
return local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
}).get();
supervisor::notify("starting messaging service");

View File

@@ -2044,11 +2044,13 @@ public:
}
}
void abort(std::exception_ptr ep) {
_end_of_stream = true;
_ex = std::move(ep);
if (_full) {
_full->set_exception(_ex);
_full.reset();
} else if (_not_full) {
_not_full->set_exception(_ex);
_not_full.reset();
}
}
};

View File

@@ -36,8 +36,14 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
});
}).finally([&wr] {
return wr.consume_end_of_stream();
}).then_wrapped([&wr] (future<> f) {
if (f.failed()) {
auto ex = f.get_exception();
wr.abort(ex);
return make_exception_future<>(ex);
} else {
return wr.consume_end_of_stream();
}
});
});
}

View File

@@ -57,6 +57,9 @@ class shard_based_splitting_mutation_writer {
}
return std::move(_consume_fut);
}
void abort(std::exception_ptr ep) {
_handle.abort(ep);
}
};
private:
@@ -110,6 +113,13 @@ public:
return shard->consume_end_of_stream();
});
}
void abort(std::exception_ptr ep) {
for (auto&& shard : _shards) {
if (shard) {
shard->abort(ep);
}
}
}
};
future<> segregate_by_shard(flat_mutation_reader producer, reader_consumer consumer) {

View File

@@ -144,6 +144,9 @@ class timestamp_based_splitting_mutation_writer {
}
return std::move(_consume_fut);
}
void abort(std::exception_ptr ep) {
_handle.abort(ep);
}
};
private:
@@ -188,6 +191,11 @@ public:
return bucket.second.consume_end_of_stream();
});
}
void abort(std::exception_ptr ep) {
for (auto&& b : _buckets) {
b.second.abort(ep);
}
}
};
future<> timestamp_based_splitting_mutation_writer::write_to_bucket(bucket_id bucket, mutation_fragment&& mf) {

View File

@@ -542,12 +542,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
return partition_snapshot_ptr(std::move(snp));
}
std::vector<range_tombstone>
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end)
{
partition_version* v = &*version();
if (!v->next()) {
return boost::copy_range<std::vector<range_tombstone>>(
return boost::copy_range<range_tombstone_result>(
v->partition().row_tombstones().slice(*_schema, start, end));
}
range_tombstone_list list(*_schema);
@@ -557,10 +557,10 @@ partition_snapshot::range_tombstones(position_in_partition_view start, position_
}
v = v->next();
}
return boost::copy_range<std::vector<range_tombstone>>(list.slice(*_schema, start, end));
return boost::copy_range<range_tombstone_result>(list.slice(*_schema, start, end));
}
std::vector<range_tombstone>
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones()
{
return range_tombstones(

View File

@@ -26,6 +26,7 @@
#include "utils/anchorless_list.hh"
#include "utils/logalloc.hh"
#include "utils/coroutine.hh"
#include "utils/chunked_vector.hh"
#include <boost/intrusive/parent_from_member.hpp>
#include <boost/intrusive/slist.hpp>
@@ -400,10 +401,13 @@ public:
::static_row static_row(bool digest_requested) const;
bool static_row_continuous() const;
mutation_partition squashed() const;
using range_tombstone_result = utils::chunked_vector<range_tombstone>;
// Returns range tombstones overlapping with [start, end)
std::vector<range_tombstone> range_tombstones(position_in_partition_view start, position_in_partition_view end);
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
// Returns all range tombstones
std::vector<range_tombstone> range_tombstones();
range_tombstone_result range_tombstones();
};
class partition_snapshot_ptr {

View File

@@ -509,7 +509,7 @@ public:
}
};
class repair_writer {
class repair_writer : public enable_lw_shared_from_this<repair_writer> {
schema_ptr _schema;
reader_permit _permit;
uint64_t _estimated_partitions;
@@ -569,8 +569,9 @@ public:
table& t = db.local().find_column_family(_schema->id());
auto [queue_reader, queue_handle] = make_queue_reader(_schema, _permit);
_mq[node_idx] = std::move(queue_handle);
auto writer = shared_from_this();
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(queue_reader),
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions, writer] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
@@ -598,13 +599,13 @@ public:
return consumer(std::move(reader));
});
},
t.stream_in_progress()).then([this, node_idx] (uint64_t partitions) {
t.stream_in_progress()).then([node_idx, writer] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
_schema->ks_name(), _schema->cf_name(), partitions);
}).handle_exception([this, node_idx] (std::exception_ptr ep) {
writer->_schema->ks_name(), writer->_schema->cf_name(), partitions);
}).handle_exception([node_idx, writer] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
_mq[node_idx]->abort(ep);
writer->_schema->ks_name(), writer->_schema->cf_name(), ep);
writer->_mq[node_idx]->abort(ep);
return make_exception_future<>(std::move(ep));
});
}
@@ -718,7 +719,7 @@ private:
size_t _nr_peer_nodes= 1;
repair_stats _stats;
repair_reader _repair_reader;
repair_writer _repair_writer;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
std::list<repair_row> _row_buf;
// Contains rows we are working on to sync between peers
@@ -822,7 +823,7 @@ public:
_seed,
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
)
, _repair_writer(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason)
, _repair_writer(make_lw_shared<repair_writer>(_schema, _permit, _estimated_partitions, _nr_peer_nodes, _reason))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&ms] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
return ms.local().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
@@ -855,7 +856,7 @@ public:
auto f2 = _sink_source_for_get_row_diff.close();
auto f3 = _sink_source_for_put_row_diff.close();
return when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
return _repair_writer.wait_for_writer_done();
return _repair_writer->wait_for_writer_done();
});
}
@@ -1340,8 +1341,8 @@ private:
future<> do_apply_rows(std::list<repair_row>&& row_diff, unsigned node_idx, update_working_row_buf update_buf) {
return do_with(std::move(row_diff), [this, node_idx, update_buf] (std::list<repair_row>& row_diff) {
return with_semaphore(_repair_writer.sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer.create_writer(_db, node_idx);
return with_semaphore(_repair_writer->sem(), 1, [this, node_idx, update_buf, &row_diff] {
_repair_writer->create_writer(_db, node_idx);
return repeat([this, node_idx, update_buf, &row_diff] () mutable {
if (row_diff.empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -1355,7 +1356,7 @@ private:
// to_repair_rows_list above where the repair_row is created.
mutation_fragment mf = std::move(r.get_mutation_fragment());
auto dk_with_hash = r.get_dk_with_hash();
return _repair_writer.do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
return _repair_writer->do_write(node_idx, std::move(dk_with_hash), std::move(mf)).then([&row_diff] {
row_diff.pop_front();
return make_ready_future<stop_iteration>(stop_iteration::no);
});

View File

@@ -298,7 +298,7 @@ void storage_service::prepare_to_join(
_token_metadata.update_normal_tokens(my_tokens, get_broadcast_address());
_cdc_streams_ts = db::system_keyspace::get_saved_cdc_streams_timestamp().get0();
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
if (!_cdc_streams_ts) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
// In that case we won't begin a CDC generation: it should be done by one of the nodes
@@ -550,7 +550,7 @@ void storage_service::join_token_ring(int delay) {
assert(should_bootstrap() || db().local().is_replacing() || !_cdc_streams_ts);
}
if (!_cdc_streams_ts && db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
if (!_cdc_streams_ts) {
// If we didn't choose a CDC streams timestamp at this point, then either
// 1. we're replacing a node which didn't gossip a CDC streams timestamp for whatever reason,
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
@@ -570,10 +570,15 @@ void storage_service::join_token_ring(int delay) {
if (!db().local().is_replacing()
&& (!db::system_keyspace::bootstrap_complete()
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
try {
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
} catch (...) {
cdc_log.warn(
"Could not create a new CDC generation: {}. This may make it impossible to use CDC. Use nodetool checkAndRepairCdcStreams to fix CDC generation",
std::current_exception());
}
}
}
@@ -893,24 +898,18 @@ void storage_service::bootstrap() {
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
if (db().local().get_config().check_experimental(db::experimental_features_t::CDC)) {
// Update pending ranges now, so we correctly count ourselves as a pending replica
// when inserting the new CDC generation.
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
update_pending_ranges().get();
// Update pending ranges now, so we correctly count ourselves as a pending replica
// when inserting the new CDC generation.
_token_metadata.add_bootstrap_tokens(_bootstrap_tokens, get_broadcast_address());
update_pending_ranges().get();
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!_cdc_streams_ts);
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!_cdc_streams_ts);
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
} else {
// We should not be able to join the cluster if other nodes support CDC but we don't.
// The check should have been made somewhere in prepare_to_join (`check_knows_remote_features`).
assert(!_feature_service.cluster_supports_cdc());
}
_cdc_streams_ts = cdc::make_new_cdc_generation(db().local().get_config(),
_bootstrap_tokens, _token_metadata, _gossiper,
_sys_dist_ks.local(), get_ring_delay(), _for_testing);
_gossiper.add_local_application_state({
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
@@ -2036,9 +2035,8 @@ future<> storage_service::start_gossiping(bind_messaging_port do_bind) {
return seastar::async([&ss, do_bind] {
if (!ss._initialized) {
slogger.warn("Starting gossip by operator request");
bool cdc_enabled = ss.db().local().get_config().check_experimental(db::experimental_features_t::CDC);
ss.set_gossip_tokens(db::system_keyspace::get_local_tokens().get0(),
cdc_enabled ? std::make_optional(cdc::get_local_streams_timestamp().get0()) : std::nullopt);
std::make_optional(cdc::get_local_streams_timestamp().get0()));
ss._gossiper.force_newer_generation();
ss._gossiper.start_gossiping(utils::get_generation_number(), gms::bind_messaging_port(bool(do_bind))).then([&ss] {
ss._initialized = true;

View File

@@ -212,16 +212,18 @@ public:
};
struct compaction_writer {
shared_sstable sst;
// We use a ptr for pointer stability and so that it can be null
// when using a noop monitor.
sstable_writer writer;
// The order in here is important. A monitor must be destroyed before the writer it is monitoring since it has a
// periodic timer that checks the writer.
// The writer must be destroyed before the shared_sstable since the it may depend on the sstable
// (as in the mx::writer over compressed_file_data_sink_impl case that depends on sstables::compression).
std::unique_ptr<compaction_write_monitor> monitor;
shared_sstable sst;
compaction_writer(std::unique_ptr<compaction_write_monitor> monitor, sstable_writer writer, shared_sstable sst)
: writer(std::move(writer)), monitor(std::move(monitor)), sst(std::move(sst)) {}
: sst(std::move(sst)), writer(std::move(writer)), monitor(std::move(monitor)) {}
compaction_writer(sstable_writer writer, shared_sstable sst)
: compaction_writer(nullptr, std::move(writer), std::move(sst)) {}
};
@@ -609,10 +611,12 @@ private:
std::move(gc_consumer));
return seastar::async([cfc = std::move(cfc), reader = std::move(reader), this] () mutable {
reader.consume_in_thread(std::move(cfc), make_partition_filter(), db::no_timeout);
reader.consume_in_thread(std::move(cfc), db::no_timeout);
});
});
return consumer(make_sstable_reader());
// producer will filter out a partition before it reaches the consumer(s)
auto producer = make_filtering_reader(make_sstable_reader(), make_partition_filter());
return consumer(std::move(producer));
}
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) {

View File

@@ -378,6 +378,7 @@ private:
_fwd_end = _fwd ? position_in_partition::before_all_clustered_rows() : position_in_partition::after_all_clustered_rows();
_out_of_range = false;
_range_tombstones.reset();
_ready = {};
_first_row_encountered = false;
}
public:

View File

@@ -86,7 +86,7 @@ ln -s "$SCYLLA" "$SCYLLA_LINK"
--alternator-write-isolation=always_use_lwt \
--alternator-streams-time-window-s=0 \
--developer-mode=1 \
--experimental-features=cdc \
--experimental-features=alternator-streams \
--ring-delay-ms 0 --collectd 0 \
--smp 2 -m 1G \
--overprovisioned --unsafe-bypass-fsync 1 \

View File

@@ -658,7 +658,6 @@ def test_filter_expression_and_sort_key_condition(test_table_sn_with_data):
# In particular, test that FilterExpression may inspect attributes which will
# not be returned by the query, because of the ProjectionExpression.
# This test reproduces issue #6951.
@pytest.mark.xfail(reason="issue #6951: cannot filter on non-returned attributes")
def test_filter_expression_and_projection_expression(test_table):
p = random_string()
test_table.put_item(Item={'p': p, 'c': 'hi', 'x': 'dog', 'y': 'cat'})

View File

@@ -386,3 +386,38 @@ def test_query_missing_key(test_table):
full_query(test_table, KeyConditions={})
with pytest.raises(ClientError, match='ValidationException'):
full_query(test_table)
# The paging tests above used a numeric sort key. Let's now also test paging
# with a bytes sort key. We already have above a test that bytes sort keys
# work and are sorted correctly (test_query_sort_order_bytes), but the
# following test adds a check that *paging* works correctly for such keys.
# We used to have a bug in this (issue #7768) - the returned LastEvaluatedKey
# was incorrectly formatted, breaking the boto3's parsing of the response.
# Note we only check the case of bytes *sort* keys in this test. For bytes
# *partition* keys, see test_scan_paging_bytes().
def test_query_paging_bytes(test_table_sb):
p = random_string()
items = [{'p': p, 'c': random_bytes()} for i in range(10)]
with test_table_sb.batch_writer() as batch:
for item in items:
batch.put_item(item)
# Deliberately pass Limit=1 to enforce paging even though we have
# just 10 items in the partition.
got_items = full_query(test_table_sb, Limit=1,
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}})
got_sort_keys = [x['c'] for x in got_items]
expected_sort_keys = sorted(x['c'] for x in items)
assert got_sort_keys == expected_sort_keys
# Similar for test for string clustering keys
def test_query_paging_string(test_table_ss):
p = random_string()
items = [{'p': p, 'c': random_string()} for i in range(10)]
with test_table_ss.batch_writer() as batch:
for item in items:
batch.put_item(item)
got_items = full_query(test_table_ss, Limit=1,
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'}})
got_sort_keys = [x['c'] for x in got_items]
expected_sort_keys = sorted(x['c'] for x in items)
assert got_sort_keys == expected_sort_keys

View File

@@ -539,7 +539,6 @@ def test_query_filter_paging(test_table_sn_with_data):
# In particular, test that QueryFilter may inspect attributes which will
# not be returned by the query, because the AttributesToGet.
# This test reproduces issue #6951.
@pytest.mark.xfail(reason="issue #6951: cannot filter on non-returned attributes")
def test_query_filter_and_attributes_to_get(test_table):
p = random_string()
test_table.put_item(Item={'p': p, 'c': 'hi', 'x': 'dog', 'y': 'cat'})

View File

@@ -19,7 +19,7 @@
import pytest
from botocore.exceptions import ClientError
from util import random_string, full_scan, full_scan_and_count, multiset
from util import random_string, random_bytes, full_scan, full_scan_and_count, multiset
from boto3.dynamodb.conditions import Attr
# Test that scanning works fine with/without pagination
@@ -264,3 +264,20 @@ def test_scan_parallel_incorrect(filled_test_table):
for segment in [7, 9]:
with pytest.raises(ClientError, match='ValidationException.*Segment'):
full_scan(test_table, TotalSegments=5, Segment=segment)
# We used to have a bug with formatting of LastEvaluatedKey in the response
# of Query and Scan with bytes keys (issue #7768). In test_query_paging_byte()
# (test_query.py) we tested the case of bytes *sort* keys. In the following
# test we check bytes *partition* keys.
def test_scan_paging_bytes(test_table_b):
# We will not Scan the entire table - we have no idea what it contains.
# But we don't need to scan the entire table - we just need the table
# to contain at least two items, and then Scan it with Limit=1 and stop
# after one page. Before #7768 was fixed, the test failed when the
# LastEvaluatedKey in the response could not be parsed.
items = [{'p': random_bytes()}, {'p': random_bytes()}]
with test_table_b.batch_writer() as batch:
for item in items:
batch.put_item(item)
response = test_table_b.scan(ConsistentRead=True, Limit=1)
assert 'LastEvaluatedKey' in response

View File

@@ -41,8 +41,10 @@ def test_fetch_from_system_tables(scylla_only, dynamodb):
key_columns = [item['column_name'] for item in col_response['Items'] if item['kind'] == 'clustering' or item['kind'] == 'partition_key']
qualified_name = "{}{}.{}".format(internal_prefix, ks_name, table_name)
response = client.scan(TableName=qualified_name, AttributesToGet=key_columns)
print(ks_name, table_name, response)
import time
start = time.time()
response = client.scan(TableName=qualified_name, AttributesToGet=key_columns, Limit=50)
print(ks_name, table_name, len(str(response)), time.time()-start)
def test_block_access_to_non_system_tables_with_virtual_interface(scylla_only, test_table_s, dynamodb):
client = dynamodb.meta.client

View File

@@ -42,16 +42,6 @@
using namespace std::string_literals;
static cql_test_config mk_cdc_test_config() {
auto ext = std::make_shared<db::extensions>();
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
auto cfg = ::make_shared<db::config>(std::move(ext));
auto features = cfg->experimental_features();
features.emplace_back(db::experimental_features_t::CDC);
cfg->experimental_features(features);
return cql_test_config(std::move(cfg));
};
namespace cdc {
api::timestamp_type find_timestamp(const mutation&);
utils::UUID generate_timeuuid(api::timestamp_type);
@@ -131,7 +121,7 @@ SEASTAR_THREAD_TEST_CASE(test_find_mutation_timestamp) {
check_stmt("DELETE vut.b FROM t WHERE pk = 0 AND ck = 0");
check_stmt("DELETE vfut FROM t WHERE pk = 0 AND ck = 0");
check_stmt("DELETE vstatic FROM t WHERE pk = 0");
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_generate_timeuuid) {
@@ -199,7 +189,7 @@ SEASTAR_THREAD_TEST_CASE(test_with_cdc_parameter) {
test("WITH cdc = {'enabled':'false'}", "{'enabled':'true'}", "{'enabled':'false'}", {false}, {true}, {false});
test("", "{'enabled':'true','preimage':'true','postimage':'true','ttl':'1'}", "{'enabled':'false'}", {false}, {true, true, true, 1}, {false});
test("WITH cdc = {'enabled':'true','preimage':'true','postimage':'true','ttl':'1'}", "{'enabled':'false'}", "{'enabled':'true','preimage':'false','postimage':'true','ttl':'2'}", {true, true, true, 1}, {false}, {true, false, true, 2});
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_detecting_conflict_of_cdc_log_table_with_existing_table) {
@@ -213,7 +203,7 @@ SEASTAR_THREAD_TEST_CASE(test_detecting_conflict_of_cdc_log_table_with_existing_
e.execute_cql("CREATE TABLE ks.tbl (a int PRIMARY KEY)").get();
e.require_table_exists("ks", "tbl").get();
BOOST_REQUIRE_THROW(e.execute_cql("ALTER TABLE ks.tbl WITH cdc = {'enabled': true}").get(), exceptions::invalid_request_exception);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_log_table) {
@@ -247,7 +237,7 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_log_table) {
// Disallow DROP
assert_unauthorized("DROP TABLE " + log_table);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_disallow_cdc_on_materialized_view) {
@@ -257,7 +247,7 @@ SEASTAR_THREAD_TEST_CASE(test_disallow_cdc_on_materialized_view) {
BOOST_REQUIRE_THROW(e.execute_cql("CREATE MATERIALIZED VIEW ks.mv AS SELECT a FROM ks.tbl PRIMARY KEY (a) WITH cdc = {'enabled': true}").get(), exceptions::invalid_request_exception);
e.require_table_does_not_exist("ks", "mv").get();
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
@@ -285,7 +275,7 @@ SEASTAR_THREAD_TEST_CASE(test_permissions_of_cdc_description) {
test_table("cdc_streams_descriptions");
test_table("cdc_generation_descriptions");
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
@@ -326,6 +316,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
// cdc log clustering key
assert_has_column(cdc::log_meta_column_name("operation"), byte_type);
assert_has_column(cdc::log_meta_column_name("ttl"), long_type);
assert_has_column(cdc::log_meta_column_name("end_of_batch"), boolean_type);
// pk
assert_has_column(cdc::log_data_column_name("pk"), int32_type);
@@ -370,7 +361,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_log_schema) {
// Check if we missed something
BOOST_REQUIRE_EQUAL(required_column_count, log_schema->all_columns_count());
}, mk_cdc_test_config()).get();
}).get();
}
static std::vector<std::vector<bytes_opt>> to_bytes(const cql_transport::messages::result_message::rows& rows) {
@@ -512,7 +503,7 @@ SEASTAR_THREAD_TEST_CASE(test_primary_key_logging) {
// DELETE FROM ks.tbl WHERE pk = 1 AND pk2 = 11
assert_row(1, 11);
BOOST_REQUIRE(actual_i == actual_end);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
@@ -534,6 +525,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
auto val_index = column_index(*rows, cdc::log_data_column_name("val"));
auto val2_index = column_index(*rows, cdc::log_data_column_name("val2"));
auto ttl_index = column_index(*rows, cdc::log_meta_column_name("ttl"));
auto eor_index = column_index(*rows, cdc::log_meta_column_name("end_of_batch"));
auto val_type = int32_type;
auto val = *first[0][val_index];
@@ -567,7 +559,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
BOOST_REQUIRE_EQUAL(pre_image.size(), i + 1);
val = *pre_image.back()[val_index];
// note: no val2 in pre-image, because we are not modifying it.
// note: no val2 in pre-image, because we are not modifying it.
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *pre_image.back()[ck2_index]);
BOOST_REQUIRE_EQUAL(data_value(last), val_type->deserialize(bytes_view(val)));
BOOST_REQUIRE_EQUAL(bytes_opt(), pre_image.back()[ttl_index]);
@@ -583,10 +575,12 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
if (post_enabled) {
val = *post_image.back()[val_index];
val2 = *post_image.back()[val2_index];
auto eor = *post_image.back()[eor_index];
BOOST_REQUIRE_EQUAL(int32_type->decompose(1111), *post_image.back()[ck2_index]);
BOOST_REQUIRE_EQUAL(data_value(nv), val_type->deserialize(bytes_view(val)));
BOOST_REQUIRE_EQUAL(data_value(22222), val_type->deserialize(bytes_view(val2)));
BOOST_REQUIRE_EQUAL(data_value(true), boolean_type->deserialize(bytes_view(eor)));
}
const auto& ttl_cell = second[second.size() - 2][ttl_index];
@@ -608,7 +602,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging) {
}
}
}
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging_static_row) {
@@ -682,7 +676,7 @@ SEASTAR_THREAD_TEST_CASE(test_pre_post_image_logging_static_row) {
test(true, false);
test(false, true);
test(false, false);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
@@ -691,7 +685,7 @@ SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
cquery_nofail(e, "DELETE FROM ks.tbl WHERE pk = 123 AND ck > 1 AND ck < 23");
cquery_nofail(e, "DELETE FROM ks.tbl WHERE pk = 123 AND ck >= 4 AND ck <= 56");
auto msg = e.execute_cql(format("SELECT \"{}\", \"{}\", \"{}\", \"{}\" FROM ks.{}",
auto msg = e.execute_cql(format("SELECT \"{}\", \"{}\", \"{}\", \"{}\" FROM ks.{}",
cdc::log_meta_column_name("time"),
cdc::log_data_column_name("pk"),
cdc::log_data_column_name("ck"),
@@ -726,7 +720,7 @@ SEASTAR_THREAD_TEST_CASE(test_range_deletion) {
// ck >= 4 AND ck <= 56
check_row(4, cdc::operation::range_delete_start_inclusive);
check_row(56, cdc::operation::range_delete_end_inclusive);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_add_columns) {
@@ -750,11 +744,11 @@ SEASTAR_THREAD_TEST_CASE(test_add_columns) {
auto kokos = *inserts.back()[kokos_index];
BOOST_REQUIRE_EQUAL(data_value("kaka"), kokos_type->deserialize(bytes_view(kokos)));
}, mk_cdc_test_config()).get();
}).get();
}
// #5582 - just quickly test that we can create the cdc enabled table on a different shard
// and still get the logs proper.
// #5582 - just quickly test that we can create the cdc enabled table on a different shard
// and still get the logs proper.
SEASTAR_THREAD_TEST_CASE(test_cdc_across_shards) {
do_with_cql_env_thread([](cql_test_env& e) {
if (smp::count < 2) {
@@ -772,7 +766,7 @@ SEASTAR_THREAD_TEST_CASE(test_cdc_across_shards) {
auto rows = select_log(e, "tbl");
BOOST_REQUIRE(!to_bytes_filtered(*rows, cdc::operation::insert).empty());
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_negative_ttl_fail) {
@@ -780,7 +774,7 @@ SEASTAR_THREAD_TEST_CASE(test_negative_ttl_fail) {
BOOST_REQUIRE_EXCEPTION(e.execute_cql("CREATE TABLE ks.fail (a int PRIMARY KEY, b int) WITH cdc = {'enabled':true,'ttl':'-1'}").get0(),
exceptions::configuration_exception,
exception_predicate::message_contains("ttl"));
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_ttls) {
@@ -830,11 +824,11 @@ SEASTAR_THREAD_TEST_CASE(test_ttls) {
auto cell_ttl_seconds = value_cast<int32_t>(cell_ttl);
// 30% tolerance in case of slow execution (a little flaky...)
BOOST_REQUIRE_CLOSE((float)cell_ttl_seconds, (float)ttl_seconds, 30.f);
}
}
};
test_ttl(0);
test_ttl(10);
}, mk_cdc_test_config()).get();
}).get();
}
// helper funcs + structs for collection testing
@@ -851,13 +845,13 @@ struct col_test {
data_value post = data_value::make_null(int32_type); // whatever
};
// iterate a set of updates and verify pre and delta values.
// iterate a set of updates and verify pre and delta values.
static void test_collection(cql_test_env& e, data_type val_type, data_type del_type, std::vector<col_test> tests, translate_func f = [](data_value v) { return v; }) {
auto col_type = val_type;
for (auto& t : tests) {
cquery_nofail(e, t.update);
auto rows = select_log(e, "tbl");
auto pre_image = to_bytes_filtered(*rows, cdc::operation::pre_image);
auto updates = to_bytes_filtered(*rows, cdc::operation::update);
@@ -918,7 +912,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
auto map_keys_type = set_type_impl::get_instance(utf8_type, false);
test_collection(e, map_type, map_keys_type, {
{
{
"UPDATE ks.tbl set val = { 'apa':'ko' } where pk=1 and pk2=11 and ck=111",
data_value::make_null(map_type), // no previous value
{
@@ -930,7 +924,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
},
::make_map_value(map_type, { { "apa", "ko" } })
},
{
{
"UPDATE ks.tbl set val = val + { 'ninja':'mission' } where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" } }),
{
@@ -941,9 +935,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
},
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } })
},
{
{
"UPDATE ks.tbl set val['ninja'] = 'shuriken' where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }),
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "mission" } }),
{
{
::make_map_value(map_type, { { "ninja", "shuriken" } }),
@@ -952,9 +946,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
},
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } })
},
{
{
"UPDATE ks.tbl set val['apa'] = null where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }),
::make_map_value(map_type, { { "apa", "ko" }, { "ninja", "shuriken" } }),
{
{
data_value::make_null(map_type),
@@ -963,9 +957,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
},
::make_map_value(map_type, { { "ninja", "shuriken" } })
},
{
{
"UPDATE ks.tbl set val['ninja'] = null, val['ola'] = 'kokos' where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "ninja", "shuriken" } }),
::make_map_value(map_type, { { "ninja", "shuriken" } }),
{
{
::make_map_value(map_type, { { "ola", "kokos" } }),
@@ -974,9 +968,9 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
},
::make_map_value(map_type, { { "ola", "kokos" } })
},
{
{
"UPDATE ks.tbl set val = { 'bolla':'trolla', 'kork':'skruv' } where pk=1 and pk2=11 and ck=111",
::make_map_value(map_type, { { "ola", "kokos" } }),
::make_map_value(map_type, { { "ola", "kokos" } }),
{
{
::make_map_value(map_type, { { "bolla", "trolla" }, { "kork", "skruv" } }),
@@ -988,7 +982,7 @@ SEASTAR_THREAD_TEST_CASE(test_map_logging) {
}
});
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_set_logging) {
@@ -999,7 +993,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
});
auto set_type = set_type_impl::get_instance(utf8_type, false);
test_collection(e, set_type, set_type, {
{
"UPDATE ks.tbl set val = { 'apa', 'ko' } where pk=1 and pk2=11 and ck=111",
@@ -1026,7 +1020,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
},
{
"UPDATE ks.tbl set val = val - { 'apa' } where pk=1 and pk2=11 and ck=111",
::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }),
::make_set_value(set_type, { "apa", "ko", "mission", "ninja" }),
{
{
data_value::make_null(set_type),
@@ -1037,7 +1031,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
},
{
"UPDATE ks.tbl set val = val - { 'mission' }, val = val + { 'nils' } where pk=1 and pk2=11 and ck=111",
::make_set_value(set_type, { "ko", "mission", "ninja" }),
::make_set_value(set_type, { "ko", "mission", "ninja" }),
{
{
::make_set_value(set_type, { "nils" }),
@@ -1059,7 +1053,7 @@ SEASTAR_THREAD_TEST_CASE(test_set_logging) {
::make_set_value(set_type, { "bolla", "trolla" })
}
});
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_list_logging) {
@@ -1072,11 +1066,11 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
auto list_type = list_type_impl::get_instance(utf8_type, false);
auto uuids_type = set_type_impl::get_instance(timeuuid_type, false);
auto val_type = map_type_impl::get_instance(list_type->name_comparator(), list_type->value_comparator(), false);
test_collection(e, val_type, uuids_type, {
{
"UPDATE ks.tbl set val = [ 'apa', 'ko' ] where pk=1 and pk2=11 and ck=111",
data_value::make_null(list_type),
data_value::make_null(list_type),
{
{
::make_list_value(list_type, { "apa", "ko" }),
@@ -1121,7 +1115,7 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
},
{
"UPDATE ks.tbl set val[0] = 'babar' where pk=1 and pk2=11 and ck=111",
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
::make_list_value(list_type, { "apa", "ko", "ninja", "mission" }),
{
{
::make_list_value(list_type, { "babar" }),
@@ -1151,7 +1145,7 @@ SEASTAR_THREAD_TEST_CASE(test_list_logging) {
}
return ::make_list_value(list_type, std::move(cpy));
});
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
@@ -1163,7 +1157,7 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
e.execute_cql("DROP TYPE ks.mytype").get();
});
auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"),
auto udt_type = user_type_impl::get_instance("ks", to_bytes("mytype"),
{ to_bytes("field0"), to_bytes("field1") },
{ int32_type, utf8_type },
false
@@ -1171,18 +1165,18 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
auto index_set_type = set_type_impl::get_instance(short_type, false);
auto f0_type = int32_type;
auto f1_type = utf8_type;
auto make_tuple = [&](std::optional<std::optional<int32_t>> i, std::optional<std::optional<sstring>> s) {
return ::make_user_value(udt_type, {
i ? ::data_value(*i) : data_value::make_null(f0_type),
s ? ::data_value(*s) : data_value::make_null(f1_type),
});
};
test_collection(e, udt_type, index_set_type, {
{
"UPDATE ks.tbl set val = { field0: 12, field1: 'ko' } where pk=1 and pk2=11 and ck=111",
data_value::make_null(udt_type),
data_value::make_null(udt_type),
{
{
make_tuple(12, "ko"),
@@ -1238,7 +1232,7 @@ SEASTAR_THREAD_TEST_CASE(test_udt_logging) {
make_tuple(1, "bolla")
},
});
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_frozen_logging) {
@@ -1289,7 +1283,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_logging) {
test_frozen("frozen<set<text>>", "{'a', 'bb', 'ccc'}");
test_frozen("frozen<map<text, text>>", "{'a': 'bb', 'ccc': 'dddd'}");
test_frozen("frozen<udt>", "{a: 'bb', ccc: 'dddd'}");
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_update_insert_delete_distinction) {
@@ -1321,7 +1315,32 @@ SEASTAR_THREAD_TEST_CASE(test_update_insert_delete_distinction) {
BOOST_REQUIRE_EQUAL(results[3].size(), 1);
BOOST_REQUIRE_EQUAL(*results[3].front(), data_value(static_cast<int8_t>(cdc::operation::row_delete)).serialize_nonnull()); // log entry from (3)
}, mk_cdc_test_config()).get();
}).get();
}
static std::vector<std::vector<data_value>> get_result(cql_test_env& e,
const std::vector<data_type>& col_types, const sstring& query) {
auto deser = [] (const data_type& t, const bytes_opt& b) -> data_value {
if (!b) {
return data_value::make_null(t);
}
return t->deserialize(*b);
};
auto msg = e.execute_cql(query).get0();
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
BOOST_REQUIRE(rows);
std::vector<std::vector<data_value>> res;
for (auto&& r: to_bytes(*rows)) {
BOOST_REQUIRE_LE(col_types.size(), r.size());
std::vector<data_value> res_r;
for (size_t i = 0; i < col_types.size(); ++i) {
res_r.push_back(deser(col_types[i], r[i]));
}
res.push_back(std::move(res_r));
}
return res;
}
SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
@@ -1346,28 +1365,8 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
return make_set_value(keys_type, std::move(s));
};
auto deser = [] (const data_type& t, const bytes_opt& b) -> data_value {
if (!b) {
return data_value::make_null(t);
}
return t->deserialize(*b);
};
auto get_result = [&] (const std::vector<data_type>& col_types, const sstring& s) -> std::vector<std::vector<data_value>> {
auto msg = e.execute_cql(s).get0();
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
BOOST_REQUIRE(rows);
std::vector<std::vector<data_value>> res;
for (auto&& r: to_bytes(*rows)) {
BOOST_REQUIRE_LE(col_types.size(), r.size());
std::vector<data_value> res_r;
for (size_t i = 0; i < col_types.size(); ++i) {
res_r.push_back(deser(col_types[i], r[i]));
}
res.push_back(std::move(res_r));
}
return res;
return ::get_result(e, col_types, s);
};
cquery_nofail(e, "create table ks.t (pk int, ck int, s int static, v1 int, v2 int, m map<int, int>, primary key (pk, ck)) with cdc = {'enabled':true}");
@@ -1566,7 +1565,7 @@ SEASTAR_THREAD_TEST_CASE(test_change_splitting) {
};
BOOST_REQUIRE_EQUAL(expected, result);
}
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_batch_with_row_delete) {
@@ -1630,7 +1629,7 @@ SEASTAR_THREAD_TEST_CASE(test_batch_with_row_delete) {
BOOST_REQUIRE_EQUAL(deser(s_type, r[3]), er[3]);
BOOST_REQUIRE_EQUAL(deser(oper_type, r[4]), er[4]);
}
}, mk_cdc_test_config()).get();
}).get();
}
struct image_set {
@@ -1939,7 +1938,7 @@ void test_batch_images(bool preimage, bool postimage) {
}
}
}, preimage, postimage);
}, mk_cdc_test_config()).get();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_batch_pre_image) {
@@ -1953,3 +1952,24 @@ SEASTAR_THREAD_TEST_CASE(test_batch_post_image) {
SEASTAR_THREAD_TEST_CASE(test_batch_pre_post_image) {
test_batch_images(true, true);
}
// Regression test for #7716
SEASTAR_THREAD_TEST_CASE(test_postimage_with_no_regular_columns) {
do_with_cql_env_thread([] (cql_test_env& e) {
using oper_ut = std::underlying_type_t<cdc::operation>;
cquery_nofail(e, "create table ks.t (pk int, ck int, primary key (pk, ck)) with cdc = {'enabled': true, 'postimage': true}");
cquery_nofail(e, "insert into ks.t (pk, ck) values (1, 2)");
auto result = get_result(e,
{data_type_for<oper_ut>(), int32_type, int32_type},
"select \"cdc$operation\", pk, ck from ks.t_scylla_cdc_log");
std::vector<std::vector<data_value>> expected = {
{ oper_ut(cdc::operation::insert), int32_t(1), int32_t(2) },
{ oper_ut(cdc::operation::post_image), int32_t(1), int32_t(2) },
};
BOOST_REQUIRE_EQUAL(expected, result);
}).get();
}

View File

@@ -931,10 +931,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED_CDC});
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -943,9 +944,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -954,9 +956,22 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_alternator_streams) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - alternator-streams\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::ALTERNATOR_STREAMS});
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -964,10 +979,11 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::UNUSED_CDC, ef::UNUSED, ef::UNUSED_CDC}));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -979,9 +995,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
});
return make_ready_future();
}
@@ -990,9 +1007,10 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
BOOST_CHECK(cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}
@@ -1000,8 +1018,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
auto cfg_ptr = std::make_unique<config>();
config& cfg = *cfg_ptr;
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED_CDC));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
BOOST_CHECK(!cfg.check_experimental(ef::ALTERNATOR_STREAMS));
return make_ready_future();
}

View File

@@ -128,12 +128,14 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) {
});
}).get();
// Since deletion of large data entries has been deleted,
// expect the record to be present.
assert_that(e.execute_cql("select partition_key from system.large_rows where table_name = 'tbl' allow filtering;").get0())
.is_rows()
.is_empty();
.with_size(1);
assert_that(e.execute_cql("select partition_key from system.large_cells where table_name = 'tbl' allow filtering;").get0())
.is_rows()
.is_empty();
.with_size(1);
return make_ready_future<>();
}, cfg).get();

View File

@@ -118,7 +118,6 @@ SEASTAR_TEST_CASE(cdc_schema_extension) {
// Extensions have to be registered here - config needs to have them before construction of test env.
ext->add_schema_extension<cdc::cdc_extension>(cdc::cdc_extension::NAME);
auto cfg = ::make_shared<db::config>(ext);
cfg->experimental_features({db::experimental_features_t::feature::CDC});
return do_with_cql_env([] (cql_test_env& e) {
auto assert_ext_correctness = [] (cql_test_env& e, cdc::cdc_extension expected_ext) {

View File

@@ -2715,7 +2715,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
}
}
// abort()
// abort() -- check that consumer is aborted
{
auto [reader, handle] = make_queue_reader(gen.schema(), tests::make_permit());
auto fill_buffer_fut = reader.fill_buffer(db::no_timeout);
@@ -2730,6 +2730,28 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), tests::make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE(!reader.is_end_of_stream());
}
// abort() -- check that producer is aborted
{
auto [reader, handle] = make_queue_reader(gen.schema(), tests::make_permit());
reader.set_max_buffer_size(1);
auto expected_reader = flat_mutation_reader_from_mutations(tests::make_permit(), expected_muts);
auto push_fut = make_ready_future<>();
while (push_fut.available()) {
push_fut = handle.push(std::move(*expected_reader(db::no_timeout).get0()));
}
BOOST_REQUIRE(!push_fut.available());
handle.abort(std::make_exception_ptr<std::runtime_error>(std::runtime_error("error")));
BOOST_REQUIRE_THROW(reader.fill_buffer(db::no_timeout).get(), std::runtime_error);
BOOST_REQUIRE_THROW(push_fut.get(), std::runtime_error);
BOOST_REQUIRE(!reader.is_end_of_stream());
}
// Detached handle

View File

@@ -49,7 +49,7 @@ static thread_local mutation_application_stats app_stats_for_tests;
// Verifies that tombstones in "list" are monotonic, overlap with the requested range,
// and have information equivalent with "expected" in that range.
static
void check_tombstone_slice(const schema& s, std::vector<range_tombstone> list,
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
const query::clustering_range& range,
std::initializer_list<range_tombstone> expected)
{

View File

@@ -607,7 +607,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
auto db_cfg_ptr = ::make_shared<db::config>(std::move(extensions));
auto& db_cfg = *db_cfg_ptr;
db_cfg.enable_user_defined_functions({true}, db::config::config_source::CommandLine);
db_cfg.experimental_features({experimental_features_t::UDF, experimental_features_t::CDC}, db::config::config_source::CommandLine);
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
if (regenerate) {
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
} else {

View File

@@ -75,6 +75,8 @@ cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
// which all get thrown away when the test is done. This can cause timeouts
// if /tmp is not tmpfs.
db_config->commitlog_use_o_dsync.set(false);
db_config->add_cdc_extension();
}
cql_test_config::cql_test_config(const cql_test_config&) = default;

View File

@@ -39,7 +39,7 @@ class LiveData(object):
def _discoverMetrics(self):
results = metric.Metric.discover(self._metric_source)
logging.debug('_discoverMetrics: {} results discovered'.format(len(results)))
for symbol in results:
for symbol in list(results):
if not self._matches(symbol, self._metricPatterns):
results.pop(symbol)
logging.debug('_initializeMetrics: {} results matched'.format(len(results)))