Compare commits

...

32 Commits

Author SHA1 Message Date
Yaron Kaikov
0295d0c5c8 release: prepare for 5.1.0-rc4 2022-11-06 14:49:29 +02:00
Botond Dénes
fa94222662 Merge 'Alternator, MV: fix bug in some view updates which set the view key to its existing value' from Nadav Har'El
As described in issue #11801, we saw in Alternator when a GSI has both partition and sort keys which were non-key attributes in the base, cases where updating the GSI-sort-key attribute to the same value it already had caused the entire GSI row to be deleted.

In this series fix this bug (it was a bug in our materialized views implementation) and add a reproducing test (plus a few more tests for similar situations which worked before the patch, and continue to work after it).

Fixes #11801

Closes #11808

* github.com:scylladb/scylladb:
  test/alternator: add test for issue 11801
  MV: fix handling of view update which reassign the same key value
  materialized views: inline used-once and confusing function, replace_entry()

(cherry picked from commit e981bd4f21)
2022-11-01 13:14:21 +02:00
Pavel Emelyanov
dff7f3c5ba compaction_manager: Swallow ENOSPCs in ::stop()
When being stopped compaction manager may step on ENOSPC. This is not a
reason to fail stopping process with abort, better to warn this fact in
logs and proceed as if nothing happened

refs: #11245

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 15:36:44 +03:00
Pavel Emelyanov
3723713130 exceptions: Mark storage_io_error::code() with noexcept
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 15:36:38 +03:00
Pavel Emelyanov
03f8411e38 table: Handle storage_io_error's ENOSPC when flushing
Commit a9805106 (table: seal_active_memtable: handle ENOSPC error)
made memtable flushing code stand ENOSPC and continue flusing again
in the hope that the node administrator would provide some free space.

However, it looks like the IO code may report back ENOSPC with some
exception type this code doesn't expect. This patch tries to fix it

refs: #11245

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 15:35:26 +03:00
Pavel Emelyanov
0e391d67d1 table: Rewrap retry loop
The existing loop is very branchy in its attempts to find out whether or
not to abort. The "allowed_retries" count can be a good indicator of the
decision taken. This makes the code notably shorter and easier to extend

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2022-10-13 15:35:25 +03:00
Benny Halevy
f76989285e table: seal_active_memtable: handle ENOSPC error
Aborting too soon on ENOSPC is too harsh, leading to loss of
availability of the node for reads, while restarting it won't
solve the ENOSPC condition.

Fixes #11245

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #11246
2022-10-13 15:35:19 +03:00
Beni Peled
9deeeb4db1 release: prepare for 5.1.0-rc3 2022-10-09 08:36:06 +03:00
Avi Kivity
1f3196735f Update tools/java submodule (cqlsh permissions)
* tools/java ad6764b506...b3959948dd (1):
  > install.sh is using wrong permissions for install cqlsh files

Fixes #11584.
2022-10-04 18:02:03 +03:00
Nadav Har'El
abb6817261 cql: validate bloom_filter_fp_chance up-front
Scylla's Bloom filter implementation has a minimal false-positive rate
that it can support (6.71e-5). When setting bloom_filter_fp_chance any
lower than that, the compute_bloom_spec() function, which writes the bloom
filter, throws an exception. However, this is too late - it only happens
while flushing the memtable to disk, and a failure at that point causes
Scylla to crash.

Instead, we should refuse the table creation with the unsupported
bloom_filter_fp_chance. This is also what Cassandra did six years ago -
see CASSANDRA-11920.

This patch also includes a regression test, which crashes Scylla before
this patch but passes after the patch (and also passes on Cassandra).

Fixes #11524.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #11576

(cherry picked from commit 4c93a694b7)
2022-10-04 16:21:48 +03:00
Nadav Har'El
d3fd090429 alternator: return ProvisionedThroughput in DescribeTable
DescribeTable is currently hard-coded to return PAY_PER_REQUEST billing
mode. Nevertheless, even in PAY_PER_REQUEST mode, the DescribeTable
operation must return a ProvisionedThroughput structure, listing both
ReadCapacityUnits and WriteCapacityUnits as 0. This requirement is not
stated in some DynamoDB documentation but is explictly mentioned in
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_ProvisionedThroughput.html
Also in empirically, DynamoDB returns ProvisionedThroughput with zeros
even in PAY_PER_REQUEST mode. We even had an xfailing test to confirm this.

The ProvisionedThroughput structure being missing was a problem for
applications like DynamoDB connectors for Spark, if they implicitly
assume that ProvisionedThroughput is returned by DescribeTable, and
fail (as described in issue #11222) if it's outright missing.

So this patch adds the missing ProvisionedThroughput structure, and
the xfailing test starts to pass.

Note that this patch doesn't change the fact that attempting to set
a table to PROVISIONED billing mode is ignored: DescribeTable continues
to always return PAY_PER_REQUEST as the billing mode and zero as the
provisioned capacities.

Fixes #11222

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #11298

(cherry picked from commit 941c719a23)
2022-10-03 14:26:55 +03:00
Pavel Emelyanov
3e7c57d162 cross-shard-barrier: Capture shared barrier in complete
When cross-shard barrier is abort()-ed it spawns a background fiber
that will wake-up other shards (if they are sleeping) with exception.

This fiber is implicitly waited by the owning sharded service .stop,
because barrier usage is like this:

    sharded<service> s;
    co_await s.invoke_on_all([] {
        ...
        barrier.abort();
    });
    ...
    co_await s.stop();

If abort happens, the invoke_on_all() will only resolve _after_ it
queues up the waking lambdas into smp queues, thus the subseqent stop
will queue its stopping lambdas after barrier's ones.

However, in debug mode the queue can be shuffled, so the owning service
can suddenly be freed from under the barrier's feet causing use after
free. Fortunately, this can be easily fixed by capturing the shared
pointer on the shared barrier instead of a regular pointer on the
shard-local barrier.

fixes: #11303

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes #11553
2022-10-03 13:20:28 +03:00
Tomasz Grabiec
f878a34da3 test: lib: random_mutation_generator: Don't generate mutations with marker uncompacted with shadowable tombstone
The generator was first setting the marker then applied tombstones.

The marker was set like this:

  row.marker() = random_row_marker();

Later, when shadowable tombstones were applied, they were compacted
with the marker as expected.

However, the key for the row was chosen randomly in each iteration and
there are multiple keys set, so there was a possibility of a key clash
with an earlier row. This could override the marker without applying
any tombstones, which is conditional on random choice.

This could generate rows with markers uncompacted with shadowable tombstones.

This broken row_cache_test::test_concurrent_reads_and_eviction on
comparison between expected and read mutations. The latter was
compacted because it went through an extra merge path, which compacts
the row.

Fix by making sure there are no key clashes.

Closes #11663

(cherry picked from commit 5268f0f837)
2022-10-02 16:44:57 +03:00
Raphael S. Carvalho
eaded57b2e compaction: Properly handle stop request for off-strategy
If user stops off-strategy via API, compaction manager can decide
to give up on it completely, so data will sit unreshaped in
maintenance set, preventing it from being compacted with data
in the main set. That's problematic because it will probably lead
to a significant increase in read and space amplification until
off-strategy is triggered again, which cannot happen anytime
soon.

Let's handle it by moving data in maintenance set into main one,
even if unreshaped. Then regular compaction will be able to
continue from where off-strategy left off.

Fixes #11543.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #11545

(cherry picked from commit a04047f390)
2022-10-02 14:20:17 +03:00
Tomasz Grabiec
25d2da08d1 db: range_tombstone_list: Avoid quadratic behavior when applying
Range tombstones are kept in memory (cache/memtable) in
range_tombstone_list. It keeps them deoverlapped, so applying a range
tombstone which covers many range tombstones will erase existing range
tombstones from the list. This operation needs to be exception-safe,
so range_tombstone_list maintains an undo log. This undo log will
receive a record for each range tombstone which is removed. For
exception safety reasons, before pushing an undo log entry, we reserve
space in the log by calling std::vector::reserve(size() + 1). This is
O(N) where N is the number of undo log entries. Therefore, the whole
application is O(N^2).

This can cause reactor stalls and availability issues when replicas
apply such deletions.

This patch avoids the problem by reserving exponentially increasing
amount of space. Also, to avoid large allocations, switches the
container to chunked_vector.

Fixes #11211

Closes #11215

(cherry picked from commit 7f80602b01)
2022-09-30 00:01:26 +03:00
Botond Dénes
9b1a570f6f sstables: crawling mx-reader: make on_out_of_clustering_range() no-op
Said method currently emits a partition-end. This method is only called
when the last fragment in the stream is a range tombstone change with a
position after all clustered rows. The problem is that
consume_partition_end() is also called unconditionally, resulting in two
partition-end fragments being emitted. The fix is simple: make this
method a no-op, there is nothing to do there.

Also add two tests: one targeted to this bug and another one testing the
crawling reader with random mutations generated for random schema.

Fixes: #11421

Closes #11422

(cherry picked from commit be9d1c4df4)
2022-09-29 23:42:01 +03:00
Piotr Dulikowski
426d045249 exception: fix the error code used for rate_limit_exception
Per-partition rate limiting added a new error type which should be
returned when Scylla decides to reject an operation due to per-partition
rate limit being exceeded. The new error code requires drivers to
negotiate support for it, otherwise Scylla will report the error as
`Config_error`. The existing error code override logic works properly,
however due to a mistake Scylla will report the `Config_error` code even
if the driver correctly negotiated support for it.

This commit fixes the problem by specifying the correct error code in
`rate_limit_exception`'s constructor.

Tested manually with a modified version of the Rust driver which
negotiates support for the new error. Additionally, tested what happens
when the driver doesn't negotiate support (Scylla properly falls back to
`Config_error`).

