Compare commits

...

49 Commits

Author SHA1 Message Date
Yaron Kaikov
9da666e778 release: prepare for 5.0.rc5 2022-05-15 22:09:16 +03:00
Benny Halevy
aca355dec1 table: clear: serialize with ongoing flush
Get all flush permits to serialize with any
ongoing flushes and preventing further flushes
during table::clear, in particular calling
discard_completed_segments for every table and
clearing the memtables in clear_and_add.

Fixes #10423

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit aae532a96b)
2022-05-15 13:39:03 +03:00
Raphael S. Carvalho
efbb2efd3f compaction: LCS: don't write to disengaged optional on compaction completion
Dtest triggers the problem by:
1) creating table with LCS
2) disabling regular compaction
3) writing a few sstables
4) running maintenance compaction, e.g. cleanup

Once the maintenance compaction completes, disengaged optional _last_compacted_keys
triggers an exception in notify_completion().

_last_compacted_keys is used by regular for its round-robin file picking
policy. It stores the last compacted key for each level. Meaning it's
irrelevant for any other compaction type.

Regular compaction is responsible for initializing it when it runs for
the first time to pick files. But with it disabled, notify_completion()
will find it uninitialized, therefore resulting in bad_optional_access.

To fix this, the procedure is skipped if _last_compacted_keys is
disengaged. Regular compaction, once re-enabled, will be able to
fill _last_compacted_keys by looking at metadata of the files.

compaction_test.py::TestCompaction::test_disable_autocompaction_doesnt_
block_user_initiated_compactions[CLEANUP-LeveledCompactionStrategy]
now passes.

Fixes #10378.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #10508

(cherry picked from commit 8e99d3912e)
2022-05-15 13:20:11 +03:00
Eliran Sinvani
44dc5c4a1d Revert "table: disable_auto_compaction: stop ongoing compactions"
This reverts commit 4affa801a5.
In issue #10146 a write throughput drop of ~50% was reported, after
bisect it was found that the change that caused it was adding some
code to the table::disable_auto_compaction which stops ongoing
compactions and returning a future that resolves once all the  compaction
tasks for a table, if any, were terminated. It turns out that this function
is used only at startup (and in REST api calls which are not used in the test)
in the distributed loader just before resharding and loading of
the sstable data. It is then reanabled after the resharding and loading
is done.
For still unknown reason, adding the extra logic of stopping ongoing
compactions made the write throughput drop to 50%.
Strangely enough this extra logic **should** (still unvalidated) not
have any side effects since no compactions for a table are supposed to
be running prior to loading it.
This regains the performance but also undo a change which eventually
should get in once we find the actual culprit.

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>

Closes #10559

Reopens #9313.

(cherry picked from commit 8e8dc2c930)
2022-05-15 08:50:38 +03:00
Juliusz Stasiewicz
6b34ba3a4f CQL: Replace assert by exception on invalid auth opcode
One user observed this assertion fail, but it's an extremely rare event.
The root cause - interlacing of processing STARTUP and OPTIONS messages -
is still there, but now it's harmless enough to leave it as is.

Fixes #10487

Closes #10503

(cherry picked from commit 603dd72f9e)
2022-05-10 14:04:52 +02:00
Yaron Kaikov
f1e25cb4a6 release: prepare for 5.0.rc4 2022-05-10 07:35:53 +03:00
Benny Halevy
c9798746ae compaction: time_window_compaction_strategy: reset estimated_remaining_tasks when running out of candidates
_estimated_remaining_tasks gets updated via get_next_non_expired_sstables ->
get_compaction_candidates, but otherwise if we return earlier from
get_sstables_for_compaction, it does not get updated and may go out of sync.

Refs #10418
(to be closed when the fix reaches branch-4.6)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #10419

(cherry picked from commit 01f41630a5)
2022-05-09 09:35:53 +03:00
Eliran Sinvani
7f70ffc5ce prepared_statements: Invalidate batch statement too
It seams that batch prepared statements always return false for
depends_on, this in turn renders the removal criteria from the
prepared statements cache to always be false which result by the
queries not being evicted.
Here we change the function to return the true state meaning,
they will return true if one of the sub queries is dependant
upon the keyspace and/ or column family.

Fixes #10129

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>
(cherry picked from commit 4eb0398457)
2022-05-08 12:31:42 +03:00
Eliran Sinvani
551636ec89 cql3 statements: Change dependency test API to express better it's
purpose

Cql statements used to have two API functions, depends_on_keyspace and
depends_on_column_family. The former, took as a parameter only a table
name, which makes no sense. There could be multiple tables with the same
name each in a different keyspace and it doesn't make sense to
generalize the test - i.e to ask "Does a statement depend on any table
named XXX?"
In this change we unify the two calls to one - depends on that takes a
keyspace name and optionally also a table name, that way every logical
dependency tests that makes sense is supported by a single API call.

(cherry picked from commit bf50dbd35b)

Ref #10129
2022-05-08 12:31:02 +03:00
Raphael S. Carvalho
e1130a01e7 table: Close reader if flush fails to peek into fragment
An OOM failure while peeking into fragment, to determine if reader will
produce any fragment, causes Scylla to abort as flat_mutation_reader
expects reader to be closed before destroyed. Let's close it if
peek() fails, to handle the scenario more gracefully.

Fixes #10027.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20220204031553.124848-1-raphaelsc@scylladb.com>
(cherry picked from commit 755cec1199)
2022-05-08 12:16:15 +03:00
Calle Wilund
b0233cb7c5 cdc: Ensure columns removed from log table are registered as dropped
If we are redefining the log table, we need to ensure any dropped
columns are registered in "dropped_columns" table, otherwise clients will not
be able to read data older than now.
Includes unit test.

Should probably be backported to all CDC enabled versions.

Fixes #10473
Closes #10474

(cherry picked from commit 78350a7e1b)
2022-05-05 11:38:18 +02:00
Avi Kivity
e480c5bf4d Merge 'loading_cache: force minimum size of unprivileged ' from Piotr Grabowski
This series enforces a minimum size of the unprivileged section when
performing `shrink()` operation.

When the cache is shrunk, we still drop entries first from unprivileged
section (as before this commit), however, if this section is already small
(smaller than `max_size / 2`), we will drop entries from the privileged
section.

This is necessary, as before this change the unprivileged section could
be starved. For example if the cache could store at most 50 entries and
there are 49 entries in privileged section, after adding 5 entries (that would
go to unprivileged section) 4 of them would get evicted and only the 5th one
would stay. This caused problems with BATCH statements where all
prepared statements in the batch have to stay in cache at the same time
for the batch to correctly execute.

To correctly check if the unprivileged section might get too small after
dropping an entry, `_current_size` variable, which tracked the overall size
of cache, is changed to two variables: `_unprivileged_section_size` and
`_privileged_section_size`, tracking section sizes separately.

New tests are added to check this new behavior and bookkeeping of the section
sizes. A test is added, that sets up a CQL environment with a very small
prepared statement cache, reproduces issue in #10440 and stresses the cache.

Fixes #10440.

Closes #10456

* github.com:scylladb/scylla:
  loading_cache_test: test prepared stmts cache
  loading_cache: force minimum size of unprivileged
  loading_cache: extract dropping entries to lambdas
  loading_cache: separately track size of sections
  loading_cache: fix typo in 'privileged'

(cherry picked from commit 5169ce40ef)
2022-05-04 14:35:53 +03:00
Tomasz Grabiec
7d90f7e93f loading_cache: Make invalidation take immediate effect
There are two issues with current implementation of remove/remove_if:

  1) If it happens concurrently with get_ptr(), the latter may still
  populate the cache using value obtained from before remove() was
  called. remove() is used to invalidate caches, e.g. the prepared
  statements cache, and the expected semantic is that values
  calculated from before remove() should not be present in the cache
  after invalidation.

  2) As long as there is any active pointer to the cached value
  (obtained by get_ptr()), the old value from before remove() will be
  still accessible and returned by get_ptr(). This can make remove()
  have no effect indefinitely if there is persistent use of the cache.

One of the user-perceived effects of this bug is that some prepared
statements may not get invalidated after a schema change and still use
the old schema (until next invalidation). If the schema change was
modifying UDT, this can cause statement execution failures. CQL
coordinator will try to interpret bound values using old set of
fields. If the driver uses the new schema, the coordinaotr will fail
to process the value with the following exception:

  User Defined Type value contained too many fields (expected 5, got 6)

The patch fixes the problem by making remove()/remove_if() erase old
entries from _loading_values immediately.

The predicate-based remove_if() variant has to also invalidate values
which are concurrently loading to be safe. The predicate cannot be
avaluated on values which are not ready. This may invalidate some
values unnecessarily, but I think it's fine.

Fixes #10117

Message-Id: <20220309135902.261734-1-tgrabiec@scylladb.com>
(cherry picked from commit 8fa704972f)
2022-05-04 14:35:37 +03:00
Avi Kivity
3e6e8579c6 loading_cache: fix indentation of timestamped_val and two nested type aliases
timestamped_val (and two other type aliases) are nested inside loading_cache,
but indented as if they were top-level names. Adjust the indent to
avoid confusion.

Closes #10118

(cherry picked from commit d1a394fd97)

Ref #10117 - backport prerequisite
2022-05-04 14:35:15 +03:00
Avi Kivity
3e98e17d18 Merge 'replica/database: drop_column_family(): properly cleanup stale querier cache entries' from Botond Dénes
Said method has to evict all querier cache entries, belonging to the to-be-dropped table. This is already the case, but there was a window where new entries could sneak in, causing a stale reference to the table to be de-referenced later when they are evicted due to TTL. This window is now closed, the entries are evicted after the method has waited for all ongoing operations on said table to stop.

Fixes: #10450

Closes #10451

* github.com:scylladb/scylla:
  replica/database: drop_column_family(): drop querier cache entries after waiting for ops
  replica/database: finish coroutinizing drop_column_family()
  replica/database: make remove(const column_family&) private

(cherry picked from commit 7f1e368e92)
2022-05-01 17:22:57 +03:00
Avi Kivity
a214f8cf6e Update tools/java submodule (bad IPv6 addresses in nodetool)
* tools/java b1e09c8b8f...2241a63bda (1):
  > CASSANDRA-17581 fix NodeProbe: Malformed IPv6 address at index

Fixes #10442
2022-04-28 11:33:15 +03:00
Benny Halevy
e8b92fe34d replica: distributed_database: populate_column_family: trigger offstrategy compaction only for the base directory
In https://github.com/scylladb/scylla/issues/10218
we see off-strategy compaction happening on a table
during the initial phases of
`distributed_loader::populate_column_family`.

It is caused by triggering offtrategy compaction
too early, when sstables are populated from the staging
directory in a144d30162.

We need to trigger offstrategy compaction only of the base
table directory, never the staging or quarantine dirs.

Fixes #10218

Test: unit(dev)
DTest: materialized_views_test.py::TestInterruptBuildProcess

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220316152812.3344634-1-bhalevy@scylladb.com>
(cherry picked from commit a1d0f089c8)
2022-04-24 17:38:53 +03:00
Nadav Har'El
fa479c84ac config: fix some types in system.config virtual table
The system.config virtual tables prints each configuration variable of
type T based on the JSON printer specified in the config_type_for<T>
in db/config.cc.

