Compare commits

...

32 Commits

Author SHA1 Message Date
Avi Kivity
f9b11c9b30 cql3: update_statement: do not set query option always_return_static_content for list read-before-write
The query option always_return_static_content was added for lightweight
transations in commits e0b31dd273 (infrastructure) and 65b86d155e
(actual use). However, the flag was added unconditionally to
update_parameters::options. This caused it to be set for list
read-modify-write operations, not just for lightweight transactions.
This is a little wasteful, and worse, it breaks compatibility as old
nodes do not understand the always_return_static_content flag and
complain when they see it.

To fix, remove the always_return_static_content from
update_parameters::options and only set it from compare-and-swap
operations that are used to implement lightweight transactions.

Fixes #5593.

Reviewed-by: Gleb Natapov <gleb@scylladb.com>
Message-Id: <20200114135133.2338238-1-avi@scylladb.com>
(cherry picked from commit 6c84dd0045)
2020-01-15 09:15:55 +02:00
Hagit Segev
798357f656 release: prepare for 3.2.0 2020-01-13 14:10:46 +02:00
Takuya ASADA
7eb86fbbb4 docker: fix typo of scylla-jmx script path (#5551)
The path should /opt/scylladb/jmx, not /opt/scylladb/scripts/jmx.

Fixes #5542

(cherry picked from commit 238a25a0f4)
2020-01-08 10:51:34 +02:00
Nadav Har'El
edf431f581 merge: CDC rolling upgrade
Merged pull request https://github.com/scylladb/scylla/pull/5538 from
Avi Kivity and Piotr Jastrzębski.

This series prepares CDC for rolling upgrade. This consists of
reducing the footprint of cdc, when disabled, on the schema, adding
a cluster feature, and redacting the cdc column when transferring
it to other nodes. The latter is needed because we'll want to backport
this to 3.2, which doesn't have canonical_mutations yet.

Fixes #5191.

(cherry picked from commit f0d8dd4094)
2020-01-07 08:19:11 +02:00
Avi Kivity
3f358c9772 tests: schema_change_test: add ability to adjust the schema that we test
This is part of original commit 52b48b415c
("Test that schema digests with UDFs don't change"). It is needed to
test tables with CDC enabled.

Ref #5191.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
2020-01-07 08:13:30 +02:00
Yaron Kaikov
c8d5738a48 release: prepare for 3.2.rc4 2020-01-01 15:19:46 +02:00
Takuya ASADA
de5c06414b dist: stop replacing /usr/lib/scylla with symlink (#5530)
Since we merged /usr/lib/scylla with /opt/scylladb, we removed
/usr/lib/scylla and replace it with the symlink point to /opt/scylladb.
However, RPM does not support replacing a directory with a symlink,
we are doing some dirty hack using RPM scriptlet, but it causes
multiple issues on upgrade/downgrade.
(See: https://docs.fedoraproject.org/en-US/packaging-guidelines/Directory_Replacement/)

To minimize Scylla upgrading/downgrade issues on user side, it's better
to keep /usr/lib/scylla directory.
Instead of creating single symlink /usr/lib/scylla -> /opt/scylladb,
we can create symlinks for each setup scripts like
/usr/lib/scylla/<script> -> /opt/scylladb/scripts/<script>.

Fixes #5522
Fixes #4585
Fixes #4611

(cherry picked from commit 263385cb4b)
2019-12-30 22:02:29 +02:00
Benny Halevy
de314dfe30 tracing: one_session_records: keep local tracing ptr
Similar to trace_state keep shared_ptr<tracing> _local_tracing_ptr
in one_session_records when constructed so it can be used
during shutdown.

Fixes #5243

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 7aef39e400)
2019-12-24 18:42:07 +02:00
Avi Kivity
40a077bf93 database: fix schema use-after-move in make_multishard_streaming_reader
On aarch64, asan detected a use-after-move. It doesn't happen on x86_64,
likely due to different argument evaluation order.

Fix by evaluating full_slice before moving the schema.

Note: I used "auto&&" and "std::move()" even though full_slice()
returns a reference. I think this is safer in case full_slice()
changes, and works just as well with a reference.

Fixes #5419.

(cherry picked from commit 85822c7786)
2019-12-24 18:34:46 +02:00
Pavel Solodovnikov
d488e762cf LWT: Fix required participants calculation for LOCAL_SERIAL CL
Suppose we have a multi-dc setup (e.g. 9 nodes distributed across
3 datacenters: [dc1, dc2, dc3] -> [3, 3, 3]).

When a query that uses LWT is executed with LOCAL_SERIAL consistency
level, the `storage_proxy::get_paxos_participants` function
incorrectly calculates the number of required participants to serve
the query.

In the example above it's calculated to be 5 (i.e. the number of
nodes needed for a regular QUORUM) instead of 2 (for LOCAL_SERIAL,
which is equivalent to LOCAL_QUORUM cl in this case).

This behavior results in an exception being thrown when executing
the following query with LOCAL_SERIAL cl:

INSERT INTO users (userid, firstname, lastname, age) VALUES (0, 'first0', 'last0', 30) IF NOT EXISTS

Unavailable: Error from server: code=1000 [Unavailable exception] message="Cannot achieve consistency level for cl LOCAL_SERIAL. Requires 5, alive 3" info={'required_replicas': 5, 'alive_replicas': 3, 'consistency': 'LOCAL_SERIAL'}

Tests: unit(dev), dtest(consistency_test.py)

Fixes #5477.

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
Message-Id: <20191216151732.64230-1-pa.solodovnikov@scylladb.com>
(cherry picked from commit c451f6d82a)
2019-12-23 15:21:32 +02:00
Asias He
637d80ffcf repair: Do not return working_row_buf_nr in get combined row hash verb
In commit b463d7039c (repair: Introduce
get_combined_row_hash_response), working_row_buf_nr is returned in
REPAIR_GET_COMBINED_ROW_HASH in addition to the combined hash. It is
scheduled to be part of 3.1 release. However it is not backported to 3.1
by accident.

In order to be compatible between 3.1 and 3.2 repair. We need to drop
the working_row_buf_nr in 3.2 release.

Fixes: #5490
Backports: 3.2
Tests: Run repair in a mixed 3.1 and 3.2 cluster
(cherry picked from commit 7322b749e0)
2019-12-22 15:53:18 +02:00
Hagit Segev
39b17be562 release: prepare for 3.2.rc3 2019-12-15 10:33:13 +02:00
Dejan Mircevski
e54df0585e cql3: Fix needs_filtering() for clustering columns
The LIKE operator requires filtering, so needs_filtering() must check
is_LIKE().  This already happens for partition columns, but it was
overlooked for clustering columns in the initial implementation of
LIKE.

Fixes #5400.

Tests: unit(dev)

Signed-off-by: Dejan Mircevski <dejan@scylladb.com>
(cherry picked from commit 27b8b6fe9d)
2019-12-12 14:40:09 +02:00
Avi Kivity
7d113bd1e9 Merge "Add experimental_features option" from Dejan
"
Add --experimental-features -- a vector of features to unlock. Make corresponding changes in the YAML parser.

Fixes #5338
"

* 'vecexper' of https://github.com/dekimir/scylla:
  config: Add `experimental_features` option
  utils: Add enum_option

(cherry picked from commit 63474a3380)
2019-12-12 14:39:42 +02:00
Avi Kivity
2e7cd77bc4 Update seastar submodule
* seastar 8837a3fdf1...8e236efda9 (1):
  > Merge "reactor: fix iocb pool underflow due to unaccounted aio fsync" from Avi

Fixes #5443.
2019-12-12 14:17:32 +02:00
Cem Sancak
0a38d2b0ee Fix DPDK mode in prepare script
Fixes #5455.

(cherry picked from commit 86b8036502)
2019-12-12 14:15:37 +02:00
Piotr Sarna
2ff26d1160 table: Reduce read amplification in view update generation
This commit makes sure that single-partition readers for
read-before-write do not have fast-forwarding enabled,
as it may lead to huge read amplification. The observed case was:
1. Creating an index.
  CREATE INDEX index1  ON myks2.standard1 ("C1");
2. Running cassandra-stress in order to generate view updates.
cassandra-stress write no-warmup n=1000000 cl=ONE -schema \
  'replication(factor=2) compaction(strategy=LeveledCompactionStrategy)' \
  keyspace=myks2 -pop seq=4000000..8000000 -rate threads=100 -errors
  skip-read-validation -node 127.0.0.1;

Without disabling fast-forwarding, single-partition readers
were turned into scanning readers in cache, which resulted
in reading 36GB (sic!) on a workload which generates less
than 1GB of view updates. After applying the fix, the number
dropped down to less than 1GB, as expected.

Refs #5409
Fixes #4615
Fixes #5418

(cherry picked from commit 79c3a508f4)
2019-12-05 22:35:05 +02:00
Calle Wilund
9dd714ae64 commitlog_replayer: Ensure applied frozen_mutation is safe during apply
Fixes #5211

In 79935df959 replay apply-call was
changed from one with no continuation to one with. But the frozen
mutation arg was still just lambda local.

Change to use do_with for this case as well.

Message-Id: <20191203162606.1664-1-calle@scylladb.com>
(cherry picked from commit 56a5e0a251)
2019-12-04 15:02:15 +02:00
Hagit Segev
3980570520 release: prepare for 3.2.rc2 2019-12-03 18:16:37 +02:00
Avi Kivity
9889e553e6 Update seastar submodule
* seastar 6f0ef3251...8837a3fdf (1):
  > shared_future: Fix crash when all returned futures time out

Fixes #5322
2019-11-29 11:47:24 +02:00
Avi Kivity
3e0b09faa1 Update seastar submodule to point to scylla-seastar.git
This allows us to add 3.2 specific patches to Seastar.
2019-11-29 11:45:44 +02:00
Asias He
bc4106ff45 repair: Fix rx_hashes_nr metrics (#5213)
In get_full_row_hashes_with_rpc_stream and
repair_get_row_diff_with_rpc_stream_process_op which were introduced in
the "Repair switch to rpc stream" series, rx_hashes_nr metrics are not
updated correctly.

In the test we have 3 nodes and run repair on node3, we makes sure the
following metrics are correct.

assertEqual(node1_metrics['scylla_repair_tx_hashes_nr'] + node2_metrics['scylla_repair_tx_hashes_nr'],
   	    node3_metrics['scylla_repair_rx_hashes_nr'])
assertEqual(node1_metrics['scylla_repair_rx_hashes_nr'] + node2_metrics['scylla_repair_rx_hashes_nr'],
   	    node3_metrics['scylla_repair_tx_hashes_nr'])
assertEqual(node1_metrics['scylla_repair_tx_row_nr'] + node2_metrics['scylla_repair_tx_row_nr'],
   	    node3_metrics['scylla_repair_rx_row_nr'])
assertEqual(node1_metrics['scylla_repair_rx_row_nr'] + node2_metrics['scylla_repair_rx_row_nr'],
   	    node3_metrics['scylla_repair_tx_row_nr'])
assertEqual(node1_metrics['scylla_repair_tx_row_bytes'] + node2_metrics['scylla_repair_tx_row_bytes'],
   	    node3_metrics['scylla_repair_rx_row_bytes'])
assertEqual(node1_metrics['scylla_repair_rx_row_bytes'] + node2_metrics['scylla_repair_rx_row_bytes'],
            node3_metrics['scylla_repair_tx_row_bytes'])

Tests: repair_additional_test.py:RepairAdditionalTest.repair_almost_synced_3nodes_test
Fixes: #5339
Backports: 3.2
(cherry picked from commit 6ec602ff2c)
2019-11-25 18:18:12 +02:00
Rafael Ávila de Espíndola
df3563c1ae rpmbuild: don't use dwz
By default rpm uses dwz to merge the debug info from various
binaries. Unfortunately, it looks like addr2line has not been updated
to handle this:

// This works
$ addr2line  -e build/release/scylla 0x1234567

$ dwz -m build/release/common.debug build/release/scylla.debug build/release/iotune.debug

// now this fails
$ addr2line -e build/release/scylla 0x1234567

I think the issue is

https://sourceware.org/bugzilla/show_bug.cgi?id=23652

Fixes #5289

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20191123015734.89331-1-espindola@scylladb.com>
(cherry picked from commit 8599f8205b)
2019-11-25 11:45:15 +02:00
Rafael Ávila de Espíndola
1c89961c4f commitlog: make sure a file is closed
If allocate or truncate throws, we have to close the file.

Fixes #4877

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20191114174810.49004-1-espindola@scylladb.com>
(cherry picked from commit 6160b9017d)
2019-11-24 17:47:08 +02:00
Tomasz Grabiec
85b1a45252 row_cache: Fix abort on bad_alloc during cache update
Since 90d6c0b, cache will abort when trying to detach partition
entries while they're updated. This should never happen. It can happen
though, when the update fails on bad_alloc, because the cleanup guard
invalidates the cache before it releases partition snapshots (held by
"update" coroutine).

Fix by destroying the coroutine first.

Fixes #5327.

Tests:
  - row_cache_test (dev)

Message-Id: <1574360259-10132-1-git-send-email-tgrabiec@scylladb.com>
(cherry picked from commit e3d025d014)
2019-11-24 17:42:41 +02:00
Pekka Enberg
6a847e2242 test.py: Append test repeat cycle to output XML filename
Currently, we overwrite the same XML output file for each test repeat
cycle. This can cause invalid XML to be generated if the XML contents
don't match exactly for every iteration.

Fix the problem by appending the test repeat cycle in the XML filename
as follows:

  $ ./test.py --repeat 3 --name vint_serialization_test --mode dev --jenkins jenkins_test

  $ ls -1 *.xml
  jenkins_test.release.vint_serialization_test.0.boost.xml
  jenkins_test.release.vint_serialization_test.1.boost.xml
  jenkins_test.release.vint_serialization_test.2.boost.xml

Fixes #5303.

Message-Id: <20191119092048.16419-1-penberg@scylladb.com>
(cherry picked from commit 505f2c1008)
2019-11-20 22:26:29 +02:00
Nadav Har'El
10cf0e0d91 merge: row_marker: correct row expiry condition
Merged patch set by Piotr Dulikowski:

This change corrects condition on which a row was considered expired by its
TTL.

The logic that decides when a row becomes expired was inconsistent with the
logic that decides if a single cell is expired. A single cell becomes expired
when expiry_timestamp <= now, while a row became expired when
expiry_timestamp < now (notice the strict inequality). For rows inserted
with TTL, this caused non-key cells to expire (change their values to null)
one second before the row disappeared. Now, row expiry logic uses non-strict
inequality.

Fixes #4263,
Fixes #5290.

Tests:

    unit(dev)
    python test described in issue #5290

(cherry picked from commit 9b9609c65b)
2019-11-20 21:37:16 +02:00
Hagit Segev
8c1474c039 release: prepare for 3.2.rc1 2019-11-19 21:38:26 +02:00
Yaron Kaikov
bb5e9527bb dist/docker: Switch to 3.2 release repository (#5296)
Modify Dockerfile SCYLLA_REPO_URL argument to point to the 3.2 repository.
2019-11-18 11:59:51 +02:00
Nadav Har'El
4dae72b2cd sstables: allow non-traditional characters in table name
The goal of this patch is to fix issue #5280, a rather serious Alternator
bug, where Scylla fails to restart when an Alternator table has secondary
indexes (LSI or GSI).

Traditionally, Cassandra allows table names to contain only alphanumeric
characters and underscores. However, most of our internal implementation
doesn't actually have this restriction. So Alternator uses the characters
':' and '!' in the table names to mark global and local secondary indexes,
respectively. And this actually works. Or almost...

This patch fixes a problem of listing, during boot, the sstables stored
for tables with such non-traditional names. The sstable listing code
needlessly assumes that the *directory* name, i.e., the CF names, matches
the "\w+" regular expression. When an sstable is found in a directory not
matching such regular expression, the boot fails. But there is no real
reason to require such a strict regular expression. So this patch relaxes
this requirement, and allows Scylla to boot with Alternator's GSI and LSI
tables and their names which include the ":" and "!" characters, and in
fact any other name allowed as a directory name.

Fixes #5280.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20191114153811.17386-1-nyh@scylladb.com>
(cherry picked from commit 2fb2eb27a2)
2019-11-17 18:07:52 +02:00
Kamil Braun
1e444a3dd5 sstables: fix sstable file I/O CQL tracing when reading multiple files (#5285)
CQL tracing would only report file I/O involving one sstable, even if
multiple sstables were read from during the query.

Steps to reproduce:

create a table with NullCompactionStrategy
insert row, flush memtables
insert row, flush memtables
restart Scylla
tracing on
select * from table
The trace would only report DMA reads from one of the two sstables.

Kudos to @denesb for catching this.

Related issue: #4908

(cherry picked from commit a67e887dea)
2019-11-17 16:09:35 +02:00
Yaron Kaikov
76906d6134 release: prepare for 3.2.rc0 2019-11-17 16:08:11 +02:00
346 changed files with 1082 additions and 110 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -1,7 +1,7 @@
#!/bin/sh
PRODUCT=scylla
VERSION=666.development
VERSION=3.2.0
if test -f version
then

View File

@@ -74,7 +74,7 @@ public:
options() = default;
options(const std::map<sstring, sstring>& map) {
if (map.find("enabled") == std::end(map)) {
throw exceptions::configuration_exception("Missing enabled CDC option");
return;
}
for (auto& p : map) {
@@ -92,6 +92,9 @@ public:
}
}
std::map<sstring, sstring> to_map() const {
if (!_enabled) {
return {};
}
return {
{ "enabled", _enabled ? "true" : "false" },
{ "preimage", _preimage ? "true" : "false" },

View File

@@ -241,7 +241,10 @@ batch_size_fail_threshold_in_kb: 50
# broadcast_rpc_address: 1.2.3.4
# Uncomment to enable experimental features
# experimental: true
# experimental_features:
# - cdc
# - lwt
# - udf
# The directory where hints files are stored if hinted handoff is enabled.
# hints_directory: /var/lib/scylla/hints

View File

@@ -381,6 +381,7 @@ scylla_tests = [
'tests/data_listeners_test',
'tests/truncation_migration_test',
'tests/like_matcher_test',
'tests/enum_option_test',
]
perf_tests = [
@@ -875,6 +876,7 @@ pure_boost_tests = set([
'tests/top_k_test',
'tests/small_vector_test',
'tests/like_matcher_test',
'tests/enum_option_test',
])
tests_not_using_seastar_test_framework = set([

View File

@@ -478,7 +478,7 @@ inline bool single_column_primary_key_restrictions<clustering_key>::needs_filter
// 3. a SLICE restriction isn't on a last place
column_id position = 0;
for (const auto& restriction : _restrictions->restrictions() | boost::adaptors::map_values) {
if (restriction->is_contains() || position != restriction->get_column_def().id) {
if (restriction->is_contains() || restriction->is_LIKE() || position != restriction->get_column_def().id) {
return true;
}
if (!restriction->is_slice()) {

View File

@@ -111,7 +111,9 @@ lw_shared_ptr<query::read_command> cas_request::read_command() const {
} else {
ranges = query::clustering_range::deoverlap(std::move(ranges), clustering_key::tri_compare(*_schema));
}
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, update_parameters::options);
auto options = update_parameters::options;
options.set(query::partition_slice::option::always_return_static_content);
query::partition_slice ps(std::move(ranges), *_schema, columns_to_read, options);
ps.set_partition_row_limit(max_rows);
return make_lw_shared<query::read_command>(_schema->id(), _schema->version(), std::move(ps));
}

View File

@@ -60,7 +60,6 @@ public:
static constexpr query::partition_slice::option_set options = query::partition_slice::option_set::of<
query::partition_slice::option::send_partition_key,
query::partition_slice::option::send_clustering_key,
query::partition_slice::option::always_return_static_content,
query::partition_slice::option::collections_as_maps>();
// Holder for data for

View File

@@ -1984,7 +1984,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), partitioner, std::move(s), pr, ps, pc,
std::move(trace_state), fwd_mr);
});
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), schema->full_slice(),
auto&& full_slice = schema->full_slice();
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
}

View File

@@ -1241,6 +1241,34 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
}
}
/// \brief Helper for ensuring a file is closed if an exception is thrown.
///
/// The file provided by the file_fut future is passed to func.
/// * If func throws an exception E, the file is closed and we return
/// a failed future with E.
/// * If func returns a value V, the file is not closed and we return
/// a future with V.
/// Note that when an exception is not thrown, it is the
/// responsibility of func to make sure the file will be closed. It
/// can close the file itself, return it, or store it somewhere.
///
/// \tparam Func The type of function this wraps
/// \param file_fut A future that produces a file
/// \param func A function that uses a file
/// \return A future that passes the file produced by file_fut to func
/// and closes it if func fails
template <typename Func>
static auto close_on_failure(future<file> file_fut, Func func) {
return file_fut.then([func = std::move(func)](file f) {
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
using futurator = futurize<std::result_of_t<Func(file)>>;
return futurator::make_exception_future(e);
});
});
});
}
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(const descriptor& d, sstring filename, open_flags flags, bool active) {
file_open_options opt;
opt.extent_allocation_size_hint = max_size;
@@ -1258,7 +1286,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
return fut;
});
return fut.then([this, d, active, filename, flags](file f) {
return close_on_failure(std::move(fut), [this, d, active, filename, flags] (file f) {
f = make_checked_file(commit_error_handler, f);
// xfs doesn't like files extended betond eof, so enlarge the file
auto fut = make_ready_future<>();

View File

@@ -276,7 +276,7 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
}
auto shard = _db.local().shard_of(fm);
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) -> future<> {
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp, shard, s] (database& db) mutable -> future<> {
auto& fm = cer.mutation();
// TODO: might need better verification that the deserialized mutation
// is schema compatible. My guess is that just applying the mutation
@@ -306,7 +306,9 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
return db.apply_in_memory(m, cf, db::rp_handle(), db::no_timeout);
});
} else {
return db.apply_in_memory(fm, cf.schema(), db::rp_handle(), db::no_timeout);
return do_with(std::move(cer).mutation(), [&](const frozen_mutation& m) {
return db.apply_in_memory(m, cf.schema(), db::rp_handle(), db::no_timeout);
});
}
}).then_wrapped([s] (future<> f) {
try {

View File

@@ -22,6 +22,7 @@
#include <unordered_map>
#include <regex>
#include <sstream>
#include <boost/any.hpp>
#include <boost/program_options.hpp>
@@ -108,6 +109,10 @@ const config_type config_type_for<int32_t> = config_type("integer", value_to_jso
template <>
const config_type config_type_for<db::seed_provider_type> = config_type("seed provider", seed_provider_to_json);
template <>
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
"experimental features", value_to_json<std::vector<sstring>>);
}
namespace YAML {
@@ -153,6 +158,23 @@ struct convert<db::config::seed_provider_type> {
}
};
template <>
class convert<enum_option<db::experimental_features_t>> {
public:
static bool decode(const Node& node, enum_option<db::experimental_features_t>& rhs) {
std::string name;
if (!convert<std::string>::decode(node, name)) {
return false;
}
try {
std::istringstream(name) >> rhs;
} catch (boost::program_options::invalid_option_value&) {
return false;
}
return true;
}
};
}
#if defined(DEBUG)
@@ -669,7 +691,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, shutdown_announce_in_ms(this, "shutdown_announce_in_ms", value_status::Used, 2 * 1000, "Time a node waits after sending gossip shutdown message in milliseconds. Same as -Dcassandra.shutdown_announce_in_ms in cassandra.")
, developer_mode(this, "developer_mode", value_status::Used, false, "Relax environment checks. Setting to true can reduce performance and reliability significantly.")
, skip_wait_for_gossip_to_settle(this, "skip_wait_for_gossip_to_settle", value_status::Used, -1, "An integer to configure the wait for gossip to settle. -1: wait normally, 0: do not wait at all, n: wait for at most n polls. Same as -Dcassandra.skip_wait_for_gossip_to_settle in cassandra.")
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock experimental features.")
, experimental(this, "experimental", value_status::Used, false, "Set to true to unlock all experimental features.")
, experimental_features(this, "experimental_features", value_status::Used, {}, "Unlock experimental features provided as the option arguments (possible values: 'lwt', 'cdc', 'udf'). Can be repeated.")
, lsa_reclamation_step(this, "lsa_reclamation_step", value_status::Used, 1, "Minimum number of segments to reclaim in a single step")
, prometheus_port(this, "prometheus_port", value_status::Used, 9180, "Prometheus port, set to zero to disable")
, prometheus_address(this, "prometheus_address", value_status::Used, "0.0.0.0", "Prometheus listening address")
@@ -779,10 +802,12 @@ db::fs::path db::config::get_conf_dir() {
return confdir;
}
void db::config::check_experimental(const sstring& what) const {
if (!experimental()) {
throw std::runtime_error(format("{} is currently disabled. Start Scylla with --experimental=on to enable.", what));
bool db::config::check_experimental(experimental_features_t::feature f) const {
if (experimental()) {
return true;
}
const auto& optval = experimental_features();
return find(begin(optval), end(optval), enum_option<experimental_features_t>{f}) != end(optval);
}
namespace bpo = boost::program_options;
@@ -827,6 +852,12 @@ const db::extensions& db::config::extensions() const {
return *_extensions;
}
std::unordered_map<sstring, db::experimental_features_t::feature> db::experimental_features_t::map() {
// We decided against using the construct-on-first-use idiom here:
// https://github.com/scylladb/scylla/pull/5369#discussion_r353614807
return {{"lwt", LWT}, {"udf", UDF}, {"cdc", CDC}};
}
template struct utils::config_file::named_value<seastar::log_level>;
namespace utils {

View File

@@ -33,6 +33,7 @@
#include "seastarx.hh"
#include "utils/config_file.hh"
#include "utils/enum_option.hh"
namespace seastar { class file; struct logging_settings; }
@@ -75,14 +76,20 @@ sstring config_value_as_json(const std::unordered_map<sstring, log_level>& v);
namespace db {
/// Enumeration of all valid values for the `experimental` config entry.
struct experimental_features_t {
enum feature { LWT, UDF, CDC };
static std::unordered_map<sstring, feature> map(); // See enum_option.
};
class config : public utils::config_file {
public:
config();
config(std::shared_ptr<db::extensions>);
~config();
// Throws exception if experimental feature is disabled.
void check_experimental(const sstring& what) const;
/// True iff the feature is enabled.
bool check_experimental(experimental_features_t::feature f) const;
/**
* Scans the environment variables for configuration files directory
@@ -263,6 +270,7 @@ public:
named_value<bool> developer_mode;
named_value<int32_t> skip_wait_for_gossip_to_settle;
named_value<bool> experimental;
named_value<std::vector<enum_option<experimental_features_t>>> experimental_features;
named_value<size_t> lsa_reclamation_step;
named_value<uint16_t> prometheus_port;
named_value<sstring> prometheus_address;

View File

@@ -33,12 +33,14 @@ enum class schema_feature {
// See https://github.com/scylladb/scylla/issues/4485
DIGEST_INSENSITIVE_TO_EXPIRY,
COMPUTED_COLUMNS,
CDC_OPTIONS,
};
using schema_features = enum_set<super_enum<schema_feature,
schema_feature::VIEW_VIRTUAL_COLUMNS,
schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY,
schema_feature::COMPUTED_COLUMNS
schema_feature::COMPUTED_COLUMNS,
schema_feature::CDC_OPTIONS
>>;
}

View File

@@ -294,19 +294,24 @@ schema_ptr tables() {
}
// Holds Scylla-specific table metadata.
schema_ptr scylla_tables() {
static thread_local auto schema = [] {
schema_ptr scylla_tables(schema_features features) {
static auto make = [] (bool has_cdc_options) -> schema_ptr {
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
return schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("version", uuid_type)
.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false))
.set_gc_grace_seconds(schema_gc_grace)
.with_version(generate_schema_version(id))
.build();
}();
return schema;
.set_gc_grace_seconds(schema_gc_grace);
if (has_cdc_options) {
sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false));
sb.with_version(generate_schema_version(id, 1));
} else {
sb.with_version(generate_schema_version(id));
}
return sb.build();
};
static thread_local schema_ptr schemas[2] = { make(false), make(true) };
return schemas[features.contains(schema_feature::CDC_OPTIONS)];
}
// The "columns" table lists the definitions of all columns in all tables
@@ -608,14 +613,28 @@ schema_ptr aggregates() {
}
#endif
static
mutation
redact_columns_for_missing_features(mutation m, schema_features features) {
if (features.contains(schema_feature::CDC_OPTIONS)) {
return std::move(m);
}
if (m.schema()->cf_name() != SCYLLA_TABLES) {
return std::move(m);
}
slogger.debug("adjusting schema_tables mutation due to possible in-progress cluster upgrade");
m.upgrade(scylla_tables(features));
return std::move(m);
}
/**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features features)
{
auto map = [&proxy] (sstring table) {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
auto map = [&proxy, features] (sstring table) {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
auto s = proxy.local().get_db().local().find_schema(NAME, table);
std::vector<mutation> mutations;
for (auto&& p : rs->partitions()) {
@@ -624,6 +643,7 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
if (is_system_keyspace(partition_key)) {
continue;
}
mut = redact_columns_for_missing_features(std::move(mut), features);
mutations.emplace_back(std::move(mut));
}
return mutations;
@@ -647,8 +667,8 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
{
auto map = [&proxy] (sstring table) {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
auto map = [&proxy, features] (sstring table) {
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table, features] (auto rs) {
auto s = proxy.local().get_db().local().find_schema(NAME, table);
std::vector<canonical_mutation> results;
for (auto&& p : rs->partitions()) {
@@ -657,6 +677,7 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
if (is_system_keyspace(partition_key)) {
continue;
}
mut = redact_columns_for_missing_features(std::move(mut), features);
results.emplace_back(mut);
}
return results;
@@ -669,6 +690,14 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
}
std::vector<mutation>
adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features) {
for (auto& m : schema) {
m = redact_columns_for_missing_features(m, features);
}
return std::move(schema);
}
future<schema_result>
read_schema_for_keyspaces(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const std::set<sstring>& keyspace_names)
{
@@ -1673,7 +1702,19 @@ mutation make_scylla_tables_mutation(schema_ptr table, api::timestamp_type times
auto ckey = clustering_key::from_singular(*s, table->cf_name());
mutation m(scylla_tables(), pkey);
m.set_clustered_cell(ckey, "version", utils::UUID(table->version()), timestamp);
store_map(m, ckey, "cdc", timestamp, table->cdc_options().to_map());
auto cdc_options = table->cdc_options().to_map();
if (!cdc_options.empty()) {
store_map(m, ckey, "cdc", timestamp, cdc_options);
} else {
// Avoid storing anything for cdc disabled, so we don't end up with
// different digests on different nodes due to the other node redacting
// the cdc column when the cdc cluster feature is disabled.
//
// Tombstones are not considered for schema digest, so this is okay (and
// needed in order for disabling of cdc to have effect).
auto& cdc_cdef = *scylla_tables()->get_column_definition("cdc");
m.set_clustered_cell(ckey, cdc_cdef, atomic_cell::make_dead(timestamp, gc_clock::now()));
}
return m;
}

View File

@@ -109,7 +109,7 @@ schema_ptr view_virtual_columns();
schema_ptr dropped_columns();
schema_ptr indexes();
schema_ptr tables();
schema_ptr scylla_tables();
schema_ptr scylla_tables(schema_features features = schema_features::full());
schema_ptr views();
schema_ptr computed_columns();
@@ -154,6 +154,7 @@ future<> save_system_keyspace_schema();
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
std::vector<mutation> adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features);
future<schema_result_value_type>
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);

View File

@@ -104,10 +104,10 @@ api::timestamp_type schema_creation_timestamp() {
// FIXME: Make automatic by calculating from schema structure.
static const uint16_t version_sequence_number = 1;
table_schema_version generate_schema_version(utils::UUID table_id) {
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset) {
md5_hasher h;
feed_hash(h, table_id);
feed_hash(h, version_sequence_number);
feed_hash(h, version_sequence_number + offset);
return utils::UUID_gen::get_name_UUID(h.finalize());
}

View File

@@ -152,7 +152,7 @@ schema_ptr aggregates();
}
table_schema_version generate_schema_version(utils::UUID table_id);
table_schema_version generate_schema_version(utils::UUID table_id, uint16_t offset = 0);
// Only for testing.
void minimal_setup(distributed<database>& db, distributed<cql3::query_processor>& qp);

View File

@@ -63,7 +63,7 @@ if __name__ == '__main__':
run('ip link set dev {TAP} master {BRIDGE}'.format(TAP=tap, BRIDGE=bridge))
run('chown {USER}.{GROUP} /dev/vhost-net'.format(USER=user, GROUP=group))
elif mode == 'dpdk':
ethpcciid = cfg.get('ETHPCIID')
ethpciid = cfg.get('ETHPCIID')
nr_hugepages = cfg.get('NR_HUGEPAGES')
run('modprobe uio')
run('modprobe uio_pci_generic')
@@ -73,7 +73,6 @@ if __name__ == '__main__':
f.write(nr_hugepages)
if dist_name() == 'Ubuntu':
run('hugeadm --create-mounts')
fi
else:
set_nic_and_disks = get_set_nic_and_disks_config_value(cfg)
ifname = cfg.get('IFNAME')

View File

@@ -21,6 +21,7 @@ opt/scylladb/scripts/libexec/*
opt/scylladb/bin/*
opt/scylladb/libreloc/*
opt/scylladb/libexec/*
usr/lib/scylla/*
var/lib/scylla/data
var/lib/scylla/commitlog
var/lib/scylla/hints

View File

@@ -24,10 +24,6 @@ if [ "$1" = configure ]; then
fi
ln -sfT /etc/scylla /var/lib/scylla/conf
if [ -d /usr/lib/scylla ]; then
mv /usr/lib/scylla /usr/lib/scylla.old
fi
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
grep -v api_ui_dir /etc/scylla/scylla.yaml | grep -v api_doc_dir > /tmp/scylla.yaml
echo "api_ui_dir: /opt/scylladb/swagger-ui/dist/" >> /tmp/scylla.yaml

View File

@@ -5,7 +5,7 @@ MAINTAINER Avi Kivity <avi@cloudius-systems.com>
ENV container docker
# The SCYLLA_REPO_URL argument specifies the URL to the RPM repository this Docker image uses to install Scylla. The default value is the Scylla's unstable RPM repository, which contains the daily build.
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/master/latest/scylla.repo
ARG SCYLLA_REPO_URL=http://downloads.scylladb.com/rpm/unstable/centos/branch-3.2/latest/scylla.repo
ADD scylla_bashrc /scylla_bashrc

View File

@@ -2,4 +2,4 @@
source /etc/sysconfig/scylla-jmx
exec /opt/scylladb/scripts/jmx/scylla-jmx -l /opt/scylladb/scripts/jmx
exec /opt/scylladb/jmx/scylla-jmx -l /opt/scylladb/jmx

View File

@@ -15,6 +15,8 @@ Obsoletes: scylla-server < 1.1
%global __brp_python_bytecompile %{nil}
%global __brp_mangle_shebangs %{nil}
%undefine _find_debuginfo_dwz_opts
%description
Scylla is a highly scalable, eventually consistent, distributed,
partitioned row DB.
@@ -75,9 +77,6 @@ getent passwd scylla || /usr/sbin/useradd -g scylla -s /sbin/nologin -r -d %{_sh
if [ -f /etc/systemd/coredump.conf ];then
/opt/scylladb/scripts/scylla_coredump_setup
fi
if [ -d /usr/lib/scylla ]; then
mv /usr/lib/scylla /usr/lib/scylla.old
fi
/opt/scylladb/scripts/scylla_post_install.sh
@@ -95,10 +94,6 @@ if [ -d /tmp/%{name}-%{version}-%{release} ]; then
rm -rf /tmp/%{name}-%{version}-%{release}/
fi
ln -sfT /etc/scylla /var/lib/scylla/conf
if [ -d /usr/lib/scylla ]; then
mv /usr/lib/scylla /usr/lib/scylla.old
fi
ln -sfT /opt/scylladb/scripts /usr/lib/scylla
%clean
rm -rf $RPM_BUILD_ROOT
@@ -130,6 +125,7 @@ rm -rf $RPM_BUILD_ROOT
/opt/scylladb/bin/*
/opt/scylladb/libreloc/*
/opt/scylladb/libexec/*
%{_prefix}/lib/scylla/*
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/data
%attr(0755,scylla,scylla) %dir %{_sharedstatedir}/scylla/commitlog

View File

@@ -69,11 +69,6 @@ struct get_sync_boundary_response {
uint64_t new_rows_nr;
};
struct get_combined_row_hash_response {
repair_hash working_row_buf_combined_csum;
uint64_t working_row_buf_nr;
};
enum class row_level_diff_detect_algorithm : uint8_t {
send_full_set,
send_full_set_rpc_stream,

View File

@@ -219,6 +219,20 @@ EOS
for i in $SBINFILES; do
ln -srf "$rprefix/scripts/$i" "$rusr/sbin/$i"
done
# we need keep /usr/lib/scylla directory to support upgrade/downgrade
# without error, so we need to create symlink for each script on the
# directory
install -m755 -d "$rusr"/lib/scylla/scyllatop/views
for i in $(find "$rprefix"/scripts/ -maxdepth 1 -type f); do
ln -srf $i "$rusr"/lib/scylla/
done
for i in $(find "$rprefix"/scyllatop/ -maxdepth 1 -type f); do
ln -srf $i "$rusr"/lib/scylla/scyllatop
done
for i in $(find "$rprefix"/scyllatop/views -maxdepth 1 -type f); do
ln -srf $i "$rusr"/lib/scylla/scyllatop/views
done
else
install -m755 -d "$rdata"/saved_caches
install -d -m755 "$retc"/systemd/system/scylla-server.service.d

View File

@@ -1876,7 +1876,7 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now,
_timestamp = api::missing_timestamp;
return false;
}
if (_ttl > no_ttl && _expiry < now) {
if (_ttl > no_ttl && _expiry <= now) {
_expiry -= _ttl;
_ttl = dead;
}

View File

@@ -679,7 +679,7 @@ public:
if (is_missing() || _ttl == dead) {
return false;
}
if (_ttl != no_ttl && _expiry < now) {
if (_ttl != no_ttl && _expiry <= now) {
return false;
}
return _timestamp > t.timestamp;
@@ -689,7 +689,7 @@ public:
if (_ttl == dead) {
return true;
}
return _ttl != no_ttl && _expiry < now;
return _ttl != no_ttl && _expiry <= now;
}
// Can be called only when is_live().
bool is_expiring() const {

View File

@@ -321,11 +321,7 @@ struct get_sync_boundary_response {
};
// Return value of the REPAIR_GET_COMBINED_ROW_HASH RPC verb
struct get_combined_row_hash_response {
repair_hash working_row_buf_combined_csum;
// The number of rows in the working row buf
uint64_t working_row_buf_nr;
};
using get_combined_row_hash_response = repair_hash;
struct node_repair_meta_id {
gms::inet_address ip;

View File

@@ -1098,14 +1098,14 @@ private:
_working_row_buf_combined_hash.clear();
if (_row_buf.empty()) {
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response{repair_hash(), 0});
return make_ready_future<get_combined_row_hash_response>(get_combined_row_hash_response());
}
return move_row_buf_to_working_row_buf().then([this] {
return do_for_each(_working_row_buf, [this] (repair_row& r) {
_working_row_buf_combined_hash.add(r.hash());
return make_ready_future<>();
}).then([this] {
return get_combined_row_hash_response{_working_row_buf_combined_hash, _working_row_buf.size()};
return get_combined_row_hash_response{_working_row_buf_combined_hash};
});
});
}
@@ -1352,7 +1352,9 @@ public:
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
auto sink_op = get_full_row_hashes_sink_op(sink);
return when_all_succeed(std::move(source_op), std::move(sink_op));
}).then([current_hashes] () mutable {
}).then([this, current_hashes] () mutable {
stats().rx_hashes_nr += current_hashes->size();
_metrics.rx_hashes_nr += current_hashes->size();
return std::move(*current_hashes);
});
}
@@ -1763,6 +1765,7 @@ static future<stop_iteration> repair_get_row_diff_with_rpc_stream_process_op(
return make_exception_future<stop_iteration>(std::runtime_error("get_row_diff_with_rpc_stream: Inject error in handler loop"));
}
bool needs_all_rows = hash_cmd.cmd == repair_stream_cmd::needs_all_rows;
_metrics.rx_hashes_nr += current_set_diff.size();
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(current_set_diff)));
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, needs_all_rows, fp = std::move(fp)] {
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
@@ -2067,6 +2070,7 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
std::unordered_set<repair_hash> set_diff, bool needs_all_rows) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
_metrics.rx_hashes_nr += set_diff.size();
auto fp = make_foreign(std::make_unique<std::unordered_set<repair_hash>>(std::move(set_diff)));
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, fp = std::move(fp), needs_all_rows] () mutable {
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
@@ -2292,8 +2296,8 @@ private:
// are identical, there is no need to transfer each and every
// row hashes to the repair master.
return master.get_combined_row_hash(_common_sync_boundary, _all_nodes[idx]).then([&, this, idx] (get_combined_row_hash_response resp) {
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}, rows_nr={}", _all_nodes[idx], resp.working_row_buf_combined_csum, resp.working_row_buf_nr);
combined_hashes[idx]= std::move(resp.working_row_buf_combined_csum);
rlogger.debug("Calling master.get_combined_row_hash for node {}, got combined_hash={}", _all_nodes[idx], resp);
combined_hashes[idx]= std::move(resp);
});
}).get();

View File

@@ -931,7 +931,6 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
});
return seastar::async([this, &m, updater = std::move(updater), real_dirty_acc = std::move(real_dirty_acc)] () mutable {
coroutine update;
size_t size_entry;
// In case updater fails, we must bring the cache to consistency without deferring.
auto cleanup = defer([&m, this] {
@@ -939,6 +938,7 @@ future<> row_cache::do_update(external_updater eu, memtable& m, Updater updater)
_prev_snapshot_pos = {};
_prev_snapshot = {};
});
coroutine update; // Destroy before cleanup to release snapshots before invalidating.
partition_presence_checker is_present = _prev_snapshot->make_partition_presence_checker();
while (!m.partitions.empty()) {
with_allocator(_tracker.allocator(), [&] () {

View File

@@ -109,7 +109,10 @@ std::optional<std::map<sstring, sstring>> schema_mutations::cdc_options() const
if (_scylla_tables) {
auto rs = query::result_set(*_scylla_tables);
if (!rs.empty()) {
return db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
auto map = db::schema_tables::get_map<sstring, sstring>(rs.row(0), "cdc");
if (map && !map->empty()) {
return map;
}
}
}
return { };

Submodule seastar updated: 6f0ef32514...8e236efda9

View File

@@ -93,6 +93,7 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(ss.cluster_supports_view_virtual_columns().when_enabled(update_schema));
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
_feature_listeners.push_back(ss.cluster_supports_cdc().when_enabled(update_schema));
auto& ms = netw::get_local_messaging_service();
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
@@ -311,7 +312,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
try {
for (const auto& cm : canonical_mutations) {
auto& tbl = db.find_column_family(cm.column_family_id());
mutations.emplace_back(cm.to_mutation(tbl.schema()));
mutations.emplace_back(cm.to_mutation(
tbl.schema()));
}
} catch (no_such_column_family& e) {
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
@@ -902,8 +904,9 @@ future<> migration_manager::announce(std::vector<mutation> mutations, bool annou
future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema)
{
netw::messaging_service::msg_addr id{endpoint, 0};
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, get_local_storage_service().cluster_schema_features());
auto fm = std::vector<frozen_mutation>(adjusted_schema.begin(), adjusted_schema.end());
auto cm = std::vector<canonical_mutation>(adjusted_schema.begin(), adjusted_schema.end());
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
}

View File

@@ -706,7 +706,9 @@ static future<std::optional<utils::UUID>> sleep_and_restart() {
* nodes have seen the most recent commit. Otherwise, return null.
*/
future<utils::UUID> paxos_response_handler::begin_and_repair_paxos(client_state& cs, unsigned& contentions, bool is_write) {
_proxy->get_db().local().get_config().check_experimental("Paxos");
if (!_proxy->get_db().local().get_config().check_experimental(db::experimental_features_t::LWT)) {
throw std::runtime_error("Paxos is currently disabled. Start Scylla with --experimental-features=lwt to enable.");
}
return do_with(api::timestamp_type(0), shared_from_this(), [this, &cs, &contentions, is_write]
(api::timestamp_type& min_timestamp_micros_to_use, shared_ptr<paxos_response_handler>& prh) {
return repeat_until_value([this, &contentions, &cs, &min_timestamp_micros_to_use, is_write] {
@@ -1883,8 +1885,9 @@ storage_proxy::get_paxos_participants(const sstring& ks_name, const dht::token &
});
pending_endpoints.erase(itend, pending_endpoints.end());
size_t participants = pending_endpoints.size() + natural_endpoints.size();
size_t required_participants = db::quorum_for(ks) + pending_endpoints.size();
const size_t participants = pending_endpoints.size() + natural_endpoints.size();
const size_t quorum_size = natural_endpoints.size() / 2 + 1;
const size_t required_participants = quorum_size + pending_endpoints.size();
std::vector<gms::inet_address> live_endpoints;
live_endpoints.reserve(participants);

View File

@@ -344,12 +344,11 @@ std::set<sstring> storage_service::get_config_supported_features_set() {
// This should only be true in tests (see cql_test_env.cc:storage_service_for_tests)
auto& db = service::get_local_storage_service().db();
if (db.local_is_initialized()) {
auto& config = service::get_local_storage_service().db().local().get_config();
auto& config = db.local().get_config();
if (config.enable_sstables_mc_format()) {
features.insert(MC_SSTABLE_FEATURE);
}
if (config.experimental()) {
// push additional experimental features
if (config.check_experimental(db::experimental_features_t::CDC)) {
features.insert(CDC_FEATURE);
}
}
@@ -3533,6 +3532,7 @@ db::schema_features storage_service::cluster_schema_features() const {
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(bool(_view_virtual_columns));
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(bool(_digest_insensitive_to_expiry));
f.set_if<db::schema_feature::COMPUTED_COLUMNS>(bool(_computed_columns));
f.set_if<db::schema_feature::CDC_OPTIONS>(bool(_cdc_feature));
return f;
}

View File

@@ -2341,8 +2341,8 @@ public:
return bool(_mc_sstable_feature);
}
bool cluster_supports_cdc() const {
return bool(_cdc_feature);
const gms::feature& cluster_supports_cdc() const {
return _cdc_feature;
}
bool cluster_supports_row_level_repair() const {

View File

@@ -2699,7 +2699,7 @@ entry_descriptor entry_descriptor::make_descriptor(sstring sstdir, sstring fname
static std::regex la_mc("(la|mc)-(\\d+)-(\\w+)-(.*)");
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
static std::regex dir(".*/([^/]*)/(\\w+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
static std::regex dir(".*/([^/]*)/([^/]+)-[\\da-fA-F]+(?:/staging|/upload|/snapshots/[^/]+)?/?");
std::smatch match;

View File

@@ -292,7 +292,7 @@ create_single_key_sstable_reader(column_family* cf,
filter_sstable_for_reader(sstables->select(pr), *cf, schema, pr, key, slice)
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, std::move(trace_state), fwd);
return sstable->read_row_flat(schema, pr.start()->value(), slice, pc, resource_tracker, trace_state, fwd);
})
);
if (readers.empty()) {
@@ -315,7 +315,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
{
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
return sst->read_range_rows_flat(s, pr, slice, pc, resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
return make_combined_reader(s, std::make_unique<incremental_reader_selector>(s,
std::move(sstables),
@@ -587,7 +587,7 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
auto reader_factory_fn = [s, &slice, &pc, resource_tracker, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
flat_mutation_reader reader = sst->read_range_rows_flat(s, pr, slice, pc,
resource_tracker, std::move(trace_state), fwd, fwd_mr, monitor_generator(sst));
resource_tracker, trace_state, fwd, fwd_mr, monitor_generator(sst));
if (sst->is_shared()) {
using sig = bool (&)(const dht::decorated_key&);
reader = make_filtering_reader(std::move(reader), sig(belongs_to_current_shard));
@@ -2543,7 +2543,7 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
std::move(slice),
std::move(m),
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority] (auto& pk, auto& slice, auto& m) mutable {
auto reader = source.make_reader(base, pk, slice, io_priority);
auto reader = source.make_reader(base, pk, slice, io_priority, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader)).then([lock = std::move(lock)] () mutable {
// return the local partition/row lock we have taken so it
// remains locked until the caller is done modifying this

View File

@@ -131,6 +131,7 @@ boost_tests = [
'data_listeners_test',
'truncation_migration_test',
'like_matcher_test',
'enum_option_test',
]
other_tests = [
@@ -265,7 +266,7 @@ if __name__ == "__main__":
env['UBSAN_OPTIONS'] = 'print_stacktrace=1'
env['BOOST_TEST_CATCH_SYSTEM_ERRORS'] = 'no'
def run_test(path, type, exec_args):
def run_test(path, repeat, type, exec_args):
boost_args = []
# avoid modifying in-place, it will change test_to_run
exec_args = exec_args + '--collectd 0'.split()
@@ -274,7 +275,7 @@ if __name__ == "__main__":
mode = 'release'
if path.startswith(os.path.join('build', 'debug')):
mode = 'debug'
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + ".boost.xml")
xmlout = (args.jenkins + "." + mode + "." + os.path.basename(path.split()[0]) + "." + str(repeat) + ".boost.xml")
boost_args += ['--report_level=no', '--logger=HRF,test_suite:XML,test_suite,' + xmlout]
if type == 'boost':
boost_args += ['--']
@@ -312,8 +313,8 @@ if __name__ == "__main__":
path = test[0]
test_type = test[1]
exec_args = test[2] if len(test) >= 3 else []
for _ in range(args.repeat):
futures.append(executor.submit(run_test, path, test_type, exec_args))
for repeat in range(args.repeat):
futures.append(executor.submit(run_test, path, repeat, test_type, exec_args))
results = []
cookie = len(futures)

View File

@@ -844,14 +844,20 @@ inline std::basic_ostream<Args...> & operator<<(std::basic_ostream<Args...> & os
}
}
namespace {
void throw_on_error(const sstring& opt, const sstring& msg, std::optional<utils::config_file::value_status> status) {
if (status != config::value_status::Invalid) {
throw std::invalid_argument(msg + " : " + opt);
}
}
} // anonymous namespace
SEASTAR_TEST_CASE(test_parse_yaml) {
config cfg;
cfg.read_from_yaml(cassandra_conf, [](auto& opt, auto& msg, auto status) {
if (status != config::value_status::Invalid) {
throw std::invalid_argument(msg + " : " + opt);
}
});
cfg.read_from_yaml(cassandra_conf, throw_on_error);
BOOST_CHECK_EQUAL(cfg.cluster_name(), "Test Cluster");
BOOST_CHECK_EQUAL(cfg.cluster_name.is_set(), true);
@@ -917,3 +923,78 @@ SEASTAR_TEST_CASE(test_parse_broken) {
return make_ready_future<>();
}
using ef = experimental_features_t;
using features = std::vector<enum_option<ef>>;
SEASTAR_TEST_CASE(test_parse_experimental_features_cdc) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::CDC});
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_lwt) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - lwt\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::LWT});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_udf) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - udf\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), features{ef::UDF});
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_multiple) {
config cfg;
cfg.read_from_yaml("experimental_features:\n - cdc\n - lwt\n - cdc\n", throw_on_error);
BOOST_CHECK_EQUAL(cfg.experimental_features(), (features{ef::CDC, ef::LWT, ef::CDC}));
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_features_invalid) {
config cfg;
using value_status = utils::config_file::value_status;
cfg.read_from_yaml("experimental_features:\n - invalidoptiontvaluedonotuse\n",
[&cfg] (const sstring& opt, const sstring& msg, std::optional<value_status> status) {
BOOST_REQUIRE_EQUAL(opt, "experimental_features");
BOOST_REQUIRE_NE(msg.find("line 2, column 7"), msg.npos);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
});
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_true) {
config cfg;
cfg.read_from_yaml("experimental: true", throw_on_error);
BOOST_CHECK(cfg.check_experimental(ef::CDC));
BOOST_CHECK(cfg.check_experimental(ef::LWT));
BOOST_CHECK(cfg.check_experimental(ef::UDF));
return make_ready_future();
}
SEASTAR_TEST_CASE(test_parse_experimental_false) {
config cfg;
cfg.read_from_yaml("experimental: false", throw_on_error);
BOOST_CHECK(!cfg.check_experimental(ef::CDC));
BOOST_CHECK(!cfg.check_experimental(ef::LWT));
BOOST_CHECK(!cfg.check_experimental(ef::UDF));
return make_ready_future();
}

View File

@@ -4070,6 +4070,8 @@ SEASTAR_TEST_CASE(test_like_operator_on_clustering_key) {
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}});
cquery_nofail(e, "insert into t (p, s) values (2, 'acc')");
require_rows(e, "select s from t where s like '%c' allow filtering", {{T("abc")}, {T("acc")}});
cquery_nofail(e, "insert into t (p, s) values (2, 'acd')");
require_rows(e, "select s from t where p = 2 and s like '%c' allow filtering", {{T("acc")}});
});
}

View File

@@ -352,7 +352,10 @@ public:
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
cfg->num_tokens.set(256);
cfg->ring_delay_ms.set(500);
cfg->experimental.set(true);
auto features = cfg->experimental_features();
features.emplace_back(db::experimental_features_t::CDC);
features.emplace_back(db::experimental_features_t::LWT);
cfg->experimental_features(features);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
create_directories((data_dir_path + "/system").c_str());

175
tests/enum_option_test.cc Normal file
View File

@@ -0,0 +1,175 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#define BOOST_TEST_MODULE core
#include <boost/test/unit_test.hpp>
#include <array>
#include <boost/program_options.hpp>
#include <map>
#include <set>
#include <sstream>
#include <string>
#include <unordered_map>
#include "utils/enum_option.hh"
namespace po = boost::program_options;
namespace {
struct days {
enum enumeration { Mo, Tu, We, Th, Fr, Sa, Su };
static std::unordered_map<std::string, enumeration> map() {
return {{"Mon", Mo}, {"Tue", Tu}, {"Wed", We}, {"Thu", Th}, {"Fri", Fr}, {"Sat", Sa}, {"Sun", Su}};
}
};
template <typename T>
enum_option<T> parse(const char* value) {
po::options_description desc("Allowed options");
desc.add_options()("opt", po::value<enum_option<T>>(), "Option");
po::variables_map vm;
const char* argv[] = {"$0", "--opt", value};
po::store(po::parse_command_line(3, argv, desc), vm);
return vm["opt"].as<enum_option<T>>();
}
template <typename T>
std::string format(typename T::enumeration d) {
std::ostringstream os;
os << enum_option<T>(d);
return os.str();
}
} // anonymous namespace
BOOST_AUTO_TEST_CASE(test_parsing) {
BOOST_CHECK_EQUAL(parse<days>("Sun"), days::Su);
BOOST_CHECK_EQUAL(parse<days>("Mon"), days::Mo);
BOOST_CHECK_EQUAL(parse<days>("Tue"), days::Tu);
BOOST_CHECK_EQUAL(parse<days>("Wed"), days::We);
BOOST_CHECK_EQUAL(parse<days>("Thu"), days::Th);
BOOST_CHECK_EQUAL(parse<days>("Fri"), days::Fr);
BOOST_CHECK_EQUAL(parse<days>("Sat"), days::Sa);
}
BOOST_AUTO_TEST_CASE(test_parsing_error) {
BOOST_REQUIRE_THROW(parse<days>("Sunday"), po::invalid_option_value);
BOOST_REQUIRE_THROW(parse<days>(""), po::invalid_option_value);
BOOST_REQUIRE_THROW(parse<days>(" "), po::invalid_option_value);
BOOST_REQUIRE_THROW(parse<days>(" Sun"), po::invalid_option_value);
}
BOOST_AUTO_TEST_CASE(test_formatting) {
BOOST_CHECK_EQUAL(format<days>(days::Mo), "Mon");
BOOST_CHECK_EQUAL(format<days>(days::Tu), "Tue");
BOOST_CHECK_EQUAL(format<days>(days::We), "Wed");
BOOST_CHECK_EQUAL(format<days>(days::Th), "Thu");
BOOST_CHECK_EQUAL(format<days>(days::Fr), "Fri");
BOOST_CHECK_EQUAL(format<days>(days::Sa), "Sat");
BOOST_CHECK_EQUAL(format<days>(days::Su), "Sun");
}
BOOST_AUTO_TEST_CASE(test_formatting_unknown) {
BOOST_CHECK_EQUAL(format<days>(static_cast<days::enumeration>(77)), "?unknown");
}
namespace {
struct names {
enum enumeration { John, Jane, Jim };
static std::map<std::string, enumeration> map() {
return {{"John", John}, {"Jane", Jane}, {"James", Jim}};
}
};
} // anonymous namespace
BOOST_AUTO_TEST_CASE(test_ordered_map) {
BOOST_CHECK_EQUAL(parse<names>("James"), names::Jim);
BOOST_CHECK_EQUAL(format<names>(names::Jim), "James");
BOOST_CHECK_EQUAL(parse<names>("John"), names::John);
BOOST_CHECK_EQUAL(format<names>(names::John), "John");
BOOST_CHECK_EQUAL(parse<names>("Jane"), names::Jane);
BOOST_CHECK_EQUAL(format<names>(names::Jane), "Jane");
BOOST_CHECK_THROW(parse<names>("Jimbo"), po::invalid_option_value);
BOOST_CHECK_EQUAL(format<names>(static_cast<names::enumeration>(77)), "?unknown");
}
namespace {
struct cities {
enum enumeration { SF, TO, NY };
static std::unordered_map<std::string, enumeration> map() {
return {
{"SanFrancisco", SF}, {"SF", SF}, {"SFO", SF}, {"Frisco", SF},
{"Toronto", TO}, {"TO", TO}, {"YYZ", TO}, {"TheSix", TO},
{"NewYork", NY}, {"NY", NY}, {"NYC", NY}, {"BigApple", NY},
};
}
};
} // anonymous namespace
BOOST_AUTO_TEST_CASE(test_multiple_parse) {
BOOST_CHECK_EQUAL(parse<cities>("SanFrancisco"), cities::SF);
BOOST_CHECK_EQUAL(parse<cities>("SF"), cities::SF);
BOOST_CHECK_EQUAL(parse<cities>("SFO"), cities::SF);
BOOST_CHECK_EQUAL(parse<cities>("Frisco"), cities::SF);
BOOST_CHECK_EQUAL(parse<cities>("Toronto"), cities::TO);
BOOST_CHECK_EQUAL(parse<cities>("TO"), cities::TO);
BOOST_CHECK_EQUAL(parse<cities>("YYZ"), cities::TO);
BOOST_CHECK_EQUAL(parse<cities>("TheSix"), cities::TO);
BOOST_CHECK_EQUAL(parse<cities>("NewYork"), cities::NY);
BOOST_CHECK_EQUAL(parse<cities>("NY"), cities::NY);
BOOST_CHECK_EQUAL(parse<cities>("NYC"), cities::NY);
BOOST_CHECK_EQUAL(parse<cities>("BigApple"), cities::NY);
}
BOOST_AUTO_TEST_CASE(test_multiple_format) {
BOOST_CHECK((std::set<std::string>{"SanFrancisco", "SF", "SFO", "Frisco"}).count(format<cities>(cities::SF)));
BOOST_CHECK((std::set<std::string>{"Toronto", "TO", "YYZ", "TheSix"}).count(format<cities>(cities::TO)));
BOOST_CHECK((std::set<std::string>{"NewYork", "NY", "NYC", "BigApple"}).count(format<cities>(cities::NY)));
}
namespace {
struct numbers {
enum enumeration { ONE, TWO };
static std::unordered_map<int, enumeration> map() {
return {{1, ONE}, {2, TWO}};
}
};
} // anonymous namespace
BOOST_AUTO_TEST_CASE(test_non_string) {
BOOST_CHECK_EQUAL(parse<numbers>("1"), numbers::ONE);
BOOST_CHECK_EQUAL(parse<numbers>("2"), numbers::TWO);
BOOST_CHECK_THROW(parse<numbers>("3"), po::invalid_option_value);
BOOST_CHECK_THROW(parse<numbers>("xx"), po::invalid_option_value);
BOOST_CHECK_THROW(parse<numbers>(""), po::invalid_option_value);
BOOST_CHECK_EQUAL(format<numbers>(numbers::ONE), "1");
BOOST_CHECK_EQUAL(format<numbers>(numbers::TWO), "2");
BOOST_CHECK_EQUAL(format<numbers>(static_cast<numbers::enumeration>(77)), "?unknown");
}

View File

@@ -1320,6 +1320,104 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) {
assert_that(m).is_equal_to(m2);
}
// This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some
// duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect
// to TTL.
SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) {
can_gc_fn never_gc = [] (tombstone) { return false; };
auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_alive({}, {})", mark, t));
BOOST_REQUIRE(mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t));
BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) {
BOOST_TEST_MESSAGE(format("must_be_dead({}, {})", mark, t));
BOOST_REQUIRE(!mark.is_live(tombstone(), t));
BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t));
BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point()));
};
const auto timestamp = api::timestamp_type(1);
const auto t0 = gc_clock::now();
const auto t1 = t0 + 1s;
const auto t2 = t0 + 2s;
const auto t3 = t0 + 3s;
// Without timestamp the marker is missing (doesn't exist)
const row_marker m1;
must_be_dead(m1, t0);
must_be_dead(m1, t1);
must_be_dead(m1, t2);
must_be_dead(m1, t3);
// With timestamp and without ttl, a row_marker is always alive
const row_marker m2(timestamp);
must_be_alive(m2, t0);
must_be_alive(m2, t1);
must_be_alive(m2, t2);
must_be_alive(m2, t3);
// A row_marker becomes dead exactly at the moment of expiry
// Reproduces #4263, #5290
const auto ttl = 1s;
const row_marker m3(timestamp, ttl, t2);
must_be_alive(m3, t0);
must_be_alive(m3, t1);
must_be_dead(m3, t2);
must_be_dead(m3, t3);
}
SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.build();
auto pk = partition_key::from_singular(*s, data_value(bytes("key1")));
auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A")));
auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B")));
auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C")));
auto ttl = 1s;
auto t0 = gc_clock::now();
auto t1 = t0 + 1s;
auto t2 = t0 + 2s;
auto t3 = t0 + 3s;
auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) {
auto slice = partition_slice_builder(*s)
.without_partition_key_columns()
.build();
auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash};
return query::result_set::from_raw_result(s, slice, m.query(slice, opts, t));
};
mutation m(s, pk);
m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1));
m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2));
m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3));
assert_that(results_at_time(m, t0))
.has_size(3)
.has(a_row().with_column("ck", data_value(bytes("A"))))
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t1))
.has_size(2)
.has(a_row().with_column("ck", data_value(bytes("B"))))
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t2))
.has_size(1)
.has(a_row().with_column("ck", data_value(bytes("C"))));
assert_that(results_at_time(m, t3)).is_empty();
}
SEASTAR_TEST_CASE(test_querying_expired_cells) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")

