Compare commits

...

36 Commits

Author SHA1 Message Date
Raphael S. Carvalho
67a62b3e8d sstables: Fix sstable reshaping for STCS
The heuristic of STCS reshape is correct, and it built the compaction
descriptor correctly, but forgot to return it to the caller, so no
reshape was ever done on behalf of STCS even when the strategy
needed it.

Fixes #7774.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20201209175044.1609102-1-raphaelsc@scylladb.com>
(cherry picked from commit e4b55f40f3)
2021-11-15 13:28:52 +02:00
Takuya ASADA
92effccf52 scylla_ntp_setup: support 'pool' directive on ntp.conf
Currently, scylla_ntp_setup only supports 'server' directive, we should
support 'pool' too.

Fixes #9393

Closes #9397

(cherry picked from commit 61469d62b8)
2021-10-10 19:42:14 +03:00
Takuya ASADA
7357529834 scylla_cpuscaling_setup: add --force option
To building Ubuntu AMI with CPU scaling configuration, we need force
running mode for scylla_cpuscaling_setup, which run setup without
checking scaling_governor support.

See scylladb/scylla-machine-image#204

Closes #9326

(cherry picked from commit f928dced0c)
2021-10-05 16:20:30 +03:00
Takuya ASADA
3dd7874f08 scylla_cpuscaling_setup: disable ondemand.service on Ubuntu
On Ubuntu, scaling_governor becomes powersave after rebooted, even we configured cpufrequtils.
This is because ondemand.service, it unconditionally change scaling_governor to ondemand or powersave.
cpufrequtils will start before ondemand.service, scaling_governor overwrite by ondemand.service.
To configure scaling_governor correctly, we have to disable this service.

Fixes #9324

Closes #9325

(cherry picked from commit cd7fe9a998)
2021-10-03 14:09:37 +03:00
Raphael S. Carvalho
1bf218c29e compaction_manager: prevent unbounded growth of pending tasks
There will be unbounded growth of pending tasks if they are submitted
faster than retiring them. That can potentially happen if memtables
are frequently flushed too early. It was observed that this unbounded
growth caused task queue violations as the queue will be filled
with tons of tasks being reevaluated. By avoiding duplication in
pending task list for a given table T, growth is no longer unbounded
and consequently reevaluation is no longer aggressive.

Refs #9331.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210930125718.41243-1-raphaelsc@scylladb.com>
(cherry picked from commit 52302c3238)
2021-10-03 13:11:30 +03:00
Yaron Kaikov
89c47a44dc release: prepare for 4.3.7 2021-09-23 15:18:22 +03:00
Eliran Sinvani
dd93f297c1 dist: rpm: Add specific versioning and python3 dependency
The Red Hat packages were missing two things, first the metapackage
wasn't dependant at all in the python3 package and second, the
scylla-server package dependencies didn't contain a version as part
of the dependency which can cause to some problems during upgrade.
Doing both of the things listed here is a bit of an overkill as either
one of them separately would solve the problem described in #XXXX
but both should be applied in order to express the correct concept.

Fixes #8829

Closes #8832

(cherry picked from commit 9bfb2754eb)
2021-09-12 16:04:11 +03:00
Calle Wilund
b0b2606a8c snapshot: Add filter to check for existing snapshot
Fixes #8212

Some snapshotting operations call in on a single table at a time.
When checking for existing snapshots in this case, we should not
bother with snapshots in other tables. Add an optional "filter"
to check routine, which if non-empty includes tables to check.

Use case is "scrub" which calls with a limited set of tables
to snapshot.

Closes #8240

(cherry picked from commit f44420f2c9)
2021-09-12 11:16:29 +03:00
Avi Kivity
6de458e915 Merge "evictable_readers: don't drop static rows, drop assumption about snapshot isolation" from Botond
"
This mini-series fixes two loosely related bugs around reader recreation
in the evictable reader (related by both being around reader
recreation). A unit test is also added which reproduces both of them and
checks that the fixes indeed work. More details in the patches
themselves.
This series replaces the two independent patches sent before:
* [PATCH v1] evictable_reader: always reset static row drop flag
* [PATCH v1] evictable_reader: relax partition key check on reader
  recreation

As they depend on each other, it is easier to add a test if they are in
a series.

Fixes: #8923
Fixes: #8893

Tests: unit(dev, mutation_reader_test:debug)
"

* 'evictable-reader-recreation-more-bugs/v1' of https://github.com/denesb/scylla:
  test: mutation_reader_test: add more test for reader recreation
  evictable_reader: relax partition key check on reader recreation
  evictable_reader: always reset static row drop flag

(cherry picked from commit 4209dfd753)
2021-09-06 17:30:29 +03:00
Takuya ASADA
b6aa5ab2d4 scylla_cpuscaling_setup: change scaling_governor path
On some environment /sys/devices/system/cpu/cpufreq/policy0/scaling_governor
does not exist even it supported CPU scaling.
Instead, /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor is
avaliable on both environment, so we should switch to it.

Fixes #9191

Closes #9193

(cherry picked from commit e5bb88b69a)
2021-08-12 12:10:24 +03:00
Raphael S. Carvalho
08cbd180ff compaction: Prevent tons of compaction of fully expired sstable from happening in parallel
Compaction manager can start tons of compaction of fully expired sstable in
parallel, which may consume a significant amount of resources.
This problem is caused by weight being released too early in compaction, after
data is all compacted but before table is called to update its state, like
replacing sstables and so on.
Fully expired sstables aren't actually compacted, so the following can happen:
- compaction 1 starts for expired sst A with weight W, but there's nothing to
be compacted, so weight W is released, then calls table to update state.
- compaction 2 starts for expired sst B with weight W, but there's nothing to
be compacted, so weight W is released, then calls table to update state.
- compaction 3 starts for expired sst C with weight W, but there's nothing to
be compacted, so weight W is released, then calls table to update state.
- compaction 1 is done updating table state, so it finally completes and
releases all the resources.
- compaction 2 is done updating table state, so it finally completes and
releases all the resources.
- compaction 3 is done updating table state, so it finally completes and
releases all the resources.

This happens because, with expired sstable, compaction will release weight
faster than it will update table state, as there's nothing to be compacted.

With my reproducer, it's very easy to reach 50 parallel compactions on a single
shard, but that number can be easily worse depending on the amount of sstables
with fully expired data, across all tables. This high parallelism can happen
only with a couple of tables, if there are many time windows with expired data,
as they can be compacted in parallel.

Prior to 55a8b6e3c9, weight was released earlier in compaction, before
last sstable was sealed, but right now, there's no need to release weight
earlier. Weight can be released in a much simpler way, after the compaction is
actually done. So such compactions will be serialized from now on.

Fixes #8710.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210527165443.165198-1-raphaelsc@scylladb.com>

[avi: drop now unneeded storage_service_for_tests]

(cherry picked from commit a7cdd846da)
2021-08-10 18:18:35 +03:00
Nadav Har'El
693c7b300a secondary index: fix regression in CREATE INDEX IF NOT EXISTS
The recent commit 0ef0a4c78d added helpful
error messages in case an index cannot be created because the intended
name of its materialized view is already taken - but accidentally broke
the "CREATE INDEX IF NOT EXISTS" feature.

The checking code was correct, but in the wrong place: we need to first
check maybe the index already exists and "IF NOT EXISTS" was chosen -
and only do this new error checking if this is not the case.

This patch also includes a cql-pytest test for reproducing this bug.
The bug is also reproduced by the translated Cassandra unit tests
    cassandra_tests/validation/entities/secondary_index_test.py::
    testCreateAndDropIndex
and this is how I found this bug. After these patch, all these tests
pass.

Fixes #8717.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210526143635.624398-1-nyh@scylladb.com>
(cherry picked from commit 97e827e3e1)
2021-08-10 17:35:43 +03:00
Nadav Har'El
2e7f618632 Merge 'Fix index name conflicts with regular tables' from Piotr Sarna
When an index is created without an explicit name, a default name
is chosen. However, there was no check if a table with conflicting
name already exists. The check is now in place and if any conflicts
are found, a new index name is chosen instead.
When an index is created *with* an explicit name and a conflicting
regular table is found, index creation should simply fail.

This series comes with a test.

Fixes #8620
Tests: unit(release)

Closes #8632

* github.com:scylladb/scylla:
  cql-pytest: add regression tests for index creation
  cql3: fail to create an index if there is a name conflict
  database: check for conflicting table names for indexes

(cherry picked from commit cee4c075d2)
2021-08-10 15:59:30 +03:00
Hagit Segev
5cd698c89d release: prepare for 4.3.6 2021-08-01 20:21:20 +03:00
Piotr Jastrzebski
482fa83a0e api: use proper type to reduce partition count
Partition count is of a type size_t but we use std::plus<int>
to reduce values of partition count in various column families.
This patch changes the argument of std::plus to the right type.
Using std::plus<int> for size_t compiles but does not work as expected.
For example plus<int>(2147483648LL, 1LL) = -2147483647 while the code
would probably want 2147483649.

Fixes #9090

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>

Closes #9074

(cherry picked from commit 90a607e844)
2021-07-27 12:38:34 +03:00
Raphael S. Carvalho
cabb7fbd3b sstables: Close promoted index readers when advancing to next summary index
Problem fixed on master since 5ed559c. So branch-4.5 and up aren't affected.

Index reader fails to close input streams of promoted index readers when advancing
to next summary entry, so Scylla can abort as a result of a stream being destroyed
while there were reads in progress. This problem was seen when row cache issued
a fast forward, so index reader was asked to advance to next summary entry while
the previous one still had reads in progress.
By closing the list of index readers when there's only one owner holding it,
the problem is safely fixed, because it cannot happen that an index_bound like
_lower_bound or _upper_bound will be left with a list that's already closed.

Fixes #9049.

test: mode(dev, debug).

No observable perf regression:

BEFORE:

   read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         8.168640            4    100000      12242        108      12262      11982    50032.2  50049    6403116   20707       0        0        8        8        0        0        0  83.3%
