Compare commits

...

27 Commits

Author SHA1 Message Date
Yaron Kaikov
6c0825e2a6 release: prepare for 4.6.11 2022-11-28 15:45:26 +02:00
Nadav Har'El
db3dd3bdf6 Merge 'cql3: don't ignore other restrictions when a multi column restriction is present during filtering' from Jan Ciołek
When filtering with multi column restriction present all other restrictions were ignored.
So a query like:
`SELECT * FROM WHERE pk = 0 AND (ck1, ck2) < (0, 0) AND regular_col = 0 ALLOW FILTERING;`
would ignore the restriction `regular_col = 0`.

This was caused by a bug in the filtering code:
2779a171fc/cql3/selection/selection.cc (L433-L449)

When multi column restrictions were detected, the code checked if they are satisfied and returned immediately.
This is fixed by returning only when these restrictions are not satisfied. When they are satisfied the other restrictions are checked as well to ensure all of them are satisfied.

This code was introduced back in 2019, when fixing #3574.
Perhaps back then it was impossible to mix multi column and regular columns and this approach was correct.

Fixes: #6200
Fixes: #12014

Closes #12031

* github.com:scylladb/scylladb:
  cql-pytest: add a reproducer for #12014, verify that filtering multi column and regular restrictions works
  boost/restrictions-test: uncomment part of the test that passes now
  cql-pytest: enable test for filtering combined multi column and regular column restrictions
  cql3: don't ignore other restrictions when a multi column restriction is present during filtering

(cherry picked from commit 2d2034ea28)

Closes #12086
2022-11-27 00:15:04 +02:00
Pavel Emelyanov
4ad24180f5 Merge '[branch-4.6] multishard_mutation_query: don't unpop partition header of spent partition ' from Botond Dénes
When stopping the read, the multishard reader will dismantle the
compaction state, pushing back (unpopping) the currently processed
partition's header to its originating reader. This ensures that if the
reader stops in the middle of a partition, on the next page the
partition-header is re-emitted as the compactor (and everything
downstream from it) expects.
It can happen however that there is nothing more for the current
partition in the reader and the next fragment is another partition.
Since we only push back the partition header (without a partition-end)
this can result in two partitions being emitted without being separated
by a partition end.
We could just add the missing partition-end when needed but it is
pointless, if the partition has no more data, just drop the header, we
won't need it on the next page.

The missing partition-end can generate an "IDL frame truncated" message
as it ends up causing the query result writer to create a corrupt
partition entry.

Fixes: https://github.com/scylladb/scylladb/issues/9482

Closes #11914

* github.com:scylladb/scylladb:
  test/cql-pytest: add regression test for "IDL frame truncated" error
  mutation_compactor: detach_state(): make it no-op if partition was exhausted
  treewide: fix headers
2022-11-16 11:52:51 +03:00
Anna Mikhlin
755c7eeb6a release: prepare for 4.6.10 2022-11-14 10:30:20 +02:00
Eliran Sinvani
8914ca8c58 cql: Fix crash upon use of the word empty for service level name
Wrong access to an uninitialized token instead of the actual
generated string caused the parser to crash, this wasn't
detected by the ANTLR3 compiler because all the temporary
variables defined in the ANTLR3 statements are global in the
generated code. This essentialy caused a null dereference.

Tests: 1. The fixed issue scenario from github.
       2. Unit tests in release mode.

Fixes #11774

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>
Message-Id: <20190612133151.20609-1-eliransin@scylladb.com>

Closes #11777

(cherry picked from commit ab7429b77d)
2022-11-10 20:43:44 +02:00
Botond Dénes
e82e4bbed3 test/cql-pytest: add regression test for "IDL frame truncated" error
(cherry picked from commit 11af489e84)
2022-11-07 16:51:14 +02:00
Botond Dénes
f9c457778e mutation_compactor: detach_state(): make it no-op if partition was exhausted
detach_state() allows the user to resume a compaction process later,
without having to keep the compactor object alive. This happens by
generating and returning the mutation fragments the user has to re-feed
to a newly constructed compactor to bring it into the exact same state
the current compactor was at the point of stopping the compaction.
This state includes the partition-header (partition-start and static-row
if any) and the currently active range tombstone.
Detaching the state is pointless however when the compaction was stopped
such that the currently compacted partition was completely exhausted.
Allowing the state to be detached in this case seems benign but it
caused a subtle bug in the main user of this feature: the partition
range scan algorithm, where the fragments included in the detached state
were pushed back into the reader which produced them. If the partition
happened to be exhausted -- meaning the next fragment in the reader was
a partition-start or EOS -- this resulted in the partition being
re-emitted later without a partition-end, resulting in corrupt
query-result being generated, in turn resulting in an obscure "IDL frame
truncated" error.

This patch solves this seemingly benign but sinister bug by making the
return value of `detach_state()` an std::optional and returning a
disengaged optional when the partition was exhausted.

(cherry picked from commit 70b4158ce0)
2022-11-07 16:51:14 +02:00
Botond Dénes
8315a7b164 treewide: fix headers
To fix CI.
2022-11-07 16:51:14 +02:00
Nadav Har'El
291ca8db60 cql3: fix cql3::util::maybe_quote() for keywords
cql3::util::maybe_quote() is a utility function formatting an identifier
name (table name, column name, etc.) that needs to be embedded in a CQL
statement - and might require quoting if it contains non-alphanumeric
characters, uppercase characters, or a CQL keyword.

maybe_quote() made an effort to only quote the identifier name if neccessary,
e.g., a lowercase name usually does not need quoting. But lowercase names
that are CQL keywords - e.g., to or where - cannot be used as identifiers
without quoting. This can cause problems for code that wants to generate
CQL statements, such as the materialized-view problem in issue #9450 - where
a user had a column called "to" and wanted to create a materialized view
for it.

So in this patch we fix maybe_quote() to recognize invalid identifiers by
using the CQL parser, and quote them. This will quote reserved keywords,
but not so-called unreserved keywords, which *are* allowed as identifiers
and don't need quoting. This addition slows down maybe_quote(), but
maybe_quote() is anyway only used in heavy operations which need to
generate CQL.

This patch also adds two tests that reproduce the bug and verify its
fix:

1. Add to the low-level maybe_quote() test (a C++ unit test) also tests
   that maybe_quote() quotes reserved keywords like "to", but doesn't
   quote unreserved keywords like "int".

2. Add a test reproducing issue #9450 - creating a materialized view
   whose key column is a keyword. This new test passes on Cassandra,
   failed on Scylla before this patch, and passes after this patch.

It is worth noting that maybe_quote() now has a "forward compatiblity"
problem: If we save CQL statements generated by maybe_quote(), and a
future version introduces a new reserved keyword, the parser of the
future version may not be able to parse the saved CQL statement that
was generated with the old mayb_quote() and didn't quote what is now
a keyword. This problem can be solved in two ways:

1. Try hard not to introduced new reserved keywords. Instead, introduce
   unreserved keywords. We've been doing this even before recognizing
   this maybe_quote() future-compatibility problem.

2. In the next patch we will introduce quote() - which unconditionally
   quotes identifier names, even if lowercase. These quoted names will
   be uglier for lowercase names - but will be safe from future
   introduction of new keywords. So we can consider switching some or
   all uses of maybe_quote() to quote().