Branches: 5.1
Fixes: #11517

Closes #11518

(cherry picked from commit e69b44a60f)
2022-09-29 23:39:25 +03:00
Botond Dénes
86dbbf12cc shard_reader: do_fill_buffer(): only update _end_of_stream after buffer is copied
Commit 8ab57aa added a yield to the buffer-copy loop, which means that
the copy can yield before done and the multishard reader might see the
half-copied buffer and consider the reader done (because
`_end_of_stream` is already set) resulting in the dropping the remaining
part of the buffer and in an invalid stream if the last copied fragment
wasn't a partition-end.

Fixes: #11561
(cherry picked from commit 0c450c9d4c)
2022-09-29 19:11:52 +03:00
Pavel Emelyanov
b05903eddd messaging_service: Fix gossiper verb group
When configuring tcp-nodelay unconditionally, messaging service thinks
gossiper uses group index 1, though it had changed some time ago and now
those verbs belong to group 0.

fixes: #11465

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
(cherry picked from commit 2c74062962)
2022-09-29 19:05:11 +03:00
Piotr Sarna
26ead53304 Merge 'Fix mutation commutativity with shadowable tombstone'
from Tomasz Grabiec

This series fixes lack of mutation associativity which manifests as
sporadic failures in
row_cache_test.cc::test_concurrent_reads_and_eviction due to differences
in mutations applied and read.

No known production impact.

Refs https://github.com/scylladb/scylladb/issues/11307

Closes #11312

* github.com:scylladb/scylladb:
  test: mutation_test: Add explicit test for mutation commutativity
  test: random_mutation_generator: Workaround for non-associativity of mutations with shadowable tombstones
  db: mutation_partition: Drop unnecessary maybe_shadow()
  db: mutation_partition: Maintain shadowable tombstone invariant when applying a hard tombstone
  mutation_partition: row: make row marker shadowing symmetric

(cherry picked from commit 484004e766)
2022-09-20 23:21:58 +02:00
Tomasz Grabiec
f60bab9471 test: row_cache: Use more narrow key range to stress overlapping reads more
This makes catching issues related to concurrent access of same or
adjacent entries more likely. For example, catches #11239.

Closes #11260

(cherry picked from commit 8ee5b69f80)
2022-09-20 23:21:54 +02:00
Yaron Kaikov
66f34245fc release: prepare for 5.1.0-rc2 2022-09-19 14:35:28 +03:00
Michał Chojnowski
4047528bd9 db: commitlog: don't print INFO logs on shutdown
The intention was for these logs to be printed during the
database shutdown sequence, but it was overlooked that it's not
the only place where commitlog::shutdown is called.
Commitlogs are started and shut down periodically by hinted handoff.
When that happens, these messages spam the log.

Fix that by adding INFO commitlog shutdown logs to database::stop,
and change the level of the commitlog::shutdown log call to DEBUG.

Fixes #11508

Closes #11536

(cherry picked from commit 9b6fc553b4)
2022-09-18 13:33:05 +03:00
Michał Chojnowski
1a82c61452 sstables: add a flag for disabling long-term index caching
Long-term index caching in the global cache, as introduced in 4.6, is a major
pessimization for workloads where accesses to the index are (spacially) sparse.
We want to have a way to disable it for the affected workloads.

There is already infrastructure in place for disabling it for BYPASS CACHE
queries. One way of solving the issue is hijacking that infrastructure.

This patch adds a global flag (and a corresponding CLI option) which controls
index caching. Setting the flag to `false` causes all index reads to behave
like they would in BYPASS CACHE queries.

Consequences of this choice:

- The per-SSTable partition_index_cache is unused. Every index_reader has
  its own, and they die together. Independent reads can no longer reuse the
  work of other reads which hit the same index pages. This is not crucial,
  since partition accesses have no (natural) spatial locality. Note that
  the original reason for partition_index_cache -- the ability to share
  reads for the lower and upper bound of the query -- is unaffected.
- The per-SSTable cached_file is unused. Every index_reader has its own
  (uncached) input stream from the index file, and every
  bsearch_clustered_cursor has its own cached_file, which dies together with
  the cursor. Note that the cursor still can perform its binary search with
  caching. However, it won't be able to reuse the file pages read by
  index_reader. In particular, if the promoted index is small, and fits inside
  the same file page as its index_entry, that page will be re-read.
  It can also happen that index_reader will read the same index file page
  multiple times. When the summary is so dense that multiple index pages fit in
  one index file page, advancing the upper bound, which reads the next index
  page, will read the same index file page. Since summary:disk ratio is 1:2000,
  this is expected to happen for partitions with size greater than 2000
  partition keys.

Fixes #11202

(cherry picked from commit cdb3e71045)
2022-09-18 13:27:46 +03:00
Avi Kivity
3d9800eb1c logalloc: don't crash while reporting reclaim stalls if --abort-on-seastar-bad-alloc is specified
The logger is proof against allocation failures, except if
--abort-on-seastar-bad-alloc is specified. If it is, it will crash.

