Compare commits

...

36 Commits

Author SHA1 Message Date
Hagit Segev
89e79023ae release: prepare for 4.0.rc2 2020-04-21 16:26:09 +03:00
Nadav Har'El
bc67da1a21 alternator-test: comment out an error-path test that doesn't work on newer boto3
Unfortunately, the boto3 library doen't allow us to check some of the
input error cases because it unnecessarily tests its input instead of
just passing it to Alternator and allowing Alternator to report the error.
In this patch we comment out a test case which used to work fine - i.e.,
the error was reported by Alternator - until recent changes to boto3
made it catch the problem without passing it to Alternator :-(

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200330190521.19526-2-nyh@scylladb.com>
(cherry picked from commit fe6cecb26d)
2020-04-21 07:19:54 +02:00
Botond Dénes
0c7643f1fe schema: schema(): use std::stable_sort() to sort key columns
When multiple key columns (clustering or partition) are passed to
the schema constructor, all having the same column id, the expectation
is that these columns will retain the order in which they were passed to
`schema_builder::with_column()`. Currently however this is not
guaranteed as the schema constructor sort key columns by column id with
`std::sort()`, which doesn't guarantee that equally comparing elements
retain their order. This can be an issue for indexes, the schemas of
which are built independently on each node. If there is any room for
variance between for the key column order, this can result in different
nodes having incompatible schemas for the same index.
The fix is to use `std::stable_sort()` which guarantees that the order
of equally comparing elements won't change.

This is a suspected cause of #5856, although we don't have hard proof.

Fixes: #5856
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
[avi: upgraded "Refs" to "Fixes", since we saw that std::sort() becomes
      unstable at 17 elements, and the failing schema had a
      clustering key with 23 elements]
Message-Id: <20200417121848.1456817-1-bdenes@scylladb.com>
(cherry picked from commit a4aa753f0f)
2020-04-19 18:18:45 +03:00
Rafael Ávila de Espíndola
c563234f40 dht: Use get_random_number<uint64_t> instead of int64_t in token::get_random_token
I bisect the opposite change in
9c202b52da as the cause of issue 6193. I
don't know why. Maybe get_random_number<signed_type> is buggy?

In any case, reverting to uint64_t solves the issue.

Fixes #6193

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200418001611.440733-1-espindola@scylladb.com>
(cherry picked from commit f3fd466156)
2020-04-19 16:20:40 +03:00
Nadav Har'El
77b7a48a02 alternator: remove mentions of experimental status of LWT
Since commit 9948f548a5, the LWT no longer
requires an "experimental" flag, so Alternator documents and scripts
which referred to the need for enabling experimental LWT, are fixed here
to no longer do that.

Fixes #6118.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200405143237.12693-1-nyh@scylladb.com>
(cherry picked from commit d9d50362af)
2020-04-19 15:10:32 +03:00
Piotr Sarna
b2b1bfb159 alternator: fix failure on incorrect table name with no indexes
If a table name is not found, it may still exist as a local index,
but the check tried to fetch a local index name regardless if it was
present in the request, which was a nullptr dereference bug.

Fixes #6161
Tests: alternator-test(local, remote)
Message-Id: <428c21e94f6c9e450b1766943677613bd46cbc68.1586347130.git.sarna@scylladb.com>

(cherry picked from commit 123edfc10c)
2020-04-19 15:07:25 +03:00
Nadav Har'El
d72cbe37aa docs/alternator/alternator.md: fix typos
Fix a couple of typos in the Alternator documentation.
Fixes scylladb/scylla-doc-issues#280
Fixes scylladb/scylla-doc-issues#281

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200419091900.23030-1-nyh@scylladb.com>
(cherry picked from commit 7e7c688946)
2020-04-19 15:03:22 +03:00
Nadav Har'El
9f7b560771 docs, alternator: alternator.md cleanup
Clean up the alternator.md document, by:

* Updating out-of-date information that outstayed its welcome.
* When Scylla does have a feature but it's just not supported via the
  DynamoDB API (e.g., CDC and on-demand backups) mention that.
* Remove mention of Alternator being experimental and users should not
  store important data on it :-)
* Miscellaneous cleanups.

Fixes #6179.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200412094641.27186-1-nyh@scylladb.com>
(cherry picked from commit 606ae0744c)
2020-04-19 15:00:53 +03:00
Nadav Har'El
06af9c028c alternator-test: make Alternator tests runnable from test.py
To make the tests in alternator-test runnable by test.py, we need to
move the directory alternator-test/ to test/alternator, because test.py
only looks for tests in subdirectories of test/. Then, we need to create
a test/alternator/suite.yaml saying that this test directory is of type
"Run", i.e., it has a single run script "run" which runs all its tests.

The "run" script had to be slightly modified to be aware of its new
location relative to the source directory.

To run the Alternator tests from test.py, do:

	./test.py --mode dev alternator

Note that in this version, the "--mode" has no effect - test/alternator/run
always runs the latest compiled Scylla, regardless of the chosen mode.

The Alternator tests can still be run manually and individually against
a running Scylla or DynamoDB as before - just go to the test/alternator
directory (instead of alternator-test previously) and run "pytest" with
the desired parameters.

Fixes #6046

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 4e2bf28b84)
2020-04-19 11:19:15 +03:00
Nadav Har'El
c74ab3ae80 test.py: add xunit XML output file for "Run" tests
Assumes that "Run" tests can take the --junit-xml=<path> option, and
pass it to ask the test to generate an XML summary of the run to a file
like testlog/dev/xml/run.1.xunit.xml.

This option is honored by the Alternator tests.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 0cccb5a630)
2020-04-19 11:19:06 +03:00
Nadav Har'El
32cd3a070a test.py: add new test type "Run"
This patch adds a new test type, "Run". A test subdirectory of type "Run"
has a script called "run" which is expected to run all the tests in that
directory.

This will be used, in the next patch, by the Alternator functional tests.
These tests indeed have a "run" script, which runs Scylla and then runs
*all* of Alternator's tests, finishing fairly quickly (in less than a
minute). All of that will become one test.py test.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 0ae3136900)
2020-04-19 11:18:01 +03:00
Nadav Har'El
bb1554f09e test.py: flag for aborting tests with SIGTERM, not SIGKILL
Today, if test.py is interrupted with SIGINT or SIGTERM, the ongoing test
is killed with SIGKILL. Some types of tests - such as Alternator's test -
may depend on being killed politely (e.g., with SIGTERM) to clean up
files.

We cannot yet change the signal to SIGTERM for all tests, because Seastar
tests often don't deal well with signals, but we can at least add a flag
that certain test types - that know they can be killed gently - will use.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 36e44972f1)
2020-04-19 11:17:51 +03:00
Nadav Har'El
2037d7550e alternator-test: change "run" script to pick random IP address
Before this patch, the Alternator tests "run" script ran Scylla on a fixed
listening address, 127.0.0.1. There is a problem that there might be other
concurrent runs of Scylla using the same IP address - e.g., CCM (used by
dtest) uses exactly this IP address for its first node.

Luckily, Linux's loopback device actually allows us to pick any of over
a million addresses in 127.0.0.0/8 to listen on - we don't need to use
127.0.0.1 specifically. So the code in this patch picks an address in
127.1.*.*, so it cannot collide with CCM (which uses 127.0.0.* for up to
255 nodes). Moreover, the last two bytes of the listen address are picked
based on the process ID of the run script; This allows multiple copies
of this script to run concurrently - in case anybody wishes to do that.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 24fcc0c0ff)
2020-04-19 11:17:39 +03:00
Nadav Har'El
c320c3f6da install-dependencies.sh: add dependencies for Alternator tests
To run Alternator tests, only two additional dependencies need to be added to
install-dependencies.sh: pytest, and python3-boto3. We also need
python3-cassandra-driver, but this dependency is already listed.

This patch only updates the dependencies for Fedora, which is what we need
for dbuild and our Jenkins setups.

Tested by building a new dbuild docker image and verifying that the
Alternator tests pass.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
[avi: update toolchain image; note this upgrades gcc to 9.3.1]
Message-Id: <20200330181128.18582-1-nyh@scylladb.com>
(cherry picked from commit 8627ae42a6)
2020-04-19 11:17:07 +03:00
Nadav Har'El
0ed70944aa alternator-test: run: use the Python driver, not cqlsh
The "run" script for the Alternator tests needs to set a system table for
authentication credentials, so we can test this feature.
So far we did this with cqlsh, but cqlsh isn't always installed on build
machines. But install-dependencies.sh already installs the Cassandra driver
for Python, so it makes more sense to use that, so this patch switches to
use it.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200331131522.28056-1-nyh@scylladb.com>
(cherry picked from commit 55f02c00f2)
2020-04-19 11:16:54 +03:00
Nadav Har'El
89f860d409 alternator-test: add "--url" option to choose Alternator's URL
The "--aws" and "--local" test options chooses between two useful default
URLs - Amazon's, or http://localhost:8000 for a local installation.
However, sometimes one wants to run Scylla on a different IP address or
port, so in this patch we add a "--url" option to choose a specific URL to
connect to. For example, "--url http://127.1.2.3:1234".

We will later use this option in the alternator-test/run script, to pick
a random IP address on which to run Scylla, and then run the test against
this address.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
(cherry picked from commit 1aec4baa51)
2020-04-19 11:13:13 +03:00
Piotr Sarna
0819d221f4 test: add cases for empty paging state for index queries
In order to check regressions related to #6136 and similar issues,
test cases for handling paging state with empty partition/clustering
key pair are added.

(cherry picked from commit 88913e9d44)
2020-04-19 10:35:26 +03:00
Piotr Sarna
53f47d4e67 cql3: fix generating base keys from empty index paging state
An empty partition/clustering key pair is a valid state of the
query paging state. Unfortunately, recent attempts at debugging
a flaky test resulted in introducing an assertion which breaks
when trying to generate a key from such a pair.
In order to keep the assertion (since it still makes sense in its
scope), but at the same time translate empty keys properly,
empty keys are now explicitly processed at the beginning of the
function.
This behaviour was 100% reproducible in a secondary index dtest below.

Fixes #6134
Refs #5856
Tests: unit(dev),
       dtest(TestSecondaryIndexes.test_truncate_base)

(cherry picked from commit 45751ee24f)
2020-04-19 10:35:09 +03:00
Kamil Braun
21ad12669a sstables: freeze types nested in collection types in legacy sstables
Some legacy `mc` SSTables (created in Scylla 3.0) may contain incorrect
serialization headers, which don't wrap frozen UDTs nested inside collections
with the FrozenType<...> tag. When reading such SSTable,
Scylla would detect a mismatch between the schema saved in schema
tables (which correctly wraps UDTs in the FrozenType<...> tag) and the schema
from the serialization header (which doesn't have these tags).

SSTables created in Scylla versions 3.1 and above, in particular in
Scylla versions that contain this commit, create correct serialization
headers (which wrap UDTs in the FrozenType<...> tag).

This commit does two things:
1. for all SSTables created after this commit, include a new feature
   flag, CorrectUDTsInCollections, presence of which implies that frozen
   UDTs inside collections have the FrozenType<...> tag.
2. when reading a Scylla SSTable without the feature flag, we assume that UDTs
   nested inside collections are always frozen, even if they don't have
   the tag. This assumption is safe to be made, because at the time of
   this commit, Scylla does not allow non-frozen (multi-cell) types inside
   collections or UDTs, and because of point 1 above.

There is one edge case not covered: if we don't know whether the SSTable
comes from Scylla or from C*. In that case we won't make the assumption
described in 2. Therefore, if we get a mismatch between schema and
serialization headers of a table which we couldn't confirm to come from
Scylla, we will still reject the table. If any user encounters such an
issue (unlikely), we will have to use another solution, e.g. using a
separate tool to rewrite the SSTable.

Fixes #6130.

(cherry picked from commit 3d811e2f95)
2020-04-17 09:11:53 +03:00
Kamil Braun
c812359383 sstables: move definition of column_translation::state::build to a .cc file
Ref #6130
2020-04-17 09:11:38 +03:00
Piotr Sarna
1bd79705fb alternator: use partition tombstone if there's no clustering key
As @tgrabiec helpfully pointed out, creating a row tombstone
for a table which does not have a clustering key in its schema
creates something that looks like an open-ended range tombstone.
That's problematic for KA/LA sstable formats, which are incapable
of writing such tombstones, so a workaround is provided
in order to allow using KA/LA in alternator.

Fixes #6035

(cherry picked from commit 0a2d7addc0)
2020-04-16 12:01:51 +03:00
Avi Kivity
7e2ef386cc Update seastar submodule
* seastar 92c488706...76260705e (1):
  > rpc: always shutdown socket when stopping a client

Fixes #6060.
2020-04-16 10:56:31 +03:00
Avi Kivity
51bad7e72c Point seastar submodule at scylla-seastar.git branch-4.0
This allows us to backport seastar patches to Scylla 4.0.
2020-04-16 10:10:40 +03:00
Asias He
0379d0c031 repair: Send reason for node operations
Since 956b092012 (Merge "Repair based node
operation" from Asias), repair is used by other node operations like
bootstrap, decommission and so on.

Send the reason for the repair, so that we can handle the materialized
view update correctly according to the reason of the operation. We want
to trigger the view update only if the repair is used by repair
operation. Otherwise, the view table will be handled twice, 1) when the
view table is synced using repair 2) when the base table is synced using
repair and view table update is triggered.