Fixes #9450

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220118161217.231811-1-nyh@scylladb.com>
(cherry picked from commit 5d2f694a90)
2022-11-07 10:38:10 +02:00
Jadw1
4da5fbaa24 CQL3: fromJson accepts string as bool
The problem was incompatibility with cassandra, which accepts bool
as a string in `fromJson()` UDF. The difference between Cassandra and
Scylla now is Scylla accepts whitespaces around word in string,
Cassandra don't. Both are case insensitive.

Fixes: #7915
(cherry picked from commit 1902dbc9ff)
2022-11-07 10:38:10 +02:00
Takuya ASADA
fc16664d81 locator::ec2_snitch: Retry HTTP request to EC2 instance metadata service
EC2 instance metadata service can be busy, ret's retry to connect with
interval, just like we do in scylla-machine-image.

Fixes #10250

Signed-off-by: Takuya ASADA <syuu@scylladb.com>

Closes #11688

(cherry picked from commit 6b246dc119)
(cherry picked from commit e2809674d2)
2022-11-06 15:43:58 +02:00
Botond Dénes
80bea5341e Merge 'Alternator, MV: fix bug in some view updates which set the view key to its existing value' from Nadav Har'El
As described in issue #11801, we saw in Alternator when a GSI has both partition and sort keys which were non-key attributes in the base, cases where updating the GSI-sort-key attribute to the same value it already had caused the entire GSI row to be deleted.

In this series fix this bug (it was a bug in our materialized views implementation) and add a reproducing test (plus a few more tests for similar situations which worked before the patch, and continue to work after it).

Fixes #11801

Closes #11808

* github.com:scylladb/scylladb:
  test/alternator: add test for issue 11801
  MV: fix handling of view update which reassign the same key value
  materialized views: inline used-once and confusing function, replace_entry()

(cherry picked from commit e981bd4f21)
2022-11-01 13:31:51 +02:00
Botond Dénes
6ecc772b56 mutation_partition: deletable_row::apply(shadowable_tombstone): remove redundant maybe_shadow()
Shadowing is already checked by the underlying row_tombstone::apply().
This redundant check was introduced by a previous fix to #9483
(6a76e12768). The rest of that patch is
good.

Refs: #9483
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20211115091513.181233-1-bdenes@scylladb.com>
(cherry picked from commit b136746040)
2022-10-16 11:53:04 +03:00
Benny Halevy
0b2e951954 range_tombstone_list: insert_from: correct rev.update range_tombstone in not overlapping case
2nd std::move(start) looks like a typo
in fe2fa3f20d.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220404124741.1775076-1-bhalevy@scylladb.com>
(cherry picked from commit 2d80057617)
2022-10-14 12:29:56 +02:00
Pavel Emelyanov
f2a738497f compaction_manager: Swallow ENOSPCs in ::stop()
When being stopped compaction manager may step on ENOSPC. This is not a
reason to fail stopping process with abort, better to warn this fact in
logs and proceed as if nothing happened

refs: #11245

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 16:02:33 +03:00
Pavel Emelyanov
badf7c816f exceptions: Mark storage_io_error::code() with noexcept
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 16:02:32 +03:00
Pavel Emelyanov
bfb86f2c78 compaction_manager: Shuffle really_do_stop()
Make it the future-returning method and setup the _stop_future in its
only caller. Makes next patch much simpler

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 16:02:31 +03:00
Beni Peled
18e7a46038 release: prepare for 4.6.9 2022-10-09 08:54:33 +03:00
Nadav Har'El
cbcfa31e51 cql: validate bloom_filter_fp_chance up-front
Scylla's Bloom filter implementation has a minimal false-positive rate
that it can support (6.71e-5). When setting bloom_filter_fp_chance any
lower than that, the compute_bloom_spec() function, which writes the bloom
filter, throws an exception. However, this is too late - it only happens
while flushing the memtable to disk, and a failure at that point causes
Scylla to crash.

Instead, we should refuse the table creation with the unsupported
bloom_filter_fp_chance. This is also what Cassandra did six years ago -
see CASSANDRA-11920.

This patch also includes a regression test, which crashes Scylla before
this patch but passes after the patch (and also passes on Cassandra).

Fixes #11524.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #11576

(cherry picked from commit 4c93a694b7)
2022-10-04 16:23:25 +03:00
Nadav Har'El
5ee69ff3a9 alternator: return ProvisionedThroughput in DescribeTable
DescribeTable is currently hard-coded to return PAY_PER_REQUEST billing
mode. Nevertheless, even in PAY_PER_REQUEST mode, the DescribeTable
operation must return a ProvisionedThroughput structure, listing both
ReadCapacityUnits and WriteCapacityUnits as 0. This requirement is not
stated in some DynamoDB documentation but is explictly mentioned in
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ProvisionedThroughput.html
Also in empirically, DynamoDB returns ProvisionedThroughput with zeros
even in PAY_PER_REQUEST mode. We even had an xfailing test to confirm this.

The ProvisionedThroughput structure being missing was a problem for
applications like DynamoDB connectors for Spark, if they implicitly
assume that ProvisionedThroughput is returned by DescribeTable, and
fail (as described in issue #11222) if it's outright missing.

So this patch adds the missing ProvisionedThroughput structure, and
the xfailing test starts to pass.

Note that this patch doesn't change the fact that attempting to set
a table to PROVISIONED billing mode is ignored: DescribeTable continues
to always return PAY_PER_REQUEST as the billing mode and zero as the
provisioned capacities.

Fixes #11222

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #11298

(cherry picked from commit 941c719a23)
2022-10-03 14:29:22 +03:00
Tomasz Grabiec
949103d22a test: lib: random_mutation_generator: Don't generate mutations with marker uncompacted with shadowable tombstone
The generator was first setting the marker then applied tombstones.

The marker was set like this:

  row.marker() = random_row_marker();

Later, when shadowable tombstones were applied, they were compacted
with the marker as expected.

However, the key for the row was chosen randomly in each iteration and
there are multiple keys set, so there was a possibility of a key clash
with an earlier row. This could override the marker without applying
any tombstones, which is conditional on random choice.

This could generate rows with markers uncompacted with shadowable tombstones.

This broken row_cache_test::test_concurrent_reads_and_eviction on
comparison between expected and read mutations. The latter was
compacted because it went through an extra merge path, which compacts
the row.

Fix by making sure there are no key clashes.

Closes #11663

(cherry picked from commit 5268f0f837)
2022-10-03 09:00:28 +03:00
Botond Dénes
549cb60f4c sstables: crawling mx-reader: make on_out_of_clustering_range() no-op
Said method currently emits a partition-end. This method is only called
when the last fragment in the stream is a range tombstone change with a
position after all clustered rows. The problem is that
consume_partition_end() is also called unconditionally, resulting in two
partition-end fragments being emitted. The fix is simple: make this
method a no-op, there is nothing to do there.

Also add two tests: one targeted to this bug and another one testing the
crawling reader with random mutations generated for random schema.

Fixes: #11421

Closes #11422

(cherry picked from commit be9d1c4df4)
2022-09-30 17:56:58 +03:00
Botond Dénes
37633c5576 test/lib/random_schema: add a simpler overload for fixed partition count
Some tests want to generate a fixed amount of random partitions, make
their life easier.

(cherry picked from commit 98f3d516a2)

Ref #11421 (prerequisite)
2022-09-30 17:56:10 +03:00
Michael Livshin
abd9f43fa7 batchlog_manager: warn when a batch fails to replay
Only for reasons other than "no such KS", i.e. when the failure is
presumed transient and the batch in question is not deleted from
batchlog and will be retried in the future.

(Would info be more appropriate here than warning?)

Signed-off-by: Michael Livshin <michael.livshin@scylladb.com>

Closes #10556

Fixes #10636

(cherry picked from commit 00ed4ac74c)
2022-09-29 12:13:21 +03:00
Raphael S. Carvalho
d41d4db5c0 compaction: Make cleanup withstand better disk pressure scenario
It's not uncommong for cleanup to be issued against an entire keyspace,
which may be composed of tons of tables. To increase chances of success
if low on space, cleanup will now start from smaller tables first, such
that bigger tables will have more space available, once they're reached,
to satisfy their space requirement.

parallel_for_each() is dropped and wasn't needed given that manager
performs per-shard serialization of cleanup jobs.

Refs #9504.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20211130133712.64517-1-raphaelsc@scylladb.com>
(cherry picked from commit 0d5ac845e1)
2022-09-29 10:15:29 +03:00
Michał Radwański
c500043a78 flat_mutation_reader: allow destructing readers which are not closed and didn't initiate any IO.
In functions such as upgrade_to_v2 (excerpt below), if the constructor
of transforming_reader throws, r needs to be destroyed, however it
hasn't been closed. However, if a reader didn't start any operations, it
is safe to destruct such a reader. This issue can potentially manifest
itself in many more readers and might be hard to track down. This commit
adds a bool indicating whether a close is anticipated, thus avoiding
errors in the destructor.

Code excerpt:
flat_mutation_reader_v2 upgrade_to_v2(flat_mutation_reader r) {
    class transforming_reader : public flat_mutation_reader_v2::impl {
        // ...
    };
    return make_flat_mutation_reader_v2<transforming_reader>(std::move(r));
}

Fixes #9065.

(cherry picked from commit 9ada63a9cb)
2022-09-29 09:40:07 +03:00
Pavel Emelyanov
af4752a526 messaging_service: Fix gossiper verb group
When configuring tcp-nodelay unconditionally, messaging service thinks
gossiper uses group index 1, though it had changed some time ago and now
those verbs belong to group 0.

fixes: #11465

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 2c74062962)
2022-09-19 10:32:49 +03:00
45 changed files with 591 additions and 78 deletions

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=4.6.8
VERSION=4.6.11
if test -f version
then