The reclaim stall report is likely to be called in low memory conditions
(reclaim's job is to alleviate these conditions after all), so we're
likely to crash here if we're reclaiming a very low memory condition
and have a large stall simultaneously (AND we're running in a debug
environment).

Prevent all this by disabling --abort-on-seastar-bad-alloc temporarily.

Fixes #11549

Closes #11555

(cherry picked from commit d3b8c0c8a6)
2022-09-18 13:24:21 +03:00
Karol Baryła
c48e9b47dd transport/server.cc: Return correct size of decompressed lz4 buffer
An incorrect size is returned from the function, which could lead to
crashes or undefined behavior. Fix by erroring out in these cases.

Fixes #11476

(cherry picked from commit 1c2eef384d)
2022-09-07 10:58:30 +03:00
Avi Kivity
2eadaad9f7 Merge 'database: evict all inactive reads for table when detaching table' from Botond Dénes
Currently, when detaching the table from the database, we force-evict all queriers for said table. This series broadens the scope of this force-evict to include all inactive reads registered at the semaphore. This ensures that any regular inactive read "forgotten" for any reason in the semaphore, will not end up in said readers accessing a dangling table reference when destroyed later.

Fixes: https://github.com/scylladb/scylladb/issues/11264

Closes #11273

* github.com:scylladb/scylladb:
  querier: querier_cache: remove now unused evict_all_for_table()
  database: detach_column_family(): use reader_concurrency_semaphore::evict_inactive_reads_for_table()
  reader_concurrency_semaphore: add evict_inactive_reads_for_table()

(cherry picked from commit afa7960926)
2022-09-02 10:41:22 +03:00
Yaron Kaikov
d10aee15e7 release: prepare for 5.1.0-rc1 2022-09-02 06:15:05 +03:00
Avi Kivity
9e017cb1e6 Update seastar submodule (tls error handling)
* seastar f9f5228b74...3aa91b4d2d (1):
  > Merge 'tls: vec_push: handle async errors rather than throwing on_internal_error' from Benny Halevy

Fixes #11252
2022-09-01 13:10:13 +03:00
Avi Kivity
b8504cc9b2 .gitmodules: switch seastar to scylla-seastar.git
This allows us to backport seastar patches to branch-5.1 on
scylla-seastar.git.
2022-09-01 13:08:22 +03:00
Avi Kivity
856703a85e Merge 'row_cache: Fix missing row if upper bound of population range is evicted and has adjacent dummy' from Tomasz Grabiec
Scenario:

cache = [
    row(pos=2, continuous=false),
    row(pos=after(2), dummy=true)
]

Scanning read starts, starts populating [-inf, before(2)] from sstables.

row(pos=2) is evicted.

cache = [
    row(pos=after(2), dummy=true)
]

Scanning read finishes reading from sstables.

Refreshes cache cursor via
partition_snapshot_row_cursor::maybe_refresh(), which calls
partition_snapshot_row_cursor::advance_to() because iterators are
invalidated. This advances the cursor to
after(2). no_clustering_row_between(2, after(2)) returns true, so
advance_to() returns true, and maybe_refresh() returns true. This is
interpreted by the cache reader as "the cursor has not moved forward",
so it marks the range as complete, without emitting the row with
pos=2. Also, it marks row(pos=after(2)) as continuous, so later reads
will also miss the row.

The bug is in advance_to(), which is using
no_clustering_row_between(a, b) to determine its result, which by
definition excludes the starting key.

Discovered by row_cache_test.cc::test_concurrent_reads_and_eviction
with reduced key range in the random_mutation_generator (1024 -> 16).

Fixes #11239

Closes #11240

* github.com:scylladb/scylladb:
  test: mvcc: Fix illegal use of maybe_refresh()
  tests: row_cache_test: Add test_eviction_of_upper_bound_of_population_range()
  tests: row_cache_test: Introduce one_shot mode to throttle
  row_cache: Fix missing row if upper bound of population range is evicted and has adjacent dummy
2022-08-11 16:51:59 +02:00
Yaron Kaikov
86a6c1fb2b release: prepare for 5.1.0-rc0 2022-08-09 18:48:43 +03:00
51 changed files with 734 additions and 104 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=5.1.0-dev
VERSION=5.1.0-rc4
if test -f version
then

View File

@@ -438,6 +438,11 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
rjson::add(table_description, "BillingModeSummary", rjson::empty_object());
rjson::add(table_description["BillingModeSummary"], "BillingMode", "PAY_PER_REQUEST");
rjson::add(table_description["BillingModeSummary"], "LastUpdateToPayPerRequestDateTime", rjson::value(creation_date_seconds));
// In PAY_PER_REQUEST billing mode, provisioned capacity should return 0
rjson::add(table_description, "ProvisionedThroughput", rjson::empty_object());
rjson::add(table_description["ProvisionedThroughput"], "ReadCapacityUnits", 0);
rjson::add(table_description["ProvisionedThroughput"], "WriteCapacityUnits", 0);
rjson::add(table_description["ProvisionedThroughput"], "NumberOfDecreasesToday", 0);
std::unordered_map<std::string,std::string> key_attribute_types;
// Add base table's KeySchema and collect types for AttributeDefinitions:

View File

@@ -842,6 +842,20 @@ future<> compaction_manager::really_do_stop() {
cmlog.info("Stopped");
}
template <typename Ex>
requires std::is_base_of_v<std::exception, Ex> &&
requires (const Ex& ex) {
{ ex.code() } noexcept -> std::same_as<const std::error_code&>;
}
auto swallow_enospc(const Ex& ex) noexcept {
if (ex.code().value() != ENOSPC) {
return make_exception_future<>(std::make_exception_ptr(ex));
}
cmlog.warn("Got ENOSPC on stop, ignoring...");
return make_ready_future<>();
}
void compaction_manager::do_stop() noexcept {
if (_state == state::none || _state == state::stopped) {
return;
@@ -849,7 +863,10 @@ void compaction_manager::do_stop() noexcept {
try {
_state = state::stopped;
_stop_future = really_do_stop();
_stop_future = really_do_stop()
.handle_exception_type([] (const std::system_error& ex) { return swallow_enospc(ex); })
.handle_exception_type([] (const storage_io_error& ex) { return swallow_enospc(ex); })
;
} catch (...) {
cmlog.error("Failed to stop the manager: {}", std::current_exception());
}
@@ -1050,7 +1067,7 @@ public:
bool performed() const noexcept {
return _performed;
}
private:
future<> run_offstrategy_compaction(sstables::compaction_data& cdata) {
// This procedure will reshape sstables in maintenance set until it's ready for
// integration into main set.
@@ -1083,6 +1100,7 @@ public:
return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
};
std::exception_ptr err;
while (auto desc = get_next_job()) {
desc->creator = [this, &new_unused_sstables, &t] (shard_id dummy) {
auto sst = t.make_sstable();
@@ -1091,7 +1109,16 @@ public:
};
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc->sstables);
auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
sstables::compaction_result ret;
try {
ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
} catch (sstables::compaction_stopped_exception&) {
// If off-strategy compaction stopped on user request, let's not discard the partial work.
// Therefore, both un-reshaped and reshaped data will be integrated into main set, allowing
// regular compaction to continue from where off-strategy left off.
err = std::current_exception();
break;
}
_performed = true;
// update list of reshape candidates without input but with output added to it
@@ -1128,6 +1155,9 @@ public:
for (auto& sst : sstables_to_remove) {
sst->mark_for_deletion();
}
if (err) {
co_await coroutine::return_exception_ptr(std::move(err));
}
}
protected:
virtual future<compaction_stats_opt> do_run() override {

View File

@@ -20,6 +20,7 @@
#include "tombstone_gc.hh"
#include "db/per_partition_rate_limit_extension.hh"
#include "db/per_partition_rate_limit_options.hh"
#include "utils/bloom_calculations.hh"
#include <boost/algorithm/string/predicate.hpp>
@@ -152,6 +153,16 @@ void cf_prop_defs::validate(const data_dictionary::database db, sstring ks_name,
throw exceptions::configuration_exception(KW_MAX_INDEX_INTERVAL + " must be greater than " + KW_MIN_INDEX_INTERVAL);
}
if (get_simple(KW_BF_FP_CHANCE)) {
double bloom_filter_fp_chance = get_double(KW_BF_FP_CHANCE, 0/*not used*/);
double min_bloom_filter_fp_chance = utils::bloom_calculations::min_supported_bloom_filter_fp_chance();
if (bloom_filter_fp_chance <= min_bloom_filter_fp_chance || bloom_filter_fp_chance > 1.0) {
throw exceptions::configuration_exception(format(
"{} must be larger than {} and less than or equal to 1.0 (got {})",
KW_BF_FP_CHANCE, min_bloom_filter_fp_chance, bloom_filter_fp_chance));
}
}
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
}

View File

@@ -2031,7 +2031,7 @@ future<> db::commitlog::segment_manager::shutdown() {
}
}
co_await _shutdown_promise->get_shared_future();
clogger.info("Commitlog shutdown complete");
clogger.debug("Commitlog shutdown complete");
}
void db::commitlog::segment_manager::add_file_to_dispose(named_file f, dispose_mode mode) {

View File

@@ -899,6 +899,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Ignore truncation record stored in system tables as if tables were never truncated.")
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
, cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, true,
"Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.")
, default_log_level(this, "default_log_level", value_status::Used)
, logger_log_level(this, "logger_log_level", value_status::Used)
, log_to_stdout(this, "log_to_stdout", value_status::Used)

View File

@@ -379,6 +379,8 @@ public:
named_value<bool> ignore_truncation_record;
named_value<bool> force_schema_commit_log;
named_value<bool> cache_index_pages;
seastar::logging_settings logging_settings(const log_cli::options&) const;
const db::extensions& extensions() const;

View File

@@ -868,13 +868,18 @@ void view_updates::generate_update(
bool same_row = true;
for (auto col_id : col_ids) {
auto* after = update.cells().find_cell(col_id);
// Note: multi-cell columns can't be part of the primary key.
auto& cdef = _base->regular_column_at(col_id);
if (existing) {
auto* before = existing->cells().find_cell(col_id);
// Note that this cell is necessarily atomic, because col_ids are
// view key columns, and keys must be atomic.
if (before && before->as_atomic_cell(cdef).is_live()) {
if (after && after->as_atomic_cell(cdef).is_live()) {
auto cmp = compare_atomic_cell_for_merge(before->as_atomic_cell(cdef), after->as_atomic_cell(cdef));
// We need to compare just the values of the keys, not
// metadata like the timestamp. This is because below,
// if the old and new view row have the same key, we need
// to be sure to reach the update_entry() case.
auto cmp = compare_unsigned(before->as_atomic_cell(cdef).value(), after->as_atomic_cell(cdef).value());
if (cmp != 0) {
same_row = false;
}
@@ -894,7 +899,13 @@ void view_updates::generate_update(
if (same_row) {
update_entry(base_key, update, *existing, now);
} else {
replace_entry(base_key, update, *existing, now);
// This code doesn't work if the old and new view row have the
// same key, because if they do we get both data and tombstone
// for the same timestamp (now) and the tombstone wins. This
// is why we need the "same_row" case above - it's not just a
// performance optimization.
delete_old_entry(base_key, *existing, update, now);
create_entry(base_key, update, now);
}
} else {
delete_old_entry(base_key, *existing, update, now);

View File

@@ -154,10 +154,7 @@ private:
void delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void do_delete_old_entry(const partition_key& base_key, const clustering_row& existing, const clustering_row& update, gc_clock::time_point now);
void update_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now);
void replace_entry(const partition_key& base_key, const clustering_row& update, const clustering_row& existing, gc_clock::time_point now) {
create_entry(base_key, update, now);
delete_old_entry(base_key, existing, update, now);
}
void update_entry_for_computed_column(const partition_key& base_key, const clustering_row& update, const std::optional<clustering_row>& existing, gc_clock::time_point now);
};
class view_update_builder {

View File

@@ -83,7 +83,7 @@ overloaded_exception::overloaded_exception(size_t c) noexcept
{}
rate_limit_exception::rate_limit_exception(const sstring& ks, const sstring& cf, db::operation_type op_type_, bool rejected_by_coordinator_) noexcept
: cassandra_exception(exception_code::CONFIG_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type_, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
: cassandra_exception(exception_code::RATE_LIMIT_ERROR, prepare_message("Per-partition rate limit reached for {} in table {}.{}, rejected by {}", op_type_, ks, cf, rejected_by_coordinator_ ? "coordinator" : "replicas"))
, op_type(op_type_)
, rejected_by_coordinator(rejected_by_coordinator_)
{ }

View File

@@ -600,6 +600,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
cfg->broadcast_to_all_shards().get();
// We pass this piece of config through a global as a temporary hack.
// See the comment at the definition of sstables::global_cache_index_pages.
smp::invoke_on_all([&cfg] {
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
}).get();
::sighup_handler sighup_handler(opts, *cfg);
auto stop_sighup_handler = defer_verbose_shutdown("sighup", [&] {
sighup_handler.stop().get();

View File

@@ -467,6 +467,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
// should not be blocked by any data requests.
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
// ATTN -- if moving GOSSIP_ verbs elsewhere, mind updating the tcp_nodelay
// setting in get_rpc_client(), which assumes gossiper verbs live in idx 0
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
@@ -737,7 +739,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
}();
auto must_tcp_nodelay = [&] {
if (idx == 1) {
if (idx == 0) {
return true; // gossip
}
if (_cfg.tcp_nodelay == tcp_nodelay_what::local) {

View File

@@ -826,6 +826,7 @@ public:
void apply(tombstone deleted_at) {
_deleted_at.apply(deleted_at);
maybe_shadow();
}
void apply(shadowable_tombstone deleted_at) {

View File

@@ -444,7 +444,7 @@ public:
// When throws, the cursor is invalidated and its position is not changed.
bool advance_to(position_in_partition_view lower_bound) {
maybe_advance_to(lower_bound);
return no_clustering_row_between(_schema, lower_bound, position());
return no_clustering_row_between_weak(_schema, lower_bound, position());
}
// Call only when valid.

View File

@@ -571,6 +571,20 @@ bool no_clustering_row_between(const schema& s, position_in_partition_view a, po
}
}
// Returns true if and only if there can't be any clustering_row with position >= a and < b.
// It is assumed that a <= b.
inline
bool no_clustering_row_between_weak(const schema& s, position_in_partition_view a, position_in_partition_view b) {
clustering_key_prefix::equality eq(s);
if (a.has_key() && b.has_key()) {
return eq(a.key(), b.key())
&& (a.get_bound_weight() == bound_weight::after_all_prefixed
|| b.get_bound_weight() != bound_weight::after_all_prefixed);
} else {
return !a.has_key() && !b.has_key();
}
}
// Includes all position_in_partition objects "p" for which: start <= p < end
// And only those.
class position_range {

View File

@@ -413,25 +413,6 @@ future<bool> querier_cache::evict_one() noexcept {
co_return false;
}
future<> querier_cache::evict_all_for_table(const utils::UUID& schema_id) noexcept {
for (auto ip : {&_data_querier_index, &_mutation_querier_index, &_shard_mutation_querier_index}) {
auto& idx = *ip;
for (auto it = idx.begin(); it != idx.end();) {
if (it->second->schema().id() == schema_id) {
auto reader_opt = it->second->permit().semaphore().unregister_inactive_read(querier_utils::get_inactive_read_handle(*it->second));
it = idx.erase(it);
--_stats.population;
if (reader_opt) {
co_await reader_opt->close();
}
} else {
++it;
}
}
}
co_return;
}
future<> querier_cache::stop() noexcept {
co_await _closing_gate.close();

View File

@@ -383,11 +383,6 @@ public:
/// is empty).
future<bool> evict_one() noexcept;
/// Evict all queriers that belong to a table.
///
/// Should be used when dropping a table.
future<> evict_all_for_table(const utils::UUID& schema_id) noexcept;
/// Close all queriers and wait on background work.
///
/// Should be used before destroying the querier_cache.

View File

@@ -9,6 +9,7 @@
#include <boost/range/adaptor/reversed.hpp>
#include "range_tombstone_list.hh"
#include "utils/allocation_strategy.hh"
#include "utils/amortized_reserve.hh"
#include <seastar/util/variant_utils.hh>
range_tombstone_list::range_tombstone_list(const range_tombstone_list& x)
@@ -375,13 +376,13 @@ range_tombstone_list::reverter::insert(range_tombstones_type::iterator it, range
range_tombstone_list::range_tombstones_type::iterator
range_tombstone_list::reverter::erase(range_tombstones_type::iterator it) {
_ops.reserve(_ops.size() + 1);
amortized_reserve(_ops, _ops.size() + 1);
_ops.emplace_back(erase_undo_op(*it));
return _dst._tombstones.erase(it);
}
void range_tombstone_list::reverter::update(range_tombstones_type::iterator it, range_tombstone&& new_rt) {
_ops.reserve(_ops.size() + 1);
amortized_reserve(_ops, _ops.size() + 1);
swap(it->tombstone(), new_rt);
_ops.emplace_back(update_undo_op(std::move(new_rt), *it));
}

View File

@@ -12,6 +12,7 @@
#include "range_tombstone.hh"
#include "query-request.hh"
#include "utils/preempt.hh"
#include "utils/chunked_vector.hh"
#include <iosfwd>
#include <variant>
@@ -106,7 +107,7 @@ class range_tombstone_list final {
class reverter {
private:
using op = std::variant<erase_undo_op, insert_undo_op, update_undo_op>;
std::vector<op> _ops;
utils::chunked_vector<op> _ops;
const schema& _s;
protected:
range_tombstone_list& _dst;

View File

@@ -749,6 +749,25 @@ void reader_concurrency_semaphore::clear_inactive_reads() {
}
}
future<> reader_concurrency_semaphore::evict_inactive_reads_for_table(utils::UUID id) noexcept {
inactive_reads_type evicted_readers;
auto it = _inactive_reads.begin();
while (it != _inactive_reads.end()) {
auto& ir = *it;
++it;
if (ir.reader.schema()->id() == id) {
do_detach_inactive_reader(ir, evict_reason::manual);
ir.ttl_timer.cancel();
ir.unlink();
evicted_readers.push_back(ir);
}
}
while (!evicted_readers.empty()) {
std::unique_ptr<inactive_read> irp(&evicted_readers.front());
co_await irp->reader.close();
}
}
std::runtime_error reader_concurrency_semaphore::stopped_exception() {
return std::runtime_error(format("{} was stopped", _name));
}
@@ -771,11 +790,9 @@ future<> reader_concurrency_semaphore::stop() noexcept {
co_return;
}
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
auto reader = std::move(ir.reader);
void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
ir.detach();
reader.permit()._impl->on_evicted();
std::unique_ptr<inactive_read> irp(&ir);
ir.reader.permit()._impl->on_evicted();
try {
if (ir.notify_handler) {
ir.notify_handler(reason);
@@ -794,7 +811,12 @@ flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(ina
break;
}
--_stats.inactive_reads;
return reader;
}
flat_mutation_reader_v2 reader_concurrency_semaphore::detach_inactive_reader(inactive_read& ir, evict_reason reason) noexcept {
std::unique_ptr<inactive_read> irp(&ir);
do_detach_inactive_reader(ir, reason);
return std::move(irp->reader);
}
void reader_concurrency_semaphore::evict(inactive_read& ir, evict_reason reason) noexcept {

View File

@@ -187,6 +187,7 @@ private:
std::optional<future<>> _execution_loop_future;
private:
void do_detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
[[nodiscard]] flat_mutation_reader_v2 detach_inactive_reader(inactive_read&, evict_reason reason) noexcept;
void evict(inactive_read&, evict_reason reason) noexcept;
@@ -302,6 +303,9 @@ public:
/// Clear all inactive reads.
void clear_inactive_reads();
/// Evict all inactive reads the belong to the table designated by the id.
future<> evict_inactive_reads_for_table(utils::UUID id) noexcept;
private:
// The following two functions are extension points for
// future inheriting classes that needs to run some stop

View File

@@ -847,12 +847,12 @@ future<> shard_reader_v2::do_fill_buffer() {
}
auto res = co_await(std::move(fill_buf_fut));
_end_of_stream = res.end_of_stream;
reserve_additional(res.buffer->size());
for (const auto& mf : *res.buffer) {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf));
co_await coroutine::maybe_yield();
}
_end_of_stream = res.end_of_stream;
}
future<> shard_reader_v2::fill_buffer() {

View File

@@ -1017,7 +1017,9 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
remove(*cf);
cf->clear_views();
co_await cf->await_pending_ops();
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) {
co_await sem->evict_inactive_reads_for_table(uuid);
}
auto f = co_await coroutine::as_future(truncate(ks, *cf, std::move(tsf), snapshot));
co_await cf->stop();
f.get(); // re-throw exception from truncate() if any
@@ -2238,10 +2240,14 @@ future<> database::stop() {
// try to ensure that CL has done disk flushing
if (_commitlog) {
dblog.info("Shutting down commitlog");
co_await _commitlog->shutdown();
dblog.info("Shutting down commitlog complete");
}
if (_schema_commitlog) {
dblog.info("Shutting down schema commitlog");
co_await _schema_commitlog->shutdown();
dblog.info("Shutting down schema commitlog complete");
}
co_await _view_update_concurrency_sem.wait(max_memory_pending_view_updates());
if (_commitlog) {

View File

@@ -586,7 +586,8 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
auto permit = std::move(flush_permit);
auto r = exponential_backoff_retry(100ms, 10s);
// Try flushing for around half an hour (30 minutes every 10 seconds)
int allowed_retries = 30 * 60 / 10;
int default_retries = 30 * 60 / 10;
int allowed_retries = default_retries;
std::optional<utils::phased_barrier::operation> op;
size_t memtable_size;
future<> previous_flush = make_ready_future<>();
@@ -599,7 +600,19 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
} catch (...) {
ex = std::current_exception();
_config.cf_stats->failed_memtables_flushes_count++;
auto abort_on_error = [ex] () {
if (try_catch<std::bad_alloc>(ex)) {
// There is a chance something else will free the memory, so we can try again
allowed_retries--;
} else if (auto ep = try_catch<std::system_error>(ex)) {
allowed_retries = ep->code().value() == ENOSPC ? default_retries : 0;
} else if (auto ep = try_catch<storage_io_error>(ex)) {
allowed_retries = ep->code().value() == ENOSPC ? default_retries : 0;
} else {
allowed_retries = 0;
}
if (allowed_retries <= 0) {
// At this point we don't know what has happened and it's better to potentially
// take the node down and rely on commitlog to replay.
//
@@ -608,14 +621,6 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
// may end up in an infinite crash loop.
tlogger.error("Memtable flush failed due to: {}. Aborting, at {}", ex, current_backtrace());
std::abort();
};
if (try_catch<std::bad_alloc>(ex)) {
// There is a chance something else will free the memory, so we can try again
if (allowed_retries-- <= 0) {
abort_on_error();
}
} else {
abort_on_error();
}
}
if (_async_gate.is_closed()) {
@@ -681,7 +686,7 @@ table::seal_active_memtable(flush_permit&& flush_permit) noexcept {
auto write_permit = permit.release_sstable_write_permit();
utils::get_local_injector().inject("table_seal_active_memtable_try_flush", []() {
throw std::bad_alloc();
throw std::system_error(ENOSPC, std::system_category(), "Injected error");
});
co_return co_await this->try_flush_memtable_to_sstable(old, std::move(write_permit));
});

Submodule seastar updated: f9f5228b74...3aa91b4d2d

View File

@@ -1189,7 +1189,7 @@ private:
}
index_reader& get_index_reader() {
if (!_index_reader) {
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
_consumer.trace_state(), caching, _single_partition_read);
}

View File

@@ -1319,7 +1319,7 @@ private:
}
index_reader& get_index_reader() {
if (!_index_reader) {
auto caching = use_caching(!_slice.options.contains(query::partition_slice::option::bypass_cache));
auto caching = use_caching(global_cache_index_pages && !_slice.options.contains(query::partition_slice::option::bypass_cache));
_index_reader = std::make_unique<index_reader>(_sst, _consumer.permit(), _consumer.io_priority(),
_consumer.trace_state(), caching, _single_partition_read);
}
@@ -1754,9 +1754,7 @@ public:
_monitor.on_read_started(_context->reader_position());
}
public:
void on_out_of_clustering_range() override {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
}
void on_out_of_clustering_range() override { }
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}

View File

@@ -87,6 +87,18 @@ thread_local disk_error_signal_type sstable_write_error;
namespace sstables {
// The below flag governs the mode of index file page caching used by the index
// reader.
//
// If set to true, the reader will read and/or populate a common global cache,
// which shares its capacity with the row cache. If false, the reader will use
// BYPASS CACHE semantics for index caching.
//
// This flag is intended to be a temporary hack. The goal is to eventually
// solve index caching problems via a smart cache replacement policy.
//
thread_local utils::updateable_value<bool> global_cache_index_pages(false);
logging::logger sstlog("sstable");
// Because this is a noop and won't hold any state, it is better to use a global than a

View File

@@ -50,6 +50,7 @@
#include "mutation_fragment_stream_validator.hh"
#include "readers/flat_mutation_reader_fwd.hh"
#include "tracing/trace_state.hh"
#include "utils/updateable_value.hh"
#include <seastar/util/optimized_optional.hh>
@@ -58,6 +59,8 @@ class cached_file;
namespace sstables {
extern thread_local utils::updateable_value<bool> global_cache_index_pages;
namespace mc {
class writer;
}

View File

@@ -94,9 +94,9 @@ def test_describe_table_size(test_table):
# Test the ProvisionedThroughput attribute returned by DescribeTable.
# This is a very partial test: Our test table is configured without
# provisioned throughput, so obviously it will not have interesting settings
# for it. DynamoDB returns zeros for some of the attributes, even though
# the documentation suggests missing values should have been fine too.
@pytest.mark.xfail(reason="DescribeTable does not return provisioned throughput")
# for it. But DynamoDB documents that zeros be returned for WriteCapacityUnits
# and ReadCapacityUnits, and does this in practice as well - and some
# applications assume these numbers are always there (even if 0).
def test_describe_table_provisioned_throughput(test_table):
got = test_table.meta.client.describe_table(TableName=test_table.name)['Table']
assert got['ProvisionedThroughput']['NumberOfDecreasesToday'] == 0

View File

@@ -427,6 +427,126 @@ def test_gsi_update_second_regular_base_column(test_table_gsi_3):
KeyConditions={'a': {'AttributeValueList': [items[3]['a']], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [items[3]['b']], 'ComparisonOperator': 'EQ'}})
# Test reproducing issue #11801: In issue #5006 we noticed that in the special
# case of a GSI with with two non-key attributes as keys (test_table_gsi_3),
# an update of the second attribute forgot to delete the old row. We fixed
# that bug, but a bug remained for updates which update the value to the *same*
# value - in that case the old row shouldn't be deleted, but we did - as
# noticed in issue #11801.
def test_11801(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Update the attribute 'b' to the same value b that it already had.
# This shouldn't change anything in the base table or in the GSI
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# In issue #11801, the following assertion failed (the view row was
# deleted and nothing matched the query).
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Above we checked that setting 'b' to the same value didn't remove
# the old GSI row. But the same update may actually modify the GSI row
# (e.g., an unrelated attribute d) - check this modification took place:
item['d'] = random_string()
test_table_gsi_3.update_item(Key={'p': p},
AttributeUpdates={'b': {'Value': b, 'Action': 'PUT'},
'd': {'Value': item['d'], 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but updating the first attribute (a)
# instead of the second (b). This test didn't fail, showing that issue #11801
# is - like #5006 - specific to the case of updating the second attribute.
def test_11801_variant1(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
d = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': d}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'a': {'Value': a, 'Action': 'PUT'}})
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but updates b to a different value
# (newb) instead of to the same one. This test didn't fail, showing that
# issue #11801 is specific to updates to the same value. This test basically
# reproduces the already-fixed #5006 (we also have another test above which
# reproduces that issue - test_gsi_update_second_regular_base_column())
def test_11801_variant2(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
newb = random_string()
item['b'] = newb
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'b': {'Value': newb, 'Action': 'PUT'}})
assert_index_query(test_table_gsi_3, 'hello', [],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [newb], 'ComparisonOperator': 'EQ'}})
# This test is the same as test_11801, but uses a different table schema
# (test_table_gsi_5) where there is only one new key column in the view (x).
# This test passed, showing that issue #11801 was specific to the special
# case of a view with two new key columns (test_table_gsi_3).
def test_11801_variant3(test_table_gsi_5):
p = random_string()
c = random_string()
x = random_string()
item = {'p': p, 'c': c, 'x': x, 'd': random_string()}
test_table_gsi_5.put_item(Item=item)
assert_index_query(test_table_gsi_5, 'hello', [item],
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
test_table_gsi_5.update_item(Key={'p': p, 'c': c}, AttributeUpdates={'x': {'Value': x, 'Action': 'PUT'}})
assert item == test_table_gsi_5.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_5, 'hello', [item],
KeyConditions={'p': {'AttributeValueList': [p], 'ComparisonOperator': 'EQ'},
'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
# Another test similar to test_11801, but instead of updating a view key
# column to the same value it already has, simply don't update it at all
# (and just modify some other regular column). This test passed, showing
# that issue #11801 is specific to the case of updating a view key column
# to the same value it already had.
def test_11801_variant4(test_table_gsi_3):
p = random_string()
a = random_string()
b = random_string()
item = {'p': p, 'a': a, 'b': b, 'd': random_string()}
test_table_gsi_3.put_item(Item=item)
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# An update that doesn't change the GSI keys (a or b), just a regular
# column d.
item['d'] = random_string()
test_table_gsi_3.update_item(Key={'p': p}, AttributeUpdates={'d': {'Value': item['d'], 'Action': 'PUT'}})
assert item == test_table_gsi_3.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert_index_query(test_table_gsi_3, 'hello', [item],
KeyConditions={'a': {'AttributeValueList': [a], 'ComparisonOperator': 'EQ'},
'b': {'AttributeValueList': [b], 'ComparisonOperator': 'EQ'}})
# Test that when a table has a GSI, if the indexed attribute is missing, the
# item is added to the base table but not the index.
# This is the same feature we already tested in test_gsi_missing_attribute()

View File

@@ -12,6 +12,7 @@
#include <deque>
#include <random>
#include "utils/chunked_vector.hh"
#include "utils/amortized_reserve.hh"
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm/equal.hpp>
@@ -207,3 +208,37 @@ BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
v.emplace_back(std::make_unique<uint64_t>(i));
}
}
BOOST_AUTO_TEST_CASE(test_amoritzed_reserve) {
utils::chunked_vector<int> v;
v.reserve(10);
amortized_reserve(v, 1);
BOOST_REQUIRE_EQUAL(v.capacity(), 10);
BOOST_REQUIRE_EQUAL(v.size(), 0);
v = {};
amortized_reserve(v, 1);
BOOST_REQUIRE_EQUAL(v.capacity(), 1);
BOOST_REQUIRE_EQUAL(v.size(), 0);
v = {};
amortized_reserve(v, 1);
BOOST_REQUIRE_EQUAL(v.capacity(), 1);
amortized_reserve(v, 2);
BOOST_REQUIRE_EQUAL(v.capacity(), 2);
amortized_reserve(v, 3);
BOOST_REQUIRE_EQUAL(v.capacity(), 4);
amortized_reserve(v, 4);
BOOST_REQUIRE_EQUAL(v.capacity(), 4);
amortized_reserve(v, 5);
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
amortized_reserve(v, 6);
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
amortized_reserve(v, 7);
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
amortized_reserve(v, 7);
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
amortized_reserve(v, 1);
BOOST_REQUIRE_EQUAL(v.capacity(), 8);
}

View File

@@ -1852,6 +1852,29 @@ SEASTAR_TEST_CASE(test_continuity_merging_of_complete_mutations) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_commutativity_and_associativity) {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
gen.set_key_cardinality(7);
for (int i = 0; i < 10; ++i) {
mutation m1 = gen();
m1.partition().make_fully_continuous();
mutation m2 = gen();
m2.partition().make_fully_continuous();
mutation m3 = gen();
m3.partition().make_fully_continuous();
assert_that(m1 + m2 + m3)
.is_equal_to(m1 + m3 + m2)
.is_equal_to(m2 + m1 + m3)
.is_equal_to(m2 + m3 + m1)
.is_equal_to(m3 + m1 + m2)
.is_equal_to(m3 + m2 + m1);
}
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_continuity_merging) {
return seastar::async([] {
simple_schema table;

View File

@@ -641,7 +641,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
static mutation_partition read_using_cursor(partition_snapshot& snap) {
tests::reader_concurrency_semaphore_wrapper semaphore;
partition_snapshot_row_cursor cur(*snap.schema(), snap);
cur.maybe_refresh();
cur.advance_to(position_in_partition::before_all_clustered_rows());
auto mp = read_partition_from(*snap.schema(), cur);
for (auto&& rt : snap.range_tombstones()) {
mp.apply_delete(*snap.schema(), rt);

View File

@@ -327,11 +327,6 @@ public:
return *this;
}
test_querier_cache& evict_all_for_table() {
_cache.evict_all_for_table(get_schema()->id()).get();
return *this;
}
test_querier_cache& no_misses() {
BOOST_REQUIRE_EQUAL(_cache.get_stats().misses, _expected_stats.misses);
return *this;
@@ -727,21 +722,6 @@ SEASTAR_THREAD_TEST_CASE(test_resources_based_cache_eviction) {
}, std::move(db_cfg_ptr)).get();
}
SEASTAR_THREAD_TEST_CASE(test_evict_all_for_table) {
test_querier_cache t;
const auto entry = t.produce_first_page_and_save_mutation_querier();
t.evict_all_for_table();
t.assert_cache_lookup_mutation_querier(entry.key, *t.get_schema(), entry.expected_range, entry.expected_slice)
.misses()
.no_drops()
.no_evictions();
// Check that the querier was removed from the semaphore too.
BOOST_CHECK(!t.get_semaphore().try_evict_one_inactive_read());
}
SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
test_querier_cache t;

View File

@@ -10,6 +10,7 @@
#include "test/lib/simple_schema.hh"
#include "test/lib/eventually.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/testing/test_case.hh>
@@ -915,3 +916,44 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_used_blocked) {
}
}
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_for_table) {
auto spec = tests::make_random_schema_specification(get_name());
std::list<tests::random_schema> schemas;
std::unordered_map<tests::random_schema*, std::vector<reader_concurrency_semaphore::inactive_read_handle>> schema_handles;
for (unsigned i = 0; i < 4; ++i) {
auto& s = schemas.emplace_back(tests::random_schema(i, *spec));
schema_handles.emplace(&s, std::vector<reader_concurrency_semaphore::inactive_read_handle>{});
}
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
auto stop_sem = deferred_stop(semaphore);
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
for (int i = 0; i < 10; ++i) {
handles.emplace_back(semaphore.register_inactive_read(make_empty_flat_reader_v2(s.schema(), semaphore.make_tracking_only_permit(s.schema().get(), get_name(), db::no_timeout))));
}
}
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
}
for (auto& s : schemas) {
auto& handles = schema_handles[&s];
BOOST_REQUIRE(std::all_of(handles.begin(), handles.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
semaphore.evict_inactive_reads_for_table(s.schema()->id()).get();
for (const auto& [k, v] : schema_handles) {
if (k == &s) {
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return !bool(handle); }));
} else if (!v.empty()) {
BOOST_REQUIRE(std::all_of(v.begin(), v.end(), [] (const reader_concurrency_semaphore::inactive_read_handle& handle) { return bool(handle); }));
}
}
handles.clear();
}
}

View File

@@ -1235,9 +1235,13 @@ SEASTAR_TEST_CASE(test_update_failure) {
class throttle {
unsigned _block_counter = 0;
promise<> _p; // valid when _block_counter != 0, resolves when goes down to 0
std::optional<promise<>> _entered;
bool _one_shot;
public:
// one_shot means whether only the first enter() after block() will block.
throttle(bool one_shot = false) : _one_shot(one_shot) {}
future<> enter() {
if (_block_counter) {
if (_block_counter && (!_one_shot || _entered)) {
promise<> p1;
promise<> p2;
@@ -1249,16 +1253,21 @@ public:
p3.set_value();
});
_p = std::move(p2);
if (_entered) {
_entered->set_value();
_entered.reset();
}
return f1;
} else {
return make_ready_future<>();
}
}
void block() {
future<> block() {
++_block_counter;
_p = promise<>();
_entered = promise<>();
return _entered->get_future();
}
void unblock() {
@@ -1402,7 +1411,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
mt2->apply(m);
}
thr.block();
auto f = thr.block();
auto m0_range = dht::partition_range::make_singular(ring[0].ring_position());
auto rd1 = cache.make_reader(s, semaphore.make_permit(), m0_range);
@@ -1413,6 +1422,7 @@ SEASTAR_TEST_CASE(test_cache_population_and_update_race) {
rd2.set_max_buffer_size(1);
auto rd2_fill_buffer = rd2.fill_buffer();
f.get();
sleep(10ms).get();
// This update should miss on all partitions
@@ -1540,12 +1550,13 @@ SEASTAR_TEST_CASE(test_cache_population_and_clear_race) {
mt2->apply(m);
}
thr.block();
auto f = thr.block();
auto rd1 = cache.make_reader(s, semaphore.make_permit());
rd1.set_max_buffer_size(1);
auto rd1_fill_buffer = rd1.fill_buffer();
f.get();
sleep(10ms).get();
// This update should miss on all partitions
@@ -3341,6 +3352,7 @@ SEASTAR_TEST_CASE(test_tombstone_merging_of_overlapping_tombstones_in_many_versi
SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) {
return seastar::async([] {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
gen.set_key_cardinality(16);
memtable_snapshot_source underlying(gen.schema());
schema_ptr s = gen.schema();
schema_ptr rev_s = s->make_reversed();
@@ -3994,3 +4006,81 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
BOOST_ASSERT(rt.calculate_size() == 1);
});
}
SEASTAR_TEST_CASE(test_eviction_of_upper_bound_of_population_range) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto cache_mt = make_lw_shared<replica::memtable>(s.schema());
auto pkey = s.make_pkey("pk");
mutation m1(s.schema(), pkey);
s.add_row(m1, s.make_ckey(1), "v1");
s.add_row(m1, s.make_ckey(2), "v2");
cache_mt->apply(m1);
cache_tracker tracker;
throttle thr(true);
auto cache_source = make_decorated_snapshot_source(snapshot_source([&] { return cache_mt->as_data_source(); }),
[&] (mutation_source src) {
return throttled_mutation_source(thr, std::move(src));
});
row_cache cache(s.schema(), cache_source, tracker);
auto pr = dht::partition_range::make_singular(pkey);
auto read = [&] (int start, int end) {
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make(s.make_ckey(start), s.make_ckey(end)))
.build();
auto rd = cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice);
auto close_rd = deferred_close(rd);
auto m_cache = read_mutation_from_flat_mutation_reader(rd).get0();
close_rd.close_now();
rd = cache_mt->make_flat_reader(s.schema(), semaphore.make_permit(), pr, slice);
auto close_rd2 = deferred_close(rd);
auto m_mt = read_mutation_from_flat_mutation_reader(rd).get0();
BOOST_REQUIRE(m_mt);
assert_that(m_cache).has_mutation().is_equal_to(*m_mt);
};
// populate [2]
{
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make_singular(s.make_ckey(2)))
.build();
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
.has_monotonic_positions();
}
auto arrived = thr.block();
// Read [0, 2]
auto f = seastar::async([&] {
read(0, 2);
});
arrived.get();
// populate (2, 3]
{
auto slice = partition_slice_builder(*s.schema())
.with_range(query::clustering_range::make(query::clustering_range::bound(s.make_ckey(2), false),
query::clustering_range::bound(s.make_ckey(3), true)))
.build();
assert_that(cache.make_reader(s.schema(), semaphore.make_permit(), pr, slice))
.has_monotonic_positions();
}
testlog.trace("Evicting");
evict_one_row(tracker); // Evicts before(0)
evict_one_row(tracker); // Evicts ck(2)
testlog.trace("Unblocking");
thr.unblock();
f.get();
read(0, 3);
});
}

View File

@@ -3149,3 +3149,58 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
return test_env::do_with_async([] (test_env& env) {
simple_schema table;
auto mut = table.new_mutation("pk0");
auto ckeys = table.make_ckeys(4);
table.add_row(mut, ckeys[0], "v0");
table.add_row(mut, ckeys[1], "v1");
table.add_row(mut, ckeys[2], "v2");
using bound = query::clustering_range::bound;
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
auto tmp = tmpdir();
auto sst_gen = [&env, &table, &tmp] () {
return env.make_sstable(table.schema(), tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, {mut});
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
return test_env::do_with_async([this] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto tmp = tmpdir();
auto sst_gen = [&env, schema, &tmp] () {
return env.make_sstable(schema, tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, muts);
{
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
for (const auto& mut : muts) {
rd.produces(mut);
}
}
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
});
}

View File

@@ -6,6 +6,7 @@ import pytest
import rest_api
import nodetool
from util import new_test_table
from cassandra.protocol import ConfigurationException
# Test inserts `N` rows into table, flushes it
# and tries to read `M` non-existing keys.
@@ -29,3 +30,27 @@ def test_bloom_filter(scylla_only, cql, test_keyspace, N, M, fp_chance):
ratio = fp / M
assert ratio >= fp_chance * 0.7 and ratio <= fp_chance * 1.15
# Test very small bloom_filter_fp_chance settings.
# The Cassandra documentation suggests that bloom_filter_fp_chance can be set
# to anything between 0 and 1, and the Datastax documentation even goes further
# and explains that 0 means "the largest possible Bloom filter".
# But in practice, there is a minimal false-positive chance that the Bloom
# filter can possibly achieve and Cassandra refuses lower settings (see
# CASSANDRA-11920) and Scylla should do the same instead of crashing much
# later during a memtable flush as it did in issue #11524.
@pytest.mark.parametrize("fp_chance", [1e-5, 0])
def test_small_bloom_filter_fp_chance(cql, test_keyspace, fp_chance):
with pytest.raises(ConfigurationException):
with new_test_table(cql, test_keyspace, 'a int PRIMARY KEY', f'WITH bloom_filter_fp_chance = {fp_chance}') as table:
cql.execute(f'INSERT INTO {table} (a) VALUES (1)')
# In issue #11524, Scylla used to crash during this flush after the
# table creation succeeded above.
nodetool.flush(cql, table)
# Check that bloom_filter_fp_chance outside [0, 1] (i.e., > 1 or < 0)
# is, unsurprisingly, forbidden.
@pytest.mark.parametrize("fp_chance", [-0.1, 1.1])
def test_invalid_bloom_filter_fp_chance(cql, test_keyspace, fp_chance):
with pytest.raises(ConfigurationException):
with new_test_table(cql, test_keyspace, 'a int PRIMARY KEY', f'WITH bloom_filter_fp_chance = {fp_chance}') as table:
pass

View File

@@ -2013,6 +2013,11 @@ public:
_blobs = boost::copy_range<std::vector<bytes>>(keys | boost::adaptors::transformed([this] (sstring& k) { return to_bytes(k); }));
}
void set_key_cardinality(size_t n_keys) {
assert(n_keys <= n_blobs);
_ck_index_dist = std::uniform_int_distribution<size_t>{0, n_keys - 1};
}
bytes random_blob() {
return _blobs[std::min(_blobs.size() - 1, std::max<size_t>(0, _ck_index_dist(_gen)))];
}
@@ -2236,12 +2241,23 @@ public:
};
size_t row_count = row_count_dist(_gen);
for (size_t i = 0; i < row_count; ++i) {
auto ckey = make_random_key();
std::unordered_set<clustering_key, clustering_key::hashing, clustering_key::equality> keys(
0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema));
while (keys.size() < row_count) {
keys.emplace(make_random_key());
}
for (auto&& ckey : keys) {
is_continuous continuous = is_continuous(_bool_dist(_gen));
if (_not_dummy_dist(_gen)) {
deletable_row& row = m.partition().clustered_row(*_schema, ckey, is_dummy::no, continuous);
row.apply(random_row_marker());
if (!row.marker().is_missing() && !row.marker().is_live()) {
// Mutations are not associative if dead marker is not matched with a dead row
// due to shadowable tombstone merging rules. See #11307.
row.apply(tombstone(row.marker().timestamp(), row.marker().deletion_time()));
}
if (_bool_dist(_gen)) {
set_random_cells(row.cells(), column_kind::regular_column);
} else {
@@ -2332,6 +2348,10 @@ std::vector<query::clustering_range> random_mutation_generator::make_random_rang
return _impl->make_random_ranges(n_ranges);
}
void random_mutation_generator::set_key_cardinality(size_t n_keys) {
_impl->set_key_cardinality(n_keys);
}
void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mutation>&,
schema_ptr, const std::vector<mutation>&)> fn) {
auto map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, true);

View File

@@ -64,6 +64,8 @@ public:
range_tombstone make_random_range_tombstone();
std::vector<dht::decorated_key> make_partition_keys(size_t n);
std::vector<query::clustering_range> make_random_ranges(unsigned n_ranges);
// Sets the number of distinct clustering keys which will be used in generated mutations.
void set_key_cardinality(size_t);
};
bytes make_blob(size_t blob_size);

View File

@@ -43,6 +43,9 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
auto prefill_compacted = logalloc::memory_compacted();
auto prefill_allocated = logalloc::memory_allocated();
scheduling_latency_measurer memtable_slm;
memtable_slm.start();
auto mt = make_lw_shared<replica::memtable>(s);
auto fill_d = duration_in_seconds([&] {
while (mt->occupancy().total_space() < memtable_size) {
@@ -54,7 +57,8 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
}
}
});
std::cout << format("Memtable fill took {:.6f} [ms]", fill_d.count() * 1000) << std::endl;
memtable_slm.stop();
std::cout << format("Memtable fill took {:.6f} [ms], {}", fill_d.count() * 1000, memtable_slm) << std::endl;
std::cout << "Draining..." << std::endl;
auto drain_d = duration_in_seconds([&] {
@@ -223,6 +227,40 @@ void test_partition_with_lots_of_range_tombstones() {
});
}
// This test case stresses handling of overlapping range tombstones
void test_partition_with_lots_of_range_tombstones_with_residuals() {
auto s = schema_builder("ks", "cf")
.with_column("pk", uuid_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v1", bytes_type, column_kind::regular_column)
.with_column("v2", bytes_type, column_kind::regular_column)
.with_column("v3", bytes_type, column_kind::regular_column)
.build();
auto pk = dht::decorate_key(*s, partition_key::from_single_value(*s,
serialized(utils::UUID_gen::get_time_UUID())));
int ck_idx = 0;
run_test("Large partition, lots of range tombstones with residuals", s, [&] {
mutation m(s, pk);
auto val = data_value(bytes(bytes::initialized_later(), cell_size));
auto ck = clustering_key::from_single_value(*s, serialized(ck_idx++));
auto r = query::clustering_range::make({ck}, {ck});
tombstone tomb(api::new_timestamp(), gc_clock::now());
m.partition().apply_row_tombstone(*s, range_tombstone(bound_view::from_range_start(r), bound_view::top(), tomb));
// Stress range tombstone overlapping with lots of range tombstones
auto stride = 1'000'000;
if (ck_idx == stride) {
ck = clustering_key::from_single_value(*s, serialized(ck_idx - stride));
r = query::clustering_range::make({ck}, {ck});
m.partition().apply_row_tombstone(*s, range_tombstone(bound_view::from_range_start(r), bound_view::top(), tomb));
}
return m;
});
}
int main(int argc, char** argv) {
app_template app;
return app.run(argc, argv, [&app] {
@@ -236,6 +274,7 @@ int main(int argc, char** argv) {
test_partition_with_few_small_rows();
test_partition_with_lots_of_small_rows();
test_partition_with_lots_of_range_tombstones();
test_partition_with_lots_of_range_tombstones_with_residuals();
});
});
}

View File

@@ -81,6 +81,11 @@ public:
}
}
future<> error() {
_barrier.abort();
return make_ready_future<>();
}
unsigned get_phase() const noexcept { return _phase.load(); }
};
@@ -115,6 +120,16 @@ int main(int argc, char **argv) {
}
w.stop().get();
}
std::vector<int> count(64);
parallel_for_each(count, [] (auto& cnt) -> future<> {
std::vector<sharded<worker>> w(32);
co_await parallel_for_each(w, [] (auto &sw) -> future<> {
co_await sw.start(utils::cross_shard_barrier());
co_await sw.invoke_on_all(&worker::error);
co_await sw.stop();
});
}).get();
});
});
}