For two variable types - experimental_features and tri_mode_restriction,
the specified converter was wrong: We used value_to_json<string> or
value_to_json<vector<string>> on something which was *not* a string.
Unfortunately, value_to_json silently casted the given objects into
strings, and the result was garbage: For example as noted in #10047,
for experimental_features instead of printing a list of features *names*,
e.g., "raft", we got a bizarre list of one-byte strings with each feature's
number (which isn't documented or even guaranteed to not change) as well
as carriage-return characters (!?).

So solution is a new printable_to_json<T> which works on a type T that
can be printed with operator<< - as in fact the above two types can -
and the type is converted into a string or vector of strings using this
operator<<, not a cast.

Also added a cql-pytest test for reading system.config and in particular
options of the above two types - checking that they contain sensible
strings and not "garbage" like before this patch.

Fixes #10047.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220209090421.298849-1-nyh@scylladb.com>
(cherry picked from commit fef7934a2d)
2022-04-14 19:29:08 +03:00
Tomasz Grabiec
40c26dd2c5 utils/chunked_managed_vector: Fix sigsegv during reserve()
Fixes the case of make_room() invoked with last_chunk_capacity_deficit
but _size not in the last reserved chunk.

Found during code review, no user impact.

Fixes #10364.

Message-Id: <20220411224741.644113-1-tgrabiec@scylladb.com>
(cherry picked from commit 0c365818c3)
2022-04-13 09:48:34 +03:00
Tomasz Grabiec
2c6f069fd1 utils/chunked_vector: Fix sigsegv during reserve()
Fixes the case of make_room() invoked with last_chunk_capacity_deficit
but _size not in the last reserved chunk.

Found during code review, no known user impact.

Fixes #10363.

Message-Id: <20220411222605.641614-1-tgrabiec@scylladb.com>
(cherry picked from commit 01eeb33c6e)
2022-04-13 09:47:24 +03:00
Avi Kivity
e27dff0c50 transport: return correct error codes when downgrading v4 {WRITE,READ}_FAILURE to {WRITE,READ}_TIMEOUT
Protocol v4 added WRITE_FAILURE and READ_FAILURE. When running under v3
we downgrade these exceptions to WRITE_TIMEOUT and READ_TIMEOUT (since
the client won't understand the v4 errors), but we still send the new
error codes. This causes the client to become confused.

Fix by updating the error codes.

A better fix is to move the error code from the constructor parameter
list and hard-code it in the constructor, but that is left for a follow-up
after this minimal fix.

Fixes #5610.

Closes #10362

(cherry picked from commit 987e6533d2)
2022-04-13 09:47:24 +03:00
Tomasz Grabiec
3f03260ffb utils/chunked_managed_vector: Fix corruption in case there is more than one chunk
If reserve() allocates more than one chunk, push_back() should not
work with the last chunk. This can result in items being pushed to the
wrong chunk, breaking internal invariants.

Also, pop_back() should not work with the last chunk. This breaks when
there is more than one chunk.

Currently, the container is only used in the sstable partition index
cache.

Manifests by crashes in sstable reader which touch sstables which have
partition index pages with more than 1638 partition entries.

Introduced in 78e5b9fd85 (4.6.0)

Fixes #10290

Message-Id: <20220407174023.527059-1-tgrabiec@scylladb.com>
(cherry picked from commit 41fe01ecff)
2022-04-08 10:53:33 +03:00
Takuya ASADA
1315135fca docker: enable --log-to-stdout which mistakenly disabled
Since our Docker image moved to Ubuntu, we mistakenly copy
dist/docker/etc/sysconfig/scylla-server to /etc/sysconfig, which is not
used in Ubuntu (it should be /etc/default).
So /etc/default/scylla-server is just default configuration of
scylla-server .deb package, --log-to-stdout is 0, same as normal installation.

We don't want keep the duplicated configuration file anyway,
so let's drop dist/docker/etc/sysconfig/scylla-server and configure
/etc/default/scylla-server in build_docker.sh.

Fixes #10270

Closes #10280

(cherry picked from commit bdefea7c82)
2022-04-07 12:13:19 +03:00
Yaron Kaikov
f92622e0de release: prepare for 5.0.rc3 2022-04-06 14:31:03 +03:00
Takuya ASADA
3bca608db5 docker: run scylla as root
Previous versions of Docker image runs scylla as root, but cb19048
accidently modified it to scylla user.
To keep compatibility we need to revert this to root.

Fixes #10261

Closes #10325

(cherry picked from commit f95a531407)
2022-04-05 12:46:25 +03:00
Takuya ASADA
a93b72d5dd docker: revert scylla-server.conf service name change
We changed supervisor service name at cb19048, but this breaks
compatibility with scylla-operator.
To fix the issue we need to revert the service name to previous one.

Fixes #10269

Closes #10323

(cherry picked from commit 41edc045d9)
2022-04-05 12:40:59 +03:00
Benny Halevy
d58ca2edbd range_tombstone_list: insert_from: correct rev.update range_tombstone in not overlapping case
2nd std::move(start) looks like a typo
in fe2fa3f20d.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220404124741.1775076-1-bhalevy@scylladb.com>
(cherry picked from commit 2d80057617)

Fixes ##10326
2022-04-05 12:39:13 +03:00
Alexey Kartashov
75740ace2a dist/docker: fix incorrect locale value
Docker build script contains an incorrect locale specification for LC_ALL setting,
this commit fixes that.

Fixes #10310

Closes #10321

(cherry picked from commit d86c3a8061)
2022-04-04 12:51:02 +03:00
Piotr Sarna
d7a1bf6331 cql3: fix qualifying restrictions with IN for indexing
When a query contains IN restriction on its partition key,
it's currently not eligible for indexing. It was however
erroneously qualified as such, which lead to fetching incorrect
results. This commit fixes the issue by not allowing such queries
to undergo indexing, and comes with a regression test.

Fixes #10300

Closes #10302

(cherry picked from commit c0fd53a9d7)
2022-04-03 11:20:49 +03:00
Avi Kivity
bbd7d657cc Update seastar submodule (pidof command not installed)
* seastar 1c0d622ba0...4a30c44c4c (1):
  > seastar-cpu-map.sh: switch from pidof to pgrep
Fixes #10238.
2022-03-29 12:36:06 +03:00
Avi Kivity
f5bf4c81d1 Merge 'replica/database: truncate: temporarily disable compaction on table and views before flush' from Benny Halevy
Flushing the base table triggers view building
and corresponding compactions on the view tables.

Temporarily disable compaction on both the base
table and all its view before flush and snapshot
since those flushed sstables are about to be truncated
anyway right after the snapshot is taken.

This should make truncate go faster.

In the process, this series also embeds `database::truncate_views`
into `truncate` and coroutinizes both

Refs #6309

Test: unit(dev)

Closes #10203

* github.com:scylladb/scylla:
  replica/database: truncate: fixup indentation
  replica/database: truncate: temporarily disable compaction on table and views before flush
  replica/database: truncate: coroutinize per-view logic
  replica/database: open-code truncate_view in truncate
  replica/database: truncate: coroutinize run_with_compaction_disabled lambda
  replica/database: coroutinize truncate
  compaction_manager: add disable_compaction method

(cherry picked from commit aab052c0d5)
2022-03-28 15:40:40 +03:00
Benny Halevy
02e8336659 atomic_cell: compare_atomic_cell_for_merge: compare ttl if expiry is equal
Following up on a57c087c89,
compare_atomic_cell_for_merge should compare the ttl value in the
reverse order since, when comparing two cells that are identical
in all attributes but their ttl, we want to keep the cell with the
smaller ttl value rather than the larger ttl, since it was written
at a later (wall-clock) time, and so would remain longer after it
expires, until purged after gc_grace seconds.

Fixes #10173

Test: mutation_test.test_cell_ordering, unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302154328.2400717-1-bhalevy@scylladb.com>
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220306091913.106508-1-bhalevy@scylladb.com>
(cherry picked from commit a085ef74ff)
2022-03-24 18:00:11 +02:00
Benny Halevy
601812e11b atomic_cell: compare_atomic_cell_for_merge: compare ttl if expiry is equal
Unlike atomic_cell_or_collection::equals, compare_atomic_cell_for_merge
currently returns std::strong_ordering::equal if two cells are equal in
every way except their ttl:s.

The problem with that is that the cells' hashes are different and this
will cause repair to keep trying to repair discrepancies caused by the
ttl being different.

This may be triggered by e.g. the spark migrator that computes the ttl
based on the expiry time by subtracting the expiry time from the current
time to produce a respective ttl.

If the cell is migrated multiple times at different times, it will generate
cells that the same expiry (by design) but have different ttl values.

Fixes #10156

Test: mutation_test.test_cell_ordering, unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302154328.2400717-1-bhalevy@scylladb.com>
(cherry picked from commit a57c087c89)
2022-03-24 18:00:11 +02:00
Benny Halevy
ea466320d2 atomic_cell: compare_atomic_cell_for_merge: fixup indentation
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302113833.2308533-2-bhalevy@scylladb.com>
(cherry picked from commit d43da5d6dc)
2022-03-24 18:00:11 +02:00
Benny Halevy
25ea831a15 atomic_cell: compare_atomic_cell_for_merge: simplify expiry/deltion_time comparison
No need to check first the the cells' expiry is different
or that deletion_time is different before comparing them
with `<=>`.

If they are the same the function returns std::strong_ordering::equal
anyhow and that is the same as `<=>` comparing identical values.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220302113833.2308533-1-bhalevy@scylladb.com>
(cherry picked from commit be865a29b8)
2022-03-24 18:00:11 +02:00
Benny Halevy
8648c79c9e main: shutdown: do not abort on certain system errors
Currently any unhandled error during deferred shutdown
is rethrown in a noexcept context (in ~deferred_action),
generating a core dump.

The core dump is not helpful if the cause of the
error is "environmental", i.e. in the system, rather
than in scylla itself.

This change detects several such errors and calls
_Exit(255) to exit the process early, without leaving
a coredump behind.  Otherwise, call abort() explicitly,
rather than letting terminate() be called implicitly
by the destructor exception handling code.

Fixes #9573

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220227101054.1294368-1-bhalevy@scylladb.com>
(cherry picked from commit 132c9d5933)
2022-03-24 14:48:52 +02:00
Nadav Har'El
7ae4d0e6f8 Seastar: backport Seastar fix for missing scring escape in JSON output
Backported Seastar fix:
  > Merge 'json/formatter: Escape strings' from Juliusz Stasiewicz

Fixes #9061

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
2022-03-23 20:29:50 +02:00
Piotr Sarna
f3564db941 expression: fix get_value for mismatched column definitions
As observed in #10026, after schema changes it somehow happened
that a column defition that does not match any of the base table
columns was passed to expression verification code.
The function that looks up the index of a column happens to return
-1 when it doesn't find anything, so using this returned index
without checking if it's nonnegative results in accessing invalid
vector data, and a segfault or silent memory corruption.
Therefore, an explicit check is added to see if the column was actually
found. This serves two purposes:
 - avoiding segfaults/memory corruption
 - making it easier to investigate the root cause of #10026

Closes #10039

(cherry picked from commit 7b364fec9849e9a342af1c240e3a7185bf5401ef)
2022-03-21 10:37:48 +01:00
Pavel Emelyanov
97caf12836 Update seastar submodule (IO preemption overlap)
* seastar 47573503...8ef87d48 (3):
  > io_queue: Don't let preemption overlap requests
  > io_queue: Pending needs to keep capacity instead of ticket
  > io_queue: Extend grab_capacity() return codes

Fixes #10233
2022-03-17 11:26:38 +03:00
Yaron Kaikov
839d9ef41a release: prepare for 5.0.rc2 2022-03-16 14:35:52 +02:00
Benny Halevy
782bd50f92 compaction_manager: rewrite_sstables: do not acquire table write lock
Since regular compaction may run in parallel no lock
is required per-table.

We still acquire a read lock in this patch, for backporting
purposes, in case the branch doesn't contain
6737c88045.
But it can be removed entirely in master in a follow-up patch.

This should solve some of the slowness in cleanup compaction (and
likely in upgrade sstables seen in #10060, and
possibly #10166.

Fixes #10175

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #10177

(cherry picked from commit 11ea2ffc3c)
2022-03-14 13:13:48 +02:00
Avi Kivity
0a4d971b4a Merge 'utils: cached_file: Fix alloc-dealloc mismatch during eviction' from Tomasz Grabiec
cached_page::on_evicted() is invoked in the LSA allocator context, set in the
reclaimer callback installed by the cache_tracker. However,
cached_pages are allocated in the standard allocator context (note:
page content is allocated inside LSA via lsa_buffer). The LSA region
will happily deallocate these, thinking that they these are large
objects which were delegated to the standard allocator. But the
_non_lsa_memory_in_use metric will underflow. When it underflows
enough, shard_segment_pool.total_memory() will become 0 and memory
reclamation will stop doing anything, leading to apparent OOM.

The fix is to switch to the standard allocator context inside
cached_page::on_evicted(). evict_range() was also given the same
treatment as a precaution, it currently is only invoked in the
standard allocator context.

The series also adds two safety checks to LSA to catch such problems earlier.

Fixes #10056

\cc @slivne @bhalevy

Closes #10130

* github.com:scylladb/scylla:
  lsa: Abort when trying to free a standard allocator object not allocated through the region
  lsa: Abort when _non_lsa_memory_in_use goes negative
  tests: utils: cached_file: Validate occupancy after eviction
  test: sstable_partition_index_cache_test: Fix alloc-dealloc mismatch
  utils: cached_file: Fix alloc-dealloc mismatch during eviction

(cherry picked from commit ff2cd72766)
2022-02-26 11:28:36 +02:00
Benny Halevy
22562f767f cql3: result_set: remove std::ref from comperator&
Applying std::ref on `RowComparator& cmp` hits the
following compilation error on Fedora 34 with
libstdc++-devel-11.2.1-9.fc34.x86_64

```
FAILED: build/dev/cql3/statements/select_statement.o
clang++ -MD -MT build/dev/cql3/statements/select_statement.o -MF build/dev/cql3/statements/select_statement.o.d -I/home/bhalevy/dev/scylla/seastar/include -I/home/bhalevy/dev/scylla/build/dev/seastar/gen/include -std=gnu++20 -U_FORTIFY_SOURCE -DSEASTAR_SSTRING -Werror=unused-result -fstack-clash-protection -DSEASTAR_API_LEVEL=6 -DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSEASTAR_SCHEDULING_GROUPS_COUNT=16 -DSEASTAR_TYPE_ERASE_MORE -DFMT_LOCALE -DFMT_SHARED -I/usr/include/p11-kit-1  -DDEVEL -DSEASTAR_ENABLE_ALLOC_FAILURE_INJECTION -DSCYLLA_ENABLE_ERROR_INJECTION -O2 -DSCYLLA_ENABLE_WASMTIME -iquote. -iquote build/dev/gen --std=gnu++20  -ffile-prefix-map=/home/bhalevy/dev/scylla=.  -march=westmere -DBOOST_TEST_DYN_LINK   -Iabseil -fvisibility=hidden  -Wall -Werror -Wno-mismatched-tags -Wno-tautological-compare -Wno-parentheses-equality -Wno-c++11-narrowing -Wno-sometimes-uninitialized -Wno-return-stack-address -Wno-missing-braces -Wno-unused-lambda-capture -Wno-overflow -Wno-noexcept-type -Wno-error=cpp -Wno-ignored-attributes -Wno-overloaded-virtual -Wno-unused-command-line-argument -Wno-defaulted-function-deleted -Wno-redeclared-class-member -Wno-unsupported-friend -Wno-unused-variable -Wno-delete-non-abstract-non-virtual-dtor -Wno-braced-scalar-init -Wno-implicit-int-float-conversion -Wno-delete-abstract-non-virtual-dtor -Wno-uninitialized-const-reference -Wno-psabi -Wno-narrowing -Wno-array-bounds -Wno-nonnull -Wno-error=deprecated-declarations -DXXH_PRIVATE_API -DSEASTAR_TESTING_MAIN -DHAVE_LZ4_COMPRESS_DEFAULT  -c -o build/dev/cql3/statements/select_statement.o cql3/statements/select_statement.cc
In file included from cql3/statements/select_statement.cc:14:
In file included from ./cql3/statements/select_statement.hh:16:
In file included from ./cql3/statements/raw/select_statement.hh:16:
In file included from ./cql3/statements/raw/cf_statement.hh:16:
In file included from ./cql3/cf_name.hh:16:
In file included from ./cql3/keyspace_element_name.hh:16:
In file included from /home/bhalevy/dev/scylla/seastar/include/seastar/core/sstring.hh:25:
In file included from /usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/algorithm:74:
In file included from /usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/pstl/glue_algorithm_defs.h:13:
In file included from /usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/functional:58:
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/refwrap.h:319:40: error: exception specification of 'function<__gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>, void>' uses itself
                = decltype(reference_wrapper::_S_fun(std::declval<_Up>()))>
                                                     ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/refwrap.h:319:40: note: in instantiation of exception specification for 'function<__gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>, void>' requested here
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/refwrap.h:321:2: note: in instantiation of default argument for 'reference_wrapper<__gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>, void>' required here
        reference_wrapper(_Up&& __uref)
        ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/type_traits:1017:57: note: while substituting deduced template arguments into function template 'reference_wrapper' [with _Up = __gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>, $1 = (no value), $2 = (no value)]
      = __bool_constant<__is_nothrow_constructible(_Tp, _Args...)>;
                                                        ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/type_traits:1023:14: note: in instantiation of template type alias '__is_nothrow_constructible_impl' requested here
    : public __is_nothrow_constructible_impl<_Tp, _Args...>::type
             ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/type_traits:153:14: note: in instantiation of template class 'std::is_nothrow_constructible<__gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>, __gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>>' requested here
    : public conditional<_B1::value, _B2, _B1>::type
             ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/std_function.h:298:11: note: (skipping 8 contexts in backtrace; use -ftemplate-backtrace-limit=0 to see all)
          return __and_<typename _Base::_Local_storage,
                 ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/stl_algo.h:1933:13: note: in instantiation of function template specialization 'std::__partial_sort<utils::chunked_vector<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>, 131072>::iterator_type<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>>, __gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>>' requested here
              std::__partial_sort(__first, __last, __last, __comp);
                   ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/stl_algo.h:1954:9: note: in instantiation of function template specialization 'std::__introsort_loop<utils::chunked_vector<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>, 131072>::iterator_type<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>>, long, __gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>>' requested here
          std::__introsort_loop(__first, __last,
               ^
/usr/lib/gcc/x86_64-redhat-linux/11/../../../../include/c++/11/bits/stl_algo.h:4875:12: note: in instantiation of function template specialization 'std::__sort<utils::chunked_vector<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>, 131072>::iterator_type<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>>, __gnu_cxx::__ops::_Iter_comp_iter<std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>>' requested here
      std::__sort(__first, __last, __gnu_cxx::__ops::__iter_comp_iter(__comp));
           ^
./cql3/result_set.hh:168:14: note: in instantiation of function template specialization 'std::sort<utils::chunked_vector<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>, 131072>::iterator_type<std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>>>, std::reference_wrapper<const std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>>' requested here
        std::sort(_rows.begin(), _rows.end(), std::ref(cmp));
             ^
cql3/statements/select_statement.cc:773:21: note: in instantiation of function template specialization 'cql3::result_set::sort<std::function<bool (const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &, const std::vector<std::optional<seastar::basic_sstring<signed char, unsigned int, 31, false>>> &)>>' requested here
                rs->sort(_ordering_comparator);
                    ^
1 error generated.
ninja: build stopped: subcommand failed.
```

Fixes #10079.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Message-Id: <20220215071955.316895-3-bhalevy@scylladb.com>
(cherry picked from commit 3e20fee070)

[avi: backport for developer quality-of-life rather than as a bug fix]
2022-02-16 10:07:11 +02:00
Raphael S. Carvalho
eb80dd1db5 Revert "sstables/compaction_manager: rewrite_sstables(): resolve maintenance group FIXME"
This reverts commit 4c05e5f966.

Moving cleanup to maintenance group made its operation time up to
10x slower than previous release. It's a blocker to 4.6 release,
so let's revert it until we figure this all out.

Probably this happens because maintenance group is fixed at a
relatively small constant, and cleanup may be incrementally
generating backlog for regular compaction, where the former is
fighting for resources against the latter.

Fixes #10060.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20220213184306.91585-1-raphaelsc@scylladb.com>
(cherry picked from commit a9427f150a)
2022-02-14 18:05:43 +02:00
Avi Kivity
51d699ee21 Update seastar submodule (overzealous log silencer)
* seastar 0d250d15ac...47573503cd (1):
  > log: Fix silencer to be shard-local and logger-global
Fixes #9784.
2022-02-14 17:54:54 +02:00
Avi Kivity
83a33bff8c Point seastar submodule at scylla-seastar.git
This allows us to backport Seastar fixes to this branch.
2022-02-14 17:54:16 +02:00
Nadav Har'El
273563b9ad alternator: allow REMOVE of non-existent nested attribute
DynamoDB allows an UpdateItem operation "REMOVE x.y" when a map x
exists in the item, but x.y doesn't - the removal silently does
nothing. Alternator incorrectly generated an error in this case,
and unfortunately we didn't have a test for this case.

So in this patch we add the missing test (which fails on Alternator
before this patch - and passes on DynamoDB) and then fix the behavior.
After this patch, "REMOVE x.y" will remain an error if "x" doesn't
exist (saying "document paths not valid for this item"), but if "x"
exists and is a map, but "x.y" doesn't, the removal will silently
do nothing and will not be an error.

Fixes #10043.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20220207133652.181994-1-nyh@scylladb.com>
(cherry picked from commit 9982a28007)
2022-02-08 11:37:31 +02:00
Yaron Kaikov
891990ec09 release: prepare for 5.0.rc1 2022-02-06 16:41:05 +02:00
Yaron Kaikov
da0cd2b107 release: prepare for 5.0.rc0 2022-02-03 08:10:30 +02:00
68 changed files with 1180 additions and 394 deletions

2
.gitmodules vendored
View File

@@ -1,6 +1,6 @@
[submodule "seastar"]
path = seastar
url = ../seastar
url = ../scylla-seastar
ignore = dirty
[submodule "swagger-ui"]
path = swagger-ui

View File

@@ -60,7 +60,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=5.0.dev
VERSION=5.0.rc5
if test -f version
then

View File

@@ -2577,8 +2577,8 @@ static bool hierarchy_actions(
// attr member so we can use add()
rjson::add_with_string_name(v, attr, std::move(*newv));
} else {
throw api_error::validation(format("Can't remove document path {} - not present in item",
subh.get_value()._path));
// Removing a.b when a is a map but a.b doesn't exist
// is silently ignored. It's not considered an error.
}
} else {
throw api_error::validation(format("UpdateExpression: document paths not valid for this item:{}", h));

View File

@@ -87,19 +87,24 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
// prefer expiring cells.
return left.is_live_and_has_ttl() ? std::strong_ordering::greater : std::strong_ordering::less;
}
if (left.is_live_and_has_ttl() && left.expiry() != right.expiry()) {
return left.expiry() <=> right.expiry();
if (left.is_live_and_has_ttl()) {
if (left.expiry() != right.expiry()) {
return left.expiry() <=> right.expiry();
} else {
// prefer the cell that was written later,
// so it survives longer after it expires, until purged.
return right.ttl() <=> left.ttl();
}
}
} else {
// Both are deleted
if (left.deletion_time() != right.deletion_time()) {
// Origin compares big-endian serialized deletion time. That's because it
// delegates to AbstractCell.reconcile() which compares values after
// comparing timestamps, which in case of deleted cells will hold
// serialized expiry.
return (uint64_t) left.deletion_time().time_since_epoch().count()
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
}
// Origin compares big-endian serialized deletion time. That's because it
// delegates to AbstractCell.reconcile() which compares values after
// comparing timestamps, which in case of deleted cells will hold
// serialized expiry.
return (uint64_t) left.deletion_time().time_since_epoch().count()
<=> (uint64_t) right.deletion_time().time_since_epoch().count();
}
return std::strong_ordering::equal;
}

View File

@@ -59,7 +59,7 @@ using namespace std::chrono_literals;
logging::logger cdc_log("cdc");
namespace cdc {
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {});
static schema_ptr create_log_schema(const schema&, std::optional<utils::UUID> = {}, schema_ptr = nullptr);
}
static constexpr auto cdc_group_name = "cdc";
@@ -206,7 +206,7 @@ public:
return;
}
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt);
auto new_log_schema = create_log_schema(new_schema, log_schema ? std::make_optional(log_schema->id()) : std::nullopt, log_schema);
auto log_mut = log_schema
? db::schema_tables::make_update_table_mutations(db, keyspace.metadata(), log_schema, new_log_schema, timestamp, false)
@@ -484,7 +484,7 @@ bytes log_data_column_deleted_elements_name_bytes(const bytes& column_name) {
return to_bytes(cdc_deleted_elements_column_prefix) + column_name;
}
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid) {
static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID> uuid, schema_ptr old) {
schema_builder b(s.ks_name(), log_name(s.cf_name()));
b.with_partitioner("com.scylladb.dht.CDCPartitioner");
b.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
@@ -571,6 +571,20 @@ static schema_ptr create_log_schema(const schema& s, std::optional<utils::UUID>
b.set_uuid(*uuid);
}
/**
* #10473 - if we are redefining the log table, we need to ensure any dropped
* columns are registered in "dropped_columns" table, otherwise clients will not
* be able to read data older than now.
*/
if (old) {
// not super efficient, but we don't do this often.
for (auto& col : old->all_columns()) {
if (!b.has_column({col.name(), col.name_as_text() })) {
b.without_column(col.name_as_text(), col.type, api::new_timestamp());
}
}
}
return b.build();
}

