Compare commits

...

43 Commits

Author SHA1 Message Date
Jenkins Promoter
93700ff5d1 Update ScyllaDB version to: 6.2.0-rc2 2024-10-02 14:58:37 +03:00
Anna Stuchlik
5e2b4a0e80 doc: add metric updates from 6.1 to 6.2
This commit specifies metrics that are new in version 6.2 compared to 6.1,
as specified in https://github.com/scylladb/scylladb/issues/20176.

Fixes https://github.com/scylladb/scylladb/issues/20176

(cherry picked from commit a97db03448)

Closes scylladb/scylladb#20930
2024-10-02 12:07:06 +03:00
Calle Wilund
bb5dc0771c commitlog: Fix buffer_list_bytes not updated correctly
Fixes #20862

With the change in 60af2f3cb2 the bookkeep
for buffer memory was changed subtly, the problem here that we would
shrink buffer size before we after flush use said buffer's size to
decrement the buffer_list_bytes value, previously inc:ed by the full,
allocated size. I.e. we would slowly grow this value instead of adjusting
properly to actual used bytes.

Test included.

(cherry picked from commit ee5e71172f)

Closes scylladb/scylladb#20902
2024-10-01 17:41:02 +03:00
Aleksandra Martyniuk
9ed8519362 node_ops: fix task_manager_module::get_nodes()
Currently, node ops virtual task gathers its children from all nodes contained
in a sum of service::topology::normal_nodes and service::topology::transition_nodes.
The maps may contain nodes that are down but weren't removed yet. So, if a user
requests the status of a node ops virtual task, the task's attempt to retrieve
its children list may fail with seastar::rpc::closed_error.

Filter out the tasks that are down in node_ops::task_manager_module::get_nodes.

Fixes: #20843.
(cherry picked from commit a558abeba3)

Closes scylladb/scylladb#20898
2024-10-01 14:52:11 +03:00
Avi Kivity
077d7c06a0 Merge '[Backport 6.2] sstables: Fix use-after-free on page cache buffer when parsing promoted index entries across pages' from ScyllaDB
This fixes a use-after-free bug when parsing clustering key across
pages.

Also includes a fix for allocating section retry, which is potentially not safe (not in practice yet).

Details of the first problem:

Clustering key index lookup is based on the index file page cache. We
do a binary search within the index, which involves parsing index
blocks touched by the algorithm. Index file pages are 4 KB chunks
which are stored in LSA.

To parse the first key of the block, we reuse clustering_parser, which
is also used when parsing the data file. The parser is stateful and
accepts consecutive chunks as temporary_buffers. The parser is
supposed to keep its state across chunks.

In 93482439, the promoted index cursor was optimized to avoid
fully page copy when parsing index blocks. Instead, parser is
given a temporary_buffer which is a view on the page.

A bit earlier, in b1b5bda, the parser was changed to keep shared
fragments of the buffer passed to the parser in its internal state (across pages)
rather than copy the fragments into a new buffer. This is problematic
when buffers come from page cache because LSA buffers may be moved
around or evicted. So the temporary_buffer which is a view on the LSA
buffer is valid only around the duration of a single consume() call to
the parser.

If the blob which is parsed (e.g. variable-length clustering key
component) spans pages, the fragments stored in the parser may be
invalidated before the component is fully parsed. As a result, the
parsed clustering key may have incorrect component values. This never
causes parsing errors because the "length" field is always parsed from
the current buffer, which is valid, and component parsing will end at
the right place in the next (valid) buffer.

The problematic path for clustering_key parsing is the one which calls
primitive_consumer::read_bytes(), which is called for example for text
components. Fixed-size components are not parsed like this, they store
the intermediate state by copying data.

This may cause incorrect clustering keys to be parsed when doing
binary search in the index, diverting the search to an incorrect
block.

Details of the solution:

We adapt page_view to a temporary_buffer-like API. For this, a new concept
is introduced called ContiguousSharedBuffer. We also change parsers so that
they can be templated on the type of the buffer they work with (page_view vs
temporary_buffer). This way we don't introduce indirection to existing algorithms.

We use page_view instead of temporary_buffer in the promoted
index parser which works with page cache buffers. page_view can be safely
shared via share() and stored across allocating sections. It keeps hold to the
LSA buffer even across allocating sections by the means of cached_file::page_ptr.

Fixes #20766

(cherry picked from commit 8aca93b3ec)

(cherry picked from commit ac823b1050)

(cherry picked from commit 93bfaf4282)

(cherry picked from commit c0fa49bab5)

(cherry picked from commit 29498a97ae)

(cherry picked from commit c15145b71d)

(cherry picked from commit 7670ee701a)

(cherry picked from commit c09fa0cb98)

(cherry picked from commit 0279ac5faa)

(cherry picked from commit 8e54ecd38e)

(cherry picked from commit b5ae7da9d2)

Refs #20837

Closes scylladb/scylladb#20905

* github.com:scylladb/scylladb:
  sstables: bsearch_clustered_cursor: Add trace-level logging
  sstables: bsearch_clustered_cursor: Move definitions out of line
  test, sstables: Verify parsing stability when allocating section is retried
  test, sstables: Verify parsing stability when buffers cross page boundary
  sstables: bsearch_clustered_cursor: Switch parsers to work with page_view
  cached_file: Adapt page_view to ContiguousSharedBuffer
  cached_file: Change meaning of page_view::_size to be relative to _offset rather than page start
  sstables, utils: Allow parsers to work with different buffer types
  sstables: promoted_index_block_parser: Make reset() always bring parser to initial state
  sstables: bsearch_clustered_cursor: Switch read_block_offset() to use the read() method
  sstables: bsearch_clustered_cursor: Fix parsing when allocating section is retried
2024-10-01 14:51:29 +03:00
Tomasz Grabiec
5a1575678b sstables: bsearch_clustered_cursor: Add trace-level logging
(cherry picked from commit b5ae7da9d2)
2024-10-01 01:38:48 +00:00
Tomasz Grabiec
2401f7f9ca sstables: bsearch_clustered_cursor: Move definitions out of line
In order to later use the formatter for the inner class
promoted_index_block, which is defined out of line after
cached_promoted_index class definition.

(cherry picked from commit 8e54ecd38e)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
906d085289 test, sstables: Verify parsing stability when allocating section is retried
(cherry picked from commit 0279ac5faa)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
34dd3a6daa test, sstables: Verify parsing stability when buffers cross page boundary
(cherry picked from commit c09fa0cb98)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
3afa8ee2ca sstables: bsearch_clustered_cursor: Switch parsers to work with page_view
This fixes a use-after-free bug when parsing clustering key across
pages.

Clustering key index lookup is based on the index file page cache. We
do a binary search within the index, which involves parsing index
blocks touched by the algorithm. Index file pages are 4 KB chunks
which are stored in LSA.

To parse the first key of the block, we reuse clustering_parser, which
is also used when parsing the data file. The parser is stateful and
accepts consecutive chunks as temporary_buffers. The parser is
supposed to keep its state across chunks.

In b1b5bda, the parser was changed to keep shared fragments of the
buffer passed to the parser in its internal state (across pages)
rather than copy the fragments into a new buffer. This is problematic
when buffers come from page cache because LSA buffers may be moved
around or evicted. So the temporary_buffer which is a view on the LSA
buffer is valid only around the duration of a single consume() call to
the parser.

If the blob which is parsed (e.g. variable-length clustering key
component) spans pages, the fragments stored in the parser may be
invalidated before the component is fully parsed. As a result, the
parsed clustering key may have incorrect component values. This never
causes parsing errors because the "length" field is always parsed from
the current buffer, which is valid, and component parsing will end at
the right place in the next (valid) buffer.

The problematic path for clustering_key parsing is the one which calls
primitive_consumer::read_bytes(), which is called for example for text
components. Fixed-size components are not parsed like this, they store
the intermediate state by copying data.

This may cause incorrect clustering keys to be parsed when doing
binary search in the index, diverting the search to an incorrect
block.

The solution is to use page_view instead of temporary_buffer, which
can be safely shared via share() and stored across allocating
section. The page_view maintains its hold to the LSA buffer even
across allocating sections.

Fixes #20766

(cherry picked from commit 7670ee701a)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
3347152ff9 cached_file: Adapt page_view to ContiguousSharedBuffer
(cherry picked from commit c15145b71d)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
ff7bd937e2 cached_file: Change meaning of page_view::_size to be relative to _offset rather than page start
Will be easier to implement ContiguousSharedBuffer API as the buffer
size will be equal to _size.

(cherry picked from commit 29498a97ae)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
50ea1dbe32 sstables, utils: Allow parsers to work with different buffer types
Currently, parsers work with temporary_buffer<char>. This is unsafe
when invoked by bsearch_clustered_cursor, which reuses some of the
parsers, and passes temporary_buffer<char> which is a view onto LSA
buffer which comes from the index file page cache. This view is stable
only around consume(). If parsing requires more than one page, it will
continue with a different input buffer. The old buffer will be
invalid, and it's unsafe for the parser to store and access
it. Unfortunetly, the temporary_buffer API allows sharing the buffer
via the share() method, which shares the underlying memory area. This
is not correct when the underlying is managed by LSA, because storage
may move. Parser uses this sharing when parsing blobs, e.g. clustering
key components. When parsing resumes in the next page, parser will try
to access the stored shared buffers pointing to the previous page,
which may result in use-after-free on the memory area.

In prearation for fixing the problem, parametrize parsers to work with
different kinds of buffers. This will allow us to instantiate them
with a buffer kind which supports sharing of LSA buffers properly in a
safe way.

It's not purely mechanical work. Some parts of the parsing state
machine still works with temporary_buffer<char>, and allocate buffers
internally, when reading into linearized destination buffer. They used
to store this destination in _read_bytes vector, same field which is
used to store the shared buffers. Now it's not possible, since shared
buffer type may be different than temporary_buffer<char>. So those
paths were changed to use a new field: _read_bytes_buf.

(cherry picked from commit c0fa49bab5)
2024-10-01 01:38:47 +00:00
Tomasz Grabiec
45125c4d7d sstables: promoted_index_block_parser: Make reset() always bring parser to initial state
When reset() is done due to allocating section retry, it can be
theoretically in an arbitrary point. So we should not assume that it
finished parsing and state was reset by previous parsing. We should
reset all the fields.

(cherry picked from commit 93bfaf4282)
2024-10-01 01:38:46 +00:00
Tomasz Grabiec
9207f7823d sstables: bsearch_clustered_cursor: Switch read_block_offset() to use the read() method
To unify logic which handles allocating section retry, and thus
improve safety.

(cherry picked from commit ac823b1050)
2024-10-01 01:38:46 +00:00
Tomasz Grabiec
711864687f sstables: bsearch_clustered_cursor: Fix parsing when allocating section is retried
Parser's state was not reset when allocating section was retried.

This doesn't cause problems in practice, because reserves are enough
to cover allocation demands of parsing clustering keys, which are at
most 64K in size. But it's still potentially unsafe and needs fixing.

(cherry picked from commit 8aca93b3ec)
2024-10-01 01:38:45 +00:00
Kamil Braun
faf11e5bc3 Merge '[Backport 6.2] Populate raft address map from gossiper on raft configuration change' from ScyllaDB
For each new node added to the raft config populate it's ID to IP mapping in raft address map from the gossiper. The mapping may have expired if a node is added to the raft configuration long after it first appears in the gossiper.

Fixes scylladb/scylladb#20600

Backport to all supported versions since the bug may cause bootstrapping failure.

(cherry picked from commit bddaf498df)

(cherry picked from commit 9e4cd32096)

Refs #20601

Closes scylladb/scylladb#20847