View File

@@ -727,7 +727,10 @@ future<fragmented_temporary_buffer> cql_server::connection::read_and_decompress_
if (ret < 0) {
throw std::runtime_error("CQL frame LZ4 uncompression failure");
}
return out.size();
if (ret != out.size()) {
throw std::runtime_error("Malformed CQL frame - provided uncompressed size different than real uncompressed size");
}
return static_cast<size_t>(ret);
});
on_compression_buffer_use();
return uncomp;

View File

@@ -0,0 +1,54 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <concepts>
#include <vector>
#include <memory>
/// Represents a container which can preallocate space for future insertions
/// which can be used to reduce the number of overall memory re-allocation and item movement.
///
/// The number of items for which space is currently reserved is returned by capacity().
/// This includes items currently present in the container.
///
/// The number of items currently present is returned by size().
///
/// Invariant:
///
/// size() <= capacity()
///
/// Space is reserved by calling reserve(desired_capacity).
/// The post-condition of calling reserve() is:
///
/// capacity() >= desired_capacity
///
/// It is guaranteed insertion of (capacity() - size()) items does not
/// throw if T::value_type constructor and move constructor do not throw.
template <typename T>
concept ContainerWithCapacity = requires (T x, size_t desired_capacity, typename T::value_type e) {
{ x.reserve(desired_capacity) } -> std::same_as<void>;
{ x.capacity() } -> std::same_as<size_t>;
{ x.size() } -> std::same_as<size_t>;
};
static_assert(ContainerWithCapacity<std::vector<int>>);
/// Reserves space for at least desired_capacity - v.size() elements.
///
/// Amortizes space expansion so that a series of N calls to amortized_reserve(v, v.size() + 1)
/// starting from an empty container takes O(N) time overall.
///
/// Post-condition: v.capacity() >= desired_capacity
template <ContainerWithCapacity T>
void amortized_reserve(T& v, size_t desired_capacity) {
if (desired_capacity > v.capacity()) {
v.reserve(std::max(desired_capacity, v.capacity() * 2));
}
}