View File

@@ -353,32 +353,50 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact
return task->compaction_done.get_future().then([task] {});
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t)
: _cm(cm)
, _table(t)
, _compaction_state(cm.get_compaction_state(_table))
, _holder(_compaction_state.gate.hold())
{
_compaction_state.compaction_disabled_counter++;
cmlog.debug("Temporarily disabled compaction for {}.{}. compaction_disabled_counter={}",
_table->schema()->ks_name(), _table->schema()->cf_name(), _compaction_state.compaction_disabled_counter);
}
compaction_manager::compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept
: _cm(o._cm)
, _table(std::exchange(o._table, nullptr))
, _compaction_state(o._compaction_state)
, _holder(std::move(o._holder))
{}
compaction_manager::compaction_reenabler::~compaction_reenabler() {
// submit compaction request if we're the last holder of the gate which is still opened.
if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) {
cmlog.debug("Reenabling compaction for {}.{}",
_table->schema()->ks_name(), _table->schema()->cf_name());
try {
_cm.submit(_table);
} catch (...) {
cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}",
_table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception());
}
}
}
future<compaction_manager::compaction_reenabler>
compaction_manager::stop_and_disable_compaction(replica::table* t) {
compaction_reenabler cre(*this, t);
co_await stop_ongoing_compactions("user-triggered operation", t);
co_return cre;
}
future<>
compaction_manager::run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func) {
auto& c_state = _compaction_state[t];
auto holder = c_state.gate.hold();
compaction_reenabler cre = co_await stop_and_disable_compaction(t);
c_state.compaction_disabled_counter++;
std::exception_ptr err;
try {
co_await stop_ongoing_compactions("user-triggered operation", t);
co_await func();
} catch (...) {
err = std::current_exception();
}
#ifdef DEBUG
assert(_compaction_state.contains(t));
#endif
// submit compaction request if we're the last holder of the gate which is still opened.
if (--c_state.compaction_disabled_counter == 0 && !c_state.gate.is_closed()) {
submit(t);
}
if (err) {
std::rethrow_exception(err);
}
co_return;
co_await func();
}
void compaction_manager::task::setup_new_compaction() {
@@ -810,7 +828,8 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
auto sstable_level = sst->get_sstable_level();
auto run_identifier = sst->run_identifier();
auto sstable_set_snapshot = can_purge ? std::make_optional(t.get_sstable_set()) : std::nullopt;
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), _maintenance_sg.io,
// FIXME: this compaction should run with maintenance priority.
auto descriptor = sstables::compaction_descriptor({ sst }, std::move(sstable_set_snapshot), service::get_local_compaction_priority(),
sstable_level, sstables::compaction_descriptor::default_max_sstable_bytes, run_identifier, options);
// Releases reference to cleaned sstable such that respective used disk space can be freed.
@@ -819,8 +838,9 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
};
auto maintenance_permit = co_await seastar::get_units(_maintenance_ops_sem, 1);
// Take write lock for table to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
auto write_lock_holder = co_await _compaction_state[&t].lock.hold_write_lock();
// FIXME: acquiring the read lock is not needed after acquiring the _maintenance_ops_sem
// only major compaction needs to acquire the write lock to synchronize with regular compaction.
auto lock_holder = co_await _compaction_state[&t].lock.hold_read_lock();
_stats.pending_tasks--;
_stats.active_tasks++;
@@ -852,7 +872,7 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa
};
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200), _available_memory));
completed = co_await with_scheduling_group(_maintenance_sg.cpu, std::ref(perform_rewrite));
completed = co_await with_scheduling_group(_compaction_controller.sg(), std::ref(perform_rewrite));
} while (!completed);
};

View File

@@ -269,6 +269,31 @@ public:
// parameter job is a function that will carry the operation
future<> run_custom_job(replica::table* t, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
class compaction_reenabler {
compaction_manager& _cm;
replica::table* _table;
compaction_state& _compaction_state;
gate::holder _holder;
public:
compaction_reenabler(compaction_manager&, replica::table*);
compaction_reenabler(compaction_reenabler&&) noexcept;
~compaction_reenabler();
replica::table* compacting_table() const noexcept {
return _table;
}
const compaction_state& compaction_state() const noexcept {
return _compaction_state;
}
};
// Disable compaction temporarily for a table t.
// Caller should call the compaction_reenabler::reenable
future<compaction_reenabler> stop_and_disable_compaction(replica::table* t);
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(replica::table* t, std::function<future<> ()> func);

View File

@@ -69,7 +69,11 @@ compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(tabl
}
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
if (removed.empty() || added.empty() || !_last_compacted_keys) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();

View File

@@ -217,6 +217,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(table_state& table_
auto compaction_time = gc_clock::now();
if (candidates.empty()) {
_estimated_remaining_tasks = 0;
return compaction_descriptor();
}

View File

@@ -81,9 +81,7 @@ public:
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const = 0;
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const = 0;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
virtual seastar::shared_ptr<const metadata> get_result_metadata() const = 0;

View File

@@ -103,7 +103,13 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
if (!col_type->is_map()) {
throw exceptions::invalid_request_exception(format("subscripting non-map column {}", cdef->name_as_text()));
}
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[data.sel.index_of(*cdef)]));
int32_t index = data.sel.index_of(*cdef);
if (index == -1) {
throw std::runtime_error(
format("Column definition {} does not match any column in the query selection",
cdef->name_as_text()));
}
const auto deserialized = cdef->type->deserialize(managed_bytes_view(*data.other_columns[index]));
const auto& data_map = value_cast<map_type_impl::native_type>(deserialized);
const auto key = evaluate(*col.sub, options);
auto&& key_type = col_type->name_comparator();
@@ -121,8 +127,16 @@ managed_bytes_opt get_value(const column_value& col, const column_value_eval_bag
case column_kind::clustering_key:
return managed_bytes(data.clustering_key[cdef->id]);
case column_kind::static_column:
case column_kind::regular_column:
return managed_bytes_opt(data.other_columns[data.sel.index_of(*cdef)]);
[[fallthrough]];
case column_kind::regular_column: {
int32_t index = data.sel.index_of(*cdef);
if (index == -1) {
throw std::runtime_error(
format("Column definition {} does not match any column in the query selection",
cdef->name_as_text()));
}
return managed_bytes_opt(data.other_columns[index]);
}
default:
throw exceptions::unsupported_operation_exception("Unknown column kind");
}