-> 1       1        22.257916            4     50000       2246          3       2249       2238   150025.0 150025    6454272  100001       0    49999   100000   149999        0        0        0  54.7%
-> 1       8         9.384961            4     11112       1184          5       1184       1178    77781.2  77781    1439328   66618   11111        1    33334    44444        0        0        0  44.0%
-> 1       16        4.976144            4      5883       1182          6       1184       1173    41180.0  41180     762053   35264    5882        0    17648    23530        0        0        0  44.1%
-> 1       32        2.582744            4      3031       1174          4       1175       1167    21216.0  21216     392619   18176    3031        0     9092    12122        0        0        0  43.8%
-> 1       64        1.308410            4      1539       1176          2       1178       1173    10772.0  10772     199353    9233    1539        0     4616     6154        0        0        0  44.0%
-> 1       256       0.331037            4       390       1178         12       1190       1165     2729.0   2729      50519    2338     390        0     1169     1558        0        0        0  44.0%
-> 1       1024      0.085108            4        98       1151          7       1155       1141      685.0    685      12694     587      98        0      293      390        0        0        0  42.9%
-> 1       4096      0.024393            6        25       1025          5       1029       1020      174.0    174       3238     149      25        0       74       98        0        0        0  37.4%
-> 64      1         8.765446            4     98462      11233         16      11236      11182    54642.0  54648    6405470   23632       1     1538     4615     4615        0        0        0  79.3%
-> 64      8         8.456430            4     88896      10512         48      10582      10464    55578.0  55578    6405971   24031    4166        0     5553     5553        0        0        0  77.3%
-> 64      16        7.798197            4     80000      10259        108      10299      10077    51248.0  51248    5922500   22160    4996        0     4998     4998        0        0        0  74.8%
-> 64      32        6.605148            4     66688      10096         64      10168      10033    42715.0  42715    4936359   18796    4164        0     4165     4165        0        0        0  75.5%
-> 64      64        4.933287            4     50016      10138         28      10189      10111    32039.0  32039    3702428   14106    3124        0     3125     3125        0        0        0  75.3%
-> 64      256       1.971701            4     20032      10160         57      10347      10103    12831.0  12831    1482993    5731    1252        0     1250     1250        0        0        0  74.1%
-> 64      1024      0.587026            4      5888      10030         84      10277       9946     3770.0   3770     435895    1635     368        0      366      366        0        0        0  74.6%
-> 64      4096      0.157401            4      1600      10165         69      10202       9698     1023.0   1023     118449     455     100        0       98       98        0        0        0  73.9%

AFTER:

   read    skip      time (s)   iterations     frags     frag/s    mad f/s    max f/s    min f/s    avg aio    aio      (KiB) blocked dropped  idx hit idx miss  idx blk    c hit   c miss    c blk    cpu