Fixes #5930
Fixes #5998

(cherry picked from commit 066934f7c4)
2020-04-16 10:06:17 +03:00
Gleb Natapov
a8ef820f27 lwt: fix cas_now_pruning counter
Due to c&p error cas_now_pruning counter is increased instead of
decreased after an operation completes. Fix it.

Fixes #6116

Message-Id: <20200401142859.GA16953@scylladb.com>
(cherry picked from commit 4d9d226596)
2020-04-06 13:06:11 +02:00
Yaron Kaikov
9908f009a4 release: prepare for 4.0.rc1 2020-04-06 10:22:45 +03:00
Pavel Emelyanov
48d8a075b4 main: Do not destroy token_metadata
The storage_proxy instances hold references to token_metadata ones and
leave unwaited futures continuing to its query_partition_key_range_concurrent
method.

The latter is called from do_query so it's not that easy to find
out who is leaking. Keep the tokens not freed for a while.

Fixes: #6093
Test: manual start-stop

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Message-Id: <20200402183538.9674-1-xemul@scylladb.com>
(cherry picked from commit 86296ba557)
2020-04-05 13:47:57 +03:00
Konstantin Osipov
e3ddd607bc lwt: remove Paxos from experimental list
Always enable lightweight transactions. Remove the check for the command
line switch from the feature service, assuming LWT is always enabled.

Remove the check for LWT from Alternator.

Note that in order for the cluster to work with LWT, all nodes need
to support it.

Rename LWT to UNUSED in db/config.hh, to keep accepting lwt keyword in
--experimental-features command line option, but do nothing with it.

Changes in v2:
* remove enable_lwt feature flag, it's always there

Closes #6102

test: unit (dev, debug)
Message-Id: <20200401071149.41921-1-kostja@scylladb.com>
(cherry picked from commit 9948f548a5)
2020-04-05 08:56:42 +03:00
Piotr Jastrzebski
511773d466 token: relax the condition of the sanity check
When we switched token representation to int64_t
we added some sanity checks that byte representation
is always 8 bytes long.

It turns out that for token_kind::before_all_keys and
token_kind::after_all_keys bytes can sometimes be empty
because for those tokens they are just ignored. The check
introduced with the change is too strict and sometimes
throws the exception for tokens before/after all keys
created with empty bytes.

This patch relaxes the condition of the check and always
uses 0 as value of _data for special before/after all keys
tokens.

Fixes #6131

Tests: unit(dev, sct)

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
(cherry picked from commit a15b32c9d9)
2020-04-04 20:19:10 +03:00
Gleb Natapov
121cd383fa lwt: remove entries from system.paxos table after successful learn stage
The learning stage of PAXOS protocol leaves behind an entry in
system.paxos table with the last learned value (which can be large). In
case not all participants learned it successfully next round on the same
key may complete the learning using this info. But if all nodes learned
the value the entry does not serve useful purpose any longer.

The patch adds another round, "prune", which is executed in background
(limited to 1000 simultaneous instances) and removes the entry in
case all nodes replied successfully to the "learn" round.  It uses the
ballot's timestamp to do the deletion, so not to interfere with the
next round. Since deletion happens very close to previous writes it will
likely happen in memtable and will never reach sstable, so that reduces
memtable flush and compaction overhead.

Fixes #5779

Message-Id: <20200330154853.GA31074@scylladb.com>
(cherry picked from commit 8a408ac5a8)
2020-04-02 15:36:52 +02:00
Gleb Natapov
90639f48e5 lwt: rename "in_progress_ballot" cell to "promise" in system.paxos table
The value that is stored in "in_progress_ballot" cell is the value of
promised ballot, so call the cell accordingly to avoid confusion
especially as we have a notion of "in progress" proposal in the code
which is not the same as in_progress_ballot here.

We can still do it without care about backwards compatibility since LWT
is still marked as experimental.

Fixes #6087.

Message-Id: <20200326095758.GA10219@scylladb.com>
(cherry picked from commit b3db6f5b04)
2020-04-02 15:36:49 +02:00
Calle Wilund
8d029a04aa db::commitlog: Don't write trailing zero block unless needed
Fixes #5899

When terminating (closing) a segment, we write a trailing block
of zero so reader can have an empty region after last used chunk
as end marker. This is due to using recycled, pre-allocated
segments with potentially non-zero data extending over the point
where we are ending the segment (i.e. we are not fully filling
the segment due to a huge mutation or similar).

However, if we reach end of segment writing the final block
(typically many small mutations), the file will end naturally
after the data written, and any trailing zero block would in fact
just extend the file further. While this will only happen once per
segment recycled (independent on how many times it is recycled),
it is still both slightly breaking the disk usage contract and
also potentially causing some disk stalls due to metadata changes
(though of course very infrequent).

We should only write trailing zero if we are below the max_size
file size when terminating

Adds a small size check to commitlog test to verify size bounds.
(Which breaks without the patch)

v2:
- Fix test to take into account that files might be deleted
  behind our backs.
v3:
- Fix test better, by doing verification _before_ segments are
  queued for delete.

Message-Id: <20200226121601.15347-2-calle@scylladb.com>
Message-Id: <20200324100235.23982-1-calle@scylladb.com>
(cherry picked from commit 9fee712d62)
2020-03-31 14:22:20 +03:00
Asias He
67995db899 gossip: Add an option to force gossip generation
Consider 3 nodes in the cluster, n1, n2, n3 with gossip generation
number g1, g2, g3.

n1, n2, n3 running scylla version with commit
0a52ecb6df (gossip: Fix max generation
drift measure)

One year later, user wants the upgrade n1,n2,n3 to a new version

when n3 does a rolling restart with a new version, n3 will use a
generation number g3'. Because g3' - g2 > MAX_GENERATION_DIFFERENCE and
g3' - g1 > MAX_GENERATION_DIFFERENCE, so g1 and g2 will reject n3's
gossip update and mark g3 as down.

Such unnecessary marking of node down can cause availability issues.
For example:

DC1: n1, n2
DC2: n3, n4

When n3 and n4 restart, n1 and n2 will mark n3 and n4 as down, which
causes the whole DC2 to be unavailable.

To fix, we can start the node with a gossip generation within
MAX_GENERATION_DIFFERENCE difference for the new node.

Once all the nodes run the version with commit
0a52ecb6df, the option is no logger
needed.

Fixes #5164

(cherry picked from commit 743b529c2b)
2020-03-30 12:36:20 +02:00
Yaron Kaikov
282cd0df7c dist/docker: Update SCYLLA_REPO_URL and VERSION defaults
Update the SCYLLA_REPO_URL and VERSION defaults to point to the latest
unstable 4.0 version. This will be used if someone runs "docker build"
locally. For the releases, the release pipelines will pass the stable
version repository URL and a specific release version.
2020-03-26 09:54:44 +02:00
Nadav Har'El
ce58994d30 sstable: default to LA format instead of KA format
Over the years, Scylla updated the sstable format from the KA format to
the LA format, and most recently to the MC format. On a mixed cluster -
as occurs during a rolling upgrade - we want all the nodes, even new ones,
to write sstables in the format preferred by the old version. The thinking
is that if the upgrade fails, and we want to downgrade all nodes back to
the older version, we don't want to lose data because we already have
too-new sstables.

So the current code starts by selecting the oldest format we ever had - KA,
and only switching this choice to LA and MC after we verify that all the
nodes in the cluster support these newer formats.

But before an agreement is reached on the new format, sstables may already
be created in the antique KA format. This is usually harmless - we can
read this format just fine. However, the KA format has a problem that it is
unable to represent table names or keyspaces with the "-" character in them,
because this character is used to separate the keyspace and table names in
the file name. For CQL, a "-" is not allowed anyway in keyspace or table
names; But for Alternator, this character is allowed - and if a KA table
happens to be created by accident (before the LA or MC formats are chosen),
it cannot be read again during boot, and Scylla cannot reboot.

The solution that this patch takes is to change Scylla's default sstable
format to LA (and, as before, if the entire cluster agrees, the newer MC
format will be used). From now on, new KA tables will never be written.
But we still fully support *reading* the KA format - this is important in
case some very old sstables never underwent compaction.

The old code had, confusingly, two places where the default KA format
was chosen. This patch fixes is so the new default (LA) is specified in
only one place.

Fixes #6071.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20200324232607.4215-2-nyh@scylladb.com>
(cherry picked from commit 91aba40114)
2020-03-25 13:27:51 +01:00
Yaron Kaikov
78f5afec30 release: prepare for 4.0.rc0 2020-03-24 23:33:23 +02:00
81 changed files with 760 additions and 247 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=4.0.rc2
if test -f version
then

View File