* github.com:scylladb/scylladb:
  test: extend existing test to check that a joining node can map addresses of all pre-existing nodes during join
  group0: make sure that address map has an entry for each new node in the raft configuration
2024-09-30 17:01:52 +02:00
Gleb Natapov
f9215b4d7e test: extend existing test to check that a joining node can map addresses of all pre-existing nodes during join
(cherry picked from commit 9e4cd32096)
2024-09-26 21:13:34 +00:00
Gleb Natapov
469ac9976a group0: make sure that address map has an entry for each new node in the raft configuration
ID->IP mapping is added to the raft address map when the mapping first
appears in the gossiper, but it is added as expiring entry. It becomes
non expiring when a node is added to raft configuration. But when a node
joins those two events may be distant in time (since the node's request
may sit in the topology coordinator queue for a while) and mappings may
expire already from the map. This patch makes sure to transfer the
mapping from the gossiper for a node that is added to the raft
configuration instead of assuming that the mapping is already there.

(cherry picked from commit bddaf498df)
2024-09-26 21:13:33 +00:00
Botond Dénes
d341f1ef1e Merge '[Backport 6.2] mark node as being replaced earlier' from ScyllaDB
Before 17f4a151ce the node was marked as
been replaced in join_group0 state, before it actually joins the group0,
so by the time it actually joins and starts transferring snapshot/log no
traffic is sent to it. The commit changed this to mark the node as
being replaced after the snapshot/log is already transferred so we can
get the traffic to the node while it sill did not caught up with a
leader and this may causes problems since the state is not complete.
Mark the node as being replaced earlier, but still add the new node to
the topology later as the commit above intended.

Fixes: https://github.com/scylladb/scylladb/issues/20629

Need to be backported since this is a regression

(cherry picked from commit 644e7a2012)

(cherry picked from commit c0939d86f9)

(cherry picked from commit 1b4c255ffd)

Refs #20743

Closes scylladb/scylladb#20829

* github.com:scylladb/scylladb:
  test: amend test_replace_reuse_ip test to check that there is no stale writes after snapshot transfer starts
  topology coordinator:: mark node as being replaced earlier
  topology coordinator: do metadata barrier before calling finish_accepting_node() during replace
2024-09-26 10:37:25 +03:00
Kamil Braun
07dfcd1f64 service: raft: fix rpc error message
What it called "leader" is actually the destination of the RPC.

Trivial fix, should be backported to all affected versions.

(cherry picked from commit 09c68c0731)

Closes scylladb/scylladb#20826
2024-09-26 10:33:50 +03:00
Anna Stuchlik
f8d63b5572 doc: add OS support for version 6.2
This commit adds the OS support for version 6.2.
In addition, it removes support for 6.0, as the policy is only to include
information for the supported versions, i.e., the two latest versions.

Fixes https://github.com/scylladb/scylladb/issues/20804

(cherry picked from commit 8145109120)

Closes scylladb/scylladb#20825
2024-09-26 10:29:08 +03:00
Anna Stuchlik
ca83da91d1 doc: add an intro to the Features page
This commit modifies the Features page in the following way:

- It adds a short introduction and descriptions to each listed feature.
- It hides the ToC (required to control and modify the information on the page,
  e.g., to add descriptions, have full control over what is displayed, etc.)
- Removes the info about Enterprise features (following the request not to include
  Enterprise info in the OSS docs)

Fixes https://github.com/scylladb/scylladb/issues/20617
Blocks https://github.com/scylladb/scylla-enterprise/pull/4711

(cherry picked from commit da8047a834)

Closes scylladb/scylladb#20811
2024-09-26 10:22:36 +03:00
Botond Dénes
f55081fb1a Merge '[Backport 6.2] Rename Alternator batch item count metrics' from ScyllaDB
This PR addresses multiple issues with alternator batch metrics:

1. Rename the metrics to scylla_alternator_batch_item_count with op=BatchGetItem/BatchWriteItem
2. The batch size calculation was wrong and didn't count all items in the batch.
3. Add a test to validate that the metrics values increase by the correct value (not just increase). This also requires an addition to the testing to validate ops of different metrics and an exact value change.

Needs backporting to allow the monitoring to use the correct metrics names.

Fixes #20571

(cherry picked from commit 515857a4a9)

(cherry picked from commit 905408f764)

(cherry picked from commit 4d57a43815)

(cherry picked from commit 8dec292698)

Refs #20646

Closes scylladb/scylladb#20758

* github.com:scylladb/scylladb:
  alternator:test_metrics test metrics for batch item count
  alternator:test_metrics Add validating the increased value
  alternator: Fix item counting in batch operations
  Alterntor rename batch item count metrics
2024-09-26 10:22:00 +03:00
Anna Stuchlik
aa8cdec5bd doc: fix a broken link
This commit fixes a link to the Manager by adding a missing underscore
to the external link.

(cherry picked from commit aa0c95c95c)

Closes scylladb/scylladb#20710
2024-09-26 10:18:59 +03:00
Anna Stuchlik
75a2484dba doc: update the unified installer instructions
This commit updates the unified installer instructions to avoid specifying a given version.
At the moment, we're technically unable to use variables in URLs, so we need to update
the page each release.

Fixes https://github.com/scylladb/scylladb/issues/20677

(cherry picked from commit 400a14eefa)

Closes scylladb/scylladb#20708
2024-09-26 10:04:35 +03:00
Gleb Natapov
37387135b4 test: amend test_replace_reuse_ip test to check that there is no stale writes after snapshot transfer starts
(cherry picked from commit 1b4c255ffd)
2024-09-26 03:45:50 +00:00
Gleb Natapov
ac24ab5141 topology coordinator:: mark node as being replaced earlier
Before 17f4a151ce the node was marked as
been replaced in join_group0 state, before it actually joins the group0,
so by the time it actually joins and starts transferring snapshot/log no
traffic is sent to it. The commit changed this to mark the node as
being replaced after the snapshot/log is already transferred so we can
get the traffic to the node while it sill did not caught up with a
leader and this may causes problems since the state is not complete.
Mark the node as being replaced earlier, but still add the new node to
the topology later as the commit above intended.

(cherry picked from commit c0939d86f9)
2024-09-26 03:45:50 +00:00
Gleb Natapov
729dc03e0c topology coordinator: do metadata barrier before calling finish_accepting_node() during replace
During replace with the same IP a node may get queries that were intended
for the node it was replacing since the new node declares itself UP
before it advertises that it is a replacement. But after the node
starts replacing procedure the old node is marked as "being replaced"
and queries no longer sent there. It is important to do so before the
new node start to get raft snapshot since the snapshot application is
not atomic and queries that run parallel with it may see partial state
and fail in weird ways. Queries that are sent before that will fail
because schema is empty, so they will not find any tables in the first
place. The is pre-existing and not addressed by this patch.

(cherry picked from commit 644e7a2012)
2024-09-26 03:45:50 +00:00
Kamil Braun
9d64ced982 test: fix topology_custom/test_raft_recovery_stuck flakiness
The test performs consecutive schema changes in RECOVERY mode. The
second change relies on the first. However the driver might route the
changes to different servers and we don't have group 0 to guarantee
linearizability. We must rely on the first change coordinator to push
the schema mutations to other servers before returning, but that only
happens when it sees other servers as alive when doing the schema
change. It wasn't guaranteed in the test. Fix this.

Fixes scylladb/scylladb#20791

Should be backported to all branches containing this test to reduce
flakiness.

(cherry picked from commit f390d4020a)

Closes scylladb/scylladb#20807
2024-09-25 15:11:10 +02:00
Abhinav
ea6349a6f5 raft topology: add error for removal of non-normal nodes
In the current scenario, We check if a node being removed is normal
on the node initiating the removenode request. However, we don't have a
similar check on the topology coordinator. The node being removed could be
normal when we initiate the request, but it doesn't have to be normal when
the topology coordinator starts handling the request.
For example, the topology coordinator could have removed this node while handling
another removenode request that was added to the request queue earlier.

This commit intends to fix this issue by adding more checks in the enqueuing phase
and return errors for duplicate requests for node removal.

This PR fixes a bug. Hence we need to backport it.

Fixes: scylladb/scylladb#20271
(cherry picked from commit b25b8dccbd)

Closes scylladb/scylladb#20799
2024-09-25 11:34:20 +02:00
Benny Halevy
ed9122a84e time_window_compaction_strategy: get_reshaping_job: restrict sort of multi_window vector to its size
Currently the function calls boost::partial_sort with a middle
iterator that might be out of bound and cause undefined behavior.

Check the vector size, and do a partial sort only if its longer
than `max_sstables`, otherwise sort the whole vector.

Fixes scylladb/scylladb#20608

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#20609

(cherry picked from commit 39ce358d82)

Refs: scylladb/scylladb#20609
2024-09-23 16:02:40 +03:00
Amnon Heiman
c7d6b4a194 alternator:test_metrics test metrics for batch item count
This patch adds tests for the batch operations item count.

The tests validate that the metrics tracking the number of items
processed in a batch increase by the correct amount.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 8dec292698)
2024-09-23 11:02:55 +00:00
Amnon Heiman
a35e138b22 alternator:test_metrics Add validating the increased value
The `check_increases_operation` now allows override the checked metric.

Additionally, a custom validation value can now be passed, which make it
possible to validate the amount by which a value has changed, rather
than just validating that the value increased.

The default behavior of validating that values have increased remains
unchanged, ensuring backward compatibility.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 4d57a43815)
2024-09-23 11:02:55 +00:00
Amnon Heiman
3db67faa8a alternator: Fix item counting in batch operations
This patch fixes the logic for counting items in batch operations.
Previously, the item count in requests was inaccurate, it count the
number of tabels in get_item and the request_items in write_items.

The new logic correctly counts each individual item in `BatchGetItem`
and `BatchWriteItem` requests.

Signed-off-by: Amnon Heiman <amnon@scylladb.com>
(cherry picked from commit 905408f764)
2024-09-23 11:02:55 +00:00
Amnon Heiman
6a12174e2d Alterntor rename batch item count metrics
This patch renames metrics tracking the total number of items in a batch
to `scylla_alternator_batch_item_count`.  It uses the existing `op` label to
differentiate between `BatchGetItem` and `BatchWriteItem` operations.

Ensures better clarity and distinction for batch operations in monitoring.

This an example of how it looks like:
 # HELP scylla_alternator_batch_item_count The total number of items processed across all batches
 # TYPE scylla_alternator_batch_item_count counter
 scylla_alternator_batch_item_count{op="BatchGetItem",shard="0"} 4
 scylla_alternator_batch_item_count{op="BatchWriteItem",shard="0"} 4

(cherry picked from commit 515857a4a9)
2024-09-23 11:02:55 +00:00
Piotr Dulikowski
ca0096ccb8 Merge '[Backport 6.2] message/messaging_service: guard adding maintenance tenant under cluster feature' from Michał Jadwiszczak
In https://github.com/scylladb/scylladb/pull/18729, we introduced a new statement tenant $maintenance, but the change wasn't protected by any cluster feature.
This wasn't a problem for OSS, since unknown isolation cookie just uses default scheduling group. However, in enterprise that leads to creating a service level on not-upgraded nodes, which may end up in an error if user create maximum number of service levels.

This patch adds a cluster feature to guard adding the new tenant. It's done in the way to handle two upgrade scenarios:

version without $maintenance tenant -> version with $maintenance tenant guarded by a feature
version with $maintenance tenant but not guarded by a feature -> version with $maintenance tenant guarded by a feature
The PR adds enabled flag to statement tenants.
This way, when the tenant is disabled, it cannot be used to create a connection, but it can be used to accept an incoming connection.
The $maintenance tenant is added to the config as disabled and it gets enabled once the corresponding feature is enabled.