-> 1       0         8.191639            4    100000      12208         46      12279      12161    50031.2  50025    6403108   20243       0        0        0        0        0        0        0  87.0%
-> 1       1        22.933121            4     50000       2180         36       2198       2115   150025.0 150025    6454272  100001       0    49999   100000   149999        0        0        0  54.9%
-> 1       8         9.471735            4     11112       1173          5       1178       1168    77781.2  77781    1439328   66663   11111        0    33334    44445        0        0        0  44.6%
-> 1       16        5.001569            4      5883       1176          2       1176       1170    41180.0  41180     762053   35296    5882        1    17648    23529        0        0        0  44.6%
-> 1       32        2.587069            4      3031       1172          1       1173       1164    21216.0  21216     392619   18185    3031        1     9092    12121        0        0        0  44.8%
-> 1       64        1.310747            4      1539       1174          3       1177       1171    10772.0  10772     199353    9233    1539        0     4616     6154        0        0        0  44.9%
-> 1       256       0.335490            4       390       1162          2       1167       1161     2729.0   2729      50519    2338     390        0     1169     1558        0        0        0  45.7%
-> 1       1024      0.081944            4        98       1196         21       1210       1162      685.0    685      12694     585      98        0      293      390        0        0        0  46.2%
-> 1       4096      0.022266            6        25       1123          3       1125       1105      174.0    174       3238     149      24        0       74       98        0        0        0  41.9%
-> 64      1         8.731741            4     98462      11276         45      11417      11231    54642.0  54640    6405470   23686       0     1538     4615     4615        0        0        0  80.2%
-> 64      8         8.396247            4     88896      10588         19      10596      10560    55578.0  55578    6405971   24275    4166        0     5553     5553        0        0        0  77.6%
-> 64      16        7.700995            4     80000      10388         88      10405      10221    51248.0  51248    5922500   22100    5000        0     4998     4998        0        0        0  76.4%
-> 64      32        6.517276            4     66688      10232         31      10342      10201    42715.0  42715    4936359   19013    4164        0     4165     4165        0        0        0  75.3%
-> 64      64        4.898669            4     50016      10210         60      10291      10150    32039.0  32039    3702428   14110    3124        0     3125     3125        0        0        0  74.4%
-> 64      256       1.969972            4     20032      10169         22      10173      10091    12831.0  12831    1482993    5660    1252        0     1250     1250        0        0        0  74.3%
-> 64      1024      0.575180            4      5888      10237         84      10316      10028     3770.0   3770     435895    1656     368        0      366      366        0        0        0  74.6%
-> 64      4096      0.158503            4      1600      10094         81      10195      10014     1023.0   1023     118449     460     100        0       98       98        0        0        0  73.5%

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210722180302.64675-1-raphaelsc@scylladb.com>
(cherry picked from commit 9dce1e4b2b)
2021-07-25 17:24:09 +03:00
Asias He
4d1c83a4e8 repair: Consider memory bloat when calculate repair parallelism
The repair parallelism is calculated by the number of memory allocated to
repair and memory usage per repair instance. Currently, it does not
consider memory bloat issues (e.g., issue #8640) which cause repair to
use more memory and cause std::bad_alloc.

Be more conservative when calculating the parallelism to avoid repair
using too much memory.

Fixes #8641

Closes #8652

(cherry picked from commit b8749f51cb)
2021-07-15 13:02:18 +03:00
Takuya ASADA
7da9884d09 scylla-fstrim.timer: drop BindsTo=scylla-server.service
To avoid restart scylla-server.service unexpectedly, drop BindsTo=
from scylla-fstrim.timer.

Fixes #8921

Closes #8973

(cherry picked from commit def81807aa)
2021-07-08 10:07:04 +03:00
Avi Kivity
b4242f01a8 Update tools/java submodule for rack/dc properties
* tools/java 7afe7018a5...86fb5c826d (1):
  > cassandra.in.sh: Add path to rack/dc properties file to classpath

Fixes #7930.
2021-07-08 09:54:23 +03:00
Takuya ASADA
27cd231f61 dist: stop removing /etc/systemd/system/*.mount on package uninstall
Listing /etc/systemd/system/*.mount as ghost file seems incorrect,
since user may want to keep using RAID volume / coredump directory after
uninstalling Scylla, or user may want to upgrade enterprise version.

Also, we mixed two types of files as ghost file, it should handle differently:
 1. automatically generated by postinst scriptlet
 2. generated by user invoked scylla_setup

The package should remove only 1, since 2 is generated by user decision.

However, just dropping .mount from %files section causes another
problem, rpm will remove these files during upgrade, instead of
uninstall (#8924).

To fix both problem, specify .mount files as "%ghost %config".
It will keep files both package upgrade and package remove.

See scylladb/scylla-enterprise#1780

Closes #8810
Closes #8924

Closes #8959

(cherry picked from commit f71f9786c7)
2021-07-07 18:38:06 +03:00
Pavel Emelyanov
030419d5ed hasher: More picky noexcept marking of feed_hash()
Commit 5adb8e555c marked the ::feed_hash() and a visitor lambda of
digester::feed_hash() as noexcept. This was quite recklesl as the
appending_hash<>::operator()s called by ::feed_hash() are not all
marked noexcept. In particular, the appending_hash<row>() is not
such and seem to throw.

The original intent of the mentioned commit was to facilitate the
partition_hasher in repair/ code. The hasher itself had been removed
by the 0af7a22c21, so it no longer needs the feed_hash-s to be
noexcepts.

The fix is to inherit noexcept from the called hashers, but for the
digester::feed_hash part the noexcept is just removed until clang
compilation bug #50994 is fixed.

fixes: #8983
tests: unit(dev)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20210706153608.4299-1-xemul@scylladb.com>
(cherry picked from commit 63a2fed585)
2021-07-07 18:36:32 +03:00
Raphael S. Carvalho
0d1362fc31 LCS: reshape: Fix overlapping check when determining if a sstable set is disjoint
Wrong comparison operator is used when checking for overlapping. It
would miss overlapping when last key of a sstable is equal to the first
key of another sstable that comes next in the set, which is sorted by
first key.

Fixes #8531.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 39ecddbd34)
2021-07-07 14:04:36 +03:00
Raphael S. Carvalho
0888aa1717 LCS: Fix terrible write amplification when reshaping level 0
LCS reshape is basically 'major compacting' level 0 until it contains less than
N sstables.

That produces terrible write amplification, because any given byte will be
compacted (initial # of sstables / max_threshold (32)) times. So if L0 initially
contained 256 ssts, there would be a WA of about 8.

This terrible write amplification can be reduced by performing STCS instead on
L0, which will leave L0 in a good shape without hurting WA as it happens
now.

Fixes #8345.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210322150655.27011-1-raphaelsc@scylladb.com>
(cherry picked from commit bcbb39999b)
2021-06-14 20:28:31 +03:00
Hagit Segev
690a96ff54 release: prepare for 4.3.5 2021-06-14 14:19:58 +03:00
Michał Chojnowski
38cdf30a35 cdc: log: fix use-after-free in process_bytes_visitor
Due to small value optimization used in `bytes`, views to `bytes` stored
in `vector` can be invalidated when the vector resizes, resulting in
use-after-free and data corruption. Fix that.

Fixes #8117

(cherry picked from commit 8cc4f39472)
2021-06-13 19:06:40 +03:00
Botond Dénes
61b71e4da0 mutation_test: test_mutation_diff_with_random_generator: compact input mutations
This test checks that `mutation_partition::difference()` works correctly.
One of the checks it does is: m1 + m2 == m1 + (m2 - m1).
If the two mutations are identical but have compactable data, e.g. a
shadowable tombstone shadowed by a row marker, the apply will collapse
these, causing the above equality check to fail (as m2 - m1 is null).
To prevent this, compact the two input mutations.

Fixes: #8221
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20210310141118.212538-1-bdenes@scylladb.com>
(cherry picked from commit cf28552357)
2021-06-13 18:30:56 +03:00
Takuya ASADA
e4b42e622e scylla_coredump_setup: avoid coredump failure when hard limit of coredump is set to zero
On the environment hard limit of coredump is set to zero, coredump test
script will fail since the system does not generate coredump.
To avoid such issue, set ulimit -c 0 before generating SEGV on the script.

Note that scylla-server.service can generate coredump even ulimit -c 0
because we set LimitCORE=infinity on its systemd unit file.

Fixes #8238

Closes #8245

(cherry picked from commit af8eae317b)
2021-06-13 18:27:13 +03:00
Avi Kivity
e625144d6e Update seastar submodule (nested exception logging)
* seastar b70b444924...5ef45afa4d (2):
  > utils/log.cc: fix nested_exception logging (again)
  > log: skip on unknown nested mixing instead of stopping the logging

Fixes #8327.
2021-06-13 18:24:52 +03:00
Yaron Kaikov
76ec7513f1 install.sh: Setup aio-max-nr upon installation
This is a follow up change to #8512.

Let's add aio conf file during scylla installation process and make sure
we also remove this file when uninstall Scylla

As per Avi Kivity's suggestion, let's set aio value as static
configuration, and make it large enough to work with 500 cpus.

Closes #8650

Refs: #8713

(cherry picked from commit dd453ffe6a)
2021-06-13 14:15:24 +03:00
Yaron Kaikov
f36f7035c8 scylla_io_setup: configure "aio-max-nr" before iotune
On severl instance types in AWS and Azure, we get the following failure
during scylla_io_setup process:
```
ERROR 2021-04-14 07:50:35,666 [shard 5] seastar - Could not setup Async
I/O: Resource temporarily unavailable. The most common cause is not
enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that
number or reducing the amount of logical CPUs available for your
application
```

We have scylla_prepare:configure_io_slots() running before the
scylla-server.service start, but the scylla_io_setup is taking place
before

1) Let's move configure_io_slots() to scylla_util.py since both
   scylla_io_setup and scylla_prepare are import functions from it
2) cleanup scylla_prepare since we don't need the same function twice
3) Let's use configure_io_slots() during scylla_io_setup to avoid such
failure

Fixes: #8587

Closes #8512

Refs: #8713

(cherry picked from commit 588a065304)
2021-06-13 14:14:59 +03:00
Benny Halevy
709e934164 test: commitlog_test: test_allocation_failure: fill memory using smaller allocations
commitlog was changed to use fragmented_temporary_buffer::ostream (db::commitlog::output).
So if there are discontiguous small memory blocks, they can be used to satisfy
an allocation even if no contiguous memory blocks are available.

To prevent that, as Avi suggested, this change allocates in 128K blocks
and frees the last one to succeed (so that we won't fail on allocating continuations).

Fixes #8028

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20210203100333.862036-1-bhalevy@scylladb.com>
(cherry picked from commit ca6f5cb0bc)
2021-06-10 19:37:37 +03:00
Dejan Mircevski
13428d56f6 cql3: Skip indexed column for CK restrictions
When querying an index table, we assemble clustering-column
restrictions for that query by going over the base table token,
partition columns, and clustering columns.  But if one of those
columns is the indexed column, there is a problem; the indexed column
is the index table's partition key, not clustering key.  We end up
with invalid clustering slice, which can cause problems downstream.

Fix this by skipping the indexed column when assembling the clustering
restrictions.

Tests: unit (dev)

Fixes #7888

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>

Closes #8320

(cherry picked from commit 0bd201d3ca)
2021-06-10 12:06:53 +03:00
Nadav Har'El
2c1f5e5225 Update tools/java submodule with backported patches
* tools/java 1489e7c539...7afe7018a5 (2):
  > nodetool: alternate way to specify table name which includes a dot
  > nodetool: do no treat table name with dot as a secondary index

Fixes #6521

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2021-06-07 09:49:55 +03:00
Nadav Har'El
9ae3edb102 alternator: fix equality check of nested document containing a set
In issue #5021 we noticed that the equality check in Alternator's condition
expressions needs to handle sets differently - we need to compare the set's
elements ignoring their order. But the implementation we added to fix that
issue was only correct when the entire attribute was a set... In the
general case, an attribute can be a nested document, with only some
inner set. The equality-checking function needs to tranverse this nested
document, and compare the sets inside it as appropriate. This is what
we do in this patch.

This patch also adds a new test comparing equality of a nested document with
some inner sets. This test passes on DynamoDB, failed on Alternator before
this patch, and passes with this patch.

Refs #5021
Fixes #8514

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210419184840.471858-1-nyh@scylladb.com>
(cherry picked from commit dae7528fe5)
In the backport to 4.3, I had to disable (skip) the new test, because it
relies on nested attribute updates to build an unsorted set - and those
were not yet supported in branch 4.3.
2021-06-07 09:25:19 +03:00
Nadav Har'El
11851fa4d9 alternator: fix inequality check of two sets
In issue #5021 we noted that Alternator's equality operator needs to be
fixed for the case of comparing two sets, because the equality check needs
to take into account the possibility of different element order.

Unfortunately, we fixed only the equality check operator, but forgot there
is also an inequality operator!

So in this patch we fix the inequality operator, and also add a test for
it that was previously missing.

The implementation of the inequality operator is trivial - it's just the
negation of the equality test. Our pre-existing tests verify that this is
the correct implementation (e.g., if attribute x doesn't exist, then "x = 3"
is false but "x <> 3" is true).

Refs #5021
Fixes #8513

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210419141450.464968-1-nyh@scylladb.com>
(cherry picked from commit 50f3201ee2)
2021-06-07 09:00:35 +03:00
Nadav Har'El
1a56e41f44 alternator: fix equality check of two unset attributes
When a condition expression (ConditionExpression, FilterExpression, etc.)
checks for equality of two item attributes, i.e., "x = y", and when one of
these attributes was missing we correctly returned false.
However, we also need to return false when *both* attributes are missing in
the item, because this is what DynamoDB does in this case. In other words
an unset attribute is never equal to anything - not even to another unset
attribute. This was not happening before this patch:

When x and y were both missing attributes, Alternator incorrectly returned
true for "x = y", and this patch fixes this case. It also fixes "x <> y"
which should to be true when both x and y are unset (but was false
before this patch).

The other comparison operators - <, <=, >, >=, BETWEEN, were all
implemented correctly even before this patch.

This patch also includes tests for all the two-unset-attribute cases of
all the operators listed above. As usual, we check that these tests pass
on both DynamoDB and Alternator to confirm our new behavior is the correct
one - before this patch, two of the new tests failed on Alternator and
passed on DynamoDB.

Fixes #8511

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20210419123911.462579-1-nyh@scylladb.com>
(cherry picked from commit 46448b0983)
2021-06-06 17:08:25 +03:00
39 changed files with 897 additions and 129 deletions

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=4.3.4
VERSION=4.3.7
if test -f version
then

View File

@@ -123,7 +123,7 @@ struct rjson_engaged_ptr_comp {
// as internally they're stored in an array, and the order of elements is
// not important in set equality. See issue #5021
static bool check_EQ_for_sets(const rjson::value& set1, const rjson::value& set2) {
if (set1.Size() != set2.Size()) {
if (!set1.IsArray() || !set2.IsArray() || set1.Size() != set2.Size()) {
return false;
}
std::set<const rjson::value*, rjson_engaged_ptr_comp> set1_raw;
@@ -137,25 +137,70 @@ static bool check_EQ_for_sets(const rjson::value& set1, const rjson::value& set2
}
return true;
}
// Moreover, the JSON being compared can be a nested document with outer
// layers of lists and maps and some inner set - and we need to get to that
// inner set to compare it correctly with check_EQ_for_sets() (issue #8514).
static bool check_EQ(const rjson::value* v1, const rjson::value& v2);
static bool check_EQ_for_lists(const rjson::value& list1, const rjson::value& list2) {
if (!list1.IsArray() || !list2.IsArray() || list1.Size() != list2.Size()) {
return false;
}
auto it1 = list1.Begin();
auto it2 = list2.Begin();
while (it1 != list1.End()) {
// Note: Alternator limits an item's depth (rjson::parse() limits
// it to around 37 levels), so this recursion is safe.
if (!check_EQ(&*it1, *it2)) {
return false;
}
++it1;
++it2;
}
return true;
}
static bool check_EQ_for_maps(const rjson::value& list1, const rjson::value& list2) {
if (!list1.IsObject() || !list2.IsObject() || list1.MemberCount() != list2.MemberCount()) {
return false;
}
for (auto it1 = list1.MemberBegin(); it1 != list1.MemberEnd(); ++it1) {
auto it2 = list2.FindMember(it1->name);
if (it2 == list2.MemberEnd() || !check_EQ(&it1->value, it2->value)) {
return false;
}
}
return true;
}
// Check if two JSON-encoded values match with the EQ relation
static bool check_EQ(const rjson::value* v1, const rjson::value& v2) {
if (!v1) {
return false;
}
if (v1->IsObject() && v1->MemberCount() == 1 && v2.IsObject() && v2.MemberCount() == 1) {
if (v1 && v1->IsObject() && v1->MemberCount() == 1 && v2.IsObject() && v2.MemberCount() == 1) {
auto it1 = v1->MemberBegin();
auto it2 = v2.MemberBegin();
if ((it1->name == "SS" && it2->name == "SS") || (it1->name == "NS" && it2->name == "NS") || (it1->name == "BS" && it2->name == "BS")) {
return check_EQ_for_sets(it1->value, it2->value);
if (it1->name != it2->name) {
return false;
}
if (it1->name == "SS" || it1->name == "NS" || it1->name == "BS") {
return check_EQ_for_sets(it1->value, it2->value);
} else if(it1->name == "L") {
return check_EQ_for_lists(it1->value, it2->value);
} else if(it1->name == "M") {
return check_EQ_for_maps(it1->value, it2->value);
} else {
// Other, non-nested types (number, string, etc.) can be compared
// literally, comparing their JSON representation.
return it1->value == it2->value;
}
} else {
// If v1 and/or v2 are missing (IsNull()) the result should be false.
// In the unlikely case that the object is malformed (issue #8070),
// let's also return false.
return false;
}
return *v1 == v2;
}
// Check if two JSON-encoded values match with the NE relation
static bool check_NE(const rjson::value* v1, const rjson::value& v2) {
return !v1 || *v1 != v2; // null is unequal to anything.
return !check_EQ(v1, v2);
}
// Check if two JSON-encoded values match with the BEGINS_WITH relation
@@ -298,6 +343,8 @@ static bool check_NOT_NULL(const rjson::value* val) {
// Only types S, N or B (string, number or bytes) may be compared by the
// various comparion operators - lt, le, gt, ge, and between.
// Note that in particular, if the value is missing (v->IsNull()), this
// check returns false.
static bool check_comparable_type(const rjson::value& v) {
if (!v.IsObject() || v.MemberCount() != 1) {
return false;

View File

@@ -331,15 +331,15 @@ void set_column_family(http_context& ctx, routes& r) {
});
cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, req->param["name"], 0, [](column_family& cf) {
return map_reduce_cf(ctx, req->param["name"], uint64_t{0}, [](column_family& cf) {
return cf.active_memtable().partition_count();
}, std::plus<int>());
}, std::plus<>());
});
cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](column_family& cf) {
return map_reduce_cf(ctx, uint64_t{0}, [](column_family& cf) {
return cf.active_memtable().partition_count();
}, std::plus<int>());
}, std::plus<>());
});
cf::get_memtable_on_heap_size.set(r, [] (const_req req) {

View File

@@ -980,9 +980,9 @@ static bytes get_bytes(const atomic_cell_view& acv) {
return acv.value().linearize();
}
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::vector<bytes>& buf) {
static bytes_view get_bytes_view(const atomic_cell_view& acv, std::forward_list<bytes>& buf) {
return acv.value().is_fragmented()
? bytes_view{buf.emplace_back(acv.value().linearize())}
? bytes_view{buf.emplace_front(acv.value().linearize())}
: acv.value().first_fragment();
}
@@ -1137,9 +1137,9 @@ struct process_row_visitor {
struct udt_visitor : public collection_visitor {
std::vector<bytes_opt> _added_cells;
std::vector<bytes>& _buf;
std::forward_list<bytes>& _buf;
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::vector<bytes>& buf)
udt_visitor(ttl_opt& ttl_column, size_t num_keys, std::forward_list<bytes>& buf)
: collection_visitor(ttl_column), _added_cells(num_keys), _buf(buf) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
@@ -1148,7 +1148,7 @@ struct process_row_visitor {
}
};
std::vector<bytes> buf;
std::forward_list<bytes> buf;
udt_visitor v(_ttl_column, type.size(), buf);
visit_collection(v);
@@ -1167,9 +1167,9 @@ struct process_row_visitor {
struct map_or_list_visitor : public collection_visitor {
std::vector<std::pair<bytes_view, bytes_view>> _added_cells;
std::vector<bytes>& _buf;
std::forward_list<bytes>& _buf;
map_or_list_visitor(ttl_opt& ttl_column, std::vector<bytes>& buf)
map_or_list_visitor(ttl_opt& ttl_column, std::forward_list<bytes>& buf)
: collection_visitor(ttl_column), _buf(buf) {}
void live_collection_cell(bytes_view key, const atomic_cell_view& cell) {
@@ -1178,7 +1178,7 @@ struct process_row_visitor {
}
};
std::vector<bytes> buf;
std::forward_list<bytes> buf;
map_or_list_visitor v(_ttl_column, buf);
visit_collection(v);

View File

@@ -306,6 +306,13 @@ create_index_statement::announce_migration(service::storage_proxy& proxy, bool i
format("Index {} is a duplicate of existing index {}", index.name(), existing_index.value().name()));
}
}
auto index_table_name = secondary_index::index_table_name(accepted_name);
if (db.has_schema(keyspace(), index_table_name)) {
return make_exception_future<::shared_ptr<cql_transport::event::schema_change>>(
exceptions::invalid_request_exception(format("Index {} cannot be created, because table {} already exists",
accepted_name, index_table_name))
);
}
++_cql_stats->secondary_index_creates;
schema_builder builder{schema};
builder.with_index(index);

View File

@@ -1120,7 +1120,11 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_g
if (single_ck_restrictions) {
auto prefix_restrictions = single_ck_restrictions->get_longest_prefix_restrictions();
auto clustering_restrictions_from_base = ::make_shared<restrictions::single_column_clustering_key_restrictions>(_view_schema, *prefix_restrictions);
const auto indexed_column = _view_schema->get_column_definition(to_bytes(_index.target_column()));
for (auto restriction_it : clustering_restrictions_from_base->restrictions()) {
if (restriction_it.first == indexed_column) {
continue; // In the index table, the indexed column is the partition (not clustering) key.
}
clustering_restrictions->merge_with(restriction_it.second);
}
}

View File

@@ -1751,7 +1751,11 @@ sstring database::get_available_index_name(const sstring &ks_name, const sstring
auto base_name = index_metadata::get_default_index_name(cf_name, index_name_root);
sstring accepted_name = base_name;
int i = 0;
while (existing_names.contains(accepted_name)) {
auto name_accepted = [&] {
auto index_table_name = secondary_index::index_table_name(accepted_name);
return !has_schema(ks_name, index_table_name) && !existing_names.contains(accepted_name);
};
while (!name_accepted()) {
accepted_name = base_name + "_" + std::to_string(++i);
}
return accepted_name;

View File

@@ -43,9 +43,13 @@
namespace db {
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name) {
future<> snapshot_ctl::check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter) {
auto& ks = _db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
return parallel_for_each(ks.metadata()->cf_meta_data(), [this, ks_name = std::move(ks_name), name = std::move(name), filter = std::move(filter)] (auto& pair) {
auto& cf_name = pair.first;
if (filter && std::find(filter->begin(), filter->end(), cf_name) == filter->end()) {
return make_ready_future<>();
}
auto& cf = _db.local().find_column_family(pair.second);
return cf.snapshot_exists(name).then([ks_name = std::move(ks_name), name] (bool exists) {
if (exists) {
@@ -111,7 +115,7 @@ future<> snapshot_ctl::take_column_family_snapshot(sstring ks_name, std::vector<
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), tables = std::move(tables), tag = std::move(tag)] {
return check_snapshot_not_exist(ks_name, tag).then([this, ks_name, tables = std::move(tables), tag] {
return check_snapshot_not_exist(ks_name, tag, tables).then([this, ks_name, tables, tag] {
return do_with(std::vector<sstring>(std::move(tables)),[this, ks_name, tag](const std::vector<sstring>& tables) {
return do_for_each(tables, [ks_name, tag, this] (const sstring& table_name) {
if (table_name.find(".") != sstring::npos) {

View File

@@ -40,6 +40,8 @@
#pragma once
#include <vector>
#include <seastar/core/sharded.hh>
#include <seastar/core/future.hh>
#include "database.hh"
@@ -112,7 +114,7 @@ private:
seastar::rwlock _lock;
seastar::gate _ops;
future<> check_snapshot_not_exist(sstring ks_name, sstring name);
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
template <typename Func>
std::result_of_t<Func()> run_snapshot_modify_operation(Func&&);

View File

@@ -58,7 +58,8 @@ public:
template<typename T, typename... Args>
void feed_hash(const T& value, Args&&... args) {
std::visit([&] (auto& hasher) noexcept -> void {
// FIXME uncomment the noexcept marking once clang bug 50994 is fixed or gcc compilation is turned on
std::visit([&] (auto& hasher) /* noexcept(noexcept(::feed_hash(hasher, value, args...))) */ -> void {
::feed_hash(hasher, value, std::forward<Args>(args)...);
}, _impl);
};