@@ -208,12 +208,11 @@ get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
throw api_error("ValidationException",
format("Non-string IndexName '{}'", index_name->GetString()));
}
}
// If no tables for global indexes were found, the index may be local
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
type = table_or_view_type::lsi;
table_name = lsi_name(orig_table_name, index_name->GetString());
// If no tables for global indexes were found, the index may be local
if (!proxy.get_db().local().has_schema(keyspace_name, table_name)) {
type = table_or_view_type::lsi;
table_name = lsi_name(orig_table_name, index_name->GetString());
}
}
try {
@@ -1019,13 +1018,22 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
mutation m(schema, _pk);
auto& row = m.partition().clustered_row(*schema, _ck);
// If there's no clustering key, a tombstone should be created directly
// on a partition, not on a clustering row - otherwise it will look like
// an open-ended range tombstone, which will crash on KA/LA sstable format.
// Ref: #6035
const bool use_partition_tombstone = schema->clustering_key_size() == 0;
if (!_cells) {
// a DeleteItem operation:
row.apply(tombstone(ts, gc_clock::now()));
if (use_partition_tombstone) {
m.partition().apply(tombstone(ts, gc_clock::now()));
} else {
// a DeleteItem operation:
m.partition().clustered_row(*schema, _ck).apply(tombstone(ts, gc_clock::now()));
}
return m;
}
// else, a PutItem operation:
auto& row = m.partition().clustered_row(*schema, _ck);
attribute_collector attrs_collector;
for (auto& c : *_cells) {
const column_definition* cdef = schema->get_column_definition(c.column_name);
@@ -1048,7 +1056,11 @@ mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
// Scylla proper, to implement the operation to replace an entire
// collection ("UPDATE .. SET x = ..") - see
// cql3::update_parameters::make_tombstone_just_before().
row.apply(tombstone(ts-1, gc_clock::now()));
if (use_partition_tombstone) {
m.partition().apply(tombstone(ts-1, gc_clock::now()));
} else {
row.apply(tombstone(ts-1, gc_clock::now()));
}
return m;
}

View File

@@ -434,6 +434,12 @@ GCC6_CONCEPT(
static KeyType
generate_base_key_from_index_pk(const partition_key& index_pk, const std::optional<clustering_key>& index_ck, const schema& base_schema, const schema& view_schema) {
const auto& base_columns = std::is_same_v<KeyType, partition_key> ? base_schema.partition_key_columns() : base_schema.clustering_key_columns();
// An empty key in the index paging state translates to an empty base key
if (index_pk.is_empty() && !index_ck) {
return KeyType::make_empty();
}
std::vector<bytes_view> exploded_base_key;
exploded_base_key.reserve(base_columns.size());
@@ -507,8 +513,7 @@ indexed_table_select_statement::do_execute_base_query(
if (old_paging_state && concurrency == 1) {
auto base_pk = generate_base_key_from_index_pk<partition_key>(old_paging_state->get_partition_key(),
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
if (_schema->clustering_key_size() > 0) {
assert(old_paging_state->get_clustering_key().has_value());
if (old_paging_state->get_clustering_key() && _schema->clustering_key_size() > 0) {
auto base_ck = generate_base_key_from_index_pk<clustering_key>(old_paging_state->get_partition_key(),
old_paging_state->get_clustering_key(), *_schema, *_view_schema);
command->slice.set_range(*_schema, base_pk,

View File

@@ -614,11 +614,17 @@ public:
future<sseg_ptr> terminate() {
assert(_closed);
if (!std::exchange(_terminated, true)) {
clogger.trace("{} is closed but not terminated.", *this);
if (_buffer.empty()) {
new_buffer(0);
// write a terminating zero block iff we are ending (a reused)
// block before actual file end.
// we should only get here when all actual data is
// already flushed (see below, close()).
if (size_on_disk() < _segment_manager->max_size) {
clogger.trace("{} is closed but not terminated.", *this);
if (_buffer.empty()) {
new_buffer(0);
}
return cycle(true, true);
}
return cycle(true, true);
}
return make_ready_future<sseg_ptr>(shared_from_this());
}

View File

@@ -689,6 +689,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, force_gossip_generation(this, "force_gossip_generation", liveness::LiveUpdate, value_status::Used, -1 , "Force gossip to use the generation number provided by user")
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
@@ -859,7 +860,7 @@ db::fs::path db::config::get_conf_sub(db::fs::path sub) {
}
bool db::config::check_experimental(experimental_features_t::feature f) const {
if (experimental()) {
if (experimental() && f != experimental_features_t::UNUSED) {
return true;
}
const auto& optval = experimental_features();
@@ -911,11 +912,13 @@ const db::extensions& db::config::extensions() const {
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
// We decided against using the construct-on-first-use idiom here:
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
// Lightweight transactions are no longer experimental. Map them
// to UNUSED switch for a while, then remove altogether.
return {{"lwt", UNUSED}, {"udf", UDF}, {"cdc", CDC}};
}
std::vector<enum_option<db::experimental_features_t>> db::experimental_features_t::all() {
return {LWT, UDF, CDC};
return {UDF, CDC};
}
template struct utils::config_file::named_value<seastar::log_level>;

View File

@@ -81,7 +81,7 @@ namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { LWT, UDF, CDC };
enum feature { UNUSED, UDF, CDC };
static std::unordered_map<sstring, feature> map(); // See enum_option.
static std::vector<enum_option<experimental_features_t>> all();
};
@@ -278,6 +278,7 @@ public:
named_value<uint32_t> shutdown_announce_in_ms;
named_value<bool> developer_mode;
named_value<int32_t> skip_wait_for_gossip_to_settle;
named_value<int32_t> force_gossip_generation;
named_value<bool> experimental;
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
named_value<size_t> lsa_reclamation_step;

View File

@@ -187,7 +187,7 @@ schema_ptr batchlog() {
{{"cf_id", uuid_type}},
// regular columns
{
{"in_progress_ballot", timeuuid_type},
{"promise", timeuuid_type},
{"most_recent_commit", bytes_type}, // serialization format is defined by frozen_mutation idl
{"most_recent_commit_at", timeuuid_type},
{"proposal", bytes_type}, // serialization format is defined by frozen_mutation idl
@@ -2201,8 +2201,8 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
return service::paxos::paxos_state();
}
auto& row = results->one();
auto promised = row.has("in_progress_ballot")
? row.get_as<utils::UUID>("in_progress_ballot") : utils::UUID_gen::min_time_UUID(0);
auto promised = row.has("promise")
? row.get_as<utils::UUID>("promise") : utils::UUID_gen::min_time_UUID(0);
std::optional<service::paxos::proposal> accepted;
if (row.has("proposal")) {
@@ -2228,7 +2228,7 @@ static int32_t paxos_ttl_sec(const schema& s) {
}
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
static auto cql = format("UPDATE system.{} USING TIMESTAMP ? AND TTL ? SET promise = ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
@@ -2274,6 +2274,20 @@ future<> save_paxos_decision(const schema& s, const service::paxos::proposal& de
).discard_result();
}
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout) {
// This should be called only if a learn stage succeeded on all replicas.
// In this case we can remove the paxos row using ballot's timestamp which
// guarantees that if there is more recent round it will not be affected.
static auto cql = format("DELETE FROM system.{} USING TIMESTAMP ? WHERE row_key = ? AND cf_id = ?", PAXOS);
return execute_cql_with_timeout(cql,
timeout,
utils::UUID_gen::micros_timestamp(ballot),
to_legacy(*key.get_compound_type(s), key.representation()),
s.id()
).discard_result();
}
} // namespace system_keyspace
sstring system_keyspace_name() {

View File

@@ -647,6 +647,7 @@ future<service::paxos::paxos_state> load_paxos_state(const partition_key& key, s
future<> save_paxos_promise(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
future<> save_paxos_proposal(const schema& s, const service::paxos::proposal& proposal, db::timeout_clock::time_point timeout);
future<> save_paxos_decision(const schema& s, const service::paxos::proposal& decision, db::timeout_clock::time_point timeout);
future<> delete_paxos_decision(const schema& s, const partition_key& key, const utils::UUID& ballot, db::timeout_clock::time_point timeout);
} // namespace system_keyspace
} // namespace db

View File

@@ -118,7 +118,7 @@ token token::midpoint(const token& t1, const token& t2) {
}
token token::get_random_token() {
return {kind::key, dht::get_random_number<int64_t>()};
return token(kind::key, dht::get_random_number<uint64_t>());
}
token token::from_sstring(const sstring& t) {

View File

@@ -58,19 +58,27 @@ public:
, _data(normalize(d)) { }
token(kind k, const bytes& b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
if (_kind != kind::key) {
_data = 0;
} else {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
token(kind k, bytes_view b) : _kind(std::move(k)) {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
if (_kind != kind::key) {
_data = 0;
} else {
if (b.size() != sizeof(_data)) {
throw std::runtime_error(fmt::format("Wrong token bytes size: expected {} but got {}", sizeof(_data), b.size()));
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
std::copy_n(b.begin(), sizeof(_data), reinterpret_cast<int8_t *>(&_data));
_data = net::ntoh(_data);
}
bool is_minimum() const {

View File

@@ -5,8 +5,8 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG VERSION=666.development
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-4.0/latest/scylla.repo
ARG VERSION=4.0.*
ADD scylla_bashrc /scylla_bashrc

View File

@@ -21,10 +21,6 @@ DynamoDB API requests.
For example., "`--alternator-port=8000`" on the command line will run
Alternator on port 8000 - the traditional port used by DynamoDB.
Alternator uses Scylla's LWT feature, which is currently considered
experimental and needs to be seperately enabled as well, e.g. with the
"`--experimental=on`" option.
By default, Scylla listens on this port on all network interfaces.
To listen only on a specific interface, pass also an "`alternator-address`"
option.
@@ -55,9 +51,8 @@ Alternator's compatibility with DynamoDB, and will be updated as the work
progresses and compatibility continues to improve.
### API Server
* Transport: HTTP mostly supported, but small features like CRC header and
compression are still missing. HTTPS supported on top of HTTP, so small
features may still be missing.
* Transport: HTTP and HTTPS are mostly supported, but small features like CRC
header and compression are still missing.
* Authorization (verifying the originator of the request): implemented
on top of system\_auth.roles table. The secret key used for authorization
is the salted\_hash column from the roles table, selected with:
@@ -65,20 +60,19 @@ progresses and compatibility continues to improve.
By default, authorization is not enforced at all. It can be turned on
by providing an entry in Scylla configuration:
alternator\_enforce\_authorization: true
* DNS server for load balancing: Not yet supported. Client needs to pick
one of the live Scylla nodes and send a request to it.
* Load balancing: Not a part of Alternator. One should use an external load
balancer or DNS server to balance the requests between the live Scylla
nodes. We plan to publish a reference example soon.
### Table Operations
* CreateTable: Supported. Note our implementation is synchronous.
* CreateTable and DeleteTable: Supported. Note our implementation is synchronous.
* DescribeTable: Partial implementation. Missing creation date and size estimate.
* UpdateTable: Not supported.
* DescribeTable: Partial implementation. Missing creation date and size esitmate.
* DeleteTable: Supported. Note our implementation is synchronous.
* ListTables: Supported.
### Item Operations
* GetItem: Support almost complete except that projection expressions can
only ask for top-level attributes.
* PutItem: Support almost complete except that condition expressions can
only refer to to-level attributes.
pre-put content) not yet supported.
* UpdateItem: Nested documents are supported but updates to nested attributes
are not (e.g., `SET a.b[3].c=val`), and neither are nested attributes in
condition expressions.
@@ -90,15 +84,14 @@ progresses and compatibility continues to improve.
* BatchWriteItem: Supported. Doesn't limit the number of items (DynamoDB
limits to 25) or size of items (400 KB) or total request size (16 MB).
### Scans
* Scan: As usual, projection expressions only support top-level attributes.
Filter expressions (to filter some of the items) partially supported:
The ScanFilter syntax is supported but FilterExpression is not yet, and
only equality operator is supported so far.
The "Select" options which allows to count items instead of returning them
is not yet supported. Parallel scan is not yet supported.
* Query: Same issues as Scan above. Additionally, missing support for
KeyConditionExpression (an alternative syntax replacing the older
KeyConditions parameter which we do support).
Scan and Query are mostly supported, with the following limitations:
* As above, projection expressions only support top-level attributes.
* Filter expressions (to filter some of the items) are only partially
supported: The ScanFilter syntax is currently only supports the equality
operator, and the FilterExpression syntax is not yet supported at all.
* The "Select" options which allows to count items instead of returning them
is not yet supported.
* Parallel scan is not yet supported.
### Secondary Indexes
Global Secondary Indexes (GSI) and Local Secondary Indexes (LSI) are
implemented, with the following limitations:
@@ -116,24 +109,28 @@ implemented, with the following limitations:
Writes are done in LOCAL_QURUM and reads in LOCAL_ONE (eventual consistency)
or LOCAL_QUORUM (strong consistency).
### Global Tables
* Not yet supported: CreateGlobalTable, UpdateGlobalTable,
DescribeGlobalTable, ListGlobalTables, UpdateGlobalTableSettings,
DescribeGlobalTableSettings. Implementation will use Scylla's multi-DC
features.
* Currently, *all* Alternator tables are created as "Global Tables", i.e., can
be accessed from all of Scylla's DCs.
* We do not yet support the DynamoDB API calls to make some of the tables
global and others local to a particular DC: CreateGlobalTable,
UpdateGlobalTable, DescribeGlobalTable, ListGlobalTables,
UpdateGlobalTableSettings, DescribeGlobalTableSettings, and UpdateTable.
### Backup and Restore
* On-demand backup: Not yet supported: CreateBackup, DescribeBackup,
DeleteBackup, ListBackups, RestoreTableFromBackup. Implementation will
use Scylla's snapshots
* On-demand backup: the DynamoDB APIs are not yet supported: CreateBackup,
DescribeBackup, DeleteBackup, ListBackups, RestoreTableFromBackup.
Users can use Scylla's [snapshots](https://docs.scylladb.com/operating-scylla/procedures/backup-restore/)
or [Scylla Manager](https://docs.scylladb.com/operating-scylla/manager/2.0/backup/).
* Continuous backup: Not yet supported: UpdateContinuousBackups,
DescribeContinuousBackups, RestoreTableToPoinInTime.
### Transations
### Transactions
* Not yet supported: TransactWriteItems, TransactGetItems.
Note that this is a new DynamoDB feature - these are more powerful than
the old conditional updates which were "lightweight transactions".
### Streams (CDC)
* Not yet supported
### Streams
* Scylla has experimental support for [CDC](https://docs.scylladb.com/using-scylla/cdc/)
(change data capture), but the "DynamoDB Streams" API is not yet supported.
### Encryption at rest
* Supported natively by Scylla, but needs to be enabled by default.
* Supported by Scylla Enterprise (not in open-source). Needs to be enabled.
### ARNs and tags
* ARN is generated for every alternator table
* Tagging can be used with the help of the following requests:
@@ -166,7 +163,9 @@ implemented, with the following limitations:
* Not required. Scylla cache is rather advanced and there is no need to place
a cache in front of the database: https://www.scylladb.com/2017/07/31/database-caches-not-good/
### Metrics
* Several metrics are available through the Grafana/Promethues stack: https://docs.scylladb.com/operating-scylla/monitoring/ It is different than the expectations of the current DynamoDB implementation. However, our
* Several metrics are available through the Grafana/Prometheus stack:
https://docs.scylladb.com/operating-scylla/monitoring/
Those are different from the current DynamoDB metrics, but Scylla's
monitoring is rather advanced and provide more insights to the internals.
## Alternator design and implementation
@@ -229,8 +228,3 @@ one DynamoDB feature which we cannot support safely: we cannot modify
a non-top-level attribute (e.g., a.b[3].c) directly without RMW. We plan
to fix this in a future version by rethinking the data model we use for
attributes, or rethinking our implementation of RMW (as explained above).
For reasons explained above, the data model used by Alternator to store
data on disk is still in a state of flux, and may change in future versions.
Therefore, in this early stage it is not recommended to store important
production data using Alternator.

View File

@@ -10,12 +10,10 @@ This section will guide you through the steps for setting up the cluster:
nightly image by running: `docker pull scylladb/scylla-nightly:latest`
2. Follow the steps in the [Scylla official download web page](https://www.scylladb.com/download/open-source/#docker)
add to every "docker run" command: `-p 8000:8000` before the image name
and `--alternator-port=8000 --experimental 1` at the end. The
"alternator-port" option specifies on which port Scylla will listen for
the (unencrypted) DynamoDB API, and "--experimental 1" is required to
enable the experimental LWT feature which Alternator uses.
and `--alternator-port=8000` at the end. The "alternator-port" option
specifies on which port Scylla will listen for the (unencrypted) DynamoDB API.
For example,
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000 --experimental 1`
`docker run --name scylla -d -p 8000:8000 scylladb/scylla-nightly:latest --alternator-port=8000
## Testing Scylla's DynamoDB API support:
### Running AWS Tic Tac Toe demo app to test the cluster:

View File

@@ -76,6 +76,9 @@ Scylla with issue #4139 fixed)
bit 4: CorrectEmptyCounters (if set, indicates the sstable was generated by
Scylla with issue #4363 fixed)
bit 5: CorrectUDTsInCollections (if set, indicates that the sstable was generated
by Scylla with issue #6130 fixed)
## extension_attributes subcomponent
extension_attributes = extension_attribute_count extension_attribute*

View File

@@ -110,10 +110,6 @@ feature_config feature_config_from_db_config(db::config& cfg) {
fcfg.enable_cdc = true;
}
if (cfg.check_experimental(db::experimental_features_t::LWT)) {
fcfg.enable_lwt = true;
}
return fcfg;
}
@@ -178,9 +174,7 @@ std::set<std::string_view> feature_service::known_feature_set() {
if (_config.enable_cdc) {
features.insert(gms::features::CDC);
}
if (_config.enable_lwt) {
features.insert(gms::features::LWT);
}
features.insert(gms::features::LWT);
for (const sstring& s : _config.disabled_features) {
features.erase(s);

View File

@@ -41,7 +41,6 @@ struct feature_config {
bool enable_sstables_mc_format = false;
bool enable_user_defined_functions = false;
bool enable_cdc = false;
bool enable_lwt = false;
std::set<sstring> disabled_features;
feature_config();
};

View File

@@ -1725,8 +1725,12 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
// message on all cpus and forard them to cpu0 to process.
return get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
g.init_messaging_service_handler(do_bind);
}).then([this, generation_nbr, preload_local_states] {
}).then([this, generation_nbr, preload_local_states] () mutable {
build_seeds_list();
if (_cfg.force_gossip_generation() > 0) {
generation_nbr = _cfg.force_gossip_generation();
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
endpoint_state& local_state = endpoint_state_map[get_broadcast_address()];
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
local_state.mark_alive();

View File

@@ -76,6 +76,8 @@ fedora_packages=(
python3-psutil
python3-cassandra-driver
python3-colorama
python3-boto3
python3-pytest
dnf-utils
pigz
net-tools

17
main.cc
View File

@@ -662,9 +662,17 @@ int main(int ac, char** av) {
supervisor::notify("starting tokens manager");
token_metadata.start().get();
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
token_metadata.stop().get();
});
// storage_proxy holds a reference on it and is not yet stopped.
// what's worse is that the calltrace
// storage_proxy::do_query
// ::query_partition_key_range
// ::query_partition_key_range_concurrent
// leaves unwaited futures on the reactor and once it gets there
// the token_metadata instance is accessed and ...
//
//auto stop_token_metadata = defer_verbose_shutdown("token metadata", [ &token_metadata ] {
// token_metadata.stop().get();
//});
supervisor::notify("starting migration manager notifier");
mm_notifier.start().get();
@@ -1071,9 +1079,6 @@ int main(int ac, char** av) {
static sharded<alternator::executor> alternator_executor;
static sharded<alternator::server> alternator_server;
if (!cfg->check_experimental(db::experimental_features_t::LWT)) {
throw std::runtime_error("Alternator enabled, but needs experimental LWT feature which wasn't enabled");
}
net::inet_address addr;
try {
addr = net::dns::get_host_by_name(cfg->alternator_address(), family).get0().addr_list.front();

View File

@@ -452,6 +452,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::PAXOS_PREPARE:
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
return 0;
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
// sent on a different connection to avoid potential deadlocks
@@ -1179,14 +1180,14 @@ future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repai
}
// Wrapper for REPAIR_ROW_LEVEL_START
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func) {
void messaging_service::register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func) {
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
}
future<> messaging_service::unregister_repair_row_level_start() {
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
}
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version));
future<> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason) {
return send_message<void>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason);
}
// Wrapper for REPAIR_ROW_LEVEL_STOP
@@ -1281,6 +1282,19 @@ future<> messaging_service::send_paxos_learn(msg_addr id, clock_type::time_point
std::move(reply_to), shard, std::move(response_id), std::move(trace_info));
}
void messaging_service::register_paxos_prune(std::function<future<rpc::no_wait_type>(
const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info>)>&& func) {
register_handler(this, messaging_verb::PAXOS_PRUNE, std::move(func));
}
future<> messaging_service::unregister_paxos_prune() {
return unregister_handler(netw::messaging_verb::PAXOS_PRUNE);
}
future<>
messaging_service::send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id,
const partition_key& key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
return send_message_oneway_timeout(this, timeout, messaging_verb::PAXOS_PRUNE, netw::msg_addr(peer), schema_id, key, ballot, std::move(trace_info));
}
void messaging_service::register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func) {
register_handler(this, netw::messaging_verb::HINT_MUTATION, std::move(func));

View File

@@ -139,7 +139,8 @@ enum class messaging_verb : int32_t {
PAXOS_ACCEPT = 40,
PAXOS_LEARN = 41,
HINT_MUTATION = 42,
LAST = 43,
PAXOS_PRUNE = 43,
LAST = 44,
};
} // namespace netw
@@ -341,9 +342,9 @@ public:
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
// Wrapper for REPAIR_ROW_LEVEL_START
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version)>&& func);
void register_repair_row_level_start(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func);
future<> unregister_repair_row_level_start();
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version);
future<> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason);
// Wrapper for REPAIR_ROW_LEVEL_STOP
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);
@@ -493,6 +494,14 @@ public:
std::vector<inet_address> forward, inet_address reply_to, unsigned shard, response_id_type response_id,
std::optional<tracing::trace_info> trace_info = std::nullopt);
void register_paxos_prune(std::function<future<rpc::no_wait_type>(const rpc::client_info&, rpc::opt_time_point, UUID schema_id, partition_key key,
utils::UUID ballot, std::optional<tracing::trace_info>)>&& func);
future<> unregister_paxos_prune();
future<> send_paxos_prune(gms::inet_address peer, clock_type::time_point timeout, UUID schema_id, const partition_key& key,
utils::UUID ballot, std::optional<tracing::trace_info> trace_info);
void register_hint_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
inet_address reply_to, unsigned shard, response_id_type response_id, rpc::optional<std::optional<tracing::trace_info>> trace_info)>&& func);
future<> unregister_hint_mutation();

View File

@@ -672,7 +672,8 @@ repair_info::repair_info(seastar::sharded<database>& db_,
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_)
const std::vector<sstring>& hosts_,
streaming::stream_reason reason_)
: db(db_)
, partitioner(get_partitioner_for_tables(db_, keyspace_, cfs_))
, keyspace(keyspace_)
@@ -682,6 +683,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
, shard(engine().cpu_id())
, data_centers(data_centers_)
, hosts(hosts_)
, reason(reason_)
, _row_level_repair(db.local().features().cluster_supports_row_level_repair()) {
}
@@ -1462,7 +1464,7 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
data_centers = options.data_centers, hosts = options.hosts] (database& localdb) mutable {
auto ri = make_lw_shared<repair_info>(db,
std::move(keyspace), std::move(ranges), std::move(cfs),
id, std::move(data_centers), std::move(hosts));
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair);
return repair_ranges(ri);
});
repair_results.push_back(std::move(f));
@@ -1524,14 +1526,15 @@ future<> repair_abort_all(seastar::sharded<database>& db) {
future<> sync_data_using_repair(seastar::sharded<database>& db,
sstring keyspace,
dht::token_range_vector ranges,
std::unordered_map<dht::token_range, repair_neighbors> neighbors) {
std::unordered_map<dht::token_range, repair_neighbors> neighbors,
streaming::stream_reason reason) {
if (ranges.empty()) {
return make_ready_future<>();
}
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
return smp::submit_to(0, [&db, keyspace = std::move(keyspace), ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
int id = repair_tracker().next_repair_command();
rlogger.info("repair id {} to sync data for keyspace={}, status=started", id, keyspace);
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors)] () mutable {
return repair_tracker().run(id, [id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.warn("repair id {} to sync data for keyspace={}, no table in this keyspace", id, keyspace);
@@ -1540,12 +1543,12 @@ future<> sync_data_using_repair(seastar::sharded<database>& db,
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
for (auto shard : boost::irange(unsigned(0), smp::count)) {
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors] (database& localdb) mutable {
auto f = db.invoke_on(shard, [keyspace, cfs, id, ranges, neighbors, reason] (database& localdb) mutable {
auto data_centers = std::vector<sstring>();
auto hosts = std::vector<sstring>();
auto ri = make_lw_shared<repair_info>(service::get_local_storage_service().db(),
std::move(keyspace), std::move(ranges), std::move(cfs),
id, std::move(data_centers), std::move(hosts));
id, std::move(data_centers), std::move(hosts), reason);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
@@ -1584,6 +1587,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
auto keyspaces = db.local().get_non_system_keyspaces();
rlogger.info("bootstrap_with_repair: started with keyspaces={}", keyspaces);
auto myip = utils::fb_utilities::get_broadcast_address();
auto reason = streaming::stream_reason::bootstrap;
for (auto& keyspace_name : keyspaces) {
if (!db.local().has_keyspace(keyspace_name)) {
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
@@ -1716,7 +1720,7 @@ future<> bootstrap_with_repair(seastar::sharded<database>& db, locator::token_me
}
}
auto nr_ranges = desired_ranges.size();
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources)).get();
sync_data_using_repair(db, keyspace_name, std::move(desired_ranges), std::move(range_sources), reason).get();
rlogger.info("bootstrap_with_repair: finished with keyspace={}, nr_ranges={}", keyspace_name, nr_ranges);
}
rlogger.info("bootstrap_with_repair: finished with keyspaces={}", keyspaces);
@@ -1730,6 +1734,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
auto keyspaces = db.local().get_non_system_keyspaces();
bool is_removenode = myip != leaving_node;
auto op = is_removenode ? "removenode_with_repair" : "decommission_with_repair";
streaming::stream_reason reason = is_removenode ? streaming::stream_reason::removenode : streaming::stream_reason::decommission;
rlogger.info("{}: started with keyspaces={}, leaving_node={}", op, keyspaces, leaving_node);
for (auto& keyspace_name : keyspaces) {
if (!db.local().has_keyspace(keyspace_name)) {
@@ -1867,7 +1872,7 @@ future<> do_decommission_removenode_with_repair(seastar::sharded<database>& db,
ranges.swap(ranges_for_removenode);
}
auto nr_ranges_synced = ranges.size();
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
rlogger.info("{}: finished with keyspace={}, leaving_node={}, nr_ranges={}, nr_ranges_synced={}, nr_ranges_skipped={}",
op, keyspace_name, leaving_node, nr_ranges_total, nr_ranges_synced, nr_ranges_skipped);
}
@@ -1883,8 +1888,8 @@ future<> removenode_with_repair(seastar::sharded<database>& db, locator::token_m
return do_decommission_removenode_with_repair(db, std::move(tm), std::move(leaving_node));
}
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc) {
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op)] () mutable {
future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm, sstring op, sstring source_dc, streaming::stream_reason reason) {
return seastar::async([&db, tm = std::move(tm), source_dc = std::move(source_dc), op = std::move(op), reason] () mutable {
auto keyspaces = db.local().get_non_system_keyspaces();
rlogger.info("{}: started with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
auto myip = utils::fb_utilities::get_broadcast_address();
@@ -1921,7 +1926,7 @@ future<> do_rebuild_replace_with_repair(seastar::sharded<database>& db, locator:
}
}
auto nr_ranges = ranges.size();
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources)).get();
sync_data_using_repair(db, keyspace_name, std::move(ranges), std::move(range_sources), reason).get();
rlogger.info("{}: finished with keyspace={}, source_dc={}, nr_ranges={}", op, keyspace_name, source_dc, nr_ranges);
}
rlogger.info("{}: finished with keyspaces={}, source_dc={}", op, keyspaces, source_dc);
@@ -1933,11 +1938,13 @@ future<> rebuild_with_repair(seastar::sharded<database>& db, locator::token_meta
if (source_dc.empty()) {
source_dc = get_local_dc();
}
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
auto reason = streaming::stream_reason::rebuild;
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
}
future<> replace_with_repair(seastar::sharded<database>& db, locator::token_metadata tm) {
auto op = sstring("replace_with_repair");
auto source_dc = get_local_dc();
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc));
auto reason = streaming::stream_reason::bootstrap;
return do_rebuild_replace_with_repair(db, std::move(tm), std::move(op), std::move(source_dc), reason);
}

View File

@@ -181,6 +181,7 @@ public:
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
streaming::stream_reason reason;
std::unordered_map<dht::token_range, repair_neighbors> neighbors;
size_t nr_failed_ranges = 0;
bool aborted = false;
@@ -211,7 +212,8 @@ public:
const std::vector<sstring>& cfs_,
int id_,
const std::vector<sstring>& data_centers_,
const std::vector<sstring>& hosts_);
const std::vector<sstring>& hosts_,
streaming::stream_reason reason_);
future<> do_streaming();
void check_failed_ranges();
future<> request_transfer_ranges(const sstring& cf,

View File

@@ -451,14 +451,17 @@ class repair_writer {
// partition_start is written and is closed when a partition_end is
// written.
std::vector<bool> _partition_opened;
streaming::stream_reason _reason;
public:
repair_writer(
schema_ptr schema,
uint64_t estimated_partitions,
size_t nr_peer_nodes)
size_t nr_peer_nodes,
streaming::stream_reason reason)
: _schema(std::move(schema))
, _estimated_partitions(estimated_partitions)
, _nr_peer_nodes(nr_peer_nodes) {
, _nr_peer_nodes(nr_peer_nodes)
, _reason(reason) {
init_writer();
}
@@ -495,9 +498,9 @@ public:
table& t = db.local().find_column_family(_schema->id());
_writer_done[node_idx] = mutation_writer::distribute_reader_and_consume_on_shards(_schema,
make_generating_reader(_schema, std::move(get_next_mutation_fragment)),
[&db, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
[&db, reason = this->_reason, estimated_partitions = this->_estimated_partitions] (flat_mutation_reader reader) {
auto& t = db.local().find_column_family(reader.schema());
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, streaming::stream_reason::repair).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
return db::view::check_needs_view_update_path(_sys_dist_ks->local(), t, reason).then([t = t.shared_from_this(), estimated_partitions, reader = std::move(reader)] (bool use_view_update_path) mutable {
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = t->get_compaction_strategy();
@@ -590,6 +593,7 @@ private:
repair_master _repair_master;
gms::inet_address _myip;
uint32_t _repair_meta_id;
streaming::stream_reason _reason;
// Repair master's sharding configuration
shard_config _master_node_shard_config;
// Partitioner of repair master
@@ -653,6 +657,7 @@ public:
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
size_t nr_peer_nodes = 1)
: _db(db)
@@ -666,6 +671,7 @@ public:
, _repair_master(master)
, _myip(utils::fb_utilities::get_broadcast_address())
, _repair_meta_id(repair_meta_id)
, _reason(reason)
, _master_node_shard_config(std::move(master_node_shard_config))
, _remote_partitioner(make_remote_partitioner())
, _same_sharding_config(is_same_sharding_config())
@@ -681,7 +687,7 @@ public:
_seed,
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
)
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes)
, _repair_writer(_schema, _estimated_partitions, _nr_peer_nodes, _reason)
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
return netw::get_local_messaging_service().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
@@ -731,7 +737,8 @@ public:
uint64_t max_row_buf_size,
uint64_t seed,
shard_config master_node_shard_config,
table_schema_version schema_version) {
table_schema_version schema_version,
streaming::stream_reason reason) {
return service::get_schema_for_write(schema_version, {from, src_cpu_id}).then([from,
repair_meta_id,
range,
@@ -739,7 +746,8 @@ public:
max_row_buf_size,
seed,
master_node_shard_config,
schema_version] (schema_ptr s) {
schema_version,
reason] (schema_ptr s) {
auto& db = service::get_local_storage_proxy().get_db();
auto& cf = db.local().find_column_family(s->id());
node_repair_meta_id id{from, repair_meta_id};
@@ -752,6 +760,7 @@ public:
seed,
repair_meta::repair_master::no,
repair_meta_id,
reason,
std::move(master_node_shard_config));
bool insertion = repair_meta_map().emplace(id, rm).second;
if (!insertion) {
@@ -1412,28 +1421,28 @@ public:
// RPC API
future<>
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version) {
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason) {
if (remote_node == _myip) {
return make_ready_future<>();
}
stats().rpc_call_nr++;
return netw::get_local_messaging_service().send_repair_row_level_start(msg_addr(remote_node),
_repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), _algo, _max_row_buf_size, _seed,
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version));
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, _master_node_shard_config.partitioner_name, std::move(schema_version), reason);
}
// RPC handler
static future<>
repair_row_level_start_handler(gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version) {
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason) {
if (!_sys_dist_ks->local_is_initialized() || !_view_update_generator->local_is_initialized()) {
return make_exception_future<>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
utils::fb_utilities::get_broadcast_address())));
}
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version));
return insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason);
}
// RPC API
@@ -2104,15 +2113,16 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
});
ms.register_repair_row_level_start([] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version) {
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(src_cpu_id % smp::count, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version] () mutable {
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, remote_partitioner_name, schema_version, reason] () mutable {
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
return repair_meta::repair_row_level_start_handler(from, src_cpu_id, repair_meta_id, std::move(ks_name),
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
shard_config{remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name)},
schema_version);
schema_version, r);
});
});
ms.register_repair_row_level_stop([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
@@ -2442,6 +2452,7 @@ public:
_seed,
repair_meta::repair_master::yes,
repair_meta_id,
_ri.reason,
std::move(master_node_shard_config),
_all_live_peer_nodes.size());
@@ -2456,7 +2467,7 @@ public:
nodes_to_stop.reserve(_all_nodes.size());
try {
parallel_for_each(_all_nodes, [&, this] (const gms::inet_address& node) {
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version).then([&] () {
return master.repair_row_level_start(node, _ri.keyspace, _cf_name, _range, schema_version, _ri.reason).then([&] () {
nodes_to_stop.push_back(node);
return master.repair_get_estimated_partitions(node).then([this, node] (uint64_t partitions) {
rlogger.trace("Get repair_get_estimated_partitions for node={}, estimated_partitions={}", node, partitions);

View File

@@ -319,10 +319,10 @@ schema::schema(const raw_schema& raw, std::optional<raw_view_info> raw_view_info
+ column_offset(column_kind::regular_column),
_raw._columns.end(), column_definition::name_comparator(regular_column_name_type()));
std::sort(_raw._columns.begin(),
std::stable_sort(_raw._columns.begin(),
_raw._columns.begin() + column_offset(column_kind::clustering_key),
[] (auto x, auto y) { return x.id < y.id; });
std::sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
std::stable_sort(_raw._columns.begin() + column_offset(column_kind::clustering_key),
_raw._columns.begin() + column_offset(column_kind::static_column),
[] (auto x, auto y) { return x.id < y.id; });

Submodule seastar updated: 92c488706c...76260705ef

View File

@@ -190,4 +190,11 @@ future<> paxos_state::learn(schema_ptr schema, proposal decision, clock_type::ti
});
}
future<> paxos_state::prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
logger.debug("Delete paxos state for ballot {}", ballot);
tracing::trace(tr_state, "Delete paxos state for ballot {}", ballot);
return db::system_keyspace::delete_paxos_decision(*schema, key, ballot, timeout);
}
} // end of namespace "service::paxos"

View File

@@ -124,6 +124,9 @@ public:
clock_type::time_point timeout);
// Replica RPC endpoint for Paxos "learn".
static future<> learn(schema_ptr schema, proposal decision, clock_type::time_point timeout, tracing::trace_state_ptr tr_state);
// Replica RPC endpoint for pruning Paxos table
static future<> prune(schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state);
};
} // end of namespace "service::paxos"

View File

@@ -171,6 +171,7 @@ public:
const schema_ptr& schema() {
return _schema;
}
// called only when all replicas replied
virtual void release_mutation() = 0;
};
@@ -300,9 +301,10 @@ public:
class cas_mutation : public mutation_holder {
lw_shared_ptr<paxos::proposal> _proposal;
shared_ptr<paxos_response_handler> _handler;
public:
explicit cas_mutation(paxos::proposal proposal , schema_ptr s)
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))) {
explicit cas_mutation(paxos::proposal proposal, schema_ptr s, shared_ptr<paxos_response_handler> handler)
: _proposal(make_lw_shared<paxos::proposal>(std::move(proposal))), _handler(std::move(handler)) {
_size = _proposal->update.representation().size();
_schema = std::move(s);
}
@@ -327,7 +329,11 @@ public:
return true;
}
virtual void release_mutation() override {
_proposal.release();
// The handler will be set for "learn", but not for PAXOS repair
// since repair may not include all replicas
if (_handler) {
_handler->prune(_proposal->ballot);
}
}
};
@@ -1184,6 +1190,12 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
return f;
}
// debug output in mutate_internal needs this
std::ostream& operator<<(std::ostream& os, const paxos_response_handler& h) {
os << "paxos_response_handler{" << h.id() << "}";
return os;
}
// This function implements learning stage of Paxos protocol
future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool allow_hints) {
tracing::trace(tr_state, "learn_decision: committing {} with cl={}", decision, _cl_for_learn);
@@ -1219,12 +1231,41 @@ future<> paxos_response_handler::learn_decision(paxos::proposal decision, bool a
}
// Path for the "base" mutations
std::array<std::tuple<paxos::proposal, schema_ptr, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, _key.token())};
std::array<std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
}
void paxos_response_handler::prune(utils::UUID ballot) {
if (_has_dead_endpoints) {
return;
}
if ( _proxy->get_stats().cas_now_pruning >= pruning_limit) {
_proxy->get_stats().cas_coordinator_dropped_prune++;
return;
}
_proxy->get_stats().cas_now_pruning++;
_proxy->get_stats().cas_prune++;
// running in the background, but the amount of the bg job is limited by pruning_limit
// it is waited by holding shared pointer to storage_proxy which guaranties
// that storage_proxy::stop() will wait for this to complete
(void)parallel_for_each(_live_endpoints, [this, ballot] (gms::inet_address peer) mutable {
return futurize_apply([&] {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "prune: prune {} locally", ballot);
return paxos::paxos_state::prune(_schema, _key.key(), ballot, _timeout, tr_state);
} else {
tracing::trace(tr_state, "prune: send prune of {} to {}", ballot, peer);
netw::messaging_service& ms = netw::get_local_messaging_service();
return ms.send_paxos_prune(peer, _timeout, _schema->version(), _key.key(), ballot, tracing::make_trace_info(tr_state));
}
});
}).finally([h = shared_from_this()] {
h->_proxy->get_stats().cas_now_pruning--;
});
}
static std::vector<gms::inet_address>
replica_ids_to_endpoints(locator::token_metadata& tm, const std::vector<utils::UUID>& replica_ids) {
std::vector<gms::inet_address> endpoints;
@@ -1571,6 +1612,14 @@ void storage_proxy_stats::stats::register_stats() {
sm::make_histogram("cas_write_contention", sm::description("how many contended writes were encountered"),
{storage_proxy_stats::current_scheduling_group_label()},
[this]{ return cas_write_contention.get_histogram(1, 8);}),
sm::make_total_operations("cas_prune", cas_prune,
sm::description("how many times paxos prune was done after successful cas operation"),
{storage_proxy_stats::current_scheduling_group_label()}),
sm::make_total_operations("cas_dropped_prune", cas_coordinator_dropped_prune,
sm::description("how many times a coordinator did not perfom prune after cas"),
{storage_proxy_stats::current_scheduling_group_label()}),
});
_metrics.add_group(REPLICA_STATS_CATEGORY, {
@@ -1606,6 +1655,9 @@ void storage_proxy_stats::stats::register_stats() {
sm::description("number of operations that crossed a shard boundary"),
{storage_proxy_stats::current_scheduling_group_label()}),
sm::make_total_operations("cas_dropped_prune", cas_replica_dropped_prune,
sm::description("how many times a coordinator did not perfom prune after cas"),
{storage_proxy_stats::current_scheduling_group_label()}),
});
}
@@ -1879,11 +1931,11 @@ storage_proxy::create_write_response_handler(const std::unordered_map<gms::inet_
}
storage_proxy::response_id_type
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& meta,
storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit) {
auto& [commit, s, t] = meta;
auto& [commit, s, h, t] = meta;
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s), cl,
return create_write_response_handler_helper(s, t, std::make_unique<cas_mutation>(std::move(commit), s, std::move(h)), cl,
db::write_type::CAS, tr_state, std::move(permit));
}
@@ -1898,7 +1950,7 @@ storage_proxy::create_write_response_handler(const std::tuple<paxos::proposal, s
auto keyspace_name = s->ks_name();
keyspace& ks = _db.local().find_keyspace(keyspace_name);
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s), std::move(endpoints),
return create_write_response_handler(ks, cl, db::write_type::CAS, std::make_unique<cas_mutation>(std::move(commit), s, nullptr), std::move(endpoints),
std::vector<gms::inet_address>(), std::vector<gms::inet_address>(), std::move(tr_state), get_stats(), std::move(permit));
}
@@ -2146,6 +2198,8 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
cl_for_paxos, participants + 1, live_endpoints.size());
}
bool dead = participants != live_endpoints.size();
// Apart from the ballot, paxos_state::prepare() also sends the current value of the requested key.
// If the values received from different replicas match, we skip a separate query stage thus saving
// one network round trip. To generate less traffic, only closest replicas send data, others send
@@ -2153,7 +2207,7 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
// list of participants by proximity to this instance.
sort_endpoints_by_proximity(live_endpoints);
return paxos_participants{std::move(live_endpoints), required_participants};
return paxos_participants{std::move(live_endpoints), required_participants, dead};
}
@@ -4942,6 +4996,42 @@ void storage_proxy::init_messaging_service() {
return f;
});
ms.register_paxos_prune([this] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
utils::UUID schema_id, partition_key key, utils::UUID ballot, std::optional<tracing::trace_info> trace_info) {
static thread_local uint16_t pruning = 0;
static constexpr uint16_t pruning_limit = 1000; // since PRUNE verb is one way replica side has its own queue limit
auto src_addr = netw::messaging_service::get_source(cinfo);
auto src_ip = src_addr.addr;
tracing::trace_state_ptr tr_state;
if (trace_info) {
tr_state = tracing::tracing::get_local_tracing_instance().create_session(*trace_info);
tracing::begin(tr_state);
tracing::trace(tr_state, "paxos_prune: message received from /{} ballot {}", src_ip, ballot);
}
if (pruning >= pruning_limit) {
get_stats().cas_replica_dropped_prune++;
tracing::trace(tr_state, "paxos_prune: do not prune due to overload", src_ip);
return make_ready_future<seastar::rpc::no_wait_type>(netw::messaging_service::no_wait());
}
pruning++;
return get_schema_for_read(schema_id, src_addr).then([this, key = std::move(key), ballot,
timeout, tr_state = std::move(tr_state), src_ip] (schema_ptr schema) mutable {
dht::token token = dht::get_token(*schema, key);
unsigned shard = dht::shard_of(*schema, token);
bool local = shard == engine().cpu_id();
get_stats().replica_cross_shard_ops += !local;
return smp::submit_to(shard, _write_smp_service_group, [gs = global_schema_ptr(schema), gt = tracing::global_trace_state_ptr(std::move(tr_state)),
local, key = std::move(key), ballot, timeout, src_ip, d = defer([] { pruning--; })] () {
tracing::trace_state_ptr tr_state = gt;
return paxos::paxos_state::prune(gs, key, ballot, *timeout, tr_state).then([src_ip, tr_state] () {
tracing::trace(tr_state, "paxos_prune: handling is done, sending a response to /{}", src_ip);
return netw::messaging_service::no_wait();
});
});
});
});
}
future<> storage_proxy::uninit_messaging_service() {
@@ -4956,7 +5046,8 @@ future<> storage_proxy::uninit_messaging_service() {
ms.unregister_truncate(),
ms.unregister_paxos_prepare(),
ms.unregister_paxos_accept(),
ms.unregister_paxos_learn()
ms.unregister_paxos_learn(),
ms.unregister_paxos_prune()
);
}