View File

@@ -953,7 +953,7 @@ bool query_processor::migration_subscriber::should_invalidate(
sstring ks_name,
std::optional<sstring> cf_name,
::shared_ptr<cql_statement> statement) {
return statement->depends_on_keyspace(ks_name) && (!cf_name || statement->depends_on_column_family(*cf_name));
return statement->depends_on(ks_name, cf_name);
}
future<> query_processor::query_internal(

View File

@@ -514,7 +514,7 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
}
if (!_nonprimary_key_restrictions->empty()) {
if (_has_queriable_regular_index) {
if (_has_queriable_regular_index && _partition_range_is_simple) {
_uses_secondary_indexing = true;
} else if (!allow_filtering) {
throw exceptions::invalid_request_exception("Cannot execute this query as it might involve data filtering and "

View File

@@ -165,7 +165,7 @@ public:
template<typename RowComparator>
void sort(const RowComparator& cmp) {
std::sort(_rows.begin(), _rows.end(), std::ref(cmp));
std::sort(_rows.begin(), _rows.end(), cmp);
}
metadata& get_metadata();

View File

@@ -18,13 +18,7 @@ uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
return 0;
}
bool cql3::statements::authentication_statement::depends_on_keyspace(
const sstring& ks_name) const {
return false;
}
bool cql3::statements::authentication_statement::depends_on_column_family(
const sstring& cf_name) const {
bool cql3::statements::authentication_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return false;
}

View File

@@ -27,9 +27,7 @@ public:
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -20,13 +20,7 @@ uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
return 0;
}
bool cql3::statements::authorization_statement::depends_on_keyspace(
const sstring& ks_name) const {
return false;
}
bool cql3::statements::authorization_statement::depends_on_column_family(
const sstring& cf_name) const {
bool cql3::statements::authorization_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return false;
}

View File

@@ -31,9 +31,7 @@ public:
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -70,14 +70,9 @@ batch_statement::batch_statement(type type_,
{
}
bool batch_statement::depends_on_keyspace(const sstring& ks_name) const
bool batch_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}
bool batch_statement::depends_on_column_family(const sstring& cf_name) const
{
return false;
return boost::algorithm::any_of(_statements, [&ks_name, &cf_name] (auto&& s) { return s.statement->depends_on(ks_name, cf_name); });
}
uint32_t batch_statement::get_bound_terms() const

View File

@@ -88,9 +88,7 @@ public:
std::unique_ptr<attributes> attrs,
cql_stats& stats);
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual uint32_t get_bound_terms() const override;

View File

@@ -539,12 +539,8 @@ modification_statement::validate(query_processor&, const service::client_state&
}
}
bool modification_statement::depends_on_keyspace(const sstring& ks_name) const {
return keyspace() == ks_name;
}
bool modification_statement::depends_on_column_family(const sstring& cf_name) const {
return column_family() == cf_name;
bool modification_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
}
void modification_statement::add_operation(::shared_ptr<operation> op) {

View File

@@ -137,9 +137,7 @@ public:
// Validate before execute, using client state and current schema
void validate(query_processor&, const service::client_state& state) const override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
void add_operation(::shared_ptr<operation> op);

View File

@@ -45,12 +45,7 @@ future<> schema_altering_statement::grant_permissions_to_creator(const service::
return make_ready_future<>();
}
bool schema_altering_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool schema_altering_statement::depends_on_column_family(const sstring& cf_name) const
bool schema_altering_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -53,9 +53,7 @@ protected:
*/
virtual future<> grant_permissions_to_creator(const service::client_state&) const;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual uint32_t get_bound_terms() const override;

View File

@@ -167,12 +167,8 @@ void select_statement::validate(query_processor&, const service::client_state& s
// Nothing to do, all validation has been done by raw_statemet::prepare()
}
bool select_statement::depends_on_keyspace(const sstring& ks_name) const {
return keyspace() == ks_name;
}
bool select_statement::depends_on_column_family(const sstring& cf_name) const {
return column_family() == cf_name;
bool select_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return keyspace() == ks_name && (!cf_name || column_family() == *cf_name);
}
const sstring& select_statement::keyspace() const {

View File

@@ -100,8 +100,7 @@ public:
virtual uint32_t get_bound_terms() const override;
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
virtual void validate(query_processor&, const service::client_state& state) const override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute(query_processor& qp,
service::query_state& state, const query_options& options) const override;

View File

@@ -17,13 +17,7 @@ uint32_t service_level_statement::get_bound_terms() const {
return 0;
}
bool service_level_statement::depends_on_keyspace(
const sstring &ks_name) const {
return false;
}
bool service_level_statement::depends_on_column_family(
const sstring &cf_name) const {
bool service_level_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const {
return false;
}

View File

@@ -43,9 +43,7 @@ public:
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -39,12 +39,7 @@ std::unique_ptr<prepared_statement> truncate_statement::prepare(data_dictionary:
return std::make_unique<prepared_statement>(::make_shared<truncate_statement>(*this));
}
bool truncate_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool truncate_statement::depends_on_column_family(const sstring& cf_name) const
bool truncate_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -30,9 +30,7 @@ public:
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
virtual bool depends_on_keyspace(const sstring& ks_name) const override;
virtual bool depends_on_column_family(const sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -46,12 +46,7 @@ std::unique_ptr<prepared_statement> use_statement::prepare(data_dictionary::data
}
bool use_statement::depends_on_keyspace(const sstring& ks_name) const
{
return false;
}
bool use_statement::depends_on_column_family(const sstring& cf_name) const
bool use_statement::depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const
{
return false;
}

View File

@@ -31,9 +31,7 @@ public:
virtual uint32_t get_bound_terms() const override;
virtual bool depends_on_keyspace(const seastar::sstring& ks_name) const override;
virtual bool depends_on_column_family(const seastar::sstring& cf_name) const override;
virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const override;
virtual seastar::future<> check_access(query_processor& qp, const service::client_state& state) const override;

View File

@@ -65,6 +65,25 @@ hinted_handoff_enabled_to_json(const db::config::hinted_handoff_enabled_type& h)
return value_to_json(h.to_configuration_string());
}
// Convert a value that can be printed with operator<<, or a vector of
// such values, to JSON. An example is enum_option<T>, because enum_option<T>
// has a operator<<.
template <typename T>
static json::json_return_type
printable_to_json(const T& e) {
return value_to_json(format("{}", e));
}
template <typename T>
static json::json_return_type
printable_vector_to_json(const std::vector<T>& e) {
std::vector<sstring> converted;
converted.reserve(e.size());
for (const auto& option : e) {
converted.push_back(format("{}", option));
}
return value_to_json(converted);
}
template <>
const config_type config_type_for<bool> = config_type("bool", value_to_json<bool>);
@@ -109,11 +128,11 @@ const config_type config_type_for<db::seed_provider_type> = config_type("seed pr
template <>
const config_type config_type_for<std::vector<enum_option<db::experimental_features_t>>> = config_type(
"experimental features", value_to_json<std::vector<sstring>>);
"experimental features", printable_vector_to_json<enum_option<db::experimental_features_t>>);
template <>
const config_type config_type_for<enum_option<db::tri_mode_restriction_t>> = config_type(
"restriction mode", value_to_json<sstring>);
"restriction mode", printable_to_json<enum_option<db::tri_mode_restriction_t>>);
template <>
const config_type config_type_for<db::config::hinted_handoff_enabled_type> = config_type("hinted handoff enabled", hinted_handoff_enabled_to_json);

View File

@@ -202,6 +202,12 @@ public:
});
}
future<flush_permit> get_all_flush_permits() {
return get_units(_background_work_flush_serializer, _max_background_work).then([this] (auto&& units) {
return this->get_flush_permit(std::move(units));
});
}
bool has_extraneous_flushes_requested() const {
return _extraneous_flushes > 0;
}

View File