View File

@@ -123,6 +123,18 @@ namespace bloom_calculations {
}
return std::min(probs.size() - 1, size_t(v));
}
/**
* Retrieves the minimum supported bloom_filter_fp_chance value
* if compute_bloom_spec() above is attempted with bloom_filter_fp_chance
* lower than this, it will throw an unsupported_operation_exception.
*/
inline double min_supported_bloom_filter_fp_chance() {
int max_buckets = probs.size() - 1;
int max_K = probs[max_buckets].size() - 1;
return probs[max_buckets][max_K];
}
}
}

View File

@@ -126,9 +126,9 @@ private:
future<> complete() {
_b->counter.fetch_add(smp::count);
bool alive = _b->alive.load(std::memory_order_relaxed);
return smp::invoke_on_all([this, sid = this_shard_id(), alive] {
return smp::invoke_on_all([b = _b, sid = this_shard_id(), alive] {
if (this_shard_id() != sid) {
std::optional<promise<>>& w = _b->wakeup[this_shard_id()];
std::optional<promise<>>& w = b->wakeup[this_shard_id()];
if (alive) {
assert(w.has_value());
w->set_value();

View File

@@ -52,7 +52,7 @@ public:
return _what.c_str();
}
const std::error_code& code() const { return _code; }
const std::error_code& code() const noexcept { return _code; }
};
// Rethrow exception if not null

View File

@@ -1329,6 +1329,12 @@ void reclaim_timer::sample_stats(stats& data) {
}
void reclaim_timer::report() const noexcept {
// The logger can allocate (and will recover from allocation failure), and
// we're in a memory-sensitive situation here and allocation can easily fail.
// Prevent --abort-on-seastar-bad-alloc from crashing us in a situation that
// we're likely to recover from, by reclaiming more.
auto guard = memory::disable_abort_on_alloc_failure_temporarily();
auto time_level = _stall_detected ? log_level::warn : log_level::debug;
auto info_level = _stall_detected ? log_level::info : log_level::debug;
auto MiB = 1024*1024;