View File

@@ -371,10 +371,8 @@ SEASTAR_TEST_CASE(test_merging_does_not_alter_tables_which_didnt_change) {
muts2.push_back(db::schema_tables::make_scylla_tables_mutation(s0, api::new_timestamp()));
mm.announce(muts2).get();
// SCYLLA_TABLES have additional columns so announcing its mutation
// changes the tables
BOOST_REQUIRE(s1 != find_table().schema());
BOOST_REQUIRE(legacy_version != find_table().schema()->version());
BOOST_REQUIRE(s1 == find_table().schema());
BOOST_REQUIRE_EQUAL(legacy_version, find_table().schema()->version());
});
});
}
@@ -575,7 +573,7 @@ SEASTAR_TEST_CASE(test_prepared_statement_is_invalidated_by_schema_change) {
// We don't want schema digest to change between Scylla versions because that results in a schema disagreement
// during rolling upgrade.
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests) {
future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_dir, std::set<sstring> disabled_features, std::vector<utils::UUID> expected_digests, std::function<void(cql_test_env& e)> extra_schema_changes) {
using namespace db;
using namespace db::schema_tables;
@@ -588,6 +586,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
auto db_cfg_ptr = make_shared<db::config>();
auto& db_cfg = *db_cfg_ptr;
db_cfg.experimental_features({experimental_features_t::UDF}, db::config::config_source::CommandLine);
if (regenerate) {
db_cfg.data_file_directories({data_dir}, db::config::config_source::CommandLine);
} else {
@@ -597,7 +596,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
cql_test_config cfg_in(db_cfg_ptr);
cfg_in.disabled_features = std::move(disabled_features);
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests)](cql_test_env& e) {
return do_with_cql_env_thread([regenerate, expected_digests = std::move(expected_digests), extra_schema_changes = std::move(extra_schema_changes)] (cql_test_env& e) {
if (regenerate) {
// Exercise many different kinds of schema changes.
e.execute_cql(
@@ -613,6 +612,7 @@ future<> test_schema_digest_does_not_change_with_disabled_features(sstring data_
e.execute_cql(
"create keyspace tests2 with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();
e.execute_cql("drop keyspace tests2;").get();
extra_schema_changes(e);
}
auto expect_digest = [&] (schema_features sf, utils::UUID expected) {
@@ -673,7 +673,7 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change) {
utils::UUID("1d91ad22-ea7c-3e7f-9557-87f0f3bb94d7"),
utils::UUID("2dcd4a37-cbb5-399b-b3c9-8eb1398b096b")
};
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS"}, std::move(expected_digests));
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test", std::set<sstring>{"COMPUTED_COLUMNS", "CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
}
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
@@ -688,5 +688,26 @@ SEASTAR_TEST_CASE(test_schema_digest_does_not_change_after_computed_columns) {
utils::UUID("d58e5214-516e-3d0b-95b5-01ab71584a8d"),
utils::UUID("e1b50bed-2ab8-3759-92c7-1f4288046ae6")
};
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{}, std::move(expected_digests));
return test_schema_digest_does_not_change_with_disabled_features("./tests/sstables/schema_digest_test_computed_columns", std::set<sstring>{"CDC"}, std::move(expected_digests), [] (cql_test_env& e) {});
}
SEASTAR_TEST_CASE(test_schema_digest_does_not_change_with_cdc_options) {
std::vector<utils::UUID> expected_digests{
utils::UUID("a1f07f31-59d6-372a-8c94-7ea467354b39"),
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
utils::UUID("524d418d-a2e2-3fc3-bf45-5fb79b33c7e4"),
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
utils::UUID("018fccba-8050-3bb9-a0a5-2b3c5f0371fe"),
utils::UUID("58f4254e-cc3b-3d56-8a45-167f9a3ea423"),
utils::UUID("48fda4f8-d7b5-3e59-a47a-7397989a9bf8"),
utils::UUID("8049bcfe-eb01-3a59-af33-16cef8a34b45"),
utils::UUID("2195a821-b2b8-3cb8-a179-2f5042e90841")
};
return test_schema_digest_does_not_change_with_disabled_features(
"./tests/sstables/schema_digest_test_cdc_options",
std::set<sstring>{},
std::move(expected_digests),
[] (cql_test_env& e) {
e.execute_cql("create table tests.table_cdc (pk int primary key, c1 int, c2 int) with cdc = {'enabled':'true'};").get();
});
}

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

View File

@@ -0,0 +1,9 @@
CompressionInfo.db
Filter.db
Data.db
Statistics.db
TOC.txt
Digest.crc32
Scylla.db
Index.db
Summary.db

Some files were not shown because too many files have changed in this diff Show More