@@ -6,12 +6,16 @@ is_nonroot() {
[ -f "$scylladir"/SCYLLA-NONROOT-FILE ]
}
is_container() {
[ -f "$scylladir"/SCYLLA-CONTAINER-FILE ]
}
is_privileged() {
[ ${EUID:-${UID}} = 0 ]
}
execsudo() {
if is_nonroot; then
if is_nonroot || is_container; then
exec "$@"
else
exec sudo -u scylla -g scylla "$@"

View File

@@ -82,15 +82,17 @@ run bash -ec "echo 'debconf debconf/frontend select Noninteractive' | debconf-se
run bash -ec "rm -rf /etc/rsyslog.conf"
run apt-get -y install hostname supervisor openssh-server openssh-client openjdk-11-jre-headless python python-yaml curl rsyslog locales sudo
run locale-gen en_US.UTF-8
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF_8
run update-locale LANG=en_US.UTF-8 LANGUAGE=en_US:en LC_ALL=en_US.UTF-8
run bash -ec "dpkg -i packages/*.deb"
run apt-get -y clean all
run bash -ec "cat /scylla_bashrc >> /etc/bash.bashrc"
run mkdir -p /etc/supervisor.conf.d
run mkdir -p /var/log/scylla
run chown -R scylla:scylla /var/lib/scylla
run sed -i -e 's/^SCYLLA_ARGS=".*"$/SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"/' /etc/default/scylla-server
run mkdir -p /opt/scylladb/supervisor
run touch /opt/scylladb/SCYLLA-CONTAINER-FILE
bcp dist/common/supervisor/scylla-server.sh /opt/scylladb/supervisor/scylla-server.sh
bcp dist/common/supervisor/scylla-jmx.sh /opt/scylladb/supervisor/scylla-jmx.sh
bcp dist/common/supervisor/scylla-node-exporter.sh /opt/scylladb/supervisor/scylla-node-exporter.sh

View File

@@ -1,4 +1,4 @@
[program:scylla-server]
[program:scylla]
command=/opt/scylladb/supervisor/scylla-server.sh
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0

View File

@@ -1,41 +0,0 @@
# choose following mode: virtio, dpdk, posix
NETWORK_MODE=posix
# tap device name(virtio)
TAP=tap0
# bridge device name (virtio)
BRIDGE=virbr0
# ethernet device name
IFNAME=eth0
# setup NIC's and disks' interrupts, RPS, XPS, nomerges and I/O scheduler (posix)
SET_NIC_AND_DISKS=no
# ethernet device driver (dpdk)
ETHDRV=
# ethernet device PCI ID (dpdk)
ETHPCIID=
# number of hugepages
NR_HUGEPAGES=64
# user for process (must be root for dpdk)
USER=scylla
# group for process
GROUP=scylla
# scylla home dir
SCYLLA_HOME=/var/lib/scylla
# scylla config dir
SCYLLA_CONF=/etc/scylla
# scylla arguments
SCYLLA_ARGS="--log-to-syslog 0 --log-to-stdout 1 --default-log-level info --network-stack posix"
# setup as AMI instance
AMI=no

View File

@@ -508,8 +508,13 @@ relocate_python3 "$rprefix"/scripts fix_system_distributed_tables.py
if $supervisor; then
install -d -m755 `supervisor_dir $retc`
for service in scylla-server scylla-jmx scylla-node-exporter; do
if [ "$service" = "scylla-server" ]; then
program="scylla"
else
program=$service
fi
cat << EOS > `supervisor_conf $retc $service`
[program:$service]
[program:$program]
directory=$rprefix
command=/bin/bash -c './supervisor/$service.sh'
EOS

33
main.cc
View File

@@ -367,11 +367,38 @@ static auto defer_verbose_shutdown(const char* what, Func&& func) {
startlog.info("Shutting down {}", what);
try {
func();
startlog.info("Shutting down {} was successful", what);
} catch (...) {
startlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
throw;
auto ex = std::current_exception();
bool do_abort = true;
try {
std::rethrow_exception(ex);
} catch (const std::system_error& e) {
// System error codes we consider "environmental",
// i.e. not scylla's fault, therefore there is no point in
// aborting and dumping core.
for (int i : {EIO, EACCES, ENOSPC}) {
if (e.code() == std::error_code(i, std::system_category())) {
do_abort = false;
break;
}
}
} catch (...) {
}
auto msg = fmt::format("Unexpected error shutting down {}: {}", what, ex);
if (do_abort) {
startlog.error("{}: aborting", msg);
abort();
} else {
startlog.error("{}: exiting, at {}", msg, current_backtrace());
// Call _exit() rather than exit() to exit immediately
// without calling exit handlers, avoiding
// boost::intrusive::detail::destructor_impl assert failure
// from ~segment_pool exit handler.
_exit(255);
}
}
startlog.info("Shutting down {} was successful", what);
};
auto ret = deferred_action(std::move(vfunc));

View File

@@ -96,7 +96,7 @@ void range_tombstone_list::insert_from(const schema& s,
if (cmp(end, it->position()) < 0) {
// not overlapping
if (it->tombstone().tomb == tomb && cmp(end, it->position()) == 0) {
rev.update(it, {std::move(start), std::move(start), tomb});
rev.update(it, {std::move(start), std::move(end), tomb});
} else {
auto rt = construct_range_tombstone_entry(std::move(start), std::move(end), tomb);
rev.insert(it, *rt);

View File

@@ -910,10 +910,9 @@ bool database::update_column_family(schema_ptr new_schema) {
return columns_changed;
}
future<> database::remove(const column_family& cf) noexcept {
void database::remove(const table& cf) noexcept {
auto s = cf.schema();
auto& ks = find_keyspace(s->ks_name());
co_await _querier_cache.evict_all_for_table(s->id());
_column_families.erase(s->id());
ks.metadata()->remove_column_family(s);
_ks_cf_to_uuid.erase(std::make_pair(s->ks_name(), s->cf_name()));
@@ -937,13 +936,20 @@ future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_
on_internal_error(dblog, fmt::format("drop_column_family {}.{}: UUID={} not found", ks_name, cf_name, uuid));
}
dblog.debug("Dropping {}.{}", ks_name, cf_name);
co_await remove(*cf);
remove(*cf);
cf->clear_views();
co_return co_await cf->await_pending_ops().then([this, &ks, cf, tsf = std::move(tsf), snapshot] {
return truncate(ks, *cf, std::move(tsf), snapshot).finally([this, cf] {
return cf->stop();
});
}).finally([cf] {});
co_await cf->await_pending_ops();
co_await _querier_cache.evict_all_for_table(cf->schema()->id());
std::exception_ptr ex;
try {
co_await truncate(ks, *cf, std::move(tsf), snapshot);
} catch (...) {
ex = std::current_exception();
}
co_await cf->stop();
if (ex) {
std::rethrow_exception(std::move(ex));
}
}
const utils::UUID& database::find_uuid(std::string_view ks, std::string_view cf) const {
@@ -2062,80 +2068,77 @@ future<> database::truncate(sstring ksname, sstring cfname, timestamp_func tsf)
future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_func tsf, bool with_snapshot) {
dblog.debug("Truncating {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name());
return with_gate(cf.async_gate(), [this, &ks, &cf, tsf = std::move(tsf), with_snapshot] () mutable -> future<> {
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;
auto holder = cf.async_gate().hold();
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
auto low_mark = cf.set_low_replay_position_mark();
const auto auto_snapshot = with_snapshot && get_config().auto_snapshot();
const auto should_flush = auto_snapshot;
const auto uuid = cf.schema()->id();
// Force mutations coming in to re-acquire higher rp:s
// This creates a "soft" ordering, in that we will guarantee that
// any sstable written _after_ we issue the flush below will
// only have higher rp:s than we will get from the discard_sstable
// call.
auto low_mark = cf.set_low_replay_position_mark();
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
future<> f = make_ready_future<>();
bool did_flush = false;
if (should_flush && cf.can_flush()) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
f = cf.flush();
did_flush = true;
} else {
f = cf.clear();
}
return f.then([this, &cf, auto_snapshot, tsf = std::move(tsf), low_mark, should_flush, did_flush] {
dblog.debug("Discarding sstable data for truncated CF + indexes");
// TODO: notify truncation
const auto uuid = cf.schema()->id();
return tsf().then([this, &cf, auto_snapshot, low_mark, should_flush, did_flush](db_clock::time_point truncated_at) {
future<> f = make_ready_future<>();
if (auto_snapshot) {
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
f = cf.snapshot(*this, name);
}
return f.then([this, &cf, truncated_at, low_mark, should_flush, did_flush] {
return cf.discard_sstables(truncated_at).then([this, &cf, truncated_at, low_mark, should_flush, did_flush](db::replay_position rp) {
// TODO: indexes.
// Note: since discard_sstables was changed to only count tables owned by this shard,
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
// the low_mark assertion does not hold, because we maybe/probably never got around to
// creating the sstables that would create them.
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
rp = std::max(low_mark, rp);
return truncate_views(cf, truncated_at, should_flush).then([&cf, truncated_at, rp] {
// save_truncation_record() may actually fail after we cached the truncation time
// but this is not be worse that if failing without caching: at least the correct time
// will be available until next reboot and a client will have to retry truncation anyway.
cf.cache_truncation_record(truncated_at);
return db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
});
});
});
});
});
}).then([this, uuid] {
drop_repair_history_map_for_table(uuid);
});
});
}
std::vector<compaction_manager::compaction_reenabler> cres;
cres.reserve(1 + cf.views().size());
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf));
co_await parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
});
});
});
cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf));
});
bool did_flush = false;
if (should_flush && cf.can_flush()) {
// TODO:
// this is not really a guarantee at all that we've actually
// gotten all things to disk. Again, need queue-ish or something.
co_await cf.flush();
did_flush = true;
} else {
co_await cf.clear();
}
dblog.debug("Discarding sstable data for truncated CF + indexes");
// TODO: notify truncation
db_clock::time_point truncated_at = co_await tsf();
if (auto_snapshot) {
auto name = format("{:d}-{}", truncated_at.time_since_epoch().count(), cf.schema()->cf_name());
co_await cf.snapshot(*this, name);
}
db::replay_position rp = co_await cf.discard_sstables(truncated_at);
// TODO: indexes.
// Note: since discard_sstables was changed to only count tables owned by this shard,
// we can get zero rp back. Changed assert, and ensure we save at least low_mark.
// #6995 - the assert below was broken in c2c6c71 and remained so for many years.
// We nowadays do not flush tables with sstables but autosnapshot=false. This means
// the low_mark assertion does not hold, because we maybe/probably never got around to
// creating the sstables that would create them.
assert(!did_flush || low_mark <= rp || rp == db::replay_position());
rp = std::max(low_mark, rp);
co_await parallel_for_each(cf.views(), [this, truncated_at, should_flush] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
if (should_flush) {
co_await vcf.flush();
} else {
co_await vcf.clear();
}
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
co_await db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);
});
// save_truncation_record() may actually fail after we cached the truncation time
// but this is not be worse that if failing without caching: at least the correct time
// will be available until next reboot and a client will have to retry truncation anyway.
cf.cache_truncation_record(truncated_at);
co_await db::system_keyspace::save_truncation_record(cf, truncated_at, rp);
drop_repair_history_map_for_table(uuid);
}
const sstring& database::get_snitch_name() const {

View File

@@ -1371,6 +1371,7 @@ private:
Future update_write_metrics(Future&& f);
void update_write_metrics_for_timed_out_write();
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, bool is_bootstrap, system_keyspace system);
void remove(const table&) noexcept;
public:
static utils::UUID empty_version;
@@ -1568,11 +1569,9 @@ public:
/** Truncates the given column family */
future<> truncate(sstring ksname, sstring cfname, timestamp_func);
future<> truncate(const keyspace& ks, column_family& cf, timestamp_func, bool with_snapshot = true);
future<> truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush);
bool update_column_family(schema_ptr s);
future<> drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func, bool with_snapshot = true);
future<> remove(const column_family&) noexcept;
const logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_manager.region_group();

View File

@@ -454,12 +454,13 @@ future<> distributed_loader::handle_sstables_pending_delete(sstring pending_dele
});
}
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist) {
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), must_exist] {
future<> distributed_loader::populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction do_allow_offstrategy_compaction, must_exist dir_must_exist) {
dblog.debug("Populating {}/{}/{} allow_offstrategy_compaction={} must_exist={}", ks, cf, sstdir, do_allow_offstrategy_compaction, dir_must_exist);
return async([&db, sstdir = std::move(sstdir), ks = std::move(ks), cf = std::move(cf), do_allow_offstrategy_compaction, dir_must_exist] {
assert(this_shard_id() == 0);
if (!file_exists(sstdir).get0()) {
if (must_exist) {
if (dir_must_exist) {
throw std::runtime_error(format("Populating {}/{} failed: {} does not exist", ks, cf, sstdir));
}
return;
@@ -529,12 +530,14 @@ future<> distributed_loader::populate_column_family(distributed<replica::databas
return global_table->make_sstable(sstdir, gen, sst_version, sstables::sstable::format_types::big);
}, eligible_for_reshape_on_boot).get();
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot] (sstables::sstable_directory& dir) {
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot] (sstables::shared_sstable sst) {
auto requires_offstrategy = sstables::offstrategy(!eligible_for_reshape_on_boot(sst));
directory.invoke_on_all([global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::sstable_directory& dir) {
return dir.do_for_each_sstable([&global_table, &eligible_for_reshape_on_boot, do_allow_offstrategy_compaction] (sstables::shared_sstable sst) {
auto requires_offstrategy = sstables::offstrategy(do_allow_offstrategy_compaction && !eligible_for_reshape_on_boot(sst));
return global_table->add_sstable_and_update_cache(sst, requires_offstrategy);
}).then([&global_table] {
}).then([&global_table, do_allow_offstrategy_compaction] {
if (do_allow_offstrategy_compaction) {
global_table->trigger_offstrategy_compaction();
}
});
}).get();
});
@@ -560,11 +563,11 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
auto sstdir = ks.column_family_directory(ksdir, cfname, uuid);
dblog.info("Keyspace {}: Reading CF {} id={} version={}", ks_name, cfname, uuid, s->version());
return ks.make_directory_for_column_family(cfname, uuid).then([&db, sstdir, uuid, ks_name, cfname] {
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname);
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::staging_dir, ks_name, cfname, allow_offstrategy_compaction::no);
}).then([&db, sstdir, ks_name, cfname] {
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, false /* must_exist */);
return distributed_loader::populate_column_family(db, sstdir + "/" + sstables::quarantine_dir, ks_name, cfname, allow_offstrategy_compaction::no, must_exist::no);
}).then([&db, sstdir, uuid, ks_name, cfname] {
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname);
return distributed_loader::populate_column_family(db, sstdir, ks_name, cfname, allow_offstrategy_compaction::yes);
}).handle_exception([ks_name, cfname, sstdir](std::exception_ptr eptr) {
std::string msg =
format("Exception while populating keyspace '{}' with column family '{}' from file '{}': {}",

View File

@@ -13,6 +13,7 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/file.hh>
#include <seastar/util/bool_class.hh>
#include <vector>
#include <functional>
#include <filesystem>
@@ -67,7 +68,9 @@ class distributed_loader {
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
sharded<replica::database>& db, sharded<db::view::view_update_generator>& view_update_generator,
std::filesystem::path datadir, sstring ks, sstring cf);
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, bool must_exist = true);
using allow_offstrategy_compaction = bool_class<struct allow_offstrategy_compaction_tag>;
using must_exist = bool_class<struct must_exist_tag>;
static future<> populate_column_family(distributed<replica::database>& db, sstring sstdir, sstring ks, sstring cf, allow_offstrategy_compaction, must_exist = must_exist::yes);
static future<> populate_keyspace(distributed<replica::database>& db, sstring datadir, sstring ks_name);
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);

View File

@@ -662,11 +662,21 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
[] (const dht::decorated_key&) { return api::min_timestamp; });
}
mutation_fragment* fragment = co_await reader.peek();
if (!fragment) {
std::exception_ptr err;
try {
mutation_fragment* fragment = co_await reader.peek();
if (!fragment) {
co_await reader.close();
_memtables->erase(old);
co_return stop_iteration::yes;
}
} catch (...) {
err = std::current_exception();
}
if (err) {
tlogger.error("failed to flush memtable for {}.{}: {}", old->schema()->ks_name(), old->schema()->cf_name(), err);
co_await reader.close();
_memtables->erase(old);
co_return stop_iteration::yes;
co_return stop_iteration(_async_gate.is_closed());
}
auto f = consumer(upgrade_to_v2(std::move(reader)));
@@ -1571,13 +1581,14 @@ bool table::can_flush() const {
}
future<> table::clear() {
auto permits = co_await _config.dirty_memory_manager->get_all_flush_permits();
if (_commitlog) {
for (auto& t : *_memtables) {
_commitlog->discard_completed_segments(_schema->id(), t->get_and_discard_rp_set());
}
}
_memtables->clear_and_add();
return _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
co_await _cache.invalidate(row_cache::external_updater([] { /* There is no underlying mutation source */ }));
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -2235,7 +2246,7 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double
void
table::enable_auto_compaction() {
// FIXME: unmute backlog. turn table backlog back on.
// XXX: unmute backlog. turn table backlog back on.
// see table::disable_auto_compaction() notes.
_compaction_disabled_by_user = false;
trigger_compaction();
@@ -2243,7 +2254,7 @@ table::enable_auto_compaction() {
future<>
table::disable_auto_compaction() {
// FIXME: mute backlog. When we disable background compactions
// XXX: mute backlog. When we disable background compactions
// for the table, we must also disable current backlog of the
// table compaction strategy that contributes to the scheduling
// group resources prioritization.
@@ -2270,9 +2281,8 @@ table::disable_auto_compaction() {
// - it will break computation of major compaction descriptor
// for new submissions
_compaction_disabled_by_user = true;
return with_gate(_async_gate, [this] {
return compaction_manager().stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction);
});
// FIXME: stop ongoing compactions
return make_ready_future<>();
}
flat_mutation_reader

Submodule seastar updated: 0d250d15ac...4a30c44c4c

View File

@@ -1030,6 +1030,20 @@ def test_nested_attribute_remove_from_missing_item(test_table_s):
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x.y')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE x[0]')
# Though in an above test (test_nested_attribute_update_bad_path_dot) we
# showed that DynamoDB does not allow REMOVE x.y if attribute x doesn't
# exist - and generates a ValidationException, if x *does* exist but y
# doesn't, it's fine and the removal should just be silently ignored.
def test_nested_attribute_remove_missing_leaf(test_table_s):
p = random_string()
item = {'p': p, 'a': {'x': 3}, 'b': ['hi']}
test_table_s.put_item(Item=item)
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE a.y')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE b[7]')
test_table_s.update_item(Key={'p': p}, UpdateExpression='REMOVE c')
# The above UpdateItem calls didn't change anything...
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
# Similarly for other types of bad paths - using [0] on something which
# doesn't exist or isn't an array.
def test_nested_attribute_update_bad_path_array(test_table_s):

View File

@@ -207,7 +207,9 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
}
{
cf_lru.evict_all();
with_allocator(region.allocator(), [] {
cf_lru.evict_all();
});
BOOST_REQUIRE_EQUAL(0, metrics.cached_bytes); // change here
BOOST_REQUIRE_EQUAL(0, cf.cached_bytes()); // change here
@@ -215,6 +217,8 @@ SEASTAR_THREAD_TEST_CASE(test_eviction_via_lru) {
BOOST_REQUIRE_EQUAL(3, metrics.page_evictions); // change here
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
BOOST_REQUIRE_EQUAL(3, metrics.page_populations);
BOOST_REQUIRE_EQUAL(region.occupancy().used_space(), 0);
}
{

View File

@@ -12,6 +12,8 @@
#include <deque>
#include <random>
#include "utils/lsa/chunked_managed_vector.hh"
#include "utils/managed_ref.hh"
#include "test/lib/log.hh"
#include <boost/range/algorithm/sort.hpp>
#include <boost/range/algorithm/equal.hpp>
@@ -203,3 +205,106 @@ SEASTAR_TEST_CASE(tests_reserve_partial) {
});
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_clear_and_release) {
region region;
allocating_section as;
with_allocator(region.allocator(), [&] {
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
for (uint64_t i = 1; i < 4000; ++i) {
as(region, [&] {
v.emplace_back(make_managed<uint64_t>(i));
});
}
v.clear_and_release();
});
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_chunk_reserve) {
region region;
allocating_section as;
for (auto conf :
{ // std::make_pair(reserve size, push count)
std::make_pair(0, 4000),
std::make_pair(100, 4000),
std::make_pair(200, 4000),
std::make_pair(1000, 4000),
std::make_pair(2000, 4000),
std::make_pair(3000, 4000),
std::make_pair(5000, 4000),
std::make_pair(500, 8000),
std::make_pair(1000, 8000),
std::make_pair(2000, 8000),
std::make_pair(8000, 500),
})
{
with_allocator(region.allocator(), [&] {
auto [reserve_size, push_count] = conf;
testlog.info("Testing reserve({}), {}x emplace_back()", reserve_size, push_count);
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
v.reserve(reserve_size);
uint64_t seed = rand();
for (uint64_t i = 0; i < push_count; ++i) {
as(region, [&] {
v.emplace_back(make_managed<uint64_t>(seed + i));
BOOST_REQUIRE(**v.begin() == seed);
});
}
auto v_it = v.begin();
for (uint64_t i = 0; i < push_count; ++i) {
BOOST_REQUIRE(**v_it++ == seed + i);
}
v.clear_and_release();
});
}
return make_ready_future<>();
}
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
// the last reserved chunk.
SEASTAR_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
region region;
allocating_section as;
with_allocator(region.allocator(), [&] {
lsa::chunked_managed_vector<managed_ref<uint64_t>> v;
// Fill two chunks
v.reserve(2000);
for (uint64_t i = 0; i < 2000; ++i) {
as(region, [&] {
v.emplace_back(make_managed<uint64_t>(i));
});
}
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
v.shrink_to_fit();
// Leave the last chunk reserved but empty
for (uint64_t i = 0; i < 1000; ++i) {
v.pop_back();
}
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
// with _size not in the last chunk. Should not sigsegv.
v.reserve(8000);
for (uint64_t i = 0; i < 2000; ++i) {
as(region, [&] {
v.emplace_back(make_managed<uint64_t>(i));
});
}
v.clear_and_release();
});
return make_ready_future<>();
}