View File

@@ -242,6 +242,7 @@ public:
std::vector<gms::inet_address> endpoints;
// How many participants are required for a quorum (i.e. is it SERIAL or LOCAL_SERIAL).
size_t required_participants;
bool has_dead_endpoints;
};
const gms::feature_service& features() const { return _features; }
@@ -317,7 +318,7 @@ private:
response_id_type create_write_response_handler(const mutation&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const hint_wrapper&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::unordered_map<gms::inet_address, std::optional<mutation>>&, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token>& proposal,
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>& proposal,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
response_id_type create_write_response_handler(const std::tuple<paxos::proposal, schema_ptr, dht::token, std::unordered_set<gms::inet_address>>& meta,
db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit);
@@ -634,6 +635,11 @@ private:
db::consistency_level _cl_for_learn;
// Live endpoints, as per get_paxos_participants()
std::vector<gms::inet_address> _live_endpoints;
// True if there are dead endpoints
// We don't include endpoints known to be unavailable in pending
// endpoints list, but need to be aware of them to avoid pruning
// system.paxos data if some endpoint is missing a Paxos write.
bool _has_dead_endpoints;
// How many endpoints need to respond favourably for the protocol to progress to the next step.
size_t _required_participants;
// A deadline when the entire CAS operation timeout expires, derived from write_request_timeout_in_ms
@@ -651,6 +657,9 @@ private:
// Unique request id for logging purposes.
const uint64_t _id = next_id++;
// max pruning operations to run in parralel
static constexpr uint16_t pruning_limit = 1000;
public:
tracing::trace_state_ptr tr_state;
@@ -674,6 +683,7 @@ public:
storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(_schema->ks_name(), _key.token(), _cl_for_paxos);
_live_endpoints = std::move(pp.endpoints);
_required_participants = pp.required_participants;
_has_dead_endpoints = pp.has_dead_endpoints;
tracing::trace(tr_state, "Create paxos_response_handler for token {} with live: {} and required participants: {}",
_key.token(), _live_endpoints, _required_participants);
}
@@ -691,6 +701,7 @@ public:
future<paxos::prepare_summary> prepare_ballot(utils::UUID ballot);
future<bool> accept_proposal(const paxos::proposal& proposal, bool timeout_if_partially_accepted = true);
future<> learn_decision(paxos::proposal decision, bool allow_hints = false);
void prune(utils::UUID ballot);
uint64_t id() const {
return _id;
}