View File

@@ -87,7 +87,8 @@ WantedBy=multi-user.target
run('sysctl -p /etc/sysctl.d/99-scylla-coredump.conf')
fp = tempfile.NamedTemporaryFile()
fp.write(b'kill -SEGV $$')
fp.write(b'ulimit -c unlimited\n')
fp.write(b'kill -SEGV $$\n')
fp.flush()
p = subprocess.Popen(['/bin/bash', fp.name], stdout=subprocess.PIPE)
pid = p.pid

View File

@@ -22,6 +22,7 @@
import os
import sys
import argparse
import shlex
import distro
from scylla_util import *
@@ -33,12 +34,22 @@ if __name__ == '__main__':
if os.getuid() > 0:
print('Requires root permission.')
sys.exit(1)
if not os.path.exists('/sys/devices/system/cpu/cpufreq/policy0/scaling_governor'):
parser = argparse.ArgumentParser(description='CPU scaling setup script for Scylla.')
parser.add_argument('--force', dest='force', action='store_true',
help='force running setup even CPU scaling unsupported')
args = parser.parse_args()
if not args.force and not os.path.exists('/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'):
print('This computer doesn\'t supported CPU scaling configuration.')
sys.exit(0)
if is_debian_variant():
if not shutil.which('cpufreq-set'):
apt_install('cpufrequtils')
try:
ondemand = systemd_unit('ondemand')
ondemand.disable()
except:
pass
cfg = sysconfig_parser('/etc/default/cpufrequtils')
cfg.set('GOVERNOR', 'performance')
cfg.commit()

View File