View File

@@ -415,6 +415,11 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
rjson::add(table_description, "BillingModeSummary", rjson::empty_object());
rjson::add(table_description["BillingModeSummary"], "BillingMode", "PAY_PER_REQUEST");
rjson::add(table_description["BillingModeSummary"], "LastUpdateToPayPerRequestDateTime", rjson::value(creation_date_seconds));
// In PAY_PER_REQUEST billing mode, provisioned capacity should return 0
rjson::add(table_description, "ProvisionedThroughput", rjson::empty_object());
rjson::add(table_description["ProvisionedThroughput"], "ReadCapacityUnits", 0);
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", 0);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
std::unordered_map<std::string,std::string> key_attribute_types;
// Add base table's KeySchema and collect types for AttributeDefinitions:

View File

@@ -604,15 +604,21 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_exception_future<json::json_return_type>(
std::runtime_error("Can not perform cleanup operation when topology changes"));
}
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) {
std::vector<column_family*> column_families_vec;
auto& cm = db.get_compaction_manager();
for (auto cf : column_families) {
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
}
return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) {
return cm.perform_cleanup(db, cf);
return ctx.db.invoke_on_all([keyspace, column_families] (database& db) -> future<> {
auto table_ids = boost::copy_range<std::vector<utils::UUID>>(column_families | boost::adaptors::transformed([&] (auto& table_name) {
return db.find_uuid(keyspace, table_name);
}));
// cleanup smaller tables first, to increase chances of success if low on space.
std::ranges::sort(table_ids, std::less<>(), [&] (const utils::UUID& id) {
return db.find_column_family(id).get_stats().live_disk_space_used;
});
auto& cm = db.get_compaction_manager();
// as a table can be dropped during loop below, let's find it before issuing the cleanup request.
for (auto& id : table_ids) {
table& t = db.find_column_family(id);
co_await cm.perform_cleanup(db, &t);
}
co_return;
}).then([]{
return make_ready_future<json::json_return_type>(0);
});

View File

@@ -527,16 +527,11 @@ future<> compaction_manager::stop() {
}
}
void compaction_manager::really_do_stop() {
if (_state == state::none || _state == state::stopped) {
return;
}
_state = state::stopped;
future<> compaction_manager::really_do_stop() {
cmlog.info("Asked to stop");
// Reset the metrics registry
_metrics.clear();
_stop_future.emplace(stop_ongoing_compactions("shutdown").then([this] () mutable {
return stop_ongoing_compactions("shutdown").then([this] () mutable {
reevaluate_postponed_compactions();
return std::move(_waiting_reevalution);
}).then([this] {
@@ -544,12 +539,34 @@ void compaction_manager::really_do_stop() {
_compaction_submission_timer.cancel();
cmlog.info("Stopped");
return _compaction_controller.shutdown();
}));
});
}
template <typename Ex>
requires std::is_base_of_v<std::exception, Ex> &&
requires (const Ex& ex) {
{ ex.code() } noexcept -> std::same_as<const std::error_code&>;
}
auto swallow_enospc(const Ex& ex) noexcept {
if (ex.code().value() != ENOSPC) {
return make_exception_future<>(std::make_exception_ptr(ex));
}
cmlog.warn("Got ENOSPC on stop, ignoring...");
return make_ready_future<>();
}
void compaction_manager::do_stop() noexcept {
if (_state == state::none || _state == state::stopped) {
return;
}
try {
really_do_stop();
_state = state::stopped;
_stop_future = really_do_stop()
.handle_exception_type([] (const std::system_error& ex) { return swallow_enospc(ex); })
.handle_exception_type([] (const storage_io_error& ex) { return swallow_enospc(ex); })
;
} catch (...) {
try {
cmlog.error("Failed to stop the manager: {}", std::current_exception());

View File

@@ -209,7 +209,7 @@ public:
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop() noexcept;
void really_do_stop();
future<> really_do_stop();
// Submit a column family to be compacted.
void submit(column_family* cf);

View File

@@ -1403,7 +1403,7 @@ serviceLevelOrRoleName returns [sstring name]
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower); }
| t=STRING_LITERAL { $name = sstring($t.text); }
| t=QUOTED_NAME { $name = sstring($t.text); }
| k=unreserved_keyword { $name = sstring($t.text);
| k=unreserved_keyword { $name = k;
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);}
| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");}
;

View File

@@ -25,6 +25,7 @@
#include "cql3_type.hh"
#include "cql3/util.hh"
#include "exceptions/exceptions.hh"
#include "ut_name.hh"
#include "database.hh"
#include "user_types_metadata.hh"
@@ -448,7 +449,20 @@ sstring maybe_quote(const sstring& identifier) {
}
if (!need_quotes) {
return identifier;
// A seemingly valid identifier matching [a-z][a-z0-9_]* may still
// need quoting if it is a CQL keyword, e.g., "to" (see issue #9450).
// While our parser Cql.g has different production rules for different
// types of identifiers (column names, table names, etc.), all of
// these behave identically for alphanumeric strings: they exclude
// many keywords but allow keywords listed as "unreserved keywords".
// So we can use any of them, for example cident.
try {
cql3::util::do_with_parser(identifier, std::mem_fn(&cql3_parser::CqlParser::cident));
return identifier;
} catch(exceptions::syntax_exception&) {
// This alphanumeric string is not a valid identifier, so fall
// through to have it quoted:
}
}
if (num_quotes == 0) {
return make_sstring("\"", identifier, "\"");

View File

@@ -450,11 +450,16 @@ bool result_set_builder::restrictions_filter::do_filter(const selection& selecti
}
auto clustering_columns_restrictions = _restrictions->get_clustering_columns_restrictions();
if (dynamic_pointer_cast<cql3::restrictions::multi_column_restriction>(clustering_columns_restrictions)) {
bool has_multi_col_clustering_restrictions =
dynamic_pointer_cast<cql3::restrictions::multi_column_restriction>(clustering_columns_restrictions) != nullptr;
if (has_multi_col_clustering_restrictions) {
clustering_key_prefix ckey = clustering_key_prefix::from_exploded(clustering_key);
return expr::is_satisfied_by(
bool multi_col_clustering_satisfied = expr::is_satisfied_by(
clustering_columns_restrictions->expression,
partition_key, clustering_key, static_row, row, selection, _options);
if (!multi_col_clustering_satisfied) {
return false;
}
}
auto static_row_iterator = static_row.iterator();
@@ -502,6 +507,13 @@ bool result_set_builder::restrictions_filter::do_filter(const selection& selecti
if (_skip_ck_restrictions) {
continue;
}
if (has_multi_col_clustering_restrictions) {
// Mixing multi column and single column restrictions on clustering
// key columns is forbidden.
// Since there are multi column restrictions we have to skip
// evaluating single column restrictions or we will get an error.
continue;
}
auto clustering_key_restrictions_map = _restrictions->get_single_column_clustering_key_restrictions();
auto restr_it = clustering_key_restrictions_map.find(cdef);
if (restr_it == clustering_key_restrictions_map.end()) {

View File

@@ -46,6 +46,7 @@
#include "cdc/cdc_extension.hh"
#include "gms/feature.hh"
#include "gms/feature_service.hh"
#include "utils/bloom_calculations.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -168,6 +169,16 @@ void cf_prop_defs::validate(const database& db, const schema::extensions_map& sc
throw exceptions::configuration_exception(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL);
}
if (get_simple(KW_BF_FP_CHANCE)) {
double bloom_filter_fp_chance = get_double(KW_BF_FP_CHANCE, 0/*not used*/);
double min_bloom_filter_fp_chance = utils::bloom_calculations::min_supported_bloom_filter_fp_chance();
if (bloom_filter_fp_chance <= min_bloom_filter_fp_chance || bloom_filter_fp_chance > 1.0) {
throw exceptions::configuration_exception(format(
"{} must be larger than {} and less than or equal to 1.0 (got {})",
KW_BF_FP_CHANCE, min_bloom_filter_fp_chance, bloom_filter_fp_chance));
}
}
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
}

View File

@@ -31,6 +31,8 @@
#include "types/listlike_partial_deserializing_iterator.hh"
#include "utils/managed_bytes.hh"
#include "exceptions/exceptions.hh"
#include <boost/algorithm/string/trim_all.hpp>
#include <boost/algorithm/string.hpp>
static inline bool is_control_char(char c) {
return c >= 0 && c <= 0x1F;
@@ -212,6 +214,17 @@ struct from_json_object_visitor {
}
bytes operator()(const boolean_type_impl& t) {
if (!value.IsBool()) {
if (value.IsString()) {
std::string str(rjson::to_string_view(value));
boost::trim_all(str);
boost::to_lower(str);
if (str == "true") {
return t.decompose(true);
} else if (str == "false") {
return t.decompose(false);
}
}
throw marshal_exception(format("Invalid JSON object {}", value));
}
return t.decompose(value.GetBool());

View File

@@ -87,6 +87,13 @@ std::unique_ptr<cql3::statements::raw::select_statement> build_select_statement(
/// forbids non-alpha-numeric characters in identifier names.
/// Quoting involves wrapping the string in double-quotes ("). A double-quote
/// character itself is quoted by doubling it.
/// maybe_quote() also quotes reserved CQL keywords (e.g., "to", "where")
/// but doesn't quote *unreserved* keywords (like ttl, int or as).
/// Note that this means that if new reserved keywords are added to the
/// parser, a saved output of maybe_quote() may no longer be parsable by
/// parser. To avoid this forward-compatibility issue, use quote() instead
/// of maybe_quote() - to unconditionally quote an identifier even if it is
/// lowercase and not (yet) a keyword.
sstring maybe_quote(const sstring& s);
// Check whether timestamp is not too far in the future as this probably

View File

@@ -39,6 +39,7 @@
*/
#include <chrono>
#include <exception>
#include <seastar/core/future-util.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/semaphore.hh>
@@ -306,6 +307,7 @@ future<> db::batchlog_manager::replay_all_failed_batches() {
} catch (no_such_keyspace& ex) {
// should probably ignore and drop the batch
} catch (...) {
blogger.warn("Replay failed (will retry): {}", std::current_exception());
// timeout, overload etc.
// Do _not_ remove the batch, assuning we got a node write error.
// Since we don't have hints (which origin is satisfied with),

View File

@@ -891,13 +891,18 @@ void view_updates::generate_update(
bool same_row = true;
for (auto col_id : col_ids) {
auto* after = update.cells().find_cell(col_id);
// Note: multi-cell columns can't be part of the primary key.
auto& cdef = _base->regular_column_at(col_id);
if (existing) {
auto* before = existing->cells().find_cell(col_id);
// Note that this cell is necessarily atomic, because col_ids are
// view key columns, and keys must be atomic.
if (before && before->as_atomic_cell(cdef).is_live()) {
if (after && after->as_atomic_cell(cdef).is_live()) {
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
// We need to compare just the values of the keys, not
// metadata like the timestamp. This is because below,
// if the old and new view row have the same key, we need
// to be sure to reach the update_entry() case.
auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value());
if (cmp != 0) {
same_row = false;
}
@@ -917,7 +922,13 @@ void view_updates::generate_update(
if (same_row) {
update_entry(base_key, update, *existing, now);
} else {
replace_entry(base_key, update, *existing, now);
// This code doesn't work if the old and new view row have the
// same key, because if they do we get both data and tombstone
// for the same timestamp (now) and the tombstone wins. This
// is why we need the "same_row" case above - it's not just a
// performance optimization.
delete_old_entry(base_key, *existing, update, now);
create_entry(base_key, update, now);
}
} else {
delete_old_entry(base_key, *existing, update, now);

View File

@@ -164,10 +164,7 @@ private:
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
create_entry(base_key, update, now);
delete_old_entry(base_key, existing, update, now);
}
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
};
class view_update_builder {

View File

@@ -45,7 +45,7 @@
logging::logger fmr_logger("flat_mutation_reader");
flat_mutation_reader& flat_mutation_reader::operator=(flat_mutation_reader&& o) noexcept {
if (_impl) {
if (_impl && _impl->is_close_required()) {
impl* ip = _impl.get();
// Abort to enforce calling close() before readers are closed
// to prevent leaks and potential use-after-free due to background
@@ -58,7 +58,7 @@ flat_mutation_reader& flat_mutation_reader::operator=(flat_mutation_reader&& o)
}
flat_mutation_reader::~flat_mutation_reader() {
if (_impl) {
if (_impl && _impl->is_close_required()) {
impl* ip = _impl.get();
// Abort to enforce calling close() before readers are closed
// to prevent leaks and potential use-after-free due to background
@@ -1344,7 +1344,7 @@ void mutation_fragment_stream_validating_filter::on_end_of_stream() {
}
flat_mutation_reader_v2& flat_mutation_reader_v2::operator=(flat_mutation_reader_v2&& o) noexcept {
if (_impl) {
if (_impl && _impl->is_close_required()) {
impl* ip = _impl.get();
// Abort to enforce calling close() before readers are closed
// to prevent leaks and potential use-after-free due to background
@@ -1357,7 +1357,7 @@ flat_mutation_reader_v2& flat_mutation_reader_v2::operator=(flat_mutation_reader
}
flat_mutation_reader_v2::~flat_mutation_reader_v2() {
if (_impl) {
if (_impl && _impl->is_close_required()) {
impl* ip = _impl.get();
// Abort to enforce calling close() before readers are closed
// to prevent leaks and potential use-after-free due to background

View File

@@ -142,6 +142,7 @@ public:
private:
tracked_buffer _buffer;
size_t _buffer_size = 0;
bool _close_required = false;
protected:
size_t max_buffer_size_in_bytes = default_max_buffer_size_in_bytes();
bool _end_of_stream = false;
@@ -175,6 +176,8 @@ public:
bool is_end_of_stream() const { return _end_of_stream; }
bool is_buffer_empty() const { return _buffer.empty(); }
bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
bool is_close_required() const { return _close_required; }
void set_close_required() { _close_required = true; }
static constexpr size_t default_max_buffer_size_in_bytes() { return 8 * 1024; }
mutation_fragment pop_mutation_fragment() {
@@ -506,9 +509,15 @@ public:
//
// Can be used to skip over entire partitions if interleaved with
// `operator()()` calls.
future<> next_partition() { return _impl->next_partition(); }
future<> next_partition() {
_impl->set_close_required();
return _impl->next_partition();
}
future<> fill_buffer() { return _impl->fill_buffer(); }
future<> fill_buffer() {
_impl->set_close_required();
return _impl->fill_buffer();
}
// Changes the range of partitions to pr. The range can only be moved
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
@@ -517,6 +526,7 @@ public:
// pr needs to be valid until the reader is destroyed or fast_forward_to()
// is called again.
future<> fast_forward_to(const dht::partition_range& pr) {
_impl->set_close_required();
return _impl->fast_forward_to(pr);
}
// Skips to a later range of rows.
@@ -546,6 +556,7 @@ public:
// In particular one must first enter a partition by fetching a `partition_start`
// fragment before calling `fast_forward_to`.
future<> fast_forward_to(position_range cr) {
_impl->set_close_required();
return _impl->fast_forward_to(std::move(cr));
}
// Closes the reader.

View File

@@ -177,6 +177,7 @@ public:
private:
tracked_buffer _buffer;
size_t _buffer_size = 0;
bool _close_required = false;
protected:
size_t max_buffer_size_in_bytes = default_max_buffer_size_in_bytes();
@@ -216,6 +217,8 @@ public:
bool is_end_of_stream() const { return _end_of_stream; }
bool is_buffer_empty() const { return _buffer.empty(); }
bool is_buffer_full() const { return _buffer_size >= max_buffer_size_in_bytes; }
bool is_close_required() const { return _close_required; }
void set_close_required() { _close_required = true; }
static constexpr size_t default_max_buffer_size_in_bytes() { return 8 * 1024; }
mutation_fragment_v2 pop_mutation_fragment() {
@@ -547,9 +550,15 @@ public:
//
// Can be used to skip over entire partitions if interleaved with
// `operator()()` calls.
future<> next_partition() { return _impl->next_partition(); }
future<> next_partition() {
_impl->set_close_required();
return _impl->next_partition();
}
future<> fill_buffer() { return _impl->fill_buffer(); }
future<> fill_buffer() {
_impl->set_close_required();
return _impl->fill_buffer();
}
// Changes the range of partitions to pr. The range can only be moved
// forwards. pr.begin() needs to be larger than pr.end() of the previousl
@@ -558,6 +567,7 @@ public:
// pr needs to be valid until the reader is destroyed or fast_forward_to()
// is called again.
future<> fast_forward_to(const dht::partition_range& pr) {
_impl->set_close_required();
return _impl->fast_forward_to(pr);
}
// Skips to a later range of rows.
@@ -587,6 +597,7 @@ public:
// In particular one must first enter a partition by fetching a `partition_start`
// fragment before calling `fast_forward_to`.
future<> fast_forward_to(position_range cr) {
_impl->set_close_required();
return _impl->fast_forward_to(std::move(cr));
}
// Closes the reader.

View File

@@ -1,5 +1,7 @@
#include "locator/ec2_snitch.hh"
#include <seastar/core/seastar.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/do_with.hh>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
@@ -67,6 +69,30 @@ future<> ec2_snitch::start() {
}
future<sstring> ec2_snitch::aws_api_call(sstring addr, uint16_t port, sstring cmd) {
return do_with(int(0), [this, addr, port, cmd] (int& i) {
return repeat_until_value([this, addr, port, cmd, &i]() -> future<std::optional<sstring>> {
++i;
return aws_api_call_once(addr, port, cmd).then([] (auto res) {
return make_ready_future<std::optional<sstring>>(std::move(res));
}).handle_exception([&i] (auto ep) {
try {
std::rethrow_exception(ep);
} catch (const std::system_error &e) {
logger().error(e.what());
if (i >= AWS_API_CALL_RETRIES - 1) {
logger().error("Maximum number of retries exceeded");
throw e;
}
}
return sleep(AWS_API_CALL_RETRY_INTERVAL).then([] {
return make_ready_future<std::optional<sstring>>(std::nullopt);
});
});
});
});
}
future<sstring> ec2_snitch::aws_api_call_once(sstring addr, uint16_t port, sstring cmd) {
return connect(socket_address(inet_address{addr}, port))
.then([this, addr, cmd] (connected_socket fd) {
_sd = std::move(fd);

View File

@@ -29,6 +29,8 @@ public:
static constexpr const char* ZONE_NAME_QUERY_REQ = "/latest/meta-data/placement/availability-zone";
static constexpr const char* AWS_QUERY_SERVER_ADDR = "169.254.169.254";
static constexpr uint16_t AWS_QUERY_SERVER_PORT = 80;
static constexpr int AWS_API_CALL_RETRIES = 5;
static constexpr auto AWS_API_CALL_RETRY_INTERVAL = std::chrono::seconds{5};
ec2_snitch(const sstring& fname = "", unsigned io_cpu_id = 0);
virtual future<> start() override;
@@ -45,5 +47,6 @@ private:
output_stream<char> _out;
http_response_parser _parser;
sstring _zone_req;
future<sstring> aws_api_call_once(sstring addr, uint16_t port, const sstring cmd);
};
} // namespace locator

View File

@@ -30,6 +30,7 @@
#include <seastar/core/io_priority_class.hh>
class memtable;
class reader_permit;
class flat_mutation_reader;
namespace sstables {

View File

@@ -442,6 +442,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
// ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay
// setting in get_rpc_client(), which assumes gossiper verbs live in idx 0
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -689,7 +691,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}();
auto must_tcp_nodelay = [&] {
if (idx == 1) {
if (idx == 0) {
return true; // gossip
}
if (_cfg.tcp_nodelay == tcp_nodelay_what::local) {

View File

@@ -283,8 +283,8 @@ public:
future<> lookup_readers(db::timeout_clock::time_point timeout);
future<> save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
std::optional<clustering_key_prefix> last_ckey);
future<> save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
dht::decorated_key last_pkey, std::optional<clustering_key_prefix> last_ckey);
future<> stop();
};
@@ -583,19 +583,22 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) {
});
}
future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
std::optional<clustering_key_prefix> last_ckey) {
future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsumed_buffer, std::optional<detached_compaction_state> compaction_state,
dht::decorated_key last_pkey, std::optional<clustering_key_prefix> last_ckey) {
if (_cmd.query_uuid == utils::UUID{}) {
return make_ready_future<>();
}
auto last_pkey = compaction_state.partition_start.key();
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
auto cs_stats = dismantle_buffer_stats{};
if (compaction_state) {
cs_stats = dismantle_compaction_state(std::move(*compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
} else {
tracing::trace(_trace_state, "No compaction state to dismantle, partition exhausted", cs_stats);
}
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey) {
@@ -703,7 +706,9 @@ future<typename ResultBuilder::result_type> do_query(
std::move(result_builder));
if (compaction_state->are_limits_reached() || result.is_short_read()) {
co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_ckey));
// Must call before calling 'detached_state()`.
auto last_pkey = *compaction_state->current_partition();
co_await ctx->save_readers(std::move(unconsumed_buffer), std::move(*compaction_state).detach_state(), std::move(last_pkey), std::move(last_ckey));
}
co_await ctx->stop();

View File

@@ -175,6 +175,9 @@ class compact_mutation_state {
std::unique_ptr<mutation_compactor_garbage_collector> _collector;
compaction_stats _stats;
// Remember if we requested to stop mid-partition.
stop_iteration _stop = stop_iteration::no;
private:
static constexpr bool only_live() {
return OnlyLive == emit_only_live_rows::yes;
@@ -270,6 +273,7 @@ public:
}
void consume_new_partition(const dht::decorated_key& dk) {
_stop = stop_iteration::no;
auto& pk = dk.key();
_dk = &dk;
_return_static_content_on_partition_with_no_rows =
@@ -323,9 +327,9 @@ public:
_static_row_live = is_live;
if (is_live || (!only_live() && !sr.empty())) {
partition_is_not_empty(consumer);
return consumer.consume(std::move(sr), current_tombstone, is_live);
_stop = consumer.consume(std::move(sr), current_tombstone, is_live);
}
return stop_iteration::no;
return _stop;
}
template <typename Consumer, typename GCConsumer>
@@ -370,23 +374,22 @@ public:
if (only_live() && is_live) {
partition_is_not_empty(consumer);
auto stop = consumer.consume(std::move(cr), t, true);
_stop = consumer.consume(std::move(cr), t, true);
if (++_rows_in_current_partition == _current_partition_limit) {
return stop_iteration::yes;
_stop = stop_iteration::yes;
}
return stop;
return _stop;
} else if (!only_live()) {
auto stop = stop_iteration::no;
if (!cr.empty()) {
partition_is_not_empty(consumer);
stop = consumer.consume(std::move(cr), t, is_live);
_stop = consumer.consume(std::move(cr), t, is_live);
}
if (!sstable_compaction() && is_live && ++_rows_in_current_partition == _current_partition_limit) {
return stop_iteration::yes;
_stop = stop_iteration::yes;
}
return stop;
return _stop;
}
return stop_iteration::no;
return _stop;
}
template <typename Consumer, typename GCConsumer>
@@ -398,13 +401,13 @@ public:
if (rt.tomb > _range_tombstones.get_partition_tombstone()) {
if (can_purge_tombstone(rt.tomb)) {
partition_is_not_empty_for_gc_consumer(gc_consumer);
return gc_consumer.consume(std::move(rt));
_stop = gc_consumer.consume(std::move(rt));
} else {
partition_is_not_empty(consumer);
return consumer.consume(std::move(rt));
_stop = consumer.consume(std::move(rt));
}
}
return stop_iteration::no;
return _stop;
}
template <typename Consumer, typename GCConsumer>
@@ -492,9 +495,24 @@ public:
/// compactor will result in the new compactor being in the same state *this
/// is (given the same outside parameters of course). Practically this
/// allows the compaction state to be stored in the compacted reader.
detached_compaction_state detach_state() && {
/// If the currently compacted partition is exhausted a disengaged optional
/// is returned -- in this case there is no state to detach.
std::optional<detached_compaction_state> detach_state() && {
// If we exhausted the partition, there is no need to detach-restore the
// compaction state.
// We exhausted the partition if `consume_partition_end()` was called
// without us requesting the consumption to stop (remembered in _stop)
// from one of the consume() overloads.
// The consume algorithm calls `consume_partition_end()` in two cases:
// * on a partition-end fragment
// * consume() requested to stop
// In the latter case, the partition is not exhausted. Even if the next
// fragment to process is a partition-end, it will not be consumed.
if (!_stop) {
return {};
}
partition_start ps(std::move(_last_dk), _range_tombstones.get_partition_tombstone());
return {std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()};
return detached_compaction_state{std::move(ps), std::move(_last_static_row), std::move(_range_tombstones).range_tombstones()};
}
const compaction_stats& stats() const { return _stats; }

View File

@@ -843,7 +843,6 @@ public:
void apply(shadowable_tombstone deleted_at) {
_deleted_at.apply(deleted_at, _marker);
maybe_shadow();
}
void apply(row_tombstone deleted_at) {

View File

@@ -109,7 +109,7 @@ void range_tombstone_list::insert_from(const schema& s,
if (cmp(end, it->position()) < 0) {
// not overlapping
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
rev.update(it, {std::move(start), std::move(start), tomb});
rev.update(it, {std::move(start), std::move(end), tomb});
} else {
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
rev.insert(it, *rt);

View File

@@ -22,6 +22,7 @@
#pragma once
#include "mutation_fragment.hh"
#include "mutation_fragment_v2.hh"
#include "converting_mutation_partition_applier.hh"
// A StreamedMutationTransformer which transforms the stream to a different schema

View File

@@ -1745,9 +1745,7 @@ public:
_monitor.on_read_started(_context->reader_position());
}
public:
void on_out_of_clustering_range() override {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
}
void on_out_of_clustering_range() override { }
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}

View File

@@ -107,9 +107,9 @@ def test_describe_table_size(test_table):
# Test the ProvisionedThroughput attribute returned by DescribeTable.
# This is a very partial test: Our test table is configured without
# provisioned throughput, so obviously it will not have interesting settings
# for it. DynamoDB returns zeros for some of the attributes, even though
# the documentation suggests missing values should have been fine too.
@pytest.mark.xfail(reason="DescribeTable does not return provisioned throughput")
# for it. But DynamoDB documents that zeros be returned for WriteCapacityUnits
# and ReadCapacityUnits, and does this in practice as well - and some
# applications assume these numbers are always there (even if 0).
def test_describe_table_provisioned_throughput(test_table):
got = test_table.meta.client.describe_table(TableName=test_table.name)['Table']
assert got['ProvisionedThroughput']['NumberOfDecreasesToday'] == 0

View File

@@ -438,6 +438,126 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
# Test reproducing issue #11801: In issue #5006 we noticed that in the special
# case of a GSI with with two non-key attributes as keys (test_table_gsi_3),
# an update of the second attribute forgot to delete the old row. We fixed
# that bug, but a bug remained for updates which update the value to the *same*
# value - in that case the old row shouldn't be deleted, but we did - as
# noticed in issue #11801.
def test_11801(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Update the attribute 'b' to the same value b that it already had.
# This shouldn't change anything in the base table or in the GSI
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# In issue #11801, the following assertion failed (the view row was
# deleted and nothing matched the query).
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Above we checked that setting 'b' to the same value didn't remove
# the old GSI row. But the same update may actually modify the GSI row
# (e.g., an unrelated attribute d) - check this modification took place:
item['d'] = random_string()
test_table_gsi_3.update_item(Key={'p': p},
AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'},
'd': {'Value': item['d'], 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but updating the first attribute (a)
# instead of the second (b). This test didn't fail, showing that issue #11801
# is - like #5006 - specific to the case of updating the second attribute.
def test_11801_variant1(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
d = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': d}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'a': {'Value': a, 'Action': 'PUT'}})
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but updates b to a different value
# (newb) instead of to the same one. This test didn't fail, showing that
# issue #11801 is specific to updates to the same value. This test basically
# reproduces the already-fixed #5006 (we also have another test above which
# reproduces that issue - test_gsi_update_second_regular_base_column())
def test_11801_variant2(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
newb = random_string()
item['b'] = newb
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': newb, 'Action': 'PUT'}})
assert_index_query(test_table_gsi_3, 'hello', [],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [newb], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but uses a different table schema
# (test_table_gsi_5) where there is only one new key column in the view (x).
# This test passed, showing that issue #11801 was specific to the special
# case of a view with two new key columns (test_table_gsi_3).
def test_11801_variant3(test_table_gsi_5):
p = random_string()
c = random_string()
x = random_string()
item = {'p': p, 'c': c, 'x': x, 'd': random_string()}
test_table_gsi_5.put_item(Item=item)
assert_index_query(test_table_gsi_5, 'hello', [item],
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
test_table_gsi_5.update_item(Key={'p': p, 'c': c}, AttributeUpdates={'x': {'Value': x, 'Action': 'PUT'}})
assert item == test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_5, 'hello', [item],
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
# Another test similar to test_11801, but instead of updating a view key
# column to the same value it already has, simply don't update it at all
# (and just modify some other regular column). This test passed, showing
# that issue #11801 is specific to the case of updating a view key column
# to the same value it already had.
def test_11801_variant4(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# An update that doesn't change the GSI keys (a or b), just a regular
# column d.
item['d'] = random_string()
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'d': {'Value': item['d'], 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Test that when a table has a GSI, if the indexed attribute is missing, the
# item is added to the base table but not the index.
# This is the same feature we already tested in test_gsi_missing_attribute()

View File

@@ -32,7 +32,7 @@
#include "cql3/util.hh"
//
// Test basic CQL string quoting
// Test basic CQL identifier quoting
//
BOOST_AUTO_TEST_CASE(maybe_quote) {
std::string s(65536, 'x');
@@ -67,6 +67,16 @@ BOOST_AUTO_TEST_CASE(maybe_quote) {
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"\""), "\"\"\"\"\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("\"hell0\""), "\"\"\"hell0\"\"\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("hello \"my\" world"), "\"hello \"\"my\"\" world\"");
// Reproducer for issue #9450. Reserved keywords like "to" or "where"
// need quoting, but unreserved keywords like "ttl", "int" or "as",
// do not.
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("to"), "\"to\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("where"), "\"where\"");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("ttl"), "ttl");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("int"), "int");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("as"), "as");
BOOST_REQUIRE_EQUAL(cql3::util::maybe_quote("ttl hi"), "\"ttl hi\"");
}
//

View File

@@ -941,3 +941,28 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) {
};
run_mutation_source_tests(populate);
}
SEASTAR_THREAD_TEST_CASE(test_allow_reader_early_destruction) {
struct test_reader_impl : public flat_mutation_reader::impl {
using flat_mutation_reader::impl::impl;
virtual future<> fill_buffer() override { return make_ready_future<>(); }
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range&) override { return make_ready_future<>(); }
virtual future<> fast_forward_to(position_range) override { return make_ready_future<>(); }
virtual future<> close() noexcept override { return make_ready_future<>(); };
};
struct test_reader_v2_impl : public flat_mutation_reader_v2::impl {
using flat_mutation_reader_v2::impl::impl;
virtual future<> fill_buffer() override { return make_ready_future<>(); }
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range&) override { return make_ready_future<>(); }
virtual future<> fast_forward_to(position_range) override { return make_ready_future<>(); }
virtual future<> close() noexcept override { return make_ready_future<>(); };
};
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
// These readers are not closed, but didn't start any operations, so it's safe for them to be destroyed.
auto reader = make_flat_mutation_reader<test_reader_impl>(s.schema(), semaphore.make_permit());
auto reader_v2 = make_flat_mutation_reader_v2<test_reader_v2_impl>(s.schema(), semaphore.make_permit());
}

View File

@@ -763,9 +763,8 @@ SEASTAR_THREAD_TEST_CASE(multi_col_in) {
cquery_nofail(e, "insert into t(pk,ck1,ck2,r) values (4,13,23,'a')");
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) allow filtering", {{I(3)}, {I(4)}});
require_rows(e, "select pk from t where (ck1) in ((13),(33),(44)) allow filtering", {{I(3)}, {I(4)}});
// TODO: uncomment when #6200 is fixed.
// require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) and r='a' allow filtering",
// {{I(4), I(13), F(23), T("a")}});
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) and r='a' allow filtering",
{{I(4), I(13), F(23), T("a")}});
cquery_nofail(e, "delete from t where pk=4");
require_rows(e, "select pk from t where (ck1,ck2) in ((13,23)) allow filtering", {{I(3)}});
auto stmt = e.prepare("select ck1 from t where (ck1,ck2) in ? allow filtering").get0();

View File

@@ -72,6 +72,7 @@
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
namespace fs = std::filesystem;
@@ -3003,3 +3004,58 @@ SEASTAR_TEST_CASE(sstable_reader_with_timeout) {
});
});
}
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
return test_env::do_with_async([] (test_env& env) {
simple_schema table;
auto mut = table.new_mutation("pk0");
auto ckeys = table.make_ckeys(4);
table.add_row(mut, ckeys[0], "v0");
table.add_row(mut, ckeys[1], "v1");
table.add_row(mut, ckeys[2], "v2");
using bound = query::clustering_range::bound;
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
auto tmp = tmpdir();
auto sst_gen = [&env, &table, &tmp] () {
return env.make_sstable(table.schema(), tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, {mut});
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
return test_env::do_with_async([this] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto tmp = tmpdir();
auto sst_gen = [&env, schema, &tmp] () {
return env.make_sstable(schema, tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, muts);
{
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
for (const auto& mut : muts) {
rd.produces(mut);
}
}
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
});
}

View File

@@ -112,3 +112,37 @@ def test_filter_with_unused_static_column(cql, test_keyspace):
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (42,43,44)")
cql.execute(f"INSERT INTO {table} (p,c,v) VALUES (1,2,3)")
assert list(cql.execute(f"SELECT * FROM {mv}")) == [(42, 43, 44)]
# Reproducer for issue #9450 - when a view's key column name is a (quoted)
# keyword, writes used to fail because they generated internally broken CQL
# with the column name not quoted.
def test_mv_quoted_column_names(cql, test_keyspace):
for colname in ['"dog"', '"Dog"', 'DOG', '"to"', 'int']:
with new_test_table(cql, test_keyspace, f'p int primary key, {colname} int') as table:
with new_materialized_view(cql, table, '*', f'{colname}, p', f'{colname} is not null and p is not null') as mv:
cql.execute(f'INSERT INTO {table} (p, {colname}) values (1, 2)')
# Validate that not only the write didn't fail, it actually
# write the right thing to the view. NOTE: on a single-node
# Scylla, view update is synchronous so we can just read and
# don't need to wait or retry.
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]
# Same as test_mv_quoted_column_names above (reproducing issue #9450), just
# check *view building* - i.e., pre-existing data in the base table that
# needs to be copied to the view. The view building cannot return an error
# to the user, but can fail to write the desired data into the view.
def test_mv_quoted_column_names_build(cql, test_keyspace):
for colname in ['"dog"', '"Dog"', 'DOG', '"to"', 'int']:
with new_test_table(cql, test_keyspace, f'p int primary key, {colname} int') as table:
cql.execute(f'INSERT INTO {table} (p, {colname}) values (1, 2)')
with new_materialized_view(cql, table, '*', f'{colname}, p', f'{colname} is not null and p is not null') as mv:
# When Scylla's view builder fails as it did in issue #9450,
# there is no way to tell this state apart from a view build
# that simply hasn't completed (besides looking at the logs,
# which we don't). This means, unfortunately, that a failure
# of this test is slow - it needs to wait for a timeout.
start_time = time.time()
while time.time() < start_time + 30:
if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
break
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]

View File

@@ -0,0 +1,64 @@
# Copyright 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#############################################################################
# Tests for scanning SELECT requests (which read many rows and/or many
# partitions).
# We have a separate test file test_filtering.py for scans which also involve
# filtering, and test_allow_filtering.py for checking when "ALLOW FILTERING"
# is needed in scan. test_secondary_index.py also contains tests for scanning
# using a secondary index.
#############################################################################
import pytest
from util import new_test_table
from cassandra.query import SimpleStatement
# Regression test for #9482
def test_scan_ending_with_static_row(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "pk int, ck int, s int STATIC, v int, PRIMARY KEY (pk, ck)") as table:
stmt = cql.prepare(f"UPDATE {table} SET s = ? WHERE pk = ?")
for pk in range(100):
cql.execute(stmt, (0, pk))
statement = SimpleStatement(f"SELECT * FROM {table}", fetch_size=10)
# This will trigger an error in either processing or building the query
# results. The success criteria for this test is the query finishing
# without errors.
res = list(cql.execute(statement))
# Test that if we have multi-column restrictions on the clustering key
# and additional filtering on regular columns, both restrictions are obeyed.
# Reproduces #6200.
def test_multi_column_restrictions_and_filtering(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int, c1 int, c2 int, r int, PRIMARY KEY (p, c1, c2)") as table:
stmt = cql.prepare(f"INSERT INTO {table} (p, c1, c2, r) VALUES (1, ?, ?, ?)")
for i in range(2):
for j in range(2):
cql.execute(stmt, [i, j, j])
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1)")) == [(0,1,1)]
# Since in that result r=1, adding "AND r=1" should return the same
# result, and adding "AND r=0" should return nothing.
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1) AND r=1 ALLOW FILTERING")) == [(0,1,1)]
# Reproduces #6200:
assert list(cql.execute(f"SELECT c1,c2,r FROM {table} WHERE p=1 AND (c1, c2) = (0,1) AND r=0 ALLOW FILTERING")) == []
# Test that if we have a range multi-column restrictions on the clustering key
# and additional filtering on regular columns, both restrictions are obeyed.
# Similar to test_multi_column_restrictions_and_filtering, but uses a range
# restriction on the clustering key columns.
# Reproduces #12014, the code is taken from a reproducer provided by a user.
def test_multi_column_range_restrictions_and_filtering(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "pk int, ts timestamp, id int, processed boolean, PRIMARY KEY (pk, ts, id)") as table:
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 0, true)")
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 1, true)")
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 2, false)")
cql.execute(f"INSERT INTO {table} (pk, ts, id, processed) VALUES (0, currentTimestamp(), 3, false)")
# This select doesn't use multi-column restrictions, the result shouldn't change when it does.
rows1 = list(cql.execute(f"SELECT id, processed FROM {table} WHERE pk = 0 AND ts >= 0 AND processed = false ALLOW FILTERING"))
assert rows1 == [(2, False), (3, False)]
# Reproduces #12014
rows2 = list(cql.execute(f"SELECT id, processed FROM {table} WHERE pk = 0 AND (ts, id) >= (0, 0) AND processed = false ALLOW FILTERING"))
assert rows1 == rows2

View File

@@ -2298,8 +2298,14 @@ public:
};
size_t row_count = row_count_dist(_gen);
for (size_t i = 0; i < row_count; ++i) {
auto ckey = make_random_key();
std::unordered_set<clustering_key, clustering_key::hashing, clustering_key::equality> keys(
0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema));
while (keys.size() < row_count) {
keys.emplace(make_random_key());
}
for (auto&& ckey : keys) {
is_continuous continuous = is_continuous(_bool_dist(_gen));
if (_not_dummy_dist(_gen)) {
deletable_row& row = m.partition().clustered_row(*_schema, ckey, is_dummy::no, continuous);

View File

@@ -1115,4 +1115,12 @@ future<std::vector<mutation>> generate_random_mutations(
});
}
future<std::vector<mutation>> generate_random_mutations(tests::random_schema& random_schema, size_t partition_count) {
return generate_random_mutations(
random_schema,
default_timestamp_generator(),
no_expiry_expiry_generator(),
std::uniform_int_distribution<size_t>(partition_count, partition_count));
}
} // namespace tests

View File

@@ -255,4 +255,7 @@ future<std::vector<mutation>> generate_random_mutations(
std::uniform_int_distribution<size_t> clustering_row_count_dist = std::uniform_int_distribution<size_t>(16, 128),
std::uniform_int_distribution<size_t> range_tombstone_count_dist = std::uniform_int_distribution<size_t>(4, 16));
/// Generate exactly partition_count partitions. See the more general overload above.
future<std::vector<mutation>> generate_random_mutations(tests::random_schema& random_schema, size_t partition_count);
} // namespace tests

View File

@@ -25,6 +25,8 @@
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/condition-variable.hh>
#include "test/raft/logical_timer.hh"
using namespace seastar;
// A set of futures that can be polled to obtain the result of some ready future in the set.

View File

@@ -26,6 +26,7 @@
#include <seastar/core/future-util.hh>
#include "raft/logical_clock.hh"
#include "raft/raft.hh"
using namespace seastar;

View File

@@ -22,6 +22,7 @@
#pragma once
#include <utility>
#include <fmt/format.h>
namespace thrift {

View File

@@ -153,6 +153,18 @@ namespace bloom_calculations {
}
return std::min(probs.size() - 1, size_t(v));
}
/**
* Retrieves the minimum supported bloom_filter_fp_chance value
* if compute_bloom_spec() above is attempted with bloom_filter_fp_chance
* lower than this, it will throw an unsupported_operation_exception.
*/
inline double min_supported_bloom_filter_fp_chance() {
int max_buckets = probs.size() - 1;
int max_K = probs[max_buckets].size() - 1;
return probs[max_buckets][max_K];
}
}
}

View File

@@ -21,6 +21,7 @@
#pragma once
#include <cassert>
#include <boost/intrusive/parent_from_member.hpp>
// A movable pointer-like object paired with exactly one other object of the same type.

View File

@@ -49,5 +49,5 @@ public:
return _what.c_str();
}
const std::error_code& code() const { return _code; }
const std::error_code& code() const noexcept { return _code; }
};

View File

@@ -21,6 +21,7 @@
#pragma once
#include <type_traits>
#include <utility>
#include <seastar/util/concepts.hh>
namespace utils {