View File

@@ -116,6 +116,11 @@ struct write_stats {
uint64_t cas_write_condition_not_met = 0;
uint64_t cas_write_timeout_due_to_uncertainty = 0;
uint64_t cas_failed_read_round_optimization = 0;
uint16_t cas_now_pruning = 0;
uint64_t cas_prune = 0;
uint64_t cas_coordinator_dropped_prune = 0;
uint64_t cas_replica_dropped_prune = 0;
std::chrono::microseconds last_mv_flow_control_delay; // delay added for MV flow control in the last request
public:

View File

@@ -3409,10 +3409,13 @@ void feature_enabled_listener::on_enabled() {
future<> read_sstables_format(distributed<storage_service>& ss) {
return db::system_keyspace::get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME).then([&ss] (std::optional<sstring> format_opt) {
sstables::sstable_version_types format = sstables::from_string(format_opt.value_or("ka"));
return ss.invoke_on_all([format] (storage_service& s) {
s._sstables_format = format;
});
if (format_opt) {
sstables::sstable_version_types format = sstables::from_string(*format_opt);
return ss.invoke_on_all([format] (storage_service& s) {
s._sstables_format = format;
});
}
return make_ready_future<>();
});
}

View File

@@ -312,7 +312,13 @@ private:
*/
std::optional<db_clock::time_point> _cdc_streams_ts;
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::ka;
// _sstables_format is the format used for writing new sstables.
// Here we set its default value, but if we discover that all the nodes
// in the cluster support a newer format, _sstables_format will be set to
// that format. read_sstables_format() also overwrites _sstables_format
// if an sstable format was chosen earlier (and this choice was persisted
// in the system table).
sstables::sstable_version_types _sstables_format = sstables::sstable_version_types::la;
seastar::named_semaphore _feature_listeners_sem = {1, named_semaphore_exception_factory{"feature listeners"}};
feature_enabled_listener _la_feature_listener;
feature_enabled_listener _mc_feature_listener;

View File

@@ -72,47 +72,8 @@ private:
static std::vector<column_info> build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
bool is_static) {
std::vector<column_info> cols;
if (s.is_dense()) {
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
cols.push_back(column_info{
&col.name(),
col.type,
col.id,
col.type->value_length_if_fixed(),
col.is_multi_cell(),
col.is_counter(),
false
});
} else {
cols.reserve(src.size());
for (auto&& desc : src) {
const bytes& type_name = desc.type_name.value;
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
const column_definition* def = s.get_column_definition(desc.name.value);
std::optional<column_id> id;
bool schema_mismatch = false;
if (def) {
id = def->id;
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
def->is_counter() != type->is_counter() ||
!def->type->is_value_compatible_with(*type);
}
cols.push_back(column_info{
&desc.name.value,
type,
id,
type->value_length_if_fixed(),
type->is_multi_cell(),
type->is_counter(),
schema_mismatch
});
}
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
}
return cols;
}
const sstable_enabled_features& features,
bool is_static);
utils::UUID schema_uuid;
std::vector<column_info> regular_schema_columns_from_sstable;
@@ -125,10 +86,10 @@ private:
state(state&&) = default;
state& operator=(state&&) = default;
state(const schema& s, const serialization_header& header)
state(const schema& s, const serialization_header& header, const sstable_enabled_features& features)
: schema_uuid(s.version())
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, false))
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, true))
, regular_schema_columns_from_sstable(build(s, header.regular_columns.elements, features, false))
, static_schema_columns_from_sstable(build(s, header.static_columns.elements, features, true))
, clustering_column_value_fix_lengths (get_clustering_values_fixed_lengths(header))
{}
};
@@ -136,9 +97,10 @@ private:
lw_shared_ptr<const state> _state = make_lw_shared<const state>();
public:
column_translation get_for_schema(const schema& s, const serialization_header& header) {
column_translation get_for_schema(
const schema& s, const serialization_header& header, const sstable_enabled_features& features) {
if (s.version() != _state->schema_uuid) {
_state = make_lw_shared(state(s, header));
_state = make_lw_shared(state(s, header, features));
}
return *this;
}

View File

@@ -38,6 +38,8 @@
*/
#include "mp_row_consumer.hh"
#include "column_translation.hh"
#include "concrete_types.hh"
namespace sstables {
@@ -79,4 +81,86 @@ atomic_cell make_counter_cell(api::timestamp_type timestamp, bytes_view value) {
return ccb.build(timestamp);
}
// See #6130.
static data_type freeze_types_in_collections(data_type t) {
return ::visit(*t, make_visitor(
[] (const map_type_impl& typ) -> data_type {
return map_type_impl::get_instance(
freeze_types_in_collections(typ.get_keys_type()->freeze()),
freeze_types_in_collections(typ.get_values_type()->freeze()),
typ.is_multi_cell());
},
[] (const set_type_impl& typ) -> data_type {
return set_type_impl::get_instance(
freeze_types_in_collections(typ.get_elements_type()->freeze()),
typ.is_multi_cell());
},
[] (const list_type_impl& typ) -> data_type {
return list_type_impl::get_instance(
freeze_types_in_collections(typ.get_elements_type()->freeze()),
typ.is_multi_cell());
},
[&] (const abstract_type& typ) -> data_type {
return std::move(t);
}
));
}
/* If this function returns false, the caller cannot assume that the SSTable comes from Scylla.
* It might, if for some reason a table was created using Scylla that didn't contain any feature bit,
* but that should never happen. */
static bool is_certainly_scylla_sstable(const sstable_enabled_features& features) {
return features.enabled_features;
}
std::vector<column_translation::column_info> column_translation::state::build(
const schema& s,
const utils::chunked_vector<serialization_header::column_desc>& src,
const sstable_enabled_features& features,
bool is_static) {
std::vector<column_info> cols;
if (s.is_dense()) {
const column_definition& col = is_static ? *s.static_begin() : *s.regular_begin();
cols.push_back(column_info{
&col.name(),
col.type,
col.id,
col.type->value_length_if_fixed(),
col.is_multi_cell(),
col.is_counter(),
false
});
} else {
cols.reserve(src.size());
for (auto&& desc : src) {
const bytes& type_name = desc.type_name.value;
data_type type = db::marshal::type_parser::parse(to_sstring_view(type_name));
if (!features.is_enabled(CorrectUDTsInCollections) && is_certainly_scylla_sstable(features)) {
// See #6130.
type = freeze_types_in_collections(std::move(type));
}
const column_definition* def = s.get_column_definition(desc.name.value);
std::optional<column_id> id;
bool schema_mismatch = false;
if (def) {
id = def->id;
schema_mismatch = def->is_multi_cell() != type->is_multi_cell() ||
def->is_counter() != type->is_counter() ||
!def->type->is_value_compatible_with(*type);
}
cols.push_back(column_info{
&desc.name.value,
type,
id,
type->value_length_if_fixed(),
type->is_multi_cell(),
type->is_counter(),
schema_mismatch
});
}
boost::range::stable_partition(cols, [](const column_info& column) { return !column.is_collection; });
}
return cols;
}
}

View File

@@ -1348,7 +1348,7 @@ public:
, _consumer(consumer)
, _sst(sst)
, _header(sst->get_serialization_header())
, _column_translation(sst->get_column_translation(s, _header))
, _column_translation(sst->get_column_translation(s, _header, sst->features()))
, _has_shadowable_tombstones(sst->has_shadowable_tombstones())
{
setup_columns(_regular_row, _column_translation.regular_columns());

View File

@@ -792,8 +792,9 @@ public:
const serialization_header& get_serialization_header() const {
return get_mutable_serialization_header(*_components);
}
column_translation get_column_translation(const schema& s, const serialization_header& h) {
return _column_translation.get_for_schema(s, h);
column_translation get_column_translation(
const schema& s, const serialization_header& h, const sstable_enabled_features& f) {
return _column_translation.get_for_schema(s, h, f);
}
const std::vector<unsigned>& get_shards_for_this_sstable() const {
return _shards;

View File

@@ -459,7 +459,8 @@ enum sstable_feature : uint8_t {
ShadowableTombstones = 2, // See #3885
CorrectStaticCompact = 3, // See #4139
CorrectEmptyCounters = 4, // See #4363
End = 5,
CorrectUDTsInCollections = 5, // See #6130
End = 6,
};
// Scylla-specific features enabled for a particular sstable.

36
test.py
View File

@@ -203,6 +203,17 @@ class CqlTestSuite(TestSuite):
def pattern(self):
return "*_test.cql"
class RunTestSuite(TestSuite):
"""TestSuite for test directory with a 'run' script """
def add_test(self, shortname, mode, options):
test = RunTest(self.next_id, shortname, self, mode, options)
self.tests.append(test)
@property
def pattern(self):
return "run"
class Test:
"""Base class for CQL, Unit and Boost tests"""
@@ -332,6 +343,24 @@ class CqlTest(Test):
if self.is_equal_result is False:
print_unidiff(self.result, self.reject)
class RunTest(Test):
"""Run tests in a directory started by a run script"""
def __init__(self, test_no, shortname, suite, mode, options):
super().__init__(test_no, shortname, suite, mode, options)
self.path = os.path.join(suite.path, shortname)
self.xmlout = os.path.join(options.tmpdir, self.mode, "xml", self.uname + ".xunit.xml")
self.args = ["--junit-xml={}".format(self.xmlout)]
def print_summary(self):
print("Output of {} {}:".format(self.path, " ".join(self.args)))
print(read_log(self.log_filename))
async def run(self, options):
# This test can and should be killed gently, with SIGTERM, not with SIGKILL
self.success = await run_test(self, options, gentle_kill=True)
logging.info("Test #%d %s", self.id, "succeeded" if self.success else "failed ")
return self
class TabularConsoleOutput:
"""Print test progress to the console"""
@@ -375,7 +404,7 @@ class TabularConsoleOutput:
print(msg)
async def run_test(test, options):
async def run_test(test, options, gentle_kill=False):
"""Run test program, return True if success else False"""
with open(test.log_filename, "wb") as log:
@@ -423,7 +452,10 @@ async def run_test(test, options):
return True
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
if process is not None:
process.kill()
if gentle_kill:
process.terminate()
else:
process.kill()
stdout, _ = await process.communicate()
if isinstance(e, asyncio.TimeoutError):
report_error("Test timed out")

View File

@@ -54,6 +54,8 @@ def pytest_addoption(parser):
parser.addoption("--https", action="store_true",
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
" running against a local Scylla installation")
parser.addoption("--url", action="store",
help="communicate with given URL instead of defaults")
# "dynamodb" fixture: set up client object for communicating with the DynamoDB
# API. Currently this chooses either Amazon's DynamoDB in the default region
@@ -70,7 +72,10 @@ def dynamodb(request):
# requires us to specify dummy region and credential parameters,
# otherwise the user is forced to properly configure ~/.aws even
# for local runs.
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
if request.config.getoption('url') != None:
local_url = request.config.getoption('url')
else:
local_url = 'https://localhost:8043' if request.config.getoption('https') else 'http://localhost:8000'
# Disable verifying in order to be able to use self-signed TLS certificates
verify = not request.config.getoption('https')
return boto3.resource('dynamodb', endpoint_url=local_url, verify=verify,

View File

@@ -4,24 +4,31 @@
set -e
script_path=$(dirname $(readlink -e $0))
source_path=$script_path/../..
# By default, we take the latest build/*/scylla as the executable:
SCYLLA=${SCYLLA-$(ls -t "$script_path/../build/"*"/scylla" | head -1)}
SCYLLA=${SCYLLA-$(ls -t "$source_path/build/"*"/scylla" | head -1)}
SCYLLA=$(readlink -f "$SCYLLA")
SCYLLA_IP=${IP-127.0.0.1}
CPUSET=${CPUSET-0}
CQLSH=${CQLSH-cqlsh}
# We need to use cqlsh to set up the authentication credentials expected by
# some of the tests that check check authentication. If cqlsh is not installed
# there isn't much point of even starting Scylla
if ! type "$CQLSH" >/dev/null 2>&1
# Below, we need to use python3 and the Cassandra drive to set up the
# authentication credentials expected by some of the tests that check
# authentication. If they are not installed there isn't much point of
# even starting Scylla
if ! python3 -c 'from cassandra.cluster import Cluster' >/dev/null 2>&1
then
echo "Error: cannot find '$CQLSH', needed for configuring Alternator authentication." >&2
echo "Please install $CQLSH in your path, or set CQLSH to its location." >&2
echo "Error: python3 and python3-cassandra-driver must be installed to configure Alternator authentication." >&2
exit 1
fi
# Pick a loopback IP address for Scylla to run, in an attempt not to collide
# other concurrent runs of Scylla. CCM uses 127.0.0.<nodenum>, so if we use
# 127.1.*.* which cannot collide with it. Moreover, we'll take the last two
# bytes of the address from the current process - so as to allow multiple
# concurrent runs of this code to use a different address.
SCYLLA_IP=127.1.$(($$ >> 8 & 255)).$(($$ & 255))
echo "Running Scylla on $SCYLLA_IP"
tmp_dir=/tmp/alternator-test-$$
mkdir $tmp_dir
@@ -52,6 +59,7 @@ trap 'cleanup' EXIT
# to work. We only need to do this if the "--https" option was explicitly
# passed - otherwise the test would not use HTTPS anyway.
alternator_port_option="--alternator-port=8000"
alternator_url="http://$SCYLLA_IP:8000"
for i
do
if [ "$i" = --https ]
@@ -59,17 +67,20 @@ do
openssl genrsa 2048 > "$tmp_dir/scylla.key"
openssl req -new -x509 -nodes -sha256 -days 365 -subj "/C=IL/ST=None/L=None/O=None/OU=None/CN=example.com" -key "$tmp_dir/scylla.key" -out "$tmp_dir/scylla.crt"
alternator_port_option="--alternator-https-port=8043"
alternator_url="https://$SCYLLA_IP:8043"
fi
done
"$SCYLLA" --options-file "$script_path/../conf/scylla.yaml" \
--alternator-address $SCYLLA_IP \
"$SCYLLA" --options-file "$source_path/conf/scylla.yaml" \
--alternator-address $SCYLLA_IP \
$alternator_port_option \
--alternator-enforce-authorization=1 \
--experimental=on --developer-mode=1 \
--developer-mode=1 \
--ring-delay-ms 0 --collectd 0 \
--cpuset "$CPUSET" -m 1G \
--api-address $SCYLLA_IP --rpc-address $SCYLLA_IP \
--api-address $SCYLLA_IP \
--rpc-address $SCYLLA_IP \
--listen-address $SCYLLA_IP \
--prometheus-address $SCYLLA_IP \
--seed-provider-parameters seeds=$SCYLLA_IP \
--workdir "$tmp_dir" \
--server-encryption-options keyfile="$tmp_dir/scylla.key" \
@@ -79,8 +90,11 @@ done
SCYLLA_PROCESS=$!
# Set up the the proper authentication credentials needed by the Alternator
# test. This requires connecting to Scylla with cqlsh - we'll wait up for
# test. This requires connecting to Scylla with CQL - we'll wait up for
# one minute for this to work:
setup_authentication() {
python3 -c 'from cassandra.cluster import Cluster; Cluster(["'$SCYLLA_IP'"]).connect().execute("INSERT INTO system_auth.roles (role, salted_hash) VALUES ('\''alternator'\'', '\''secret_pass'\'')")'
}
echo "Scylla is: $SCYLLA."
echo -n "Booting Scylla..."
ok=
@@ -94,18 +108,18 @@ do
summary="Error: Scylla failed to boot after $SECONDS seconds."
break
fi
err=`"$CQLSH" -e "INSERT INTO system_auth.roles (role, salted_hash) VALUES ('alternator', 'secret_pass')" 2>&1` && ok=yes && break
err=`setup_authentication 2>&1` && ok=yes && break
case "$err" in
"Connection error:"*)
*NoHostAvailable:*)
# This is what we expect while Scylla is still booting.
;;
*"command not found")
summary="Error: need 'cqlsh' in your path, to configure Alternator authentication."
*ImportError:*|*"command not found"*)
summary="Error: need python3 and python3-cassandra-driver to configure Alternator authentication."
echo
echo $summary
break;;
*)
summary="Unknown cqlsh error, can't set authentication credentials: '$err'"
summary="Unknown error trying to set authentication credentials: '$err'"
echo
echo $summary
break;;
@@ -125,7 +139,8 @@ else
fi
cd "$script_path"
pytest "$@"
set +e
pytest --url $alternator_url "$@"
code=$?
case $code in
0) summary="Alternator tests pass";;