@@ -91,12 +91,12 @@ if __name__ == '__main__':
with open('/etc/ntp.conf') as f:
conf = f.read()
if args.subdomain:
conf2 = re.sub(r'server\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', 'server \\1.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
conf2 = re.sub(r'(server|pool)\s+([0-9]+)\.(\S+)\.pool\.ntp\.org', '\\1 \\2.{}.pool.ntp.org'.format(args.subdomain), conf, flags=re.MULTILINE)
with open('/etc/ntp.conf', 'w') as f:
f.write(conf2)
conf = conf2
match = re.search(r'^server\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
server = match.group(1)
match = re.search(r'^(server|pool)\s+(\S*)(\s+\S+)?', conf, flags=re.MULTILINE)
server = match.group(2)
ntpd = systemd_unit('ntpd.service')
ntpd.stop()
# ignore error, ntpd may able to adjust clock later

View File

@@ -27,7 +27,6 @@ import platform
import distro
from scylla_util import *
from multiprocessing import cpu_count
def get_mode_cpuset(nic, mode):
mode_cpu_mask = out('/opt/scylladb/scripts/perftune.py --tune net --nic {} --mode {} --get-cpu-mask-quiet'.format(nic, mode))
@@ -98,16 +97,6 @@ def verify_cpu():
print('\nIf this is a virtual machine, please update its CPU feature configuration or upgrade to a newer hypervisor.')
sys.exit(1)
def configure_aio_slots():
with open('/proc/sys/fs/aio-max-nr') as f:
aio_max_nr = int(f.read())
# (10000 + 1024 + 2) * ncpus for scylla,
# 65536 for other apps
required_aio_slots = cpu_count() * 11026 + 65536
if aio_max_nr < required_aio_slots:
with open('/proc/sys/fs/aio-max-nr', 'w') as f:
f.write(str(required_aio_slots))
if __name__ == '__main__':
verify_cpu()
@@ -125,8 +114,6 @@ if __name__ == '__main__':
os.remove('/etc/scylla/ami_disabled')
sys.exit(1)
configure_aio_slots()
if mode == 'virtio':
tap = cfg.get('TAP')
user = cfg.get('USER')
@@ -156,4 +143,3 @@ if __name__ == '__main__':
print(f'Exception occurred while creating perftune.yaml: {e}')
print('To fix the error, please re-run scylla_setup.')
sys.exit(1)

View File

@@ -34,6 +34,7 @@ from pathlib import Path
import distro
from multiprocessing import cpu_count
def scriptsdir_p():
p = Path(sys.argv[0]).resolve()

View File

@@ -0,0 +1,2 @@
# Raise max AIO events
fs.aio-max-nr = 5578536

View File

@@ -1,7 +1,5 @@
[Unit]
Description=Run Scylla fstrim daily
After=scylla-server.service
BindsTo=scylla-server.service
[Timer]
OnCalendar=Sat *-*-* 00:00:00

View File

@@ -11,6 +11,7 @@ else
sysctl -p/usr/lib/sysctl.d/99-scylla-sched.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-vm.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-inotify.conf || :
sysctl -p/usr/lib/sysctl.d/99-scylla-aio.conf || :
fi
#DEBHELPER#

View File

@@ -12,8 +12,6 @@ case "$1" in
if [ "$1" = "purge" ]; then
rm -rf /etc/systemd/system/scylla-server.service.d/
fi
rm -f /etc/systemd/system/var-lib-systemd-coredump.mount
rm -f /etc/systemd/system/var-lib-scylla.mount
;;
esac

View File

@@ -7,7 +7,7 @@ Group: Applications/Databases
License: AGPLv3
URL: http://www.scylladb.com/
Source0: %{reloc_pkg}
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
Requires: %{product}-server = %{version} %{product}-conf = %{version} %{product}-python3 = %{version} %{product}-kernel-conf = %{version} %{product}-jmx = %{version} %{product}-tools = %{version} %{product}-tools-core = %{version}
Obsoletes: scylla-server < 1.1
%global _debugsource_template %{nil}
@@ -52,7 +52,7 @@ Summary: The Scylla database server
License: AGPLv3
URL: http://www.scylladb.com/
Requires: kernel >= 3.10.0-514
Requires: %{product}-conf %{product}-python3
Requires: %{product}-conf = %{version} %{product}-python3 = %{version}
Conflicts: abrt
AutoReqProv: no
@@ -137,9 +137,9 @@ rm -rf $RPM_BUILD_ROOT
%ghost /etc/systemd/system/scylla-server.service.d/capabilities.conf
%ghost /etc/systemd/system/scylla-server.service.d/mounts.conf
/etc/systemd/system/scylla-server.service.d/dependencies.conf
%ghost /etc/systemd/system/var-lib-systemd-coredump.mount
%ghost %config /etc/systemd/system/var-lib-systemd-coredump.mount
%ghost /etc/systemd/system/scylla-cpupower.service
%ghost /etc/systemd/system/var-lib-scylla.mount
%ghost %config /etc/systemd/system/var-lib-scylla.mount
%package conf
Group: Applications/Databases
@@ -207,6 +207,7 @@ if Scylla is the main application on your server and you wish to optimize its la
/usr/lib/systemd/systemd-sysctl 99-scylla-sched.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-vm.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-inotify.conf >/dev/null 2>&1 || :
/usr/lib/systemd/systemd-sysctl 99-scylla-aio.conf >/dev/null 2>&1 || :
%files kernel-conf
%defattr(-,root,root)

View File

@@ -62,7 +62,7 @@ struct appending_hash;
template<typename H, typename T, typename... Args>
requires Hasher<H>
inline
void feed_hash(H& h, const T& value, Args&&... args) noexcept {
void feed_hash(H& h, const T& value, Args&&... args) noexcept(noexcept(std::declval<appending_hash<T>>()(h, value, args...))) {
appending_hash<T>()(h, value, std::forward<Args>(args)...);
};

View File

@@ -1151,6 +1151,9 @@ flat_mutation_reader evictable_reader::recreate_reader() {
_range_override.reset();
_slice_override.reset();
_drop_partition_start = false;
_drop_static_row = false;
if (_last_pkey) {
bool partition_range_is_inclusive = true;
@@ -1236,13 +1239,25 @@ void evictable_reader::maybe_validate_partition_start(const flat_mutation_reader
// is in range.
if (_last_pkey) {
const auto cmp_res = tri_cmp(*_last_pkey, ps.key());
if (_drop_partition_start) { // should be the same partition
if (_drop_partition_start) { // we expect to continue from the same partition
// We cannot assume the partition we stopped the read at is still alive
// when we recreate the reader. It might have been compacted away in the
// meanwhile, so allow for a larger partition too.
require(
cmp_res == 0,
"{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
cmp_res <= 0,
"{}(): validation failed, expected partition with key larger or equal to _last_pkey {} due to _drop_partition_start being set, but got {}",
__FUNCTION__,
*_last_pkey,
ps.key());
// Reset drop flags and next pos if we are not continuing from the same partition
if (cmp_res < 0) {
// Close previous partition, we are not going to continue it.
push_mutation_fragment(*_schema, _permit, partition_end{});
_drop_partition_start = false;
_drop_static_row = false;
_next_position_in_partition = position_in_partition::for_partition_start();
_trim_range_tombstones = false;
}
} else { // should be a larger partition
require(
cmp_res < 0,
@@ -1293,9 +1308,14 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
_drop_partition_start = false;
return true;
}
if (_drop_static_row && mf.is_static_row()) {
_drop_static_row = false;
return true;
// Unlike partition-start above, a partition is not guaranteed to have a
// static row fragment. So reset the flag regardless of whether we could
// drop one or not.
// We are guaranteed to get here only right after dropping a partition-start,
// so if we are not seeing a static row here, the partition doesn't have one.
if (_drop_static_row) {
_drop_static_row = false;
return mf.is_static_row();
}
return false;
}

View File

@@ -309,7 +309,7 @@ float node_ops_metrics::repair_finished_percentage() {
tracker::tracker(size_t nr_shards, size_t max_repair_memory)
: _shutdown(false)
, _repairs(nr_shards) {
auto nr = std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range()));
auto nr = std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4));
rlogger.info("Setting max_repair_memory={}, max_repair_memory_per_range={}, max_repair_ranges_in_parallel={}",
max_repair_memory, max_repair_memory_per_range(), nr);
_range_parallelism_semaphores.reserve(nr_shards);

Submodule seastar updated: b70b444924...5ef45afa4d

View File

@@ -438,7 +438,6 @@ protected:
mutation_source_metadata _ms_metadata = {};
garbage_collected_sstable_writer::data _gc_sstable_writer_data;
compaction_sstable_replacer_fn _replacer;
std::optional<compaction_weight_registration> _weight_registration;
utils::UUID _run_identifier;
::io_priority_class _io_priority;
// optional clone of sstable set to be used for expiration purposes, so it will be set if expiration is enabled.
@@ -457,7 +456,6 @@ protected:
, _sstable_level(descriptor.level)
, _gc_sstable_writer_data(*this)
, _replacer(std::move(descriptor.replacer))
, _weight_registration(std::move(descriptor.weight_registration))
, _run_identifier(descriptor.run_identifier)
, _io_priority(descriptor.io_priority)
, _sstable_set(std::move(descriptor.all_sstables_snapshot))
@@ -929,9 +927,6 @@ public:
}
virtual void on_end_of_compaction() override {
if (_weight_registration) {
_cf.get_compaction_manager().on_compaction_complete(*_weight_registration);
}
replace_remaining_exhausted_sstables();
}
private:

View File

@@ -134,8 +134,6 @@ struct compaction_descriptor {
uint64_t max_sstable_bytes;
// Run identifier of output sstables.
utils::UUID run_identifier;
// Holds ownership of a weight assigned to this compaction iff it's a regular one.
std::optional<compaction_weight_registration> weight_registration;
// Calls compaction manager's task for this compaction to release reference to exhausted sstables.
std::function<void(const std::vector<shared_sstable>& exhausted_sstables)> release_exhausted;
// The options passed down to the compaction code.

View File

@@ -436,7 +436,7 @@ void compaction_manager::reevaluate_postponed_compactions() {
}
void compaction_manager::postpone_compaction_for_column_family(column_family* cf) {
_postponed.push_back(cf);
_postponed.insert(cf);
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
@@ -576,7 +576,7 @@ void compaction_manager::submit(column_family* cf) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto compacting = make_lw_shared<compacting_sstable_registration>(this, descriptor.sstables);
descriptor.weight_registration = compaction_weight_registration(this, weight);
auto weight_r = compaction_weight_registration(this, weight);
descriptor.release_exhausted = [compacting] (const std::vector<sstables::shared_sstable>& exhausted_sstables) {
compacting->release_compacting(exhausted_sstables);
};
@@ -586,7 +586,7 @@ void compaction_manager::submit(column_family* cf) {
_stats.pending_tasks--;
_stats.active_tasks++;
task->compaction_running = true;
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting), weight_r = std::move(weight_r)] (future<> f) mutable {
_stats.active_tasks--;
task->compaction_running = false;
@@ -799,7 +799,7 @@ future<> compaction_manager::remove(column_family* cf) {
task->stopping = true;
}
}
_postponed.erase(boost::remove(_postponed, cf), _postponed.end());
_postponed.erase(cf);
// Wait for the termination of an ongoing compaction on cf, if any.
return do_for_each(*tasks_to_stop, [this, cf] (auto& task) {
@@ -835,11 +835,6 @@ void compaction_manager::stop_compaction(sstring type) {
}
}
void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) {
weight_registration.deregister();
reevaluate_postponed_compactions();
}
void compaction_manager::propagate_replacement(column_family* cf,
const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
for (auto& info : _compactions) {

View File

@@ -99,7 +99,7 @@ private:
future<> _waiting_reevalution = make_ready_future<>();
condition_variable _postponed_reevaluation;
// column families that wait for compaction but had its submission postponed due to ongoing compaction.
std::vector<column_family*> _postponed;
std::unordered_set<column_family*> _postponed;
// tracks taken weights of ongoing compactions, only one compaction per weight is allowed.
// weight is value assigned to a compaction job that is log base N of total size of all input sstables.
std::unordered_set<int> _weight_tracker;
@@ -256,11 +256,6 @@ public:
// Stops ongoing compaction of a given type.
void stop_compaction(sstring type);
// Called by compaction procedure to release the weight lock assigned to it, such that
// another compaction waiting on same weight can start as soon as possible. That's usually
// called before compaction seals sstable and such and after all compaction work is done.
void on_compaction_complete(compaction_weight_registration& weight_registration);
double backlog() {
return _backlog_manager.backlog();
}

View File

@@ -367,6 +367,7 @@ class index_reader {
const io_priority_class& _pc;
tracing::trace_state_ptr _trace_state;
shared_index_lists _index_lists;
future<> _background_closes = make_ready_future<>();
struct reader {
index_consumer _consumer;
@@ -472,6 +473,16 @@ private:
};
return _index_lists.get_or_load(summary_idx, loader).then([this, &bound, summary_idx] (shared_index_lists::list_ptr ref) {
// to make sure list is not closed when another bound is still using it, index list will only be closed when there's only one owner holding it
if (bound.current_list && bound.current_list.use_count() == 1) {
// a new background close will only be initiated when previous ones terminate, so as to limit the concurrency.
_background_closes = _background_closes.then_wrapped([current_list = std::move(bound.current_list)] (future<>&& f) mutable {
f.ignore_ready_future();
return do_with(std::move(current_list), [] (shared_index_lists::list_ptr& current_list) mutable {
return close_index_list(current_list);
});
});
}
bound.current_list = std::move(ref);
bound.current_summary_idx = summary_idx;
bound.current_index_idx = 0;
@@ -841,6 +852,8 @@ public:
return close_index_list(_upper_bound->current_list);
}
return make_ready_future<>();
}).then([this] () mutable {
return std::move(_background_closes);
});
}
};

View File

@@ -147,7 +147,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
unsigned overlapping_sstables = 0;
auto prev_last = dht::ring_position::min();
for (auto& sst : sstables) {
if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) {
if (dht::ring_position(sst->get_first_decorated_key()).tri_compare(*schema, prev_last) <= 0) {
overlapping_sstables++;
}
prev_last = dht::ring_position(sst->get_last_decorated_key());
@@ -189,10 +189,8 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
};
if (level_info[0].size() > offstrategy_threshold) {
level_info[0].resize(std::min(level_info[0].size(), max_sstables));
compaction_descriptor desc(std::move(level_info[0]), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
size_tiered_compaction_strategy stcs(_stcs_options);
return stcs.get_reshaping_job(std::move(level_info[0]), schema, iop, mode);
}
for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) {

View File

@@ -256,6 +256,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
bucket.resize(std::min(max_sstables, bucket.size()));
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
}

View File

@@ -154,6 +154,27 @@ def test_update_condition_eq_unequal(test_table_s):
ConditionExpression='q = :oldval',
ExpressionAttributeValues={':val1': 3, ':oldval': 2})
# In test_update_condition_eq_unequal() above we saw that a non-existent
# attribute is not "=" to a value. Here we check what happens when two
# non-existent attributes are checked for equality. It turns out, they should
# *not* be considered equal. In short, an unset attribute is never equal to
# anything - not even to another unset attribute.
# Reproduces issue #8511.
def test_update_condition_eq_two_unset(test_table_s):
p = random_string()
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q = z',
ExpressionAttributeValues={':val1': 2})
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'}})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q = z',
ExpressionAttributeValues={':val1': 3})
# Check that set equality is checked correctly. Unlike string equality (for
# example), it cannot be done with just naive string comparison of the JSON
# representation, and we need to allow for any order. (see issue #5021)
@@ -175,6 +196,39 @@ def test_update_condition_eq_set(test_table_s):
ExpressionAttributeValues={':val1': 3, ':oldval': set(['chinchilla', 'cat', 'dog', 'mouse'])})
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The above test (test_update_condition_eq_set()) checked equality of simple
# set attributes. But an attributes can contain a nested document, where the
# set sits in a deep level (the set itself is a leaf in this heirarchy because
# it can only contain numbers, strings or bytes). We need to correctly support
# equality check in that case too.
# Reproduces issue #8514.
@pytest.mark.skip(reason="test needs nested update not yet in branch 4.3")
def test_update_condition_eq_nested_set(test_table_s):
p = random_string()
# Because boto3 sorts the set values we give it, in order to generate a
# set with a different order, we need to build it incrementally.
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': {'b': 'c', 'd': ['e', 'f', set(['g', 'h'])], 'i': set(['j', 'k'])}, 'Action': 'PUT'}})
test_table_s.update_item(Key={'p': p},
UpdateExpression='ADD a.d[2] :val1, a.i :val2',
ExpressionAttributeValues={':val1': set(['l', 'm']), ':val2': set(['n', 'o'])})
# Sanity check - the attribute contains the set we think it does
expected = {'b': 'c', 'd': ['e', 'f', set(['g', 'h', 'l', 'm'])], 'i': set(['j', 'k', 'n', 'o'])}
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == expected
# Now finally check that condition expression check knows the equality too.
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ConditionExpression='a = :oldval',
ExpressionAttributeValues={':val1': 3, ':oldval': expected})
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# Check that equality can also fail, if the inner set differs
wrong = {'b': 'c', 'd': ['e', 'f', set(['g', 'h', 'l', 'bad'])], 'i': set(['j', 'k', 'n', 'o'])}
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ConditionExpression='a = :oldval',
ExpressionAttributeValues={':val1': 4, ':oldval': wrong})
# Test for ConditionExpression with operator "<>" (non-equality),
def test_update_condition_ne(test_table_s):
p = random_string()
@@ -215,6 +269,54 @@ def test_update_condition_ne(test_table_s):
ExpressionAttributeValues={':newval': 3, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['c'] == 3
# Check that set inequality is checked correctly. This reproduces the same
# bug #5021 that we reproduced above in test_update_condition_eq_set(), just
# that here we check the inequality operator instead of equality.
# Reproduces issue #8513.
def test_update_condition_ne_set(test_table_s):
p = random_string()
# Because boto3 sorts the set values we give it, in order to generate a
# set with a different order, we need to build it incrementally.
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': set(['dog', 'chinchilla']), 'Action': 'PUT'}})
test_table_s.update_item(Key={'p': p},
UpdateExpression='ADD a :val1',
ExpressionAttributeValues={':val1': set(['cat', 'mouse'])})
# Sanity check - the attribute contains the set we think it does
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == set(['chinchilla', 'cat', 'dog', 'mouse'])
# Now check that condition expression check knows there is no inequality
# here.
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ConditionExpression='a <> :oldval',
ExpressionAttributeValues={':val1': 2, ':oldval': set(['chinchilla', 'cat', 'dog', 'mouse'])})
# As a sanity check, also check something which should be unequal:
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET b = :val1',
ConditionExpression='a <> :oldval',
ExpressionAttributeValues={':val1': 3, ':oldval': set(['chinchilla', 'cat', 'dog', 'horse'])})
assert 'b' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# In test_update_condition_ne() above we saw that a non-existent attribute is
# "not equal" to any value. Here we check what happens when two non-existent
# attributes are checked for non-equality. It turns out, they are also
# considered "not equal". In short, an unset attribute is always "not equal" to
# anything - even to another unset attribute.
# Reproduces issue #8511.
def test_update_condition_ne_two_unset(test_table_s):
p = random_string()
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q <> z',
ExpressionAttributeValues={':val1': 2})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == 2
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q <> z',
ExpressionAttributeValues={':val1': 3})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['a'] == 3
# Test for ConditionExpression with operator "<"
def test_update_condition_lt(test_table_s):
p = random_string()
@@ -316,6 +418,45 @@ def test_update_condition_lt(test_table_s):
ExpressionAttributeValues={':newval': 2, ':oldval': 1})
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item']['z'] == 4
# In test_update_condition_lt() above we saw that a non-existent attribute is
# not "<" any value. Here we check what happens when two non-existent
# attributes are compared with "<". It turns out that the result of such
# comparison is also false.
# The same is true for other order operators - any order comparison involving
# one unset attribute should be false - even if the second operand is an
# unset attribute as well. Note that the <> operator is different - it is
# always results in true if one of the operands is an unset attribute (see
# test_update_condition_ne_two_unset() above).
# This test is related to issue #8511 (although it passed even before fixing
# that issue).
def test_update_condition_comparison_two_unset(test_table_s):
p = random_string()
ops = ['<', '<=', '>', '>=']
for op in ops:
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q ' + op + ' z',
ExpressionAttributeValues={':val1': 2})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q between z and x',
ExpressionAttributeValues={':val1': 2})
test_table_s.update_item(Key={'p': p},
AttributeUpdates={'a': {'Value': 1, 'Action': 'PUT'}})
for op in ops:
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q ' + op + ' z',
ExpressionAttributeValues={':val1': 3})
with pytest.raises(ClientError, match='ConditionalCheckFailedException'):
test_table_s.update_item(Key={'p': p},
UpdateExpression='SET a = :val1',
ConditionExpression='q between z and x',
ExpressionAttributeValues={':val1': 2})
# Test for ConditionExpression with operator "<="
def test_update_condition_le(test_table_s):
p = random_string()