Fixes https://github.com/scylladb/scylladb/issues/20070
Refs https://github.com/scylladb/scylla-enterprise/issues/4403

(cherry picked from commit d44844241d)

(cherry picked from commit 71a03ef6b0)

(cherry picked from commit b4b91ca364)

Refs https://github.com/scylladb/scylladb/pull/19802

Closes scylladb/scylladb#20690

* github.com:scylladb/scylladb:
  message/messaging_service: guard adding maintenance tenant under cluster feature
  message/messaging_service: add feature_service dependency
  message/messaging_service: add `enabled` flag to statement tenants
2024-09-23 09:48:12 +02:00
Jenkins Promoter
a71d4bc49c Update ScyllaDB version to: 6.2.0-rc1 2024-09-19 10:21:33 +03:00
Michał Jadwiszczak
749399e4b8 message/messaging_service: guard adding maintenance tenant under cluster feature
Set `enabled` flag for `$maintenance` tenant to false and
enable it when `MAINTENANCE_TENANT` feature is enabled.

(cherry-picked from b4b91ca364)
2024-09-18 19:10:24 +02:00
Michał Jadwiszczak
bdd97b2950 message/messaging_service: add feature_service dependency
(cherry-picked from 71a03ef6b0)
2024-09-18 19:09:46 +02:00
Michał Jadwiszczak
1a056f0cab message/messaging_service: add enabled flag to statement tenants
Adding a new tenant needs to be done under cluster feature protection.
However it wasn't the case for adding `$maintenance` statement tenant
and to fix it we need to support an upgrade from node which doesn't
know about maintenance tenant at all and from one which uses it without
any cluster feature protection.

This commit adds `enabled` flag to statement tenants.
This way, when the tenant is disabled, it cannot be used to create
a connection, but it can be used to accept an incoming connection.

(cherry-picked from d44844241d)
2024-09-18 19:09:06 +02:00
Tzach Livyatan
cf78a2caca Update client-node-encryption: OpsnSSL is FIPS *enabled*
Closes scylladb/scylladb#19705

(cherry picked from commit cb864b11d8)
2024-09-18 11:58:46 +03:00
Anna Mikhlin
cbc53f0e81 Update ScyllaDB version to: 6.2.0-rc0 2024-09-17 13:40:50 +03:00
41 changed files with 738 additions and 228 deletions

View File

@@ -78,7 +78,7 @@ fi
# Default scylla product/version tags
PRODUCT=scylla
VERSION=6.2.0-dev
VERSION=6.2.0-rc2
if test -f version
then

View File

@@ -2195,7 +2195,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
mutation_builders.reserve(request_items.MemberCount());
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
batch_size++;
schema_ptr schema = get_table_from_batch_request(_proxy, it);
tracing::add_table_name(trace_state, schema->ks_name(), schema->cf_name());
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
@@ -2216,6 +2215,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
co_return api_error::validation("Provided list of item keys contains duplicates");
}
used_keys.insert(std::move(mut_key));
batch_size++;
} else if (r_name == "DeleteRequest") {
const rjson::value& key = (r->value)["Key"];
mutation_builders.emplace_back(schema, put_or_delete_item(
@@ -2226,6 +2226,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
co_return api_error::validation("Provided list of item keys contains duplicates");
}
used_keys.insert(std::move(mut_key));
batch_size++;
} else {
co_return api_error::validation(fmt::format("Unknown BatchWriteItem request type: {}", r_name));
}
@@ -3483,7 +3484,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
}
};
std::vector<table_requests> requests;
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
tracing::add_table_name(trace_state, sstring(executor::KEYSPACE_NAME_PREFIX) + rs.schema->cf_name(), rs.schema->cf_name());
@@ -3497,6 +3498,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
rs.add(key);
check_key(key, rs.schema);
}
batch_size += rs.requests.size();
requests.emplace_back(std::move(rs));
}
@@ -3504,7 +3506,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
co_await verify_permission(client_state, tr.schema, auth::permission::SELECT);
}
_stats.api_operations.batch_get_item_batch_total += requests.size();
_stats.api_operations.batch_get_item_batch_total += batch_size;
// If we got here, all "requests" are valid, so let's start the
// requests for the different partitions all in parallel.
std::vector<future<std::vector<rjson::value>>> response_futures;

View File

@@ -29,8 +29,6 @@ stats::stats() : api_operations{} {
seastar::metrics::description("Latency summary of an operation via Alternator API"), [this]{return to_metrics_summary(api_operations.name.summary());})(op(CamelCaseName)).set_skip_when_empty(),
OPERATION(batch_get_item, "BatchGetItem")
OPERATION(batch_write_item, "BatchWriteItem")
OPERATION(batch_get_item_batch_total, "BatchGetItemSize")
OPERATION(batch_write_item_batch_total, "BatchWriteItemSize")
OPERATION(create_backup, "CreateBackup")
OPERATION(create_global_table, "CreateGlobalTable")
OPERATION(create_table, "CreateTable")
@@ -98,6 +96,10 @@ stats::stats() : api_operations{} {
seastar::metrics::description("number of rows read and matched during filtering operations")),
seastar::metrics::make_total_operations("filtered_rows_dropped_total", [this] { return cql_stats.filtered_rows_read_total - cql_stats.filtered_rows_matched_total; },
seastar::metrics::description("number of rows read and dropped during filtering operations")),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchWriteItem")},
api_operations.batch_write_item_batch_total).set_skip_when_empty(),
seastar::metrics::make_counter("batch_item_count", seastar::metrics::description("The total number of items processed across all batches"),{op("BatchGetItem")},
api_operations.batch_get_item_batch_total).set_skip_when_empty(),
});
}

View File

@@ -296,7 +296,8 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
// When trimming, let's keep sstables with overlapping time window, so as to reduce write amplification.
// For example, if there are N sstables spanning window W, where N <= 32, then we can produce all data for W
// in a single compaction round, removing the need to later compact W to reduce its number of files.
boost::partial_sort(multi_window, multi_window.begin() + max_sstables, [](const shared_sstable &a, const shared_sstable &b) {
auto sort_size = std::min(max_sstables, multi_window.size());
boost::partial_sort(multi_window, multi_window.begin() + sort_size, [](const shared_sstable &a, const shared_sstable &b) {
return a->get_stats_metadata().max_timestamp < b->get_stats_metadata().max_timestamp;
});
maybe_trim_job(multi_window, job_size, disjoint);

View File

@@ -1132,7 +1132,12 @@ public:
write(out, uint64_t(0));
}
buf.remove_suffix(buf.size_bytes() - size);
auto to_remove = buf.size_bytes() - size;
// #20862 - we decrement usage counter based on buf.size() below.
// Since we are shrinking buffer here, we need to also decrement
// counter already
buf.remove_suffix(to_remove);
_segment_manager->totals.buffer_list_bytes -= to_remove;
// Build sector checksums.
auto id = net::hton(_desc.id);
@@ -3826,6 +3831,10 @@ uint64_t db::commitlog::get_total_size() const {
;
}
uint64_t db::commitlog::get_buffer_size() const {
return _segment_manager->totals.buffer_list_bytes;
}
uint64_t db::commitlog::get_completed_tasks() const {
return _segment_manager->totals.allocation_count;
}

View File

@@ -306,6 +306,7 @@ public:
future<> delete_segments(std::vector<sstring>) const;
uint64_t get_total_size() const;
uint64_t get_buffer_size() const;
uint64_t get_completed_tasks() const;
uint64_t get_flush_count() const;
uint64_t get_pending_tasks() const;

View File

@@ -35,8 +35,6 @@
#include <span>
#include <unordered_map>
class fragmented_temporary_buffer;
namespace utils {
class directories;
} // namespace utils

View File

@@ -1,8 +1,11 @@
Features
========================
This document highlights ScyllaDB's key data modeling features.
.. toctree::
:maxdepth: 1
:hidden:
Lightweight Transactions </features/lwt/>
Global Secondary Indexes </features/secondary-indexes/>
@@ -12,6 +15,23 @@ Features
Change Data Capture </features/cdc/index>
Workload Attributes </features/workload-attributes>
`ScyllaDB Enterprise <https://enterprise.docs.scylladb.com/stable/overview.html#enterprise-only-features>`_
provides additional features, including Encryption at Rest,
workload prioritization, auditing, and more.
.. panel-box::
:title: ScyllaDB Features
:id: "getting-started"
:class: my-panel
* Secondary Indexes and Materialized Views provide efficient search mechanisms
on non-partition keys by creating an index.
* :doc:`Global Secondary Indexes </features/secondary-indexes/>`
* :doc:`Local Secondary Indexes </features/local-secondary-indexes/>`
* :doc:`Materialized Views </features/materialized-views/>`
* :doc:`Lightweight Transactions </features/lwt/>` provide conditional updates
through linearizability.
* :doc:`Counters </features/counters/>` are columns that only allow their values
to be incremented, decremented, read, or deleted.
* :doc:`Change Data Capture </features/cdc/index>` allows you to query the current
state and the history of all changes made to tables in the database.
* :doc:`Workload Attributes </features/workload-attributes>` assigned to your workloads
specify how ScyllaDB will handle requests depending on the workload.

View File

@@ -6,9 +6,9 @@ You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-
+----------------------------+------+------+------+-------+-------+-------+
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 11 | 8 | 9 |
+============================+======+======+======+=======+=======+=======+
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
| 6.2 | |v| | |v| | |v| | |v| | |v| | |v| |
+----------------------------+------+------+------+-------+-------+-------+
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
| 6.1 | |v| | |v| | |v| | |v| | |v| | |v| |
+----------------------------+------+------+------+-------+-------+-------+
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.

View File

@@ -1,8 +1,3 @@
.. |SCYLLADB_VERSION| replace:: 5.2
.. update the version folder URL below (variables won't work):
https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/
====================================================
Install ScyllaDB Without root Privileges
====================================================
@@ -24,14 +19,17 @@ Note that if you're on CentOS 7, only root offline installation is supported.
Download and Install
-----------------------
#. Download the latest tar.gz file for ScyllaDB |SCYLLADB_VERSION| (x86 or ARM) from https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-5.2/.
#. Download the latest tar.gz file for ScyllaDB version (x86 or ARM) from ``https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-<version>/``.
Example for version 6.1: https://downloads.scylladb.com/downloads/scylla/relocatable/scylladb-6.1/
#. Uncompress the downloaded package.
The following example shows the package for ScyllaDB 5.2.4 (x86):
The following example shows the package for ScyllaDB 6.1.1 (x86):
.. code:: console
tar xvfz scylla-unified-5.2.4-0.20230623.cebbf6c5df2b.x86_64.tar.gz
tar xvfz scylla-unified-6.1.1-0.20240814.8d90b817660a.x86_64.tar.gz
#. Install OpenJDK 8 or 11.

View File

@@ -41,7 +41,7 @@ With the recent addition of the `ScyllaDB Advisor <http://monitoring.docs.scylla
Install ScyllaDB Manager
------------------------
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>` together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
Install and use `ScyllaDB Manager <https://manager.docs.scylladb.com>`_ together with the `ScyllaDB Monitoring Stack <http://monitoring.docs.scylladb.com/>`_.
ScyllaDB Manager provides automated backups and repairs of your database.
ScyllaDB Manager can manage multiple ScyllaDB clusters and run cluster-wide tasks in a controlled and predictable way.
For example, with ScyllaDB Manager you can control the intensity of a repair, increasing it to speed up the process, or lower the intensity to ensure it minimizes impact on ongoing operations.

View File

@@ -3,7 +3,7 @@ Encryption: Data in Transit Client to Node
Follow the procedures below to enable a client to node encryption.
Once enabled, all communication between the client and the node is transmitted over TLS/SSL.
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 certified.
The libraries used by ScyllaDB for OpenSSL are FIPS 140-2 enabled.
Workflow
^^^^^^^^

View File

@@ -21,8 +21,8 @@ The following metrics are new in ScyllaDB |NEW_VERSION|:
* - Metric
- Description
* -
-
* - scylla_alternator_batch_item_count
- The total number of items processed across all batches

View File

@@ -143,6 +143,7 @@ public:
// whereas without it, it will fail the insert - i.e. for things like raft etc _all_ nodes should
// have it or none, otherwise we can get partial failures on writes.
gms::feature fragmented_commitlog_entries { *this, "FRAGMENTED_COMMITLOG_ENTRIES"sv };
gms::feature maintenance_tenant { *this, "MAINTENANCE_TENANT"sv };
// A feature just for use in tests. It must not be advertised unless
// the "features_enable_test_feature" injection is enabled.

View File

@@ -1389,7 +1389,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
scfg.statement_tenants = {
{dbcfg.statement_scheduling_group, "$user"},
{default_scheduling_group(), "$system"},
{dbcfg.streaming_scheduling_group, "$maintenance"}
{dbcfg.streaming_scheduling_group, "$maintenance", false}
};
scfg.streaming = dbcfg.streaming_scheduling_group;
scfg.gossip = dbcfg.gossip_scheduling_group;
@@ -1404,7 +1404,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
// Delay listening messaging_service until gossip message handlers are registered
messaging.start(mscfg, scfg, creds).get();
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
messaging.invoke_on_all(&netw::messaging_service::stop).get();
});