View File

@@ -178,3 +178,32 @@ BOOST_AUTO_TEST_CASE(tests_reserve_partial) {
BOOST_REQUIRE_EQUAL(v.capacity(), orig_size);
}
}
// Tests the case of make_room() invoked with last_chunk_capacity_deficit but _size not in
// the last reserved chunk.
BOOST_AUTO_TEST_CASE(test_shrinking_and_expansion_involving_chunk_boundary) {
using vector_type = utils::chunked_vector<std::unique_ptr<uint64_t>>;
vector_type v;
// Fill two chunks
v.reserve(vector_type::max_chunk_capacity() * 3 / 2);
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 3 / 2; ++i) {
v.emplace_back(std::make_unique<uint64_t>(i));
}
// Make the last chunk smaller than max size to trigger the last_chunk_capacity_deficit path in make_room()
v.shrink_to_fit();
// Leave the last chunk reserved but empty
for (uint64_t i = 0; i < vector_type::max_chunk_capacity(); ++i) {
v.pop_back();
}
// Try to reserve more than the currently reserved capacity and trigger last_chunk_capacity_deficit path
// with _size not in the last chunk. Should not sigsegv.
v.reserve(vector_type::max_chunk_capacity() * 4);
for (uint64_t i = 0; i < vector_type::max_chunk_capacity() * 2; ++i) {
v.emplace_back(std::make_unique<uint64_t>(i));
}
}

View File

@@ -985,3 +985,38 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
BOOST_REQUIRE(expected.empty());
});
}
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
auto& db = e.local_db();
const auto ts = db_clock::now();
auto& tbl = db.find_column_family("ks", "cf");
auto op = std::optional(tbl.read_in_progress());
auto s = tbl.schema();
auto q = query::data_querier(
tbl.as_mutation_source(),
tbl.schema(),
database_test(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s.get(), "test", db::no_timeout),
query::full_partition_range,
s->full_slice(),
default_priority_class(),
nullptr);
auto f = e.db().invoke_on_all([ts] (replica::database& db) {
return db.drop_column_family("ks", "cf", [ts] { return make_ready_future<db_clock::time_point>(ts); });
});
// we add a querier to the querier cache while the drop is ongoing
auto& qc = db.get_querier_cache();
qc.insert(utils::make_random_uuid(), std::move(q), nullptr);
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
op.reset(); // this should allow the drop to finish
f.get();
// the drop should have cleaned up all entries belonging to that table
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
});
}

View File

@@ -25,6 +25,7 @@
#include "test/lib/tmpdir.hh"
#include "test/lib/log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/cql_test_env.hh"
#include <vector>
#include <numeric>
@@ -428,6 +429,163 @@ SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged) {
});
}
SEASTAR_TEST_CASE(test_loading_cache_eviction_unprivileged_minimum_size) {
return seastar::async([] {
// Test that unprivileged section is not starved.
//
// This scenario is tested: cache max_size is 50 and there are 49 entries in
// privileged section. After adding 5 elements (that go to unprivileged
// section) all of them should stay in unprivileged section and elements
// in privileged section should get evicted.
//
// Wrong handling of this situation caused problems with BATCH statements
// where all prepared statements in the batch have to stay in cache at
// the same time for the batch to correctly execute.
using namespace std::chrono;
utils::loading_cache<int, sstring, 1> loading_cache(50, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
prepare().get();
// Add 49 elements to privileged section
for (int i = 0; i < 49; i++) {
// Touch the value with the key "i" twice
loading_cache.get_ptr(i, loader).discard_result().get();
loading_cache.find(i);
}
// Add 5 elements to unprivileged section
for (int i = 50; i < 55; i++) {
loading_cache.get_ptr(i, loader).discard_result().get();
}
// Make sure that none of 5 elements were evicted
for (int i = 50; i < 55; i++) {
BOOST_REQUIRE(loading_cache.find(i) != nullptr);
}
BOOST_REQUIRE_EQUAL(loading_cache.size(), 50);
});
}
struct sstring_length_entry_size {
size_t operator()(const sstring& val) {
return val.size();
}
};
SEASTAR_TEST_CASE(test_loading_cache_section_size_correctly_calculated) {
return seastar::async([] {
auto load_len1 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(1)); };
auto load_len5 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(5)); };
auto load_len10 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(10)); };
auto load_len95 = [] (const int& key) { return make_ready_future<sstring>(tests::random::get_sstring(95)); };
using namespace std::chrono;
utils::loading_cache<int, sstring, 1, utils::loading_cache_reload_enabled::no, sstring_length_entry_size> loading_cache(100, 1h, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
loading_cache.get_ptr(1, load_len1).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
loading_cache.get_ptr(2, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
// Move "2" to privileged section by touching it the second time.
loading_cache.get_ptr(2, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 2);
loading_cache.get_ptr(3, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 5);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
// Move "1" to privileged section. load_len10 should not get executed, as "1"
// is already in the cache.
loading_cache.get_ptr(1, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 10);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 3);
// Flood cache with elements of size 10,
// unprivileged. "1" and "2" should stay in the privileged section.
for (int i = 11; i < 30; i++) {
loading_cache.get_ptr(i, load_len10).discard_result().get();
}
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 6);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 100);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
// Flood cache with elements of size 10, privileged.
for (int i = 11; i < 30; i++) {
loading_cache.get_ptr(i, load_len10).discard_result().get();
loading_cache.get_ptr(i, load_len10).discard_result().get();
}
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 100);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 0);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
// Add one new unprivileged entry.
loading_cache.get_ptr(31, load_len1).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 10);
// Add another unprivileged entry, privileged entry should get evicted.
loading_cache.get_ptr(32, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 90);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 6);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
// Make it privileged by touching it again.
loading_cache.get_ptr(32, load_len5).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 1);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 11);
// Add another unprivileged entry.
loading_cache.get_ptr(33, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 95);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 11);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
// Add another unprivileged entry, privileged entry should get evicted.
loading_cache.get_ptr(34, load_len10).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 85);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 21);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
// Add a big unprivileged entry, filling almost entire cache.
loading_cache.get_ptr(35, load_len95).discard_result().get();
BOOST_REQUIRE_EQUAL(loading_cache.privileged_section_memory_footprint(), 75);
// We shrink the cache BEFORE adding element,
// so after adding the element, the cache
// can exceed max_size...
BOOST_REQUIRE_EQUAL(loading_cache.unprivileged_section_memory_footprint(), 95 + 21);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 12);
});
}
SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
return seastar::async([] {
using namespace std::chrono;
@@ -449,3 +607,169 @@ SEASTAR_TEST_CASE(test_loading_cache_reload_during_eviction) {
BOOST_REQUIRE_EQUAL(loading_cache.size(), 1);
});
}
SEASTAR_THREAD_TEST_CASE(test_loading_cache_remove_leaves_no_old_entries_behind) {
using namespace std::chrono;
load_count = 0;
auto load_v1 = [] (auto key) { return make_ready_future<sstring>("v1"); };
auto load_v2 = [] (auto key) { return make_ready_future<sstring>("v2"); };
auto load_v3 = [] (auto key) { return make_ready_future<sstring>("v3"); };
{
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
//
// Test remove() concurrent with loading
//
auto f = loading_cache.get_ptr(0, [&](auto key) {
return yield().then([&] {
return load_v1(key);
});
});
loading_cache.remove(0);
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
auto ptr1 = f.get0();
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
loading_cache.remove(0);
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
//
// Test that live ptr1, removed from cache, does not prevent reload of new value
//
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
ptr1 = nullptr;
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
}
// Test remove_if()
{
utils::loading_cache<int, sstring> loading_cache(num_loaders, 100s, testlog);
auto stop_cache_reload = seastar::defer([&loading_cache] { loading_cache.stop().get(); });
//
// Test remove_if() concurrent with loading
//
auto f = loading_cache.get_ptr(0, [&](auto key) {
return yield().then([&] {
return load_v1(key);
});
});
loading_cache.remove_if([] (auto&& v) { return v == "v1"; });
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
auto ptr1 = f.get0();
BOOST_REQUIRE_EQUAL(*ptr1, "v1");
BOOST_REQUIRE_EQUAL(loading_cache.find(0), nullptr);
BOOST_REQUIRE_EQUAL(loading_cache.size(), 0);
ptr1 = loading_cache.get_ptr(0, load_v2).get0();
loading_cache.remove_if([] (auto&& v) { return v == "v2"; });
BOOST_REQUIRE_EQUAL(*ptr1, "v2");
//
// Test that live ptr1, removed from cache, does not prevent reload of new value
//
auto ptr2 = loading_cache.get_ptr(0, load_v3).get0();
ptr1 = nullptr;
BOOST_REQUIRE_EQUAL(*ptr2, "v3");
ptr2 = nullptr;
}
}
SEASTAR_TEST_CASE(test_prepared_statement_small_cache) {
// CQL prepared statement cache uses loading_cache
// internally.
constexpr auto CACHE_SIZE = 950000;
cql_test_config small_cache_config;
small_cache_config.qp_mcfg = {CACHE_SIZE, CACHE_SIZE};
return do_with_cql_env_thread([](cql_test_env& e) {
e.execute_cql("CREATE TABLE tbl1 (a int, b int, PRIMARY KEY (a))").get();
auto current_uid = 0;
// Prepare 100 queries and execute them twice,
// filling "privileged section" of loading_cache.
std::vector<cql3::prepared_cache_key_type> prepared_ids_privileged;
for (int i = 0; i < 100; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
e.execute_prepared(prepared_id, {}).get();
e.execute_prepared(prepared_id, {}).get();
prepared_ids_privileged.push_back(prepared_id);
}
int how_many_in_cache = 0;
for (auto& prepared_id : prepared_ids_privileged) {
if (e.local_qp().get_prepared(prepared_id)) {
how_many_in_cache++;
}
}
// Assumption: CACHE_SIZE should hold at least 50 queries,
// but not more than 99 queries. Other checks in this
// test rely on that fact.
BOOST_REQUIRE(how_many_in_cache >= 50);
BOOST_REQUIRE(how_many_in_cache <= 99);
// Then prepare 5 queries and execute them one time,
// which will occupy "unprivileged section" of loading_cache.
std::vector<cql3::prepared_cache_key_type> prepared_ids_unprivileged;
for (int i = 0; i < 5; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
e.execute_prepared(prepared_id, {}).get();
prepared_ids_unprivileged.push_back(prepared_id);
}
// Check that all of those prepared queries can still be
// executed. This simulates as if you wanted to execute
// a BATCH with all of them, which requires all of those
// prepared statements to be executable (in the cache).
for (auto& prepared_id : prepared_ids_unprivileged) {
e.execute_prepared(prepared_id, {}).get();
}
// Deterministic random for reproducibility.
testing::local_random_engine.seed(12345);
// Prepare 500 queries and execute them a random number of times.
for (int i = 0; i < 500; i++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
auto times = rand_int(4);
for (int j = 0; j < times; j++) {
e.execute_prepared(prepared_id, {}).get();
}
}
// Prepare 100 simulated "batches" and execute them
// a random number of times.
for (int i = 0; i < 100; i++) {
std::vector<cql3::prepared_cache_key_type> prepared_ids_batch;
for (int j = 0; j < 5; j++) {
auto prepared_id = e.prepare(fmt::format("SELECT * FROM tbl1 WHERE a = {}", current_uid++)).get0();
prepared_ids_batch.push_back(prepared_id);
}
auto times = rand_int(4);
for (int j = 0; j < times; j++) {
for (auto& prepared_id : prepared_ids_batch) {
e.execute_prepared(prepared_id, {}).get();
}
}
}
}, small_cache_config);
}