View File

@@ -0,0 +1 @@
type: Run

View File

@@ -100,6 +100,14 @@ def test_query_basic_restrictions(dynamodb, filled_test_table):
print(got_items)
assert multiset([item for item in items if item['p'] == 'long' and item['c'].startswith('11')]) == multiset(got_items)
def test_query_nonexistent_table(dynamodb):
client = dynamodb.meta.client
with pytest.raises(ClientError, match="ResourceNotFoundException"):
client.query(TableName="i_do_not_exist", KeyConditions={
'p' : {'AttributeValueList': ['long'], 'ComparisonOperator': 'EQ'},
'c' : {'AttributeValueList': ['11'], 'ComparisonOperator': 'BEGINS_WITH'}
})
def test_begins_with(dynamodb, test_table):
paginator = dynamodb.meta.client.get_paginator('query')
items = [{'p': 'unorthodox_chars', 'c': sort_key, 'str': 'a'} for sort_key in [u'ÿÿÿ', u'cÿbÿ', u'cÿbÿÿabg'] ]

View File

@@ -42,6 +42,11 @@ def test_scan_basic(filled_test_table):
assert len(items) == len(got_items)
assert multiset(items) == multiset(got_items)
def test_scan_nonexistent_table(dynamodb):
client = dynamodb.meta.client
with pytest.raises(ClientError, match="ResourceNotFoundException"):
client.scan(TableName="i_do_not_exist")
def test_scan_with_paginator(dynamodb, filled_test_table):
test_table, items = filled_test_table
paginator = dynamodb.meta.client.get_paginator('scan')