View File

@@ -119,6 +119,7 @@
#include "idl/mapreduce_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
#include "idl/join_node.dist.impl.hh"
#include "gms/feature_service.hh"
namespace netw {
@@ -232,9 +233,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
return _rpc->unregister_handler(verb);
}
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port)
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
: messaging_service(config{std::move(id), ip, ip, port},
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
{}
static
@@ -419,13 +420,14 @@ void messaging_service::do_start_listen() {
}
}
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
: _cfg(std::move(cfg))
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
, _scheduling_config(scfg)
, _scheduling_info_for_connection_index(initial_scheduling_info())
, _feature_service(feature_service)
{
_rpc->set_logger(&rpc_logger);
@@ -434,7 +436,8 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
// which in turn relies on _connection_index_for_tenant to be initialized.
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
auto& tenant_cfg = _scheduling_config.statement_tenants[i];
_connection_index_for_tenant.push_back({tenant_cfg.sched_group, i, tenant_cfg.enabled});
}
register_handler(this, messaging_verb::CLIENT_ID, [this] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size, rpc::optional<utils::UUID> host_id) {
@@ -457,6 +460,7 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
});
init_local_preferred_ip_cache(_cfg.preferred_ips);
init_feature_listeners();
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
@@ -679,16 +683,22 @@ messaging_service::get_rpc_client_idx(messaging_verb verb) const {
return idx;
}
// A statement or statement-ack verb
const auto curr_sched_group = current_scheduling_group();
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
// i == 0: the default tenant maps to the default client indexes belonging to the interval
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
idx += i * PER_TENANT_CONNECTION_COUNT;
break;
if (_connection_index_for_tenant[i].enabled) {
// i == 0: the default tenant maps to the default client indexes belonging to the interval
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
idx += i * PER_TENANT_CONNECTION_COUNT;
break;
} else {
// If the tenant is disable, immediately return current index to
// use $system tenant.
return idx;
}
}
}
return idx;
}
@@ -793,6 +803,22 @@ void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_addre
remove_rpc_client(msg_addr(ep));
}
void messaging_service::init_feature_listeners() {
_maintenance_tenant_enabled_listener = _feature_service.maintenance_tenant.when_enabled([this] {
enable_scheduling_tenant("$maintenance");
});
}
void messaging_service::enable_scheduling_tenant(std::string_view name) {
for (size_t i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
if (_scheduling_config.statement_tenants[i].name == name) {
_scheduling_config.statement_tenants[i].enabled = true;
_connection_index_for_tenant[i].enabled = true;
return;
}
}
}
gms::inet_address messaging_service::get_public_endpoint_for(const gms::inet_address& ip) const {
auto i = _preferred_to_endpoint.find(ip);
return i != _preferred_to_endpoint.end() ? i->second : ip;

View File

@@ -45,6 +45,7 @@ namespace gms {
class gossip_digest_ack2;
class gossip_get_endpoint_states_request;
class gossip_get_endpoint_states_response;
class feature_service;
}
namespace db {
@@ -299,6 +300,7 @@ public:
struct tenant {
scheduling_group sched_group;
sstring name;
bool enabled = true;
};
// Must have at least one element. No two tenants should have the same
// scheduling group. [0] is the default tenant, that all unknown
@@ -319,6 +321,7 @@ private:
struct tenant_connection_index {
scheduling_group sched_group;
unsigned cliend_idx;
bool enabled;
};
private:
config _cfg;
@@ -337,6 +340,7 @@ private:
scheduling_config _scheduling_config;
std::vector<scheduling_info_for_connection_index> _scheduling_info_for_connection_index;
std::vector<tenant_connection_index> _connection_index_for_tenant;
gms::feature_service& _feature_service;
struct connection_ref;
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
@@ -351,8 +355,8 @@ private:
public:
using clock_type = lowres_clock;
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>);
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
~messaging_service();
future<> start();
@@ -544,6 +548,12 @@ public:
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
unsigned get_rpc_client_idx(messaging_verb verb) const;
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"}; // "forward" is the old name for "mapreduce"
void init_feature_listeners();
private:
std::any _maintenance_tenant_enabled_listener;
void enable_scheduling_tenant(std::string_view name);
};
} // namespace netw

View File

@@ -186,6 +186,8 @@ std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
_ss._topology_state_machine._topology.transition_nodes
) | boost::adaptors::transformed([&ss = _ss] (auto& node) {
return ss.host2ip(locator::host_id{node.first.uuid()});
}) | boost::adaptors::filtered([&ss = _ss] (auto& ip) {
return ss._gossiper.is_alive(ip);
})
);
}

View File