View File

@@ -578,11 +578,14 @@ SEASTAR_TEST_CASE(test_allocation_failure){
// Use us loads of memory so we can OOM at the appropriate place
try {
assert(fragmented_temporary_buffer::default_fragment_size < size);
for (;;) {
junk->emplace_back(new char[size]);
junk->emplace_back(new char[fragmented_temporary_buffer::default_fragment_size]);
}
} catch (std::bad_alloc&) {
}
auto last = junk->end();
junk->erase(--last);
return log.add_mutation(utils::UUID_gen::get_time_UUID(), size, db::commitlog::force_sync::no, [size](db::commitlog::output& dst) {
dst.fill(char(1), size);
}).then_wrapped([junk, size](future<db::rp_handle> f) {

View File

@@ -3267,39 +3267,30 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment> second_buffer,
size_t max_buffer_size) {
std::list<std::deque<mutation_fragment>> buffers,
position_in_partition_view first_buf_last_fragment_position,
size_t max_buffer_size,
bool detach_buffer = true) {
class factory {
schema_ptr _schema;
reader_permit _permit;
std::optional<std::deque<mutation_fragment>> _first_buffer;
std::optional<std::deque<mutation_fragment>> _second_buffer;
std::list<std::deque<mutation_fragment>> _buffers;
size_t _max_buffer_size;
private:
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
if (!o) {
return {};
}
return copy_fragments(*_schema, _permit, *o);
}
public:
factory(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
factory(schema_ptr schema, reader_permit permit, std::list<std::deque<mutation_fragment>> buffers, size_t max_buffer_size)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _first_buffer(std::move(first_buffer))
, _second_buffer(std::move(second_buffer))
, _buffers(std::move(buffers))
, _max_buffer_size(max_buffer_size) {
}
factory(const factory& o)
: _schema(o._schema)
, _permit(o._permit)
, _first_buffer(copy_buffer(o._first_buffer))
, _second_buffer(copy_buffer(o._second_buffer)) {
, _permit(o._permit) {
for (const auto& buf : o._buffers) {
_buffers.emplace_back(copy_fragments(*_schema, _permit, buf));
}
}
factory(factory&& o) = default;
@@ -3313,14 +3304,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
BOOST_REQUIRE(s == _schema);
if (_first_buffer) {
auto buf = *std::exchange(_first_buffer, {});
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
if (_second_buffer) {
auto buf = *std::exchange(_second_buffer, {});
if (!_buffers.empty()) {
auto buf = std::move(_buffers.front());
_buffers.pop_front();
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
@@ -3328,9 +3314,9 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
return make_empty_flat_reader(_schema, std::move(permit));
}
};
auto ms = mutation_source(factory(schema, permit, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
auto ms = mutation_source(factory(schema, permit, std::move(buffers), max_buffer_size));
auto [rd, handle] = make_manually_paused_evictable_reader(
auto rd = make_auto_paused_evictable_reader(
std::move(ms),
schema,
permit,
@@ -3346,18 +3332,42 @@ flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
const auto eq_cmp = position_in_partition::equal_compare(*schema);
BOOST_REQUIRE(rd.is_buffer_full());
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), first_buf_last_fragment_position));
BOOST_REQUIRE(!rd.is_end_of_stream());
rd.detach_buffer();
handle.pause();
if (detach_buffer) {
rd.detach_buffer();
}
while(permit.semaphore().try_evict_one_inactive_read());
return std::move(rd);
}
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment> last_buffer,
size_t max_buffer_size,
bool detach_buffer = true) {
std::list<std::deque<mutation_fragment>> list;
list.emplace_back(std::move(first_buffer));
list.emplace_back(std::move(last_buffer));
return create_evictable_reader_and_evict_after_first_buffer(
std::move(schema),
std::move(permit),
prange,
slice,
std::move(list),
last_fragment_position,
max_buffer_size,
detach_buffer);
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
@@ -3659,7 +3669,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∈ pkrange",
partition_error_prefix,
"",
s.schema(),
permit,
prange,
@@ -3747,3 +3757,208 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
make_second_buffer(pkeys[3]),
max_buffer_size);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
reader_concurrency_semaphore semaphore(1, 0, get_name());
simple_schema s;
auto permit = semaphore.make_permit(s.schema().get(), get_name());
auto pkeys = s.make_pkeys(2);
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
return pk1.less_compare(*s.schema(), pk2);
});
const auto& pkey1 = pkeys[0];
const auto& pkey2 = pkeys[1];
const int second_buffer_ck = 10;
struct buffer {
simple_schema& s;
reader_permit permit;
std::deque<mutation_fragment> frags;
std::vector<mutation> muts;
size_t size = 0;
std::optional<position_in_partition_view> last_pos;
buffer(simple_schema& s_, reader_permit permit_, dht::decorated_key key)
: s(s_), permit(std::move(permit_)) {
add_partition(key);
}
size_t add_partition(dht::decorated_key key) {
size += frags.emplace_back(*s.schema(), permit, partition_start{key, {}}).memory_usage();
muts.emplace_back(s.schema(), key);
return size;
}
size_t add_mutation_fragment(mutation_fragment&& mf, bool only_to_frags = false) {
if (!only_to_frags) {
muts.back().apply(mf);
}
size += frags.emplace_back(*s.schema(), permit, std::move(mf)).memory_usage();
return size;
}
size_t add_static_row(std::optional<mutation_fragment> sr = {}) {
auto srow = sr ? std::move(*sr) : s.make_static_row("s");
return add_mutation_fragment(std::move(srow));
}
size_t add_clustering_row(int i, bool only_to_frags = false) {
return add_mutation_fragment(mutation_fragment(*s.schema(), permit, s.make_row(s.make_ckey(i), "v")), only_to_frags);
}
size_t add_clustering_rows(int start, int end) {
for (int i = start; i < end; ++i) {
add_clustering_row(i);
}
return size;
}
size_t add_partition_end() {
size += frags.emplace_back(*s.schema(), permit, partition_end{}).memory_usage();
return size;
}
void save_position() { last_pos = frags.back().position(); }
void find_position(size_t buf_size) {
size_t s = 0;
for (const auto& frag : frags) {
s += frag.memory_usage();
if (s >= buf_size) {
last_pos = frag.position();
break;
}
}
BOOST_REQUIRE(last_pos);
}
};
auto make_reader = [&] (const buffer& first_buffer, const buffer& second_buffer, const buffer* const third_buffer, size_t max_buffer_size) {
std::list<std::deque<mutation_fragment>> buffers;
buffers.emplace_back(copy_fragments(*s.schema(), permit, first_buffer.frags));
buffers.emplace_back(copy_fragments(*s.schema(), permit, second_buffer.frags));
if (third_buffer) {
buffers.emplace_back(copy_fragments(*s.schema(), permit, third_buffer->frags));
}
return create_evictable_reader_and_evict_after_first_buffer(
s.schema(),
permit,
query::full_partition_range,
s.schema()->full_slice(),
std::move(buffers),
*first_buffer.last_pos,
max_buffer_size,
false);
};
testlog.info("Same partition, with static row");
{
buffer first_buffer(s, permit, pkey1);
first_buffer.add_static_row();
auto srow = mutation_fragment(*s.schema(), permit, first_buffer.frags.back());
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck);
buffer second_buffer(s, permit, pkey1);
second_buffer.add_static_row(std::move(srow));
second_buffer.add_clustering_row(second_buffer_ck);
second_buffer.add_clustering_row(second_buffer_ck + 1);
second_buffer.add_partition_end();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Same partition, no static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck);
buffer second_buffer(s, permit, pkey1);
second_buffer.add_clustering_row(second_buffer_ck);
second_buffer.add_clustering_row(second_buffer_ck + 1);
second_buffer.add_partition_end();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Same partition as expected, no static row, next partition has static row (#8923)");
{
buffer second_buffer(s, permit, pkey1);
second_buffer.add_clustering_rows(second_buffer_ck, second_buffer_ck + second_buffer_ck / 2);
// We want to end the buffer on the partition-start below, but since a
// partition start will be dropped from it, we have to use the size
// without it.
const auto buf_size = second_buffer.add_partition_end();
second_buffer.add_partition(pkey2);
second_buffer.add_static_row();
auto srow = mutation_fragment(*s.schema(), permit, second_buffer.frags.back());
second_buffer.add_clustering_rows(0, 2);
buffer first_buffer(s, permit, pkey1);
for (int i = 0; first_buffer.add_clustering_row(i) < buf_size; ++i);
first_buffer.save_position();
first_buffer.add_mutation_fragment(mutation_fragment(*s.schema(), permit, second_buffer.frags[1]));
buffer third_buffer(s, permit, pkey2);
third_buffer.add_static_row(std::move(srow));
third_buffer.add_clustering_rows(0, 2);
third_buffer.add_partition_end();
first_buffer.find_position(buf_size);
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces(second_buffer.muts[1] + third_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Next partition, with no static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
buffer second_buffer(s, permit, pkey2);
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
second_buffer.add_partition_end();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0])
.produces(second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Next partition, with static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
buffer second_buffer(s, permit, pkey2);
second_buffer.add_static_row();
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
second_buffer.add_partition_end();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0])
.produces(second_buffer.muts[0])
.produces_end_of_stream();
}
}