View File

@@ -244,11 +244,12 @@ def test_table_streams_off(dynamodb):
table.delete();
# DynamoDB doesn't allow StreamSpecification to be empty map - if it
# exists, it must have a StreamEnabled
with pytest.raises(ClientError, match='ValidationException'):
table = create_test_table(dynamodb, StreamSpecification={},
KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
table.delete();
# Unfortunately, new versions of boto3 doesn't let us pass this...
#with pytest.raises(ClientError, match='ValidationException'):
# table = create_test_table(dynamodb, StreamSpecification={},
# KeySchema=[{ 'AttributeName': 'p', 'KeyType': 'HASH' }],
# AttributeDefinitions=[{ 'AttributeName': 'p', 'AttributeType': 'S' }]);
# table.delete();
# Unfortunately, boto3 doesn't allow us to pass StreamSpecification=None.
# This is what we had in issue #5796.

View File

@@ -296,7 +296,9 @@ SEASTAR_TEST_CASE(test_commitlog_closed) {
SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
commitlog::config cfg;
cfg.commitlog_segment_size_in_mb = 2;
constexpr auto max_size_mb = 2;
cfg.commitlog_segment_size_in_mb = max_size_mb;
cfg.commitlog_total_space_in_mb = 1;
cfg.commitlog_sync_period_in_ms = 1;
return cl_test(cfg, [](commitlog& log) {
@@ -306,8 +308,15 @@ SEASTAR_TEST_CASE(test_commitlog_delete_when_over_disk_limit) {
// add a flush handler that simply says we're done with the range.
auto r = log.add_flush_handler([&log, sem, segments](cf_id_type id, replay_position pos) {
*segments = log.get_active_segment_names();
log.discard_completed_segments(id);
sem->signal();
// Verify #5899 - file size should not exceed the config max.
return parallel_for_each(*segments, [](sstring filename) {
return file_size(filename).then([](uint64_t size) {
BOOST_REQUIRE_LE(size, max_size_mb * 1024 * 1024);
});
}).then([&log, sem, id] {
log.discard_completed_segments(id);
sem->signal();
});
});
auto set = make_lw_shared<std::set<segment_id_type>>();

View File

@@ -930,17 +930,17 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
SEASTAR_TEST_CASE(test_parse_experimental_features_unused) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UNUSED});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -950,7 +950,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -958,9 +958,9 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::UNUSED, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -973,7 +973,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
});
return make_ready_future();
@@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_true) {
config cfg;
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
@@ -992,7 +992,7 @@ SEASTAR_TEST_CASE(test_parse_experimental_false) {
config cfg;
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UNUSED));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}