@@ -102,16 +102,27 @@ static const auto raft_manual_recovery_doc = "https://docs.scylladb.com/master/a
class group0_rpc: public service::raft_rpc {
direct_failure_detector::failure_detector& _direct_fd;
gms::gossiper& _gossiper;
public:
explicit group0_rpc(direct_failure_detector::failure_detector& direct_fd,
raft_state_machine& sm, netw::messaging_service& ms,
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id)
raft_address_map& address_map, shared_ptr<raft::failure_detector> raft_fd, raft::group_id gid, raft::server_id srv_id, gms::gossiper& gossiper)
: raft_rpc(sm, ms, address_map, std::move(raft_fd), gid, srv_id)
, _direct_fd(direct_fd)
, _direct_fd(direct_fd), _gossiper(gossiper)
{}
virtual void on_configuration_change(raft::server_address_set add, raft::server_address_set del) override {
for (const auto& addr: add) {
auto ip_for_id = _address_map.find(addr.id);
if (!ip_for_id) {
// Make sure that the addresses of new nodes in the configuration are in the address map
auto ips = _gossiper.get_nodes_with_host_id(locator::host_id(addr.id.uuid()));
for (auto ip : ips) {
if (_gossiper.is_normal(ip)) {
_address_map.add_or_update_entry(addr.id, ip);
}
}
}
// Entries explicitly managed via `rpc::on_configuration_change() should NOT be
// expirable.
_address_map.set_nonexpiring(addr.id);
@@ -204,7 +215,7 @@ const raft::server_id& raft_group0::load_my_id() {
raft_server_for_group raft_group0::create_server_for_group0(raft::group_id gid, raft::server_id my_id, service::storage_service& ss, cql3::query_processor& qp,
service::migration_manager& mm, bool topology_change_enabled) {
auto state_machine = std::make_unique<group0_state_machine>(_client, mm, qp.proxy(), ss, _raft_gr.address_map(), _feat, topology_change_enabled);
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id);
auto rpc = std::make_unique<group0_rpc>(_raft_gr.direct_fd(), *state_machine, _ms.local(), _raft_gr.address_map(), _raft_gr.failure_detector(), gid, my_id, _gossiper);
// Keep a reference to a specific RPC class.
auto& rpc_ref = *rpc;
auto storage = std::make_unique<raft_sys_table_storage>(qp, gid, my_id);

View File

@@ -77,7 +77,7 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
}
return verb(&_messaging, netw::msg_addr(*ip_addr), db::no_timeout, _group_id, _my_id, id, std::forward<Args>(args)...)
.handle_exception_type([loc= std::move(loc), id] (const seastar::rpc::closed_error& e) {;
const auto msg = fmt::format("Failed to execute {} on leader {}: {}", loc.function_name(), id, e);
const auto msg = fmt::format("Failed to execute {}, destination {}: {}", loc.function_name(), id, e);
rlogger.trace("{}", msg);
return make_exception_future<Ret>(raft::transport_error(msg));
});

View File

@@ -572,11 +572,11 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
on_fatal_internal_error(rtlogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
}
SCYLLA_ASSERT(existing_ip);
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
if (rs.ring.has_value()) {
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
update_topology(host_id, ip, rs);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
} else {
// After adding replacing endpoint above the node will no longer be reported for reads and writes,
@@ -4062,6 +4062,11 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
request_id = guard.new_group0_state_id();
if (auto itr = _topology_state_machine._topology.requests.find(id);
itr != _topology_state_machine._topology.requests.end() && itr->second == topology_request::remove) {
throw std::runtime_error("Removenode failed. Concurrent request for removal already in progress");
}
try {
// Make non voter during request submission for better HA
co_await _group0->make_nonvoters(ignored_ids, _group0_as, raft_timeout{});
@@ -6673,6 +6678,10 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
co_return join_node_response_result{};
}
if (utils::get_local_injector().enter("join_node_response_drop_expiring")) {
_group0->modifiable_address_map().force_drop_expiring_entries();
}
try {
co_return co_await std::visit(overloaded_functor {
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {

View File

@@ -1577,7 +1577,30 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.info("entered `{}` transition state", *tstate);
switch (*tstate) {
case topology::transition_state::join_group0: {
auto [node, accepted] = co_await finish_accepting_node(get_node_to_work_on(std::move(guard)));
auto node = get_node_to_work_on(std::move(guard));
if (node.rs->state == node_state::replacing) {
// Make sure all nodes are no longer trying to write to a node being replaced. This is important
// if the new node have the same IP, so that old write will not go to the new node by mistake after this point.
// It is important to do so before the call to finish_accepting_node() below since after this call the new node becomes
// a full member of the cluster and it starts loading an initial snapshot. Since snapshot loading is not atomic any queries
// that are done in parallel may see a partial state.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("transition_state::join_group0, "
"global_token_metadata_barrier failed, error {}",
std::current_exception());
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
break;
}
}
bool accepted;
std::tie(node, accepted) = co_await finish_accepting_node(std::move(node));
// If responding to the joining node failed, move the node to the left state and
// stop the topology transition.
@@ -1649,22 +1672,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
break;
case node_state::replacing: {
SCYLLA_ASSERT(!node.rs->ring);
// Make sure all nodes are no longer trying to write to a node being replaced. This is important if the new node have the same IP, so that old write will not
// go to the new node by mistake
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("transition_state::join_group0, "
"global_token_metadata_barrier failed, error {}",
std::current_exception());
_rollback = fmt::format("global_token_metadata_barrier failed in join_group0 state {}", std::current_exception());
break;
}
auto replaced_id = std::get<replace_param>(node.req_param.value()).replaced_id;
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
SCYLLA_ASSERT(it != _topo_sm._topology.normal_nodes.end());

View File

@@ -23,8 +23,8 @@
#include <variant>
template<typename T>
static inline T consume_be(temporary_buffer<char>& p) {
template<typename T, ContiguousSharedBuffer Buffer>
static inline T consume_be(Buffer& p) {
T i = read_be<T>(p.get());
p.trim_front(sizeof(T));
return i;
@@ -60,7 +60,9 @@ enum class read_status { ready, waiting };
// }
// return pc._u32;
//
class primitive_consumer {
template<ContiguousSharedBuffer Buffer>
class primitive_consumer_impl {
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
private:
// state machine progress:
enum class prestate {
@@ -103,20 +105,26 @@ private:
// state for READING_BYTES prestate
size_t _read_bytes_len = 0;
utils::small_vector<temporary_buffer<char>, 1> _read_bytes;
temporary_buffer<char> _read_bytes_buf; // for contiguous reading.
utils::small_vector<Buffer, 1> _read_bytes;
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
fragmented_temporary_buffer* _read_bytes_where;
FragmentedBuffer* _read_bytes_where;
// Alloc-free
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) noexcept {
inline read_status read_partial_int(Buffer& data, prestate next_state) noexcept {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = next_state;
return read_status::waiting;
}
inline read_status read_partial_int(prestate next_state) noexcept {
_pos = 0;
_prestate = next_state;
return read_status::waiting;
}
template <typename VintType, prestate ReadingVint, prestate ReadingVintWithLen>
inline read_status read_vint(temporary_buffer<char>& data, typename VintType::value_type& dest) {
inline read_status read_vint(Buffer& data, typename VintType::value_type& dest) {
if (data.empty()) {
_prestate = ReadingVint;
return read_status::waiting;
@@ -128,9 +136,8 @@ private:
data.trim_front(len);
return read_status::ready;
} else {
_read_bytes.clear();
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
@@ -140,23 +147,23 @@ private:
}
}
template <typename VintType>
inline read_status read_vint_with_len(temporary_buffer<char>& data, typename VintType::value_type& dest) {
inline read_status read_vint_with_len(Buffer& data, typename VintType::value_type& dest) {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
dest = VintType::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
_prestate = prestate::NONE;
return read_status::ready;
}
return read_status::waiting;
};
public:
primitive_consumer(reader_permit permit) : _permit(std::move(permit)) {}
primitive_consumer_impl(reader_permit permit) : _permit(std::move(permit)) {}
inline read_status read_8(temporary_buffer<char>& data) {
inline read_status read_8(Buffer& data) {
if (data.size() >= sizeof(uint8_t)) {
_u8 = consume_be<uint8_t>(data);
return read_status::ready;
@@ -170,7 +177,7 @@ public:
// (this is the common case), do this immediately. Otherwise, remember
// what we have in the buffer, and remember to continue later by using
// a "prestate":
inline read_status read_16(temporary_buffer<char>& data) {
inline read_status read_16(Buffer& data) {
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
return read_status::ready;
@@ -179,7 +186,7 @@ public:
}
}
// Alloc-free
inline read_status read_32(temporary_buffer<char>& data) noexcept {
inline read_status read_32(Buffer& data) noexcept {
if (data.size() >= sizeof(uint32_t)) {
_u32 = consume_be<uint32_t>(data);
return read_status::ready;
@@ -187,7 +194,10 @@ public:
return read_partial_int(data, prestate::READING_U32);
}
}
inline read_status read_64(temporary_buffer<char>& data) {
inline read_status read_32() noexcept {
return read_partial_int(prestate::READING_U32);
}
inline read_status read_64(Buffer& data) {
if (data.size() >= sizeof(uint64_t)) {
_u64 = consume_be<uint64_t>(data);
return read_status::ready;
@@ -195,16 +205,24 @@ public:
return read_partial_int(data, prestate::READING_U64);
}
}
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
temporary_buffer<char> share(Buffer& data, uint32_t offset, uint32_t len) {
if constexpr(std::is_same_v<Buffer, temporary_buffer<char>>) {
return data.share(offset, len);
} else {
auto ret = make_new_tracked_temporary_buffer(len, _permit);
std::copy(data.begin() + offset, data.begin() + offset + len, ret.get_write());
return ret;
}
}
inline read_status read_bytes_contiguous(Buffer& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
where = data.share(0, len);
where = share(data, 0, len);
data.trim_front(len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes.clear();
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
_read_bytes_len = len;
_read_bytes_where_contiguous = &where;
_pos = data.size();
@@ -213,12 +231,12 @@ public:
return read_status::waiting;
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
inline read_status read_bytes(Buffer& data, uint32_t len, FragmentedBuffer& where) {
if (data.size() >= len) {
auto fragments = std::move(where).release();
fragments.clear();
fragments.push_back(data.share(0, len));
where = fragmented_temporary_buffer(std::move(fragments), len);
where = FragmentedBuffer(std::move(fragments), len);
data.trim_front(len);
return read_status::ready;
} else {
@@ -233,7 +251,7 @@ public:
return read_status::waiting;
}
}
inline read_status read_short_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
inline read_status read_short_length_bytes(Buffer& data, temporary_buffer<char>& where) {
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
} else {
@@ -242,19 +260,19 @@ public:
}
return read_bytes_contiguous(data, uint32_t{_u16}, where);
}
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
inline read_status read_unsigned_vint(Buffer& data) {
return read_vint<
unsigned_vint,
prestate::READING_UNSIGNED_VINT,
prestate::READING_UNSIGNED_VINT_WITH_LEN>(data, _u64);
}
inline read_status read_signed_vint(temporary_buffer<char>& data) {
inline read_status read_signed_vint(Buffer& data) {
return read_vint<
signed_vint,
prestate::READING_SIGNED_VINT,
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
}
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
inline read_status read_unsigned_vint_length_bytes_contiguous(Buffer& data, temporary_buffer<char>& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
_read_bytes_where_contiguous = &where;
@@ -267,9 +285,8 @@ public:
data.trim_front(len);
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes.clear();
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
@@ -279,7 +296,7 @@ public:
}
}
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
inline read_status read_unsigned_vint_length_bytes(Buffer& data, FragmentedBuffer& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
_read_bytes_where = &where;
@@ -292,9 +309,8 @@ public:
data.trim_front(len);
return read_bytes(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes.clear();
_read_bytes.push_back(make_new_tracked_temporary_buffer(len, _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_buf = make_new_tracked_temporary_buffer(len, _permit);
std::copy(data.begin(), data.end(), _read_bytes_buf.get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
@@ -307,7 +323,7 @@ public:
private:
// Reads bytes belonging to an integer of size len. Returns true
// if a full integer is now available.
bool process_int(temporary_buffer<char>& data, unsigned len) {
bool process_int(Buffer& data, unsigned len) {
SCYLLA_ASSERT(_pos < len);
auto n = std::min((size_t)(len - _pos), data.size());
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
@@ -316,9 +332,18 @@ private:
return _pos == len;
}
public:
read_status consume_u32(Buffer& data) {
if (process_int(data, sizeof(uint32_t))) {
_u32 = net::ntoh(_read_int.uint32);
_prestate = prestate::NONE;
return read_status::ready;
}
return read_status::waiting;
}
// Feeds data into the state machine.
// After the call, when data is not empty then active() can be assumed to be false.
read_status consume(temporary_buffer<char>& data) {
read_status consume(Buffer& data) {
if (__builtin_expect(_prestate == prestate::NONE, true)) {
return read_status::ready;
}
@@ -360,12 +385,12 @@ public:
return read_vint_with_len<signed_vint>(data, _i64);
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
@@ -375,12 +400,12 @@ public:
}
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
std::copy_n(data.begin(), n, _read_bytes_buf.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes_buf.get_write()), _read_bytes_len));
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
@@ -390,11 +415,11 @@ public:
}
case prestate::READING_BYTES_CONTIGUOUS: {
auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
std::copy(data.begin(), data.begin() + n, _read_bytes_buf.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
*_read_bytes_where_contiguous = std::move(_read_bytes_buf);
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -406,8 +431,8 @@ public:
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
std::vector<temporary_buffer<char>> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
*_read_bytes_where = fragmented_temporary_buffer(std::move(fragments), _read_bytes_len);
std::vector<Buffer> fragments(std::make_move_iterator(_read_bytes.begin()), std::make_move_iterator(_read_bytes.end()));
*_read_bytes_where = FragmentedBuffer(std::move(fragments), _read_bytes_len);
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -435,12 +460,7 @@ public:
}
break;
case prestate::READING_U32:
if (process_int(data, sizeof(uint32_t))) {
_u32 = net::ntoh(_read_int.uint32);
_prestate = prestate::NONE;
return read_status::ready;
}
break;
return consume_u32(data);
case prestate::READING_U64:
if (process_int(data, sizeof(uint64_t))) {
_u64 = net::ntoh(_read_int.uint64);
@@ -461,6 +481,8 @@ public:
}
};
using primitive_consumer = primitive_consumer_impl<temporary_buffer<char>>;
template <typename StateProcessor>
class continuous_data_consumer : protected primitive_consumer {
using proceed = data_consumer::proceed;

View File

@@ -1110,7 +1110,7 @@ public:
_consumer.consume_row_end();
return;
}
if (_state != state::ROW_START || primitive_consumer::active()) {
if (_state != state::ROW_START || data_consumer::primitive_consumer::active()) {
throw malformed_sstable_exception("end of input, but not end of row");
}
}

View File

@@ -145,6 +145,24 @@ private:
//
using block_set_type = std::set<promoted_index_block, block_comparator>;
block_set_type _blocks;
private:
using Buffer = cached_file::page_view;
struct u32_parser {
data_consumer::primitive_consumer_impl<Buffer>& parser;
void reset() {
parser.read_32();
}
data_consumer::read_status consume(Buffer& buf) {
return parser.consume_u32(buf);
}
uint32_t value() const {
return parser._u32;
}
};
public:
const schema& _s;
uint64_t _promoted_index_start;
@@ -152,26 +170,50 @@ public:
metrics& _metrics;
const pi_index_type _blocks_count;
cached_file& _cached_file;
data_consumer::primitive_consumer _primitive_parser;
clustering_parser _clustering_parser;
promoted_index_block_parser _block_parser;
data_consumer::primitive_consumer_impl<Buffer> _primitive_parser;
u32_parser _u32_parser;
clustering_parser<Buffer> _clustering_parser;
promoted_index_block_parser<Buffer> _block_parser;
reader_permit _permit;
cached_file::stream _stream;
logalloc::allocating_section _as;
private:
// Feeds the stream into the consumer until the consumer is satisfied.
// Does not give unconsumed data back to the stream.
template <typename Consumer>
future<> consume_stream(cached_file::stream& s, Consumer& c) {
return repeat([&] {
return s.next_page_view().then([&] (cached_file::page_view&& page) {
future<> read(cached_file::offset_type pos, tracing::trace_state_ptr trace_state, Consumer& c) {
struct retry_exception : std::exception {};
_stream = _cached_file.read(pos, _permit, trace_state);
c.reset();
return repeat([this, pos, trace_state, &c] {
return _stream.next_page_view().then([this, &c] (cached_file::page_view&& page) {
if (!page) {
on_internal_error(sstlog, "End of stream while parsing");
}
bool retry = false;
return _as(_cached_file.region(), [&] {
auto buf = page.get_buf();
return stop_iteration(c.consume(buf) == data_consumer::read_status::ready);
if (retry) {
throw retry_exception();
}
retry = true;
auto status = c.consume(page);
utils::get_local_injector().inject("cached_promoted_index_parsing_invalidate_buf_across_page", [&page] {
page.release_and_scramble();
});
utils::get_local_injector().inject("cached_promoted_index_bad_alloc_parsing_across_page", [this] {
// Prevent reserve explosion in testing.
_as.set_lsa_reserve(1);
_as.set_std_reserve(1);
throw std::bad_alloc();
});
return stop_iteration(status == data_consumer::read_status::ready);
});
}).handle_exception_type([this, pos, trace_state, &c] (const retry_exception& e) {
_stream = _cached_file.read(pos, _permit, trace_state);
c.reset();
return stop_iteration::no;
});
});
}
@@ -183,48 +225,17 @@ private:
return _promoted_index_size - (_blocks_count - idx) * sizeof(pi_offset_type);
}
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(_promoted_index_start + get_offset_entry_pos(idx), _permit, trace_state);
return _stream.next_page_view().then([this] (cached_file::page_view page) {
temporary_buffer<char> buf = page.get_buf();
static_assert(noexcept(std::declval<data_consumer::primitive_consumer>().read_32(buf)));
if (__builtin_expect(_primitive_parser.read_32(buf) == data_consumer::read_status::ready, true)) {
return make_ready_future<pi_offset_type>(_primitive_parser._u32);
}
return consume_stream(_stream, _primitive_parser).then([this] {
return _primitive_parser._u32;
});
});
}
future<pi_offset_type> read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state);
// Postconditions:
// - block.start is engaged and valid.
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
_clustering_parser.reset();
return consume_stream(_stream, _clustering_parser).then([this, &block] {
auto mem_before = block.memory_usage();
block.start.emplace(_clustering_parser.get_and_reset());
_metrics.used_bytes += block.memory_usage() - mem_before;
});
}
future<> read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state);
// Postconditions:
// - block.end is engaged, all fields in the block are valid
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
_stream = _cached_file.read(_promoted_index_start + block.offset, _permit, trace_state);
_block_parser.reset();
return consume_stream(_stream, _block_parser).then([this, &block] {
auto mem_before = block.memory_usage();
block.start.emplace(std::move(_block_parser.start()));
block.end.emplace(std::move(_block_parser.end()));
block.end_open_marker = _block_parser.end_open_marker();
block.data_file_offset = _block_parser.offset();
block.width = _block_parser.width();
_metrics.used_bytes += block.memory_usage() - mem_before;
});
}
future<> read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state);
public:
/// \brief Returns a pointer to promoted_index_block entry which has at least offset and index fields valid.
future<promoted_index_block*> get_block_only_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
auto i = _blocks.lower_bound(idx);
@@ -242,6 +253,7 @@ private:
});
}
private:
void erase_range(block_set_type::iterator begin, block_set_type::iterator end) {
while (begin != end) {
--_metrics.block_count;
@@ -267,6 +279,7 @@ public:
, _blocks_count(blocks_count)
, _cached_file(f)
, _primitive_parser(permit)
, _u32_parser(_primitive_parser)
, _clustering_parser(s, permit, cvfl, true)
, _block_parser(s, permit, std::move(cvfl))
, _permit(std::move(permit))
@@ -333,6 +346,10 @@ public:
erase_range(_blocks.begin(), _blocks.lower_bound(block->index));
}
void clear() {
erase_range(_blocks.begin(), _blocks.end());
}
cached_file& file() { return _cached_file; }
};
} // namespace sstables::mc
@@ -350,6 +367,40 @@ struct fmt::formatter<sstables::mc::cached_promoted_index::promoted_index_block>
};
namespace sstables::mc {
inline
future<cached_promoted_index::pi_offset_type>
cached_promoted_index::read_block_offset(pi_index_type idx, tracing::trace_state_ptr trace_state) {
return read(_promoted_index_start + get_offset_entry_pos(idx), trace_state, _u32_parser).then([idx, this] {
sstlog.trace("cached_promoted_index {}: read_block_offset: idx: {}, offset: {}", fmt::ptr(this), idx, _u32_parser.value());
return _u32_parser.value();
});
}
inline
future<> cached_promoted_index::read_block_start(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
return read(_promoted_index_start + block.offset, trace_state, _clustering_parser).then([this, &block] {
auto mem_before = block.memory_usage();
block.start.emplace(_clustering_parser.get_and_reset());
sstlog.trace("cached_promoted_index {}: read_block_start: {}", fmt::ptr(this), block);
_metrics.used_bytes += block.memory_usage() - mem_before;
});
}
inline
future<> cached_promoted_index::read_block(promoted_index_block& block, tracing::trace_state_ptr trace_state) {
return read(_promoted_index_start + block.offset, trace_state, _block_parser).then([this, &block] {
auto mem_before = block.memory_usage();
block.start.emplace(std::move(_block_parser.start()));
block.end.emplace(std::move(_block_parser.end()));
block.end_open_marker = _block_parser.end_open_marker();
block.data_file_offset = _block_parser.offset();
block.width = _block_parser.width();
_metrics.used_bytes += block.memory_usage() - mem_before;
sstlog.trace("cached_promoted_index {}: read_block: {}", fmt::ptr(this), block);
});
}
/// Cursor implementation which does binary search over index entries.
///
/// Memory consumption: O(log(N))
@@ -460,6 +511,8 @@ public:
, _trace_state(std::move(trace_state))
{ }
cached_promoted_index& promoted_index() { return _promoted_index; }
future<std::optional<skip_info>> advance_to(position_in_partition_view pos) override {
position_in_partition::less_compare less(_s);

View File

@@ -26,20 +26,22 @@ namespace mc {
// while (cp.consume(next_buf()) == read_status::waiting) {}
// position_in_partition pos = cp.get();
//
template <ContiguousSharedBuffer Buffer>
class clustering_parser {
using FragmentedBuffer = basic_fragmented_buffer<Buffer>;
const schema& _s;
column_values_fixed_lengths _clustering_values_fixed_lengths;
bool _parsing_start_key;
boost::iterator_range<column_values_fixed_lengths::const_iterator> ck_range;
std::vector<fragmented_temporary_buffer> clustering_key_values;
std::vector<FragmentedBuffer> clustering_key_values;
bound_kind_m kind{};
fragmented_temporary_buffer column_value;
FragmentedBuffer column_value;
uint64_t ck_blocks_header = 0;
uint32_t ck_blocks_header_offset = 0;
std::optional<position_in_partition> _pos;
data_consumer::primitive_consumer _primitive;
data_consumer::primitive_consumer_impl<Buffer> _primitive;
enum class state {
CLUSTERING_START,
@@ -79,7 +81,7 @@ class clustering_parser {
position_in_partition make_position() {
auto key = clustering_key_prefix::from_range(clustering_key_values | boost::adaptors::transformed(
[] (const fragmented_temporary_buffer& b) { return fragmented_temporary_buffer::view(b); }));
[] (const FragmentedBuffer & b) { return typename FragmentedBuffer::view(b); }));
if (kind == bound_kind_m::clustering) {
return position_in_partition::for_key(std::move(key));
@@ -108,7 +110,7 @@ public:
// Feeds the data into the state machine.
// Returns read_status::ready when !active() after the call.
read_status consume(temporary_buffer<char>& data) {
read_status consume(Buffer& data) {
if (_primitive.consume(data) == read_status::waiting) {
return read_status::waiting;
}
@@ -202,12 +204,15 @@ public:
}
void reset() {
_parsing_start_key = true;
_state = state::CLUSTERING_START;
_primitive.reset();
}
};
template <ContiguousSharedBuffer Buffer>
class promoted_index_block_parser {
clustering_parser _clustering;
clustering_parser<Buffer> _clustering;
std::optional<position_in_partition> _start_pos;
std::optional<position_in_partition> _end_pos;
@@ -228,7 +233,7 @@ class promoted_index_block_parser {
DONE,
} _state = state::START;
data_consumer::primitive_consumer _primitive;
data_consumer::primitive_consumer_impl<Buffer> _primitive;
public:
using read_status = data_consumer::read_status;
@@ -246,7 +251,7 @@ public:
// Feeds the data into the state machine.
// Returns read_status::ready when whole block was parsed.
// If returns read_status::waiting then data.empty() after the call.
read_status consume(temporary_buffer<char>& data) {
read_status consume(Buffer& data) {
static constexpr size_t width_base = 65536;
if (_primitive.consume(data) == read_status::waiting) {
return read_status::waiting;
@@ -318,7 +323,7 @@ public:
void reset() {
_end_open_marker.reset();
_clustering.set_parsing_start_key(true);
_clustering.reset();
_state = state::START;
}
};

View File

@@ -71,7 +71,7 @@ private:
};
struct m_parser_context {
mc::promoted_index_block_parser block_parser;
mc::promoted_index_block_parser<temporary_buffer<char>> block_parser;
m_parser_context(const schema& s, reader_permit permit, column_values_fixed_lengths cvfl)
: block_parser(s, std::move(permit), std::move(cvfl))

View File

@@ -104,13 +104,16 @@ def check_increases_metric(metrics, metric_names):
assert saved_metrics[n] < get_metric(metrics, n, None, the_metrics), f'metric {n} did not increase'
@contextmanager
def check_increases_operation(metrics, operation_names):
def check_increases_operation(metrics, operation_names, metric_name = 'scylla_alternator_operation', expected_value=None):
the_metrics = get_metrics(metrics)
saved_metrics = { x: get_metric(metrics, 'scylla_alternator_operation', {'op': x}, the_metrics) for x in operation_names }
saved_metrics = { x: get_metric(metrics, metric_name, {'op': x}, the_metrics) for x in operation_names }
yield
the_metrics = get_metrics(metrics)
for op in operation_names:
assert saved_metrics[op] < get_metric(metrics, 'scylla_alternator_operation', {'op': op}, the_metrics)
if expected_value:
assert expected_value == get_metric(metrics, metric_name, {'op': op}, the_metrics) - saved_metrics[op]
else:
assert saved_metrics[op] < get_metric(metrics, metric_name, {'op': op}, the_metrics)
###### Test for metrics that count DynamoDB API operations:
@@ -125,6 +128,16 @@ def test_batch_get_item(test_table_s, metrics):
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}], 'ConsistentRead': True}})
def test_batch_write_item_count(test_table_s, metrics):
with check_increases_operation(metrics, ['BatchWriteItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
test_table_s.meta.client.batch_write_item(RequestItems = {
test_table_s.name: [{'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}, {'PutRequest': {'Item': {'p': random_string(), 'a': 'hi'}}}]})
def test_batch_get_item_count(test_table_s, metrics):
with check_increases_operation(metrics, ['BatchGetItem'], metric_name='scylla_alternator_batch_item_count', expected_value=2):
test_table_s.meta.client.batch_get_item(RequestItems = {
test_table_s.name: {'Keys': [{'p': random_string()}, {'p': random_string()}], 'ConsistentRead': True}})
# Test counters for CreateTable, DescribeTable, UpdateTable and DeleteTable
def test_table_operations(dynamodb, metrics):
with check_increases_operation(metrics, ['CreateTable', 'DescribeTable', 'UpdateTable', 'DeleteTable']):

View File

@@ -453,3 +453,52 @@ SEASTAR_THREAD_TEST_CASE(test_invalidation) {
BOOST_REQUIRE_EQUAL(2, metrics.page_populations);
BOOST_REQUIRE_EQUAL(0, metrics.page_hits);
}
SEASTAR_THREAD_TEST_CASE(test_page_view_as_contiguous_shared_buffer) {
auto page_size = cached_file::page_size;
test_file tf = make_test_file(page_size);
cached_file_stats metrics;
logalloc::region region;
cached_file cf(tf.f, metrics, cf_lru, region, page_size);
auto s = cf.read(1, std::nullopt);
cached_file::page_view p = s.next_page_view().get();
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, page_size - 1), sstring(p.begin(), p.end()));
BOOST_REQUIRE_EQUAL(p.size(), page_size - 1);
BOOST_REQUIRE(!p.empty());
p.trim(10);
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.begin(), p.end()));
BOOST_REQUIRE_EQUAL(tf.contents.substr(1, 10), sstring(p.get_write(), p.end()));
p.trim_front(1);
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p.begin(), p.end()));
// Check movability
{
auto p_cpy = p.share();
auto p1 = std::move(p_cpy);
BOOST_REQUIRE_EQUAL(tf.contents.substr(2, 9), sstring(p1.begin(), p1.end()));
BOOST_REQUIRE(p_cpy.empty());
BOOST_REQUIRE(p_cpy.size() == 0);
BOOST_REQUIRE(!p_cpy);
}
auto p2 = p.share(2, 3);
BOOST_REQUIRE_EQUAL(tf.contents.substr(4, 3), sstring(p2.begin(), p2.end()));
p2.trim_front(1); // should not affect p
p.trim_front(9);
BOOST_REQUIRE_EQUAL(p.size(), 0);
BOOST_REQUIRE(p.begin() == p.end());
p = {};
BOOST_REQUIRE_EQUAL(p.size(), 0);
BOOST_REQUIRE(p.begin() == p.end());
BOOST_REQUIRE(!p);
BOOST_REQUIRE_EQUAL(sstring(p.begin(), p.end()), sstring());
// p should not affect p2
BOOST_REQUIRE_EQUAL(tf.contents.substr(5, 2), sstring(p2.begin(), p2.end()));
}

View File

@@ -2024,3 +2024,35 @@ SEASTAR_TEST_CASE(test_oversized_several_medium) {
SEASTAR_TEST_CASE(test_oversized_several_large) {
co_await test_oversized(8, 32);
}
// tests #20862 - buffer usage counter not being updated correctly
SEASTAR_TEST_CASE(test_commitlog_buffer_size_counter) {
commitlog::config cfg;
tmpdir tmp;
cfg.commit_log_location = tmp.path().string();
auto log = co_await commitlog::create_commitlog(cfg);
rp_set rps;
// uncomment for verbosity
// logging::logger_registry().set_logger_level("commitlog", logging::log_level::debug);
auto uuid = make_table_id();
auto size = 1024;
auto size_before_alloc = log.get_buffer_size();
rp_handle h = co_await log.add_mutation(uuid, size, db::commitlog::force_sync::no, [&](db::commitlog::output& dst) {
dst.fill('1', size);
});
h.release();
auto size_after_alloc = log.get_buffer_size();
co_await log.sync_all_segments();
auto size_after_sync = log.get_buffer_size();
BOOST_CHECK_LE(size_before_alloc, size_after_alloc);
BOOST_CHECK_LE(size_after_sync, size_before_alloc);
co_await log.shutdown();
co_await log.clear();
}

View File

@@ -11,6 +11,7 @@
#include "test/lib/simple_schema.hh"
#include "test/lib/sstable_test_env.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/make_random_string.hh"
#include "readers/from_mutations_v2.hh"
@@ -46,3 +47,96 @@ SEASTAR_TEST_CASE(test_abort_during_index_read) {
consumer_ctx.close().get();
});
}
SEASTAR_TEST_CASE(test_promoted_index_parsing_page_crossing_and_retries) {
return test_env::do_with_async([](test_env& env) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
testlog.info("Skipped because error injection is not enabled");
#else
simple_schema ss;
auto s = ss.schema();
auto pk = ss.make_pkey();
auto mut = mutation(s, pk);
// enough to have same index block whose clustering key is split across pages
std::vector<clustering_key> keys;
const auto n_keys = 100;
auto key_size = cached_file::page_size / 3; // guarantees that index blocks are not congruent with page size.
keys.reserve(n_keys);
for (int i = 0; i < n_keys; ++i) {
keys.push_back(ss.make_ckey(make_random_string(key_size)));
ss.add_row(mut, keys[i], "v");
}
clustering_key::less_compare less(*s);
std::sort(keys.begin(), keys.end(), less);
env.manager().set_promoted_index_block_size(1); // force entry for each row
auto mut_reader = make_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), std::move(mut));
auto sst = make_sstable_easy(env, std::move(mut_reader), env.manager().configure_writer());
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
tracing::trace_state_ptr trace = nullptr;
auto index = std::make_unique<index_reader>(sst, permit, trace, use_caching::yes, true);
auto close_index = deferred_close(*index);
index->advance_to(dht::ring_position_view(pk)).get();
index->read_partition_data().get();
auto cur = dynamic_cast<mc::bsearch_clustered_cursor*>(index->current_clustered_cursor());
BOOST_REQUIRE(cur);
std::optional<cached_file::offset_type> prev_offset;
int crossed_page = 0;
utils::get_local_injector().enable("cached_promoted_index_parsing_invalidate_buf_across_page", false);
for (int i = 0; i < n_keys - 1; ++i) {
auto block_offset = cur->promoted_index().get_block_only_offset(i, trace).get()->offset;
auto next_block_offset = cur->promoted_index().get_block_only_offset(i + 1, trace).get()->offset;
auto start_page = block_offset / cached_file::page_size;
auto end_page = (next_block_offset - 1) / cached_file::page_size;
if (start_page != end_page) {
auto pos = position_in_partition::for_key(keys[i]);
position_in_partition::equal_compare eq(*s);
testlog.info("Crossed page at block {}, offset [{}, {})", i, block_offset, next_block_offset);
crossed_page++;
auto* block = cur->promoted_index().get_block(i, trace).get();
testlog.debug("key : {}", pos);
testlog.debug("start : {}", *block->start);
testlog.debug("end : {}", *block->end);
BOOST_REQUIRE(eq(*block->start, pos));
BOOST_REQUIRE(eq(*block->end, pos));
if (prev_offset) {
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
}
cur->promoted_index().clear();
utils::get_local_injector().enable("cached_promoted_index_bad_alloc_parsing_across_page", true);
block = cur->promoted_index().get_block(i, trace).get();
testlog.debug("start : {}", *block->start);
testlog.debug("end : {}", *block->end);
BOOST_REQUIRE(eq(*block->start, pos));
BOOST_REQUIRE(eq(*block->end, pos));
if (prev_offset) {
BOOST_REQUIRE_LT(*prev_offset, block->data_file_offset);
}
prev_offset = block->data_file_offset;
}
}
BOOST_REQUIRE_GE(crossed_page, 6); // If not, increase n_keys
#endif
});
}

View File

@@ -709,7 +709,7 @@ private:
port = tmp.local_address().port();
}
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
_ms.start(host_id, listen, std::move(port)).get();
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service)).get();
stop_ms = defer(stop_type(stop_ms_func));
if (cfg_in.ms_listen) {

View File

@@ -12,7 +12,9 @@
#include <seastar/core/app-template.hh>
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "db/system_distributed_keyspace.hh"
#include "gms/feature_service.hh"
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"
@@ -56,6 +58,7 @@ int main(int ac, char ** av) {
sharded<abort_source> abort_sources;
sharded<locator::shared_token_metadata> token_metadata;
sharded<gms::feature_service> feature_service;
sharded<netw::messaging_service> messaging;
abort_sources.start().get();
@@ -68,7 +71,10 @@ int main(int ac, char ** av) {
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_mgr = defer([&] { token_metadata.stop().get(); });
messaging.start(locator::host_id{}, listen, 7000).get();
auto cfg = gms::feature_config_from_db_config(db::config(), {});
feature_service.start(cfg).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
auto stop_messaging = deferred_stop(messaging);
gms::gossip_config gcfg;

View File

@@ -15,6 +15,7 @@
#include <seastar/core/thread.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/closeable.hh>
#include "gms/feature_service.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
@@ -192,8 +193,11 @@ int main(int ac, char ** av) {
sharded<locator::shared_token_metadata> token_metadata;
token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_tm = deferred_stop(token_metadata);
seastar::sharded<gms::feature_service> feature_service;
auto cfg = gms::feature_config_from_db_config(db::config(), {});
feature_service.start(cfg).get();
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();

View File

@@ -135,3 +135,19 @@ async def test_rebuild_node(manager: ManagerClient, random_tables: RandomTables)
servers = await manager.running_servers()
await manager.rebuild_node(servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)
@pytest.mark.asyncio
async def test_concurrent_removenode(manager: ManagerClient):
servers = await manager.running_servers()
assert len(servers) >= 3
await manager.server_stop_gracefully(servers[2].server_id)
try:
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id),
manager.remove_node(servers[1].server_id, servers[2].server_id)])
except Exception as e:
logger.info(f"exception raised due to concurrent remove node requests: {e}")
else:
raise Exception("concurrent removenode request should result in a failure, but unexpectedly succeeded")

View File

@@ -27,16 +27,20 @@ async def test_long_join(manager: ManagerClient) -> None:
await asyncio.gather(task)
@pytest.mark.asyncio
async def test_long_join_drop_wntries_on_bootstrapping(manager: ManagerClient) -> None:
async def test_long_join_drop_entries_on_bootstrapping(manager: ManagerClient) -> None:
"""The test checks that join works even if expiring entries are dropped
on the joining node between placement of the join request and its processing"""
s1 = await manager.server_add()
servers = await manager.servers_add(2)
inj = 'topology_coordinator_pause_before_processing_backlog'
await manager.api.enable_injection(s1.ip_addr, inj, one_shot=True)
s2 = await manager.server_add(start=False, config={
[await manager.api.enable_injection(s.ip_addr, inj, one_shot=True) for s in servers]
s = await manager.server_add(start=False, config={
'error_injections_at_startup': ['pre_server_start_drop_expiring']
})
task = asyncio.create_task(manager.server_start(s2.server_id))
await manager.server_sees_other_server(s1.ip_addr, s2.ip_addr, interval=300)
await manager.api.message_injection(s1.ip_addr, inj)
task = asyncio.create_task(manager.server_start(s.server_id))
log = await manager.server_open_log(s.server_id)
await log.wait_for("init - starting gossiper")
servers.append(s)
await manager.servers_see_each_other(servers, interval=300)
await manager.api.enable_injection(s.ip_addr, 'join_node_response_drop_expiring', one_shot=True)
[await manager.api.message_injection(s.ip_addr, inj) for s in servers[:-1]]
await asyncio.gather(task)

View File

@@ -79,6 +79,10 @@ async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
logging.info(f"Restarting {others}")
await manager.rolling_restart(others)
# Prevent scylladb/scylladb#20791
logging.info(f"Wait until {srv1} sees {others} as alive")
await manager.server_sees_others(srv1.server_id, len(others))
logging.info(f"{others} restarted, waiting until driver reconnects to them")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)

View File

@@ -106,9 +106,15 @@ async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
parameters=[k, v],
host=host2)
finish_time = time.time()
await replace_future
s = await replace_future
logger.info(f"done, writes count {next_id}, took {finish_time - start_time} seconds")
# make sure that after we start snapshot transfer we no longer have stale writes
log = await manager.server_open_log(s.server_id)
m = await log.wait_for("group0_raft_sm - transfer snapshot from ")
errs = await log.grep("storage_proxy - Failed to apply mutation from", from_mark=m)
assert len(errs) == 0
result_set = await manager.get_cql().run_async(SimpleStatement("select * from ks.test_table",
consistency_level=ConsistencyLevel.QUORUM),
host=host2, all_pages=True)

View File

@@ -79,6 +79,10 @@ private:
}
return std::unique_ptr<cached_page, cached_page_del>(this);
}
bool only_ref() const {
return _use_count <= 1;
}
public:
explicit cached_page(cached_file* parent, page_idx_type idx, temporary_buffer<char> buf)
: parent(parent)
@@ -115,11 +119,10 @@ private:
return temporary_buffer<char>(_buf.get_write(), _buf.size(), make_deleter([self = std::move(self)] {}));
}
// Returns a buffer which reflects contents of this page.
// The buffer will not prevent eviction.
// Returns a pointer to the contents of the page.
// The buffer is invalidated when the page is evicted or when the owning LSA region invalidates references.
temporary_buffer<char> get_buf_weak() {
return temporary_buffer<char>(_lsa_buf.get(), _lsa_buf.size(), deleter());
char* begin() {
return _lsa_buf.get();
}
size_t size_in_allocator() {
@@ -208,10 +211,11 @@ public:
class page_view {
cached_page::ptr_type _page;
size_t _offset;
size_t _size;
size_t _size = 0;
std::optional<reader_permit::resource_units> _units;
public:
page_view() = default;
page_view(size_t offset, size_t size, cached_page::ptr_type page, std::optional<reader_permit::resource_units> units)
: _page(std::move(page))
, _offset(offset)
@@ -219,15 +223,64 @@ public:
, _units(std::move(units))
{}
// The returned buffer is valid only until the LSA region associated with cached_file invalidates references.
temporary_buffer<char> get_buf() {
auto buf = _page->get_buf_weak();
buf.trim(_size);
buf.trim_front(_offset);
return buf;
page_view(page_view&& o) noexcept
: _page(std::move(o._page))
, _offset(std::exchange(o._offset, 0))
, _size(std::exchange(o._size, 0))
, _units(std::move(o._units))
{}
page_view& operator=(page_view&& o) noexcept {
_page = std::move(o._page);
_offset = std::exchange(o._offset, 0);
_size = std::exchange(o._size, 0);
_units = std::move(o._units);
return *this;
}
operator bool() const { return bool(_page); }
// Fills the page with garbage, releases the pointer and evicts the page so that it's no longer in cache.
// For testing use-after-free on the buffer space.
// After the call, the object is the same state as after being moved-from.
void release_and_scramble() noexcept {
if (_page->only_ref()) {
std::memset(_page->_lsa_buf.get(), 0xfe, _page->_lsa_buf.size());
cached_page& cp = *_page;
_page = nullptr;
cp.parent->_lru.remove(cp);
cp.on_evicted();
} else {
_page = nullptr;
}
_size = 0;
_offset = 0;
_units = std::nullopt;
}
operator bool() const { return bool(_page) && _size; }
public: // ContiguousSharedBuffer concept
const char* begin() const { return _page ? _page->begin() + _offset : nullptr; }
const char* get() const { return begin(); }
const char* end() const { return begin() + _size; }
size_t size() const { return _size; }
bool empty() const { return !_size; }
char* get_write() { return const_cast<char*>(begin()); }
void trim(size_t pos) {
_size = pos;
}
void trim_front(size_t n) {
_offset += n;
_size -= n;
}
page_view share() {
return share(0, _size);
}
page_view share(size_t pos, size_t len) {
return page_view(_offset + pos, len, _page->share(), {});
}
};
// Generator of subsequent pages of data reflecting the contents of the file.
@@ -306,7 +359,7 @@ public:
? _cached_file->_last_page_size
: page_size;
units = get_page_units(page_size);
page_view buf(_offset_in_page, size, std::move(page), std::move(units));
page_view buf(_offset_in_page, size - _offset_in_page, std::move(page), std::move(units));
_offset_in_page = 0;
++_page_idx;
return buf;

View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <concepts>
#include <memory>
// A contiguous buffer of char objects which can be trimmed and
// supports zero-copy sharing of its underlying memory.
template<typename T>
concept ContiguousSharedBuffer = std::movable<T>
&& std::default_initializable<T>
&& requires(T& obj, size_t pos, size_t len) {
// Creates a new buffer that shares the memory of the original buffer.
// The lifetime of the new buffer is independent of the original buffer.
{ obj.share() } -> std::same_as<T>;
// Like share() but the new buffer represents a sub-range of the original buffer.
{ obj.share(pos, len) } -> std::same_as<T>;
// Trims the suffix of a buffer so that 'len' is the index of the first removed byte.
{ obj.trim(len) } -> std::same_as<void>;
// Trims the prefix of the buffer so that `pos` is the index of the first byte after the trim.
{ obj.trim_front(pos) } -> std::same_as<void>;
{ obj.begin() } -> std::same_as<const char*>;
{ obj.get() } -> std::same_as<const char*>;
{ obj.get_write() } -> std::same_as<char*>;
{ obj.end() } -> std::same_as<const char*>;
{ obj.size() } -> std::same_as<size_t>;
{ obj.empty() } -> std::same_as<bool>;
};

View File

@@ -17,11 +17,13 @@
#include "bytes.hh"
#include "bytes_ostream.hh"
#include "contiguous_shared_buffer.hh"
#include "fragment_range.hh"
/// Fragmented buffer consisting of multiple temporary_buffer<char>
class fragmented_temporary_buffer {
using vector_type = std::vector<seastar::temporary_buffer<char>>;
/// Fragmented buffer consisting of multiple Buffer objects.
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer {
using vector_type = std::vector<Buffer>;
vector_type _fragments;
size_t _size_bytes = 0;
public:
@@ -30,15 +32,15 @@ public:
class view;
class istream;
class reader;
using ostream = seastar::memory_output_stream<vector_type::iterator>;
using ostream = seastar::memory_output_stream<typename vector_type::iterator>;
fragmented_temporary_buffer() = default;
basic_fragmented_buffer() = default;
fragmented_temporary_buffer(std::vector<seastar::temporary_buffer<char>> fragments, size_t size_bytes) noexcept
basic_fragmented_buffer(std::vector<Buffer> fragments, size_t size_bytes) noexcept
: _fragments(std::move(fragments)), _size_bytes(size_bytes)
{ }
fragmented_temporary_buffer(const char* str, size_t size)
basic_fragmented_buffer(const char* str, size_t size)
{
*this = allocate_to_fit(size);
size_t pos = 0;
@@ -54,10 +56,10 @@ public:
ostream get_ostream() noexcept {
if (_fragments.size() != 1) {
return ostream::fragmented(_fragments.begin(), _size_bytes);
return typename ostream::fragmented(_fragments.begin(), _size_bytes);
}
auto& current = *_fragments.begin();
return ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
return typename ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
}
using const_fragment_iterator = typename vector_type::const_iterator;
@@ -100,23 +102,23 @@ public:
_fragments.erase(it.base(), _fragments.end());
}
// Creates a fragmented temporary buffer of a specified size, supplied as a parameter.
// Creates a fragmented buffer of a specified size, supplied as a parameter.
// Max chunk size is limited to 128kb (the same limit as `bytes_stream` has).
static fragmented_temporary_buffer allocate_to_fit(size_t data_size) {
static basic_fragmented_buffer allocate_to_fit(size_t data_size) {
constexpr size_t max_fragment_size = default_fragment_size; // 128KB
const size_t full_fragment_count = data_size / max_fragment_size; // number of max-sized fragments
const size_t last_fragment_size = data_size % max_fragment_size;
std::vector<seastar::temporary_buffer<char>> fragments;
std::vector<Buffer> fragments;
fragments.reserve(full_fragment_count + !!last_fragment_size);
for (size_t i = 0; i < full_fragment_count; ++i) {
fragments.emplace_back(seastar::temporary_buffer<char>(max_fragment_size));
fragments.emplace_back(Buffer(max_fragment_size));
}
if (last_fragment_size) {
fragments.emplace_back(seastar::temporary_buffer<char>(last_fragment_size));
fragments.emplace_back(Buffer(last_fragment_size));
}
return fragmented_temporary_buffer(std::move(fragments), data_size);
return basic_fragmented_buffer(std::move(fragments), data_size);
}
vector_type release() && noexcept {
@@ -124,7 +126,8 @@ public:
}
};
class fragmented_temporary_buffer::view {
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::view {
vector_type::const_iterator _current;
const char* _current_position = nullptr;
size_t _current_size = 0;
@@ -252,7 +255,7 @@ public:
_current_size = std::min(_current_size, _total_size);
}
bool operator==(const fragmented_temporary_buffer::view& other) const noexcept {
bool operator==(const basic_fragmented_buffer::view& other) const noexcept {
auto this_it = begin();
auto other_it = other.begin();
@@ -285,10 +288,14 @@ public:
return this_it == end() && other_it == other.end();
}
};
using fragmented_temporary_buffer = basic_fragmented_buffer<temporary_buffer<char>>;
static_assert(FragmentRange<fragmented_temporary_buffer::view>);
static_assert(FragmentedView<fragmented_temporary_buffer::view>);
inline fragmented_temporary_buffer::operator view() const noexcept
template <ContiguousSharedBuffer Buffer>
inline basic_fragmented_buffer<Buffer>::operator view() const noexcept
{
if (!_size_bytes) {
return view();
@@ -305,7 +312,8 @@ concept ExceptionThrower = requires(T obj, size_t n) {
}
class fragmented_temporary_buffer::istream {
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::istream {
vector_type::const_iterator _current;
const char* _current_position;
const char* _current_end;
@@ -465,29 +473,32 @@ public:
}
};
inline fragmented_temporary_buffer::istream fragmented_temporary_buffer::get_istream() const noexcept // allow empty (ut for that)
template <ContiguousSharedBuffer Buffer>
inline basic_fragmented_buffer<Buffer>::istream basic_fragmented_buffer<Buffer>::get_istream() const noexcept // allow empty (ut for that)
{
return istream(_fragments, _size_bytes);
}
class fragmented_temporary_buffer::reader {
std::vector<temporary_buffer<char>> _fragments;
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::reader {
using FragBuffer = basic_fragmented_buffer<Buffer>;
FragBuffer::vector_type _fragments;
size_t _left = 0;
public:
future<fragmented_temporary_buffer> read_exactly(input_stream<char>& in, size_t length) {
_fragments = std::vector<temporary_buffer<char>>();
future<FragBuffer> read_exactly(input_stream<char>& in, size_t length) {
_fragments = FragBuffer::vector_type();
_left = length;
return repeat_until_value([this, length, &in] {
if (!_left) {
return make_ready_future<std::optional<fragmented_temporary_buffer>>(fragmented_temporary_buffer(std::move(_fragments), length));
return make_ready_future<std::optional<FragBuffer>>(FragBuffer(std::move(_fragments), length));
}
return in.read_up_to(_left).then([this] (temporary_buffer<char> buf) {
if (buf.empty()) {
return std::make_optional(fragmented_temporary_buffer());
return std::make_optional(FragBuffer());
}
_left -= buf.size();
_fragments.emplace_back(std::move(buf));
return std::optional<fragmented_temporary_buffer>();
_fragments.emplace_back(Buffer(std::move(buf)));
return std::optional<FragBuffer>();
});
});
}
@@ -495,7 +506,8 @@ public:
// The operator below is used only for logging
inline std::ostream& operator<<(std::ostream& out, const fragmented_temporary_buffer::view& v) {
template <ContiguousSharedBuffer Buffer>
inline std::ostream& operator<<(std::ostream& out, const typename basic_fragmented_buffer<Buffer>::view& v) {
for (bytes_view frag : fragment_range(v)) {
out << to_hex(frag);
}