View File

@@ -1805,12 +1805,16 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
BOOST_FAIL(format("Partitions don't match, got: {}\n...and: {}", mutation_partition::printer(s, mp1), mutation_partition::printer(s, mp2)));
}
};
for_each_mutation_pair([&] (auto&& m1, auto&& m2, are_equal eq) {
const auto now = gc_clock::now();
can_gc_fn never_gc = [] (tombstone) { return false; };
for_each_mutation_pair([&] (auto m1, auto m2, are_equal eq) {
mutation_application_stats app_stats;
auto s = m1.schema();
if (s != m2.schema()) {
return;
}
m1.partition().compact_for_compaction(*s, never_gc, now);
m2.partition().compact_for_compaction(*s, never_gc, now);
auto m12 = m1;
m12.apply(m2);
auto m12_with_diff = m1;

View File

@@ -6685,3 +6685,135 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) {
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
return test_env::do_with_async([] (test_env& env) {
BOOST_REQUIRE(smp::count == 1);
auto make_schema = [] (auto idx) {
auto builder = schema_builder("tests", std::to_string(idx))
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
std::map <sstring, sstring> opts = {
{time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS"},
{time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1"},
{time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"},
};
builder.set_compaction_strategy_options(std::move(opts));
builder.set_gc_grace_seconds(0);
return builder.build();
};
auto cm = make_lw_shared<compaction_manager>();
cm->enable();
auto stop_cm = defer([&cm] {
cm->stop().get();
});
auto tmp = tmpdir();
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
auto next_timestamp = [] (auto step) {
using namespace std::chrono;
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
};
auto make_expiring_cell = [&] (schema_ptr s, std::chrono::hours step) {
static thread_local int32_t value = 1;
auto key_str = tokens[0].first;
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
mutation m(s, key);
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
return m;
};
auto make_table_with_single_fully_expired_sstable = [&] (auto idx) {
auto s = make_schema(idx);
column_family::config cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string() + "/" + std::to_string(idx);
touch_directory(cfg.datadir).get();
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
auto sst_gen = [&env, s, dir = cfg.datadir, gen = make_lw_shared<unsigned>(1)] () mutable {
return env.make_sstable(s, dir, (*gen)++, sstables::sstable::version_types::md, big);
};
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
cf->start();
cf->mark_ready_for_writes();
auto muts = { make_expiring_cell(s, std::chrono::hours(1)) };
auto sst = make_sstable_containing(sst_gen, muts);
column_family_test(cf).add_sstable(sst);
return cf;
};
std::vector<lw_shared_ptr<column_family>> tables;
auto stop_tables = defer([&tables] {
for (auto& t : tables) {
t->stop().get();
}
});
for (auto i = 0; i < 100; i++) {
tables.push_back(make_table_with_single_fully_expired_sstable(i));
}
// Make sure everything is expired
forward_jump_clocks(std::chrono::hours(100));
for (auto& t : tables) {
BOOST_REQUIRE(t->sstables_count() == 1);
t->trigger_compaction();
}
BOOST_REQUIRE(cm->get_stats().pending_tasks >= 1 || cm->get_stats().active_tasks >= 1);
size_t max_ongoing_compaction = 0;
// wait for submitted jobs to finish.
auto end = [cm, &tables] {
return cm->get_stats().pending_tasks == 0 && cm->get_stats().active_tasks == 0
&& boost::algorithm::all_of(tables, [] (auto& t) { return t->sstables_count() == 0; });
};
while (!end()) {
if (!cm->get_stats().pending_tasks && !cm->get_stats().active_tasks) {
for (auto& t : tables) {
if (t->sstables_count()) {
t->trigger_compaction();
}
}
}
max_ongoing_compaction = std::max(cm->get_stats().active_tasks, max_ongoing_compaction);
later().get();
}
BOOST_REQUIRE(cm->get_stats().errors == 0);
BOOST_REQUIRE(max_ongoing_compaction == 1);
});
}
SEASTAR_TEST_CASE(stcs_reshape_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto s = ss.schema();
std::vector<shared_sstable> sstables;
sstables.reserve(s->max_compaction_threshold());
for (auto gen = 1; gen <= s->max_compaction_threshold(); gen++) {
auto sst = env.make_sstable(s, "", gen, la, big);
sstables::test(sst).set_data_file_size(1);
sstables.push_back(std::move(sst));
}
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered,
s->compaction_strategy_options());
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::strict).sstables.size());
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, default_priority_class(), reshape_mode::relaxed).sstables.size());
});
}