View File

@@ -531,6 +531,43 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
});
{
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
auto paging_state = extract_paging_state(res);
assert_that(res).is_rows().with_rows({{
{int32_type->decompose(3)}, {int32_type->decompose(2)}, {int32_type->decompose(1)},
}});
// Override the actual paging state with one with empty keys,
// which is a valid paging state as well, and should return
// no rows.
paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(),
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
paging_state->get_rows_fetched_for_last_partition());
qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0();
assert_that(res).is_rows().with_size(0);
}
{
// An artificial paging state with an empty key pair is also valid and is expected
// not to return rows (since no row matches an empty partition key)
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});
auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0();
assert_that(res).is_rows().with_size(0);
}
});
}

View File

@@ -5256,3 +5256,131 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_log_too_many_rows) {
test_sstable_log_too_many_rows_f(random, (random + 1), false);
test_sstable_log_too_many_rows_f((random + 1), random, true);
}
// The following test runs on test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection
// It was created using Scylla 3.0.x using the following CQL statements:
//
// CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
// CREATE TYPE ks.ut (a int, b int);
// CREATE TABLE ks.t ( pk int PRIMARY KEY,
// m map<int, frozen<ut>>,
// fm frozen<map<int, frozen<ut>>>,
// mm map<int, frozen<map<int, frozen<ut>>>>,
// fmm frozen<map<int, frozen<map<int, frozen<ut>>>>>,
// s set<frozen<ut>>,
// fs frozen<set<frozen<ut>>>,
// l list<frozen<ut>>,
// fl frozen<list<frozen<ut>>>
// ) WITH compression = {};
// UPDATE ks.t USING TIMESTAMP 1525385507816568 SET
// m[0] = {a: 0, b: 0},
// fm = {0: {a: 0, b: 0}},
// mm[0] = {0: {a: 0, b: 0}},
// fmm = {0: {0: {a: 0, b: 0}}},
// s = s + {{a: 0, b: 0}},
// fs = {{a: 0, b: 0}},
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
// fl = [{a: 0, b: 0}]
// WHERE pk = 0;
//
// It checks whether a SSTable containing UDTs nested in collections, which contains incorrect serialization headers
// (doesn't wrap nested UDTs in the FrozenType<...> tag) can be loaded by new versions of Scylla.
static const sstring LEGACY_UDT_IN_COLLECTION_PATH =
"test/resource/sstables/3.x/uncompressed/legacy_udt_in_collection";
SEASTAR_THREAD_TEST_CASE(test_legacy_udt_in_collection_table) {
auto abj = defer([] { await_background_jobs().get(); });
auto ut = user_type_impl::get_instance("ks", to_bytes("ut"),
{to_bytes("a"), to_bytes("b")},
{int32_type, int32_type}, false);
auto m_type = map_type_impl::get_instance(int32_type, ut, true);
auto fm_type = map_type_impl::get_instance(int32_type, ut, false);
auto mm_type = map_type_impl::get_instance(int32_type, fm_type, true);
auto fmm_type = map_type_impl::get_instance(int32_type, fm_type, false);
auto s_type = set_type_impl::get_instance(ut, true);
auto fs_type = set_type_impl::get_instance(ut, false);
auto l_type = list_type_impl::get_instance(ut, true);
auto fl_type = list_type_impl::get_instance(ut, false);
auto s = schema_builder("ks", "t")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("m", m_type)
.with_column("fm", fm_type)
.with_column("mm", mm_type)
.with_column("fmm", fmm_type)
.with_column("s", s_type)
.with_column("fs", fs_type)
.with_column("l", l_type)
.with_column("fl", fl_type)
.set_compressor_params(compression_parameters::no_compression())
.build();
auto m_cdef = s->get_column_definition(to_bytes("m"));
auto fm_cdef = s->get_column_definition(to_bytes("fm"));
auto mm_cdef = s->get_column_definition(to_bytes("mm"));
auto fmm_cdef = s->get_column_definition(to_bytes("fmm"));
auto s_cdef = s->get_column_definition(to_bytes("s"));
auto fs_cdef = s->get_column_definition(to_bytes("fs"));
auto l_cdef = s->get_column_definition(to_bytes("l"));
auto fl_cdef = s->get_column_definition(to_bytes("fl"));
BOOST_REQUIRE(m_cdef && fm_cdef && mm_cdef && fmm_cdef && s_cdef && fs_cdef && l_cdef && fl_cdef);
auto ut_val = make_user_value(ut, {int32_t(0), int32_t(0)});
auto fm_val = make_map_value(fm_type, {{int32_t(0), ut_val}});
auto fmm_val = make_map_value(fmm_type, {{int32_t(0), fm_val}});
auto fs_val = make_set_value(fs_type, {ut_val});
auto fl_val = make_list_value(fl_type, {ut_val});
mutation mut{s, partition_key::from_deeply_exploded(*s, {0})};
auto ckey = clustering_key::make_empty();
// m[0] = {a: 0, b: 0}
{
collection_mutation_description desc;
desc.cells.emplace_back(int32_type->decompose(0),
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *m_cdef, desc.serialize(*m_type));
}
// fm = {0: {a: 0, b: 0}}
mut.set_clustered_cell(ckey, *fm_cdef, atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val)));
// mm[0] = {0: {a: 0, b: 0}},
{
collection_mutation_description desc;
desc.cells.emplace_back(int32_type->decompose(0),
atomic_cell::make_live(*fm_type, write_timestamp, fm_type->decompose(fm_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *mm_cdef, desc.serialize(*mm_type));
}
// fmm = {0: {0: {a: 0, b: 0}}},
mut.set_clustered_cell(ckey, *fmm_cdef, atomic_cell::make_live(*fmm_type, write_timestamp, fmm_type->decompose(fmm_val)));
// s = s + {{a: 0, b: 0}},
{
collection_mutation_description desc;
desc.cells.emplace_back(ut->decompose(ut_val),
atomic_cell::make_live(*bytes_type, write_timestamp, bytes{}, atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *s_cdef, desc.serialize(*s_type));
}
// fs = {{a: 0, b: 0}},
mut.set_clustered_cell(ckey, *fs_cdef, atomic_cell::make_live(*fs_type, write_timestamp, fs_type->decompose(fs_val)));
// l[scylla_timeuuid_list_index(7fb27e80-7b12-11ea-9fad-f4d108a9e4a3)] = {a: 0, b: 0},
{
collection_mutation_description desc;
desc.cells.emplace_back(timeuuid_type->decompose(utils::UUID("7fb27e80-7b12-11ea-9fad-f4d108a9e4a3")),
atomic_cell::make_live(*ut, write_timestamp, ut->decompose(ut_val), atomic_cell::collection_member::yes));
mut.set_clustered_cell(ckey, *l_cdef, desc.serialize(*l_type));
}
// fl = [{a: 0, b: 0}]
mut.set_clustered_cell(ckey, *fl_cdef, atomic_cell::make_live(*fl_type, write_timestamp, fl_type->decompose(fl_val)));
sstable_assertions sst(s, LEGACY_UDT_IN_COLLECTION_PATH);
sst.load();
assert_that(sst.read_rows_flat()).produces(mut).produces_end_of_stream();
}

View File

@@ -397,9 +397,6 @@ public:
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
cfg->num_tokens.set(256);
cfg->ring_delay_ms.set(500);
auto features = cfg->experimental_features();
features.emplace_back(db::experimental_features_t::LWT);
cfg->experimental_features(features);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
create_directories((data_dir_path + "/system").c_str());
@@ -439,7 +436,6 @@ public:
gms::feature_config fcfg;
fcfg.enable_cdc = true;
fcfg.enable_lwt = true;
fcfg.enable_sstables_mc_format = true;
if (cfg->enable_user_defined_functions()) {
fcfg.enable_user_defined_functions = true;

View File

@@ -0,0 +1,9 @@
Scylla.db
CRC.db
Filter.db
Statistics.db
TOC.txt
Digest.crc32
Index.db
Summary.db
Data.db

View File

@@ -1 +1 @@
docker.io/scylladb/scylla-toolchain:fedora-31-20200128
docker.io/scylladb/scylla-toolchain:fedora-31-20200402