View File

@@ -690,6 +690,7 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
};
auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) {
testlog.trace("Expected {} == {}", c1, c2);
BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0);
BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0);
};
@@ -711,9 +712,11 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2));
// Origin doesn't compare ttl (is it wise?)
assert_equal(
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1),
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2));
// But we do. See https://github.com/scylladb/scylla/issues/10156
// and https://github.com/scylladb/scylla/issues/10173
assert_order(
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2),
atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1));
assert_order(
atomic_cell::make_live(*bytes_type, 0, bytes("value1")),

View File

@@ -24,11 +24,13 @@ static void add_entry(logalloc::region& r,
{
logalloc::allocating_section as;
as(r, [&] {
sstables::key sst_key = sstables::key::from_partition_key(s, key);
page._entries.push_back(make_managed<index_entry>(
managed_bytes(sst_key.get_bytes()),
position,
managed_ref<promoted_index>()));
with_allocator(r.allocator(), [&] {
sstables::key sst_key = sstables::key::from_partition_key(s, key);
page._entries.push_back(make_managed<index_entry>(
managed_bytes(sst_key.get_bytes()),
position,
managed_ref<promoted_index>()));
});
});
}

View File

@@ -6,6 +6,7 @@ from cassandra.cluster import ConsistencyLevel
from cassandra.query import SimpleStatement
from util import new_test_table
from nodetool import flush
def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
'''Test that the stream IDs chosen for CDC log entries come from the CDC generation
@@ -31,3 +32,16 @@ def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
assert(log_stream_ids.issubset(stream_ids))
# Test for #10473 - reading logs (from sstable) after dropping
# column in base.
def test_cdc_alter_table_drop_column(scylla_only, cql, test_keyspace):
schema = "pk int primary key, v int"
extra = " with cdc = {'enabled': true}"
with new_test_table(cql, test_keyspace, schema, extra) as table:
cql.execute(f"insert into {table} (pk, v) values (0, 0)")
cql.execute(f"insert into {table} (pk, v) values (1, null)")
flush(cql, table)
flush(cql, table + "_scylla_cdc_log")
cql.execute(f"alter table {table} drop v")
cql.execute(f"select * from {table}_scylla_cdc_log")

View File

@@ -115,3 +115,16 @@ def test_operator_ne_not_supported(cql, table1):
cql.execute(f'SELECT a FROM {table1} WHERE a != 0')
with pytest.raises(InvalidRequest, match='Unsupported.*!='):
cql.execute(f'SELECT a FROM {table1} WHERE token(a) != 0')
# Test that the fact that a column is indexed does not cause us to fetch
# incorrect results from a filtering query (issue #10300).
def test_index_with_in_relation(scylla_only, cql, test_keyspace):
schema = 'p int, c int, v boolean, primary key (p,c)'
with new_test_table(cql, test_keyspace, schema) as table:
cql.execute(f"create index on {table}(v)")
for p, c, v in [(0,0,True),(0,1,False),(0,2,True),(0,3,False),
(1,0,True),(1,1,False),(1,2,True),(1,3,False),
(2,0,True),(2,1,False),(2,2,True),(2,3,False)]:
cql.execute(f"insert into {table} (p,c,v) values ({p}, {c}, {v})")
res = cql.execute(f"select * from {table} where p in (0,1) and v = False ALLOW FILTERING")
assert set(res) == set([(0,1,False),(0,3,False),(1,1,False), (1,3,False)])

View File

@@ -6,6 +6,7 @@
import pytest
import util
import nodetool
import json
def test_snapshots_table(scylla_only, cql, test_keyspace):
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
@@ -32,3 +33,31 @@ def test_runtime_info(scylla_only, cql):
def test_versions(scylla_only, cql):
_check_exists(cql, "versions", ("key", "build_id", "build_mode", "version"))
# Check reading the system.config table, which should list all configuration
# parameters. As we noticed in issue #10047, each type of configuration
# parameter can have a different function for printing it out, and some of
# those may be wrong so we want to check as many as we can - including
# specifically the experimental_features option which was wrong in #10047.
def test_system_config_read(scylla_only, cql):
# All rows should have the columns name, source, type and value:
rows = list(cql.execute("SELECT name, source, type, value FROM system.config"))
values = dict()
for row in rows:
values[row.name] = row.value
# Check that experimental_features exists and makes sense.
# It needs to be a JSON-formatted strings, and the strings need to be
# ASCII feature names - not binary garbage as it was in #10047.
assert 'experimental_features' in values
obj = json.loads(values['experimental_features'])
assert isinstance(obj, list)
assert isinstance(obj[0], str)
assert obj[0] and obj[0].isascii() and obj[0].isprintable()
# Check formatting of tri_mode_restriction like
# restrict_replication_simplestrategy. These need to be one of
# allowed string values 0, 1, true, false or warn - but in particular
# non-empty and printable ASCII, not garbage.
assert 'restrict_replication_simplestrategy' in values
obj = json.loads(values['restrict_replication_simplestrategy'])
assert isinstance(obj, str)
assert obj and obj.isascii() and obj.isprintable()

View File

@@ -626,7 +626,12 @@ public:
mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
cql3::query_processor::memory_config qp_mcfg;
if (cfg_in.qp_mcfg) {
qp_mcfg = *cfg_in.qp_mcfg;
} else {
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
}
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(db));
qp.start(std::ref(proxy), std::move(local_data_dict), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });

View File

@@ -21,6 +21,7 @@
#include "cql3/query_options_fwd.hh"
#include "cql3/values.hh"
#include "cql3/prepared_statements_cache.hh"
#include "cql3/query_processor.hh"
#include "bytes.hh"
#include "schema.hh"
#include "test/lib/eventually.hh"
@@ -85,6 +86,7 @@ public:
// Scheduling groups are overwritten unconditionally, see get_scheduling_groups().
std::optional<replica::database_config> dbcfg;
std::set<sstring> disabled_features;
std::optional<cql3::query_processor::memory_config> qp_mcfg;
cql_test_config();
cql_test_config(const cql_test_config&);

View File

@@ -444,7 +444,9 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
break;
case auth_state::AUTHENTICATION:
// Support both SASL auth from protocol v2 and the older style Credentials auth from v1
assert(cqlop == cql_binary_opcode::AUTH_RESPONSE || cqlop == cql_binary_opcode::CREDENTIALS);
if (cqlop != cql_binary_opcode::AUTH_RESPONSE && cqlop != cql_binary_opcode::CREDENTIALS) {
throw exceptions::protocol_exception(format("Unexpected message {:d}, expecting AUTH_RESPONSE or CREDENTIALS", int(cqlop)));
}
if (res_op == cql_binary_opcode::READY || res_op == cql_binary_opcode::AUTH_SUCCESS) {
client_state.set_auth_state(auth_state::READY);
}
@@ -1219,7 +1221,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_read_timeout_
std::unique_ptr<cql_server::response> cql_server::connection::make_read_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, bool data_present, const tracing::trace_state_ptr& tr_state) const
{
if (_version < 4) {
return make_read_timeout_error(stream, err, std::move(msg), cl, received, blockfor, data_present, tr_state);
return make_read_timeout_error(stream, exceptions::exception_code::READ_TIMEOUT, std::move(msg), cl, received, blockfor, data_present, tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));
@@ -1247,7 +1249,7 @@ std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_writ
std::unique_ptr<cql_server::response> cql_server::connection::make_mutation_write_failure_error(int16_t stream, exceptions::exception_code err, sstring msg, db::consistency_level cl, int32_t received, int32_t numfailures, int32_t blockfor, db::write_type type, const tracing::trace_state_ptr& tr_state) const
{
if (_version < 4) {
return make_mutation_write_timeout_error(stream, err, std::move(msg), cl, received, blockfor, type, tr_state);
return make_mutation_write_timeout_error(stream, exceptions::exception_code::WRITE_TIMEOUT, std::move(msg), cl, received, blockfor, type, tr_state);
}
auto response = std::make_unique<cql_server::response>(stream, cql_binary_opcode::ERROR, tr_state);
response->write_int(static_cast<int32_t>(err));

View File

@@ -326,6 +326,7 @@ public:
}
size_t evict_range(cache_type::iterator start, cache_type::iterator end) noexcept {
return with_allocator(standard_allocator(), [&] {
size_t count = 0;
auto disposer = [] (auto* p) noexcept {};
while (start != end) {
@@ -338,6 +339,7 @@ public:
}
}
return count;
});
}
public:
/// \brief Constructs a cached_file.
@@ -464,8 +466,10 @@ public:
inline
void cached_file::cached_page::on_evicted() noexcept {
parent->on_evicted(*this);
cached_file::cache_type::iterator it(this);
it.erase(page_idx_less_comparator());
with_allocator(standard_allocator(), [this] {
cached_file::cache_type::iterator it(this);
it.erase(page_idx_less_comparator());
});
}
class cached_file_impl : public file_impl {

View File

@@ -376,7 +376,9 @@ chunked_vector<T, max_contiguous_allocation>::make_room(size_t n, bool stop_afte
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
// FIXME: realloc? maybe not worth the complication; only works for PODs
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
if (_size > _capacity - last_chunk_capacity) {
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk.get());
}
_chunks.back() = std::move(new_last_chunk);
_capacity += capacity_increase;
}

View File

@@ -108,81 +108,82 @@ template<typename Key,
typename Alloc = std::pmr::polymorphic_allocator<>>
class loading_cache {
using loading_cache_clock_type = seastar::lowres_clock;
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
using loading_cache_clock_type = seastar::lowres_clock;
using safe_link_list_hook = bi::list_base_hook<bi::link_mode<bi::safe_link>>;
class timestamped_val {
public:
using value_type = Tp;
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
class lru_entry;
class value_ptr;
class timestamped_val {
public:
using value_type = Tp;
using loading_values_type = typename utils::loading_shared_values<Key, timestamped_val, Hash, EqualPred, LoadingSharedValuesStats, 256>;
class lru_entry;
class value_ptr;
private:
value_type _value;
loading_cache_clock_type::time_point _loaded;
loading_cache_clock_type::time_point _last_read;
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
size_t _size = 0;
private:
value_type _value;
loading_cache_clock_type::time_point _loaded;
loading_cache_clock_type::time_point _last_read;
lru_entry* _lru_entry_ptr = nullptr; /// MRU item is at the front, LRU - at the back
size_t _size = 0;
public:
timestamped_val(value_type val)
: _value(std::move(val))
, _loaded(loading_cache_clock_type::now())
, _last_read(_loaded)
, _size(EntrySize()(_value))
{}
timestamped_val(timestamped_val&&) = default;
public:
timestamped_val(value_type val)
: _value(std::move(val))
, _loaded(loading_cache_clock_type::now())
, _last_read(_loaded)
, _size(EntrySize()(_value))
{}
timestamped_val(timestamped_val&&) = default;
timestamped_val& operator=(value_type new_val) {
assert(_lru_entry_ptr);
timestamped_val& operator=(value_type new_val) {
assert(_lru_entry_ptr);
_value = std::move(new_val);
_loaded = loading_cache_clock_type::now();
_lru_entry_ptr->cache_size() -= _size;
_size = EntrySize()(_value);
_lru_entry_ptr->cache_size() += _size;
return *this;
}
_value = std::move(new_val);
_loaded = loading_cache_clock_type::now();
_lru_entry_ptr->owning_section_size() -= _size;
_size = EntrySize()(_value);
_lru_entry_ptr->owning_section_size() += _size;
return *this;
}
value_type& value() noexcept { return _value; }
const value_type& value() const noexcept { return _value; }
value_type& value() noexcept { return _value; }
const value_type& value() const noexcept { return _value; }
static const timestamped_val& container_of(const value_type& value) {
return *bi::get_parent_from_member(&value, &timestamped_val::_value);
}
static const timestamped_val& container_of(const value_type& value) {
return *bi::get_parent_from_member(&value, &timestamped_val::_value);
}
loading_cache_clock_type::time_point last_read() const noexcept {
return _last_read;
}
loading_cache_clock_type::time_point last_read() const noexcept {
return _last_read;
}
loading_cache_clock_type::time_point loaded() const noexcept {
return _loaded;
}
loading_cache_clock_type::time_point loaded() const noexcept {
return _loaded;
}
size_t size() const {
return _size;
}
size_t size() const {
return _size;
}
bool ready() const noexcept {
return _lru_entry_ptr;
}
bool ready() const noexcept {
return _lru_entry_ptr;
}
lru_entry* lru_entry_ptr() const noexcept {
return _lru_entry_ptr;
}
lru_entry* lru_entry_ptr() const noexcept {
return _lru_entry_ptr;
}
private:
void touch() noexcept {
assert(_lru_entry_ptr);
_last_read = loading_cache_clock_type::now();
_lru_entry_ptr->touch();
}
private:
void touch() noexcept {
_last_read = loading_cache_clock_type::now();
if (_lru_entry_ptr) {
_lru_entry_ptr->touch();
}
}
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
_lru_entry_ptr = lru_entry_ptr;
}
};
void set_anchor_back_reference(lru_entry* lru_entry_ptr) noexcept {
_lru_entry_ptr = lru_entry_ptr;
}
};
private:
using loading_values_type = typename timestamped_val::loading_values_type;
@@ -265,7 +266,7 @@ public:
});
}).then([this, k] (timestamped_val_ptr ts_val_ptr) {
// check again since it could have already been inserted and initialized
if (!ts_val_ptr->ready()) {
if (!ts_val_ptr->ready() && !ts_val_ptr.orphaned()) {
_logger.trace("{}: storing the value for the first time", k);
if (ts_val_ptr->size() > _max_size) {
@@ -331,6 +332,11 @@ public:
return set_find(k);
}
// Removes all values matching a given predicate and values which are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// The predicate may be invoked multiple times on the same value.
// It must return the same result for a given value (it must be a pure function).
template <typename Pred>
void remove_if(Pred&& pred) {
static_assert(std::is_same<bool, std::result_of_t<Pred(const value_type&)>>::value, "Bad Pred signature");
@@ -344,15 +350,29 @@ public:
_unprivileged_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
_lru_list.remove_and_dispose_if(cond_pred, value_destroyer);
_loading_values.remove_if([&pred] (const timestamped_val& v) {
return pred(v.value());
});
}
// Removes a given key from the cache.
// The key is removed immediately.
// After this, get_ptr() is guaranteed to reload the value before returning it.
// As a consequence of the above, if there is a concurrent get_ptr() in progress with this,
// its value will not populate the cache. It will still succeed.
void remove(const Key& k) {
remove_ts_value(set_find(k));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(k);
}
// Removes a given key from the cache.
// Same guarantees as with remove(key).
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) noexcept {
remove_ts_value(set_find(key, std::move(key_hasher_func), std::move(key_equal_func)));
remove_ts_value(set_find(key, key_hasher_func, key_equal_func));
// set_find() returns nullptr for a key which is currently loading, which we want to remove too.
_loading_values.remove(key, key_hasher_func, key_equal_func);
}
size_t size() const {
@@ -361,9 +381,18 @@ public:
/// \brief returns the memory size the currently cached entries occupy according to the EntrySize predicate.
size_t memory_footprint() const {
return _current_size;
return _unprivileged_section_size + _privileged_section_size;
}
/// \brief returns the memory size the currently cached entries occupy in the privileged section according to the EntrySize predicate.
size_t privileged_section_memory_footprint() const noexcept {
return _privileged_section_size;
}
/// \brief returns the memory size the currently cached entries occupy in the unprivileged section according to the EntrySize predicate.
size_t unprivileged_section_memory_footprint() const noexcept {
return _unprivileged_section_size;
}
private:
void remove_ts_value(timestamped_val_ptr ts_ptr) {
if (!ts_ptr) {
@@ -419,16 +448,22 @@ private:
}
if (lru_entry.touch_count() < SectionHitThreshold) {
_logger.trace("Putting key {} into the unpriviledged section", lru_entry.key());
_logger.trace("Putting key {} into the unprivileged section", lru_entry.key());
_unprivileged_lru_list.push_front(lru_entry);
lru_entry.inc_touch_count();
} else {
_logger.trace("Putting key {} into the priviledged section", lru_entry.key());
_logger.trace("Putting key {} into the privileged section", lru_entry.key());
_lru_list.push_front(lru_entry);
// Bump it up only once to avoid a wrap around
if (lru_entry.touch_count() == SectionHitThreshold) {
// This code will run only once, when a promotion
// from unprivileged to privileged section happens.
// Update section size bookkeeping.
lru_entry.owning_section_size() -= lru_entry.timestamped_value().size();
lru_entry.inc_touch_count();
lru_entry.owning_section_size() += lru_entry.timestamped_value().size();
}
}
}
@@ -495,17 +530,44 @@ private:
void shrink() {
using namespace std::chrono;
while (_current_size >= _max_size && !_unprivileged_lru_list.empty()) {
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the unpriviledged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
}
while (_current_size >= _max_size) {
auto drop_privileged_entry = [&] {
ts_value_lru_entry& lru_entry = *_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
};
auto drop_unprivileged_entry = [&] {
ts_value_lru_entry& lru_entry = *_unprivileged_lru_list.rbegin();
_logger.trace("shrink(): {}: dropping the unprivileged entry: ms since last_read {}", lru_entry.key(), duration_cast<milliseconds>(loading_cache_clock_type::now() - lru_entry.timestamped_value().last_read()).count());
loading_cache::destroy_ts_value(&lru_entry);
LoadingCacheStats::inc_unprivileged_on_cache_size_eviction();
};
// When cache entries need to be evicted due to a size restriction,
// unprivileged section entries are evicted first.
//
// However, we make sure that the unprivileged section does not get
// too small, because this could lead to starving the unprivileged section.
// For example if the cache could store at most 50 entries and there are 49 entries in
// privileged section, after adding 5 entries (that would go to unprivileged
// section) 4 of them would get evicted and only the 5th one would stay.
// This caused problems with BATCH statements where all prepared statements
// in the batch have to stay in cache at the same time for the batch to correctly
// execute.
auto minimum_unprivileged_section_size = _max_size / 2;
while (memory_footprint() >= _max_size && _unprivileged_section_size > minimum_unprivileged_section_size) {
drop_unprivileged_entry();
}
while (memory_footprint() >= _max_size && !_lru_list.empty()) {
drop_privileged_entry();
}
// If dropping entries from privileged section did not help,
// we have to drop entries from unprivileged section,
// going below minimum_unprivileged_section_size.
while (memory_footprint() >= _max_size) {
drop_unprivileged_entry();
}
}
@@ -558,7 +620,8 @@ private:
loading_values_type _loading_values;
lru_list_type _lru_list; // list containing "privileged" section entries
lru_list_type _unprivileged_lru_list; // list containing "unprivileged" section entries
size_t _current_size = 0;
size_t _privileged_section_size = 0;
size_t _unprivileged_section_size = 0;
size_t _max_size = 0;
loading_cache_clock_type::duration _expiry;
loading_cache_clock_type::duration _refresh;
@@ -624,7 +687,7 @@ public:
static_assert(SectionHitThreshold <= std::numeric_limits<typeof(_touch_count)>::max() / 2, "SectionHitThreshold value is too big");
_ts_val_ptr->set_anchor_back_reference(this);
cache_size() += _ts_val_ptr->size();
owning_section_size() += _ts_val_ptr->size();
}
void inc_touch_count() noexcept {
@@ -640,12 +703,12 @@ public:
lru_list_type& lru_list = _parent.container_list(*this);
lru_list.erase(lru_list.iterator_to(*this));
}
cache_size() -= _ts_val_ptr->size();
owning_section_size() -= _ts_val_ptr->size();
_ts_val_ptr->set_anchor_back_reference(nullptr);
}
size_t& cache_size() noexcept {
return _parent._current_size;
size_t& owning_section_size() noexcept {
return _touch_count <= SectionHitThreshold ? _parent._unprivileged_section_size : _parent._privileged_section_size;
}
void touch() noexcept {

View File

@@ -83,6 +83,10 @@ private:
_val.emplace(std::move(new_val));
}
bool orphaned() const {
return !is_linked();
}
shared_promise<>& loaded() {
return _loaded;
}
@@ -95,7 +99,9 @@ private:
: _parent(parent), _key(std::move(k)) {}
~entry() {
_parent._set.erase(_parent._set.iterator_to(*this));
if (is_linked()) {
_parent._set.erase(_parent._set.iterator_to(*this));
}
Stats::inc_evictions();
}
@@ -153,6 +159,18 @@ public:
return res;
}
// Returns the key this entry is associated with.
// Valid if bool(*this).
const key_type& key() const {
return _e->key();
}
// Returns true iff the entry is not linked in the set.
// Call only when bool(*this).
bool orphaned() const {
return _e->orphaned();
}
friend class loading_shared_values;
friend std::ostream& operator<<(std::ostream& os, const entry_ptr& ep) {
return os << ep._e.get();
@@ -265,6 +283,50 @@ public:
return entry_ptr(it->shared_from_this());
};
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType, typename KeyHasher, typename KeyEqual>
void remove(const KeyType& key, KeyHasher key_hasher_func, KeyEqual key_equal_func) {
set_iterator it = _set.find(key, std::move(key_hasher_func), key_eq<KeyType, KeyEqual>());
if (it != _set.end()) {
_set.erase(it);
}
}
// Removes a given key from this container.
// If a given key is currently loading, the loading will succeed and will return entry_ptr
// to the caller, but the value will not be present in the container. It will be removed
// when the last entry_ptr dies, as usual.
//
// Post-condition: !find(key)
template<typename KeyType>
void remove(const KeyType& key) {
remove(key, Hash(), EqualPred());
}
// Removes all values which match a given predicate or are currently loading.
// Guarantees that no values which match the predicate and whose loading was initiated
// before this call will be present after this call (or appear at any time later).
// Same effects as if remove(e.key()) was called on each matching entry.
template<typename Pred>
requires std::is_invocable_r_v<bool, Pred, const Tp&>
void remove_if(const Pred& pred) {
auto it = _set.begin();
while (it != _set.end()) {
if (!it->ready() || pred(it->value())) {
auto next = std::next(it);
_set.erase(it);
it = next;
} else {
++it;
}
}
}
// keep the default non-templated overloads to ease on the compiler for specifications
// that do not require the templated find().
entry_ptr find(const key_type& key) noexcept {

View File

@@ -584,6 +584,10 @@ static constexpr auto max_used_space_ratio_for_compaction = 0.85;
static constexpr size_t max_used_space_for_compaction = segment_size * max_used_space_ratio_for_compaction;
static constexpr size_t min_free_space_for_compaction = segment_size - max_used_space_for_compaction;
struct [[gnu::packed]] non_lsa_object_cookie {
uint64_t value = 0xbadcaffe;
};
static_assert(min_free_space_for_compaction >= max_managed_object_size,
"Segments which cannot fit max_managed_object_size must not be considered compactible for the sake of forward progress of compaction");
@@ -827,9 +831,13 @@ public:
void clear_allocation_failure_flag() { _allocation_failure_flag = false; }
bool allocation_failure_flag() { return _allocation_failure_flag; }
void refill_emergency_reserve();
void update_non_lsa_memory_in_use(ssize_t n) {
void add_non_lsa_memory_in_use(size_t n) {
_non_lsa_memory_in_use += n;
}
void subtract_non_lsa_memory_in_use(size_t n) {
assert(_non_lsa_memory_in_use >= n);
_non_lsa_memory_in_use -= n;
}
size_t non_lsa_memory_in_use() const {
return _non_lsa_memory_in_use;
}
@@ -1630,17 +1638,18 @@ public:
memory::on_alloc_point();
shard_segment_pool.on_memory_allocation(size);
if (size > max_managed_object_size) {
auto ptr = standard_allocator().alloc(migrator, size, alignment);
auto ptr = standard_allocator().alloc(migrator, size + sizeof(non_lsa_object_cookie), alignment);
// This isn't very acurrate, the correct free_space value would be
// malloc_usable_size(ptr) - size, but there is no way to get
// the exact object size at free.
auto allocated_size = malloc_usable_size(ptr);
new ((char*)ptr + allocated_size - sizeof(non_lsa_object_cookie)) non_lsa_object_cookie();
_non_lsa_occupancy += occupancy_stats(0, allocated_size);
if (_group) {
_evictable_space += allocated_size;
_group->increase_usage(_heap_handle, allocated_size);
}
shard_segment_pool.update_non_lsa_memory_in_use(allocated_size);
shard_segment_pool.add_non_lsa_memory_in_use(allocated_size);
return ptr;
} else {
auto ptr = alloc_small(object_descriptor(migrator), (segment::size_type) size, alignment);
@@ -1652,12 +1661,14 @@ public:
private:
void on_non_lsa_free(void* obj) noexcept {
auto allocated_size = malloc_usable_size(obj);
auto cookie = (non_lsa_object_cookie*)((char*)obj + allocated_size) - 1;
assert(cookie->value == non_lsa_object_cookie().value);
_non_lsa_occupancy -= occupancy_stats(0, allocated_size);
if (_group) {
_evictable_space -= allocated_size;
_group->decrease_usage(_heap_handle, allocated_size);
}
shard_segment_pool.update_non_lsa_memory_in_use(-allocated_size);
shard_segment_pool.subtract_non_lsa_memory_in_use(allocated_size);
}
public:
virtual void free(void* obj) noexcept override {

View File

@@ -60,6 +60,9 @@ private:
throw std::out_of_range("chunked_managed_vector out of range access");
}
}
chunk_ptr& back_chunk() {
return _chunks[_size / max_chunk_capacity()];
}
static void migrate(T* begin, T* end, managed_vector<T>& result);
public:
using value_type = T;
@@ -106,24 +109,24 @@ public:
void push_back(const T& x) {
reserve_for_push_back();
_chunks.back().emplace_back(x);
back_chunk().emplace_back(x);
++_size;
}
void push_back(T&& x) {
reserve_for_push_back();
_chunks.back().emplace_back(std::move(x));
back_chunk().emplace_back(std::move(x));
++_size;
}
template <typename... Args>
T& emplace_back(Args&&... args) {
reserve_for_push_back();
auto& ret = _chunks.back().emplace_back(std::forward<Args>(args)...);
auto& ret = back_chunk().emplace_back(std::forward<Args>(args)...);
++_size;
return ret;
}
void pop_back() {
--_size;
_chunks.back().pop_back();
back_chunk().pop_back();
}
const T& back() const {
return *addr(_size - 1);
@@ -381,7 +384,9 @@ chunked_managed_vector<T>::make_room(size_t n, bool stop_after_one) {
auto new_last_chunk_capacity = last_chunk_capacity + capacity_increase;
// FIXME: realloc? maybe not worth the complication; only works for PODs
auto new_last_chunk = new_chunk(new_last_chunk_capacity);
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
if (_size > _capacity - last_chunk_capacity) {
migrate(addr(_capacity - last_chunk_capacity), addr(_size), new_last_chunk);
}
_chunks.back() = std::move(new_last_chunk);
_capacity += capacity_increase;
}