View File

@@ -0,0 +1,186 @@
# Copyright 2020 ScyllaDB
#
# This file is part of Scylla.
#
# Scylla is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scylla is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Scylla. If not, see <http://www.gnu.org/licenses/>.
# Tests for secondary indexes
import time
import pytest
from cassandra.protocol import SyntaxException, AlreadyExists, InvalidRequest, ConfigurationException, ReadFailure
from util import new_test_table, unique_name
# A reproducer for issue #7443: Normally, when the entire table is SELECTed,
# the partitions are returned sorted by the partitions' token. When there
# is filtering, this order is not expected to change. Furthermore, when this
# filtering happens to use a secondary index, again the order is not expected
# to change.
def test_partition_order_with_si(cql, test_keyspace):
schema = 'pk int, x int, PRIMARY KEY ((pk))'
with new_test_table(cql, test_keyspace, schema) as table:
# Insert 20 partitions, all of them with x=1 so that filtering by x=1
# will yield the same 20 partitions:
N = 20
stmt = cql.prepare('INSERT INTO '+table+' (pk, x) VALUES (?, ?)')
for i in range(N):
cql.execute(stmt, [i, 1])
# SELECT all the rows, and verify they are returned in increasing
# partition token order (note that the token is a *signed* number):
tokens = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table)]
assert len(tokens) == N
assert sorted(tokens) == tokens
# Now select all the partitions with filtering of x=1. Since all
# rows have x=1, this shouldn't change the list of matching rows, and
# also shouldn't check their order:
tokens1 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1 ALLOW FILTERING')]
assert tokens1 == tokens
# Now add an index on x, which allows implementing the "x=1"
# restriction differently. With the index, "ALLOW FILTERING" is
# no longer necessary. But the order of the results should
# still not change. Issue #7443 is about the order changing here.
cql.execute('CREATE INDEX ON '+table+'(x)')
# "CREATE INDEX" does not wait until the index is actually available
# for use. Reads immediately after the CREATE INDEX may fail or return
# partial results. So let's retry until reads resume working:
for i in range(100):
try:
tokens2 = [row.system_token_pk for row in cql.execute('SELECT token(pk) FROM '+table+' WHERE x=1')]
if len(tokens2) == N:
break
except ReadFailure:
pass
time.sleep(0.1)
assert tokens2 == tokens
# Test which ensures that indexes for a query are picked by the order in which
# they appear in restrictions. That way, users can deterministically pick
# which indexes are used for which queries.
# Note that the order of picking indexing is not set in stone and may be
# subject to change - in which case this test case should be amended as well.
# The order tested in this case was decided as a good first step in issue
# #7969, but it's possible that it will eventually be implemented another
# way, e.g. dynamically based on estimated query selectivity statistics.
# Ref: #7969
@pytest.mark.xfail(reason="The order of picking indexes is currently arbitrary. Issue #7969")
def test_order_of_indexes(scylla_only, cql, test_keyspace):
schema = 'p int primary key, v1 int, v2 int, v3 int'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"CREATE INDEX my_v3_idx ON {table}(v3)")
cql.execute(f"CREATE INDEX my_v1_idx ON {table}(v1)")
cql.execute(f"CREATE INDEX my_v2_idx ON {table}((p),v2)")
# All queries below should use the first index they find in the list
# of restrictions. Tracing information will be consulted to ensure
# it's true. Currently some of the cases below succeed, because the
# order is not well defined (and may, for instance, change upon
# server restart), but some of them fail. Once a proper ordering
# is implemented, all cases below should succeed.
def index_used(query, index_name):
assert any([index_name in event.description for event in cql.execute(query, trace=True).get_query_trace().events])
index_used(f"SELECT * FROM {table} WHERE v3 = 1", "my_v3_idx")
index_used(f"SELECT * FROM {table} WHERE v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 allow filtering", "my_v1_idx")
index_used(f"SELECT * FROM {table} WHERE p = 1 and v3 = 1 and v1 = 2 allow filtering", "my_v3_idx")
# Local indexes are still skipped if they cannot be used
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v1 = 2 allow filtering", "my_v1_idx")
index_used(f"SELECT * FROM {table} WHERE v2 = 1 and v3 = 2 and v1 = 3 allow filtering", "my_v3_idx")
index_used(f"SELECT * FROM {table} WHERE v1 = 1 and v2 = 2 and v3 = 3 allow filtering", "my_v1_idx")
# Local indexes are still preferred over global ones, if they can be used
index_used(f"SELECT * FROM {table} WHERE p = 1 and v1 = 1 and v3 = 2 and v2 = 2 allow filtering", "my_v2_idx")
index_used(f"SELECT * FROM {table} WHERE p = 1 and v2 = 1 and v1 = 2 allow filtering", "my_v2_idx")
# Indexes can be created without an explicit name, in which case a default name is chosen.
# However, due to #8620 it was possible to break the index creation mechanism by creating
# a properly named regular table, which conflicts with the generated index name.
def test_create_unnamed_index_when_its_name_is_taken(cql, test_keyspace):
schema = 'p int primary key, v int'
with new_test_table(cql, test_keyspace, schema) as table:
try:
cql.execute(f"CREATE TABLE {table}_v_idx_index (i_do_not_exist_in_the_base_table int primary key)")
# Creating an index should succeed, even though its default name is taken
# by the table above
cql.execute(f"CREATE INDEX ON {table}(v)")
finally:
cql.execute(f"DROP TABLE {table}_v_idx_index")
# Indexed created with an explicit name cause a materialized view to be created,
# and this view has a specific name - <index-name>_index. If there happens to be
# a regular table (or another view) named just like that, index creation should fail.
def test_create_named_index_when_its_name_is_taken(scylla_only, cql, test_keyspace):
schema = 'p int primary key, v int'
with new_test_table(cql, test_keyspace, schema) as table:
index_name = unique_name()
try:
cql.execute(f"CREATE TABLE {test_keyspace}.{index_name}_index (i_do_not_exist_in_the_base_table int primary key)")
# Creating an index should fail, because it's impossible to create
# its underlying materialized view, because its name is taken by a regular table
with pytest.raises(InvalidRequest, match="already exists"):
cql.execute(f"CREATE INDEX {index_name} ON {table}(v)")
finally:
cql.execute(f"DROP TABLE {test_keyspace}.{index_name}_index")
# Tests for CREATE INDEX IF NOT EXISTS
# Reproduces issue #8717.
def test_create_index_if_not_exists(cql, test_keyspace):
with new_test_table(cql, test_keyspace, 'p int primary key, v int') as table:
cql.execute(f"CREATE INDEX ON {table}(v)")
# Can't create the same index again without "IF NOT EXISTS", but can
# do it with "IF NOT EXISTS":
with pytest.raises(InvalidRequest, match="duplicate"):
cql.execute(f"CREATE INDEX ON {table}(v)")
cql.execute(f"CREATE INDEX IF NOT EXISTS ON {table}(v)")
cql.execute(f"DROP INDEX {test_keyspace}.{table.split('.')[1]}_v_idx")
# Now test the same thing for named indexes. This is what broke in #8717:
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
with pytest.raises(InvalidRequest, match="already exists"):
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
cql.execute(f"CREATE INDEX IF NOT EXISTS xyz ON {table}(v)")
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
# Exactly the same with non-lower case name.
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
with pytest.raises(InvalidRequest, match="already exists"):
cql.execute(f'CREATE INDEX "CamelCase" ON {table}(v)')
cql.execute(f'CREATE INDEX IF NOT EXISTS "CamelCase" ON {table}(v)')
cql.execute(f'DROP INDEX {test_keyspace}."CamelCase"')
# Trying to create an index for an attribute that's already indexed,
# but with a different name. The "IF NOT EXISTS" appears to succeed
# in this case, but does not actually create the new index name -
# only the old one remains.
cql.execute(f"CREATE INDEX xyz ON {table}(v)")
with pytest.raises(InvalidRequest, match="duplicate"):
cql.execute(f"CREATE INDEX abc ON {table}(v)")
cql.execute(f"CREATE INDEX IF NOT EXISTS abc ON {table}(v)")
with pytest.raises(InvalidRequest):
cql.execute(f"DROP INDEX {test_keyspace}.abc")
cql.execute(f"DROP INDEX {test_keyspace}.xyz")
# Test that the paging state works properly for indexes on tables
# with descending clustering order. There was a problem with indexes
# created on clustering keys with DESC clustering order - they are represented
# as "reverse" types internally and Scylla assertions failed that the base type
# is different from the underlying view type, even though, from the perspective
# of deserialization, they're equal. Issue #8666
def test_paging_with_desc_clustering_order(cql, test_keyspace):
schema = 'p int, c int, primary key (p,c)'
extra = 'with clustering order by (c desc)'
with new_test_table(cql, test_keyspace, schema, extra) as table:
cql.execute(f"CREATE INDEX ON {table}(c)")
for i in range(3):
cql.execute(f"INSERT INTO {table}(p,c) VALUES ({i}, 42)")
stmt = SimpleStatement(f"SELECT * FROM {table} WHERE c = 42", fetch_size=1)
assert len([row for row in cql.execute(stmt)]) == 3

View File

@@ -173,6 +173,10 @@ public:
return res;
}
long use_count() const noexcept {
return _e ? _e.use_count() : 0;
}
friend class loading_shared_values;
};