These commands will be used by strongly consistent tablets to submit
mutations to Raft. A simple state_machine implementation is introduced
to apply these commands.
We apply commands in batches to reduce commitlog I/O overhead. The
batched variant of database::apply has known atomicity issues. For
example, it does not guarantee atomicity under memory pressure: some
mutations may be published to the memtable while others are blocked in
run_when_memory_available. We will address these issues later.
Bound_weight and partition_region are defined in both paging_state.idl.hh and
position_in_partition.idl.hh. This isn't currently causing any issues, but if
a future RPC uses both the paging_state and position_in_partition, after
including both files we'll get a duplicate error.
In this patch we prevent this by removing the definitions from paging_state.idl.hh
and including position_in_partition.idl.hh in their place.
Closesscylladb/scylladb#28228
Currently direct_fd_ping runs without timeout, but the verb is not
waited forever, the wait is canceled after a timeout, this timeout
simply is not passed to the rpc. It may create a situation where the
rpc callback can runs on a destination but it is no longer waited on.
Change the code to pass timeout to rpc as well and return earlier from
the rpc handler if the timeout is reached by the time the callback is
called. This is backwards compatible since timeout is passed as
optional.
Previously, the view building coordinator relied on setting each task's state to STARTED and then explicitly removing these state entries once tasks finished, before scheduling new ones. This approach induced a significant number of group0 commits, particularly in large clusters with many nodes and tablets, negatively impacting performance and scalability.
With the update, the coordinator and worker logic has been restructured to operate without maintaining per-task states. Instead, tasks are simply tracked with an aborted boolean flag, which is still essential for certain tablet operations. This change removes much of the coordination complexity, simplifies the view building code, and reduces operational overhead.
In addition, the coordinator now batches reports of finished tasks before making commits. Rather than committing task completions individually, it aggregates them and reports in groups, significantly minimizing the frequency of group0 commits. This new approach is expected to improve efficiency and scalability during materialized view construction, especially in large deployments.
Fixes https://github.com/scylladb/scylladb/issues/26311
This patch needs to be backported to 2025.4.
Closesscylladb/scylladb#26897
* github.com:scylladb/scylladb:
docs/dev/view-building-coordinator: update the docs after recent changes
db/view/view_building: send coordinator's term in the RPC
db/view/view_building_state: replace task's state with `aborted` flag
db/view/view_building_coordinator: batch finished tasks reporting
db/view/view_building_worker: change internal implementation
db/view/view_building_coordinator: change `work_on_tasks` RPC return type
To avoid case when an old coordinator (which hasn't been stopped yet)
dictates what should be done, add raft term to the `work_on_view_building_tasks`
RPC.
The worker needs to check if the term matches the current term from raft
server, and deny the request when the term is bad.
This reverts commit 11f045bb7c.
The rpc was added together with colocated tablets in 2025.4 to support a
"shared repair" operation of a group of colocated tablets that repairs
all of them and allows also for special behavior as opposed to repairing
a single specific tablet.
It is not used anymore because we decided to not repair all colocated
tablets in a single shared operation, but to repair only the base table,
and in a later release support repairing colocated tables individually.
We can remove the rpc in 2025.4 because it is introduced in the same
version.
Currently if a banned node tries to connect to a cluster it fails to
create connections, but has no idea why, so from inside the node it
looks like it has communication problems. This patch adds new rpc
NOTIFY_BANNED which is sent back to the node when its connection is
dropped. On receiving the rpc the node isolates itself and print an
informative message about why it did so.
Closesscylladb/scylladb#26943
This commit doesn't change the logic behind the view building worker but
it changes how the worker is executing view building tasks.
Previously, the worker had a state only on shard0 and it was reacting to
changes in group0 state. When it noticed some tasks were moved to
`STARTED` state, the worker was creating a batch for it on the shard0
state.
The RPC call was used only to start the batch and to get its result.
Now, the main logic of batch management was moved to the RPC call
handler.
The worker has a local state on each shard and the state
contains:
- unique ptr to the batch
- set of completed tasks
- information for which views the base table was flushed
So currently, each batch lives on a shard where it has its work to do
exclusively. This eliminates a need to do a synchronization between
shard0 and work shard, which was a painful point in previous
implementation.
The worker still reacts to changes in group0 view building state, but
currently it's only used to observe whether any view building tasks was
aborted by setting `ABORTED` state.
To prepare for further changes to drop the view building task state,
the worker ignores `IDLE` and `STARTED` states completely.
During the initial implementation of the view builing coordinator,
we decided that if a view building task fails locally on the worker
(example reason: view update's target replica is not available),
the worker will retry this work instead of reporting a failure to the
coordinator.
However, we left return type of the RPC, which was telling if a task was
finished successfully or aborted.
But the worker doesn't need to report that a task was aborted, because
it's the coordinator, who decides to abort a task.
So, this commit changes the return type to list of UUIDs of completed
tasks.
Previously length of the returned vector needed to be the same as length
of the vector sent in the request.
No we can drop this restriction and the RPC handler return list of UUIDs
of completed tasks (subset of vector sent in the request).
This change is required to drop `STARTED` state in next commits.
Since Scylla 2025.4 wasn't released yet and we're going to merge this
patch before releasing, no RPC versioning or cluster feature is needed.
This patch changes the tablet size map in load_stats. Previously, this
data structure was:
std::unordered_map<range_based_tablet_id, uint64_t> tablet_sizes;
and is changed into:
std::unordered_map<table_id, std::unordered_map<dht::token_range, uint64_t>> tablet_sizes;
This allows for improved performance of tablet tablet size reconciliation.
Using the name regular as the incremental mode could be confusing, since
regular might be interpreted as the non-incremental repair. It is better
to use incremental directly.
Before:
- regular (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)
After:
- incremental (standard incremental repair)
- full (full incremental repair)
- disabled (incremental repair disabled)
Fixes#26503Closesscylladb/scylladb#26504
This commit extend the TABLE_LOAD_STATS RPC with data about the tablet
replica sizes and effective disk capacity.
Effective disk capacity of a node is computed as a sum of the sizes of
all tablet replicas on a node and available disk space.
This is the first change in the size based load balancing series.
Closesscylladb/scylladb#26035
This PR refactors the can_vote function in the Raft algorithms for improved clarity and maintainability by providing safer strong boolean types to the raft algorithm.
Fixes: #21937
Backport: No backport required
Closesscylladb/scylladb#25787
As requested in #22104, moved the files and fixed other includes and build system.
Moved files:
- combine.hh
- collection_mutation.hh
- collection_mutation.cc
- converting_mutation_partition_applier.hh
- converting_mutation_partition_applier.cc
- counters.hh
- counters.cc
- timestamp.hh
Fixes: #22104
This is a cleanup, no need to backport
Closesscylladb/scylladb#25085
Currently, in repair_tablet we retrieve session_id from tablet
map (and throw if it isn't specified). In case of topology
coordinator failover, we may end up in a situation where a node
runs outdated repair, treating session of a different operation
as the repair's session:
- topology coordinator starts repair transition (A);
- topology coordinator sends tablet repair rpc to node1;
- topology coordinator is separated from the cluster;
- new topology coordinator is elected;
- new topology coordinator sees waiting repair request (A_2)
and executes it;
- new repair of the same tablet is requested (B);
- new topology coordinator starts repair transition (B);
- new topology coordinator sends tablet repair rpc to node2;
- node2 starts repair (B) as repair master;
- node1 starts repair (A), checks the current session (B), proceeds
with repair (B) as repair master.
Send current session_id in repair_tablet rpc. If this session_id
and session id got from tablet map don't match, an exception
is thrown.
Fixes: https://github.com/scylladb/scylladb/issues/23318.
No backport; changes in rpc signatures
Closesscylladb/scylladb#25369
* github.com:scylladb/scylladb:
test: check that repair with outdated session_id fails
service: pass current session_id to repair rpc
This PR consists of three parts:
* Small refactoring of the fencing APIs in storage_proxy (renames + comments + some functions were extracted)
* Implement the fencing for LWT verbs itself. This includes checking the fencing token before and after local replica data accesses.
* Two new `test.py` tests in `test_fencing.py`, which check the fencing in some real-world scenarios.
Backport: no need -- fencing for LWT requests is needed primarily for LWT over tablets, which is not released yet.
Fixesscylladb/scylladb#22332Closesscylladb/scylladb#25550
* https://github.com/scylladb/scylladb:
test_tablets_lwt: eliminate redundant disable_tablet_balancing
test_fencing: add test_lwt_fencing_upgrade
pylib: extract upgrade helpers from test_sstable_compression_dictionaries_upgrade.py
test_fencing: add test_fenced_out_on_tablet_migration_while_handling_paxos_verb
test_fencing: test_fence_lwt_during_bootstap
pylib/rest_client.py: encode injection name
storage_proxy_stats: add fenced_out_requests metric
storage_proxy: add fencing to Paxos verbs
storage_proxy::apply_fence: add overload that throws on failure
storage_proxy: extract apply_fence_result
sp::apply_fence: rename to apply_fence_on_ready
sp::apply_fence: rename to check_fence
sp::apply_fence: make non-generic
As requested in #22120, moved the files and fixed other includes and build system.
Moved files:
- query.cc
- query-request.hh
- query-result.hh
- query-result-reader.hh
- query-result-set.cc
- query-result-set.hh
- query-result-writer.hh
- query_id.hh
- query_result_merger.hh
Fixes: #22120
This is a cleanup, no need to backport
Closesscylladb/scylladb#25105
This commit adds fencing support to all Paxos verbs:
* Pass an optional (for backward compatibility) fencing_token as a
parameter to the prepare, accept, learn, and prune verbs.
* Call apply_fence twice — before and after accessing local data. This
ensures that if the coordinator is fenced out mid-request, the replica
does not return success, which would otherwise incorrectly contribute
to achieving the target CL. Without this, a user might observe
successful writes that become unreadable after the topology
operation completes.
* For prune, call apply_fence only once because it does not return a
response to the LWT coordinator.
Fixesscylladb/scylladb#22332
The version keyword is missed for the optional mark_as_repaired
parameter. This causes the new node to expect more data to come:
INFO 2025-09-01 19:23:05,332 [shard 0:strm] rpc - client
127.0.7.6:50116 msg_id 8: caught exception while processing a message:
std::out_of_range (deserialization buffer underflow)
When the sender is an old node in a mixed cluster, the data will never
come. To fix, add the missing version keyword.
Our idl-compiler.py should have caught the typo since the keyword
was missing in the [[]] tag.
Fixes#25666Closesscylladb/scylladb#25782
This patch introduces a new `incremental_mode` parameter to the tablet
repair REST API, providing more fine-grained control over the
incremental repair process.
Previously, incremental repair was on and could not be turned off. This
change allows users to select from three distinct modes:
- `regular`: This is the default mode. It performs a standard
incremental repair, processing only unrepaired sstables and skipping
those that are already repaired. The repair state (`repaired_at`,
`sstables_repaired_at`) is updated.
- `full`: This mode forces the repair to process all sstables, including
those that have been previously repaired. This is useful when a full
data validation is needed without disabling the incremental repair
feature. The repair state is updated.
- `disabled`: This mode completely disables the incremental repair logic
for the current repair operation. It behaves like a classic
(pre-incremental) repair, and it does not update any incremental
repair state (`repaired_at` in sstables or `sstables_repaired_at` in
the system.tablets table).
The implementation includes:
- Adding the `incremental_mode` parameter to the
`/storage_service/repair/tablet` API endpoint.
- Updating the internal repair logic to handle the different modes.
- Adding a new test case to verify the behavior of each mode.
- Updating the API documentation and developer documentation.
Fixes#25605Closesscylladb/scylladb#25693
When a scaling out is delayed or fails, it is crucial to ensure that clusters remain operational
and recoverable even under extreme conditions. To achieve this, the following proactive measures
are implemented:
- reject writes
- includes: inserts, updates, deletes, counter updates, hints, read+repair and lwt writes
- applicable to: user tables, views, CDC log, audit, cql tracing
- stop running compactions/repairs and prevent from starting new ones
- reject incoming tablet migrations
The aforementioned mechanisms are automatically enabled when node's disk utilization reaches
the critical level (default: 98%) and disabled when the utilization drop below the threshold.
Apart from that, the series add tests that require mounted volumes to simulate out of space.
The paths to the volumes can be provided using the a pytest argument, i.e. `--space-limited-dirs`.
When not provided, tests are skipped.
Test scenarios:
1. Start a cluster and write data until one of the nodes reaches 90% of the disk utilization
2. Perform an **operation** that would take the nodes over 100%
3. The nodes should not exceed the critical disk utilization (98% by default)
4. Scale out the cluster by adding one node per rack
5. Retry or wait for the **operation** from step 2
The **operation** is: writing data, running compactions, building materialized views, running repair,
migrating tablets (caused by RF change, decommission).
The test is successful, if no nodes run out of space, the **operation** from step 2 is
aborted/paused/timed out and the **operation** from step 5 is successful.
`perf-simple-query --smp 1 -m 1G` results obtained for fixed 400MHz frequency:
Read path (before)
```
instructions_per_op:
mean= 39661.51 standard-deviation=34.53
median= 39655.39 median-absolute-deviation=23.33
maximum=39708.71 minimum=39622.61
```
Read path (after)
```
instructions_per_op:
mean= 39691.68 standard-deviation=34.54
median= 39683.14 median-absolute-deviation=11.94
maximum=39749.32 minimum=39656.63
```
Write path (before):
```
instructions_per_op:
mean= 50942.86 standard-deviation=97.69
median= 50974.11 median-absolute-deviation=34.25
maximum=51019.23 minimum=50771.60
```
Write path (after):
```
instructions_per_op:
mean= 51000.15 standard-deviation=115.04
median= 51043.93 median-absolute-deviation=52.19
maximum=51065.81 minimum=50795.00
```
Fixes: https://github.com/scylladb/scylladb/issues/14067
Refs: https://github.com/scylladb/scylladb/issues/2871
No backport, as it is a new feature.
Closesscylladb/scylladb#23917
* github.com:scylladb/scylladb:
tests/cluster: Add new storage tests
test/scylla_cluster: Override workdir when passed via cmdline
streaming: Reject incoming migrations
storage_service: extend locator::load_stats to collect per-node critical disk utilization flag
repair_service: Add a facility to disable the service
compaction_manager: Subscribe to out of space controller
compaction_manager: Replace enabled/disabled states with running state
database: Add critical_disk_utilization mode database can be moved to
disk_space_monitor: add subscription API for threshold-based disk space monitoring
docs: Add feature documentation
config: Add critical_disk_utilization_level option
replica/exceptions: Add a new custom replica exception
Currently, in repair_tablet we retrieve session_id from tablet
map (and throw if it isn't specified). In case of topology
coordinator failover, we may end up in a situation where a node
runs outdated repair, treating session of a different operation
as the repair's session:
- topology coordinator starts repair transition (A);
- topology coordinator sends tablet repair rpc to node1;
- topology coordinator is separated from the cluster;
- new topology coordinator is elected;
- new topology coordinator sees waiting repair request (A_2)
and executes it;
- new repair of the same tablet is requested (B);
- new topology coordinator starts repair transition (B);
- new topology coordinator sends tablet repair rpc to node2;
- node2 starts repair (B) as repair master;
- node1 starts repair (A), checks the current session (B), proceeds
with repair (B) as repair master.
Send current session_id in repair_tablet rpc. If this session_id
and session id got from tablet map don't match, an exception
is thrown.
This commit extends the TABLE_LOAD_STATS RPC with information whether
a node operates in the critical disk utilization mode.
This information will be needed to distict between the causes why
a table migration/repair was interrupted.
The new exception `critical_disk_utilization_exception` is thrown
when the user table mutation writes are being blocked due to e.g.
reaching a critical disk utilization level.
This new exception, is then correctly handled on the coordinator
side when transforming into `mutation_write_failure_exception` with
a meaningful error message: "Write rejected due to critical disk
utilization".
The RPC will be used by view building coordinator to attach to and wait
for tasks performed by view building worker (introduced in later
commit).
The RPC gets vector of tasks' ids and returns vector of
`view_task_result`s.
i-th task result reffers to i-th task id.
The central idea of incremental repair is to allow repair participants
to select and repair only a portion of the dataset to speed up the
repair process. All repair participants must utilize an identical
selection method to repair and synchronize the same selected dataset.
There are two primary selection methods: time-based and file-based. The
time-based method selects data within a specified time frame. It is
versatile but it is less efficient because it requires reading all of
the dataset and omitting data beyond the time frame. The file-based
method selects data from unrepaired SSTables and is more efficient
because it allows the entire SSTable to be omitted. This document patch
implements the file-based selection method.
Incremental repair will only be supported for tablet tables; it will not
be supported for vnode tables. On one hand, the legacy vnode is less
important to support. On the other hand, the incremental repair for
vnode is much harder to implement. With vnodes, a SSTalbe could contain
data for multiple vnode ranges. When a given vnode range is repaired,
only a portion of the SSTable is repaired. This complicates the
manipulation of SSTables significantly during both repair and
compaction. With tablets, an entire tablet is repaired so that a
sstable is either fully repaired or not repaired which is a huge
simplification.
This patch uses the repaired_at from sstables::statistics component to
mark a sstable as repaired. It uses a virtual clock as the repair
timestamp, i.e., using a monotonically increasing number for the
repaired_at field of a SSTable and sstables_repaired_at column in
system.tablets table. Notice that when a sstable is not repaired, the
repaired_at field will be set to the default value 0 by default. The
being_repaired in memory field of a SSTable is used to explicitly mark
that a SSTable is being selected. The following variables are used for
incremental repair:
The repaired_at on disk field of a SSTable is used.
- A 64-bit number increases sequentially
The sstables_repaired_at is added to the system.tablets table.
- repaired_at <= sstables_repaired_at means the sstable is repaired
The being_repaired in memory field of a SSTable is added.
- A repair UUID tells which sstable has participated in the repair
Initial test results:
1) Medium dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~500GB
Cluster pre-populated with ~500GB of data before starting repairs job.
Results for Repair Timings:
The regular repair run took 210 mins.
Incremental repair 1st run took 183 mins, 2nd and 3rd runs took around 48s
The speedup is: 183 mins / 48s = 228X
2) Small dataset results
Node amount: 3
Instance type: i4i.2xlarge
Disk usage per node: ~167GB
Cluster pre-populated with ~167GB of data before starting the repairs job.
Regular repair 1st run took 110s, 2nd and 3rd runs took 110s.
Incremental repair 1st run took 110 seconds, 2nd and 3rd run took 1.5 seconds.
The speedup is: 110s / 1.5s = 73X
3) Large dataset results
Node amount: 6
Instance type: i4i.2xlarge, 3 racks
50% of base load, 50% read/write
Dataset == Sum of data on each node
Dataset Non-incremental repair (minutes)
1.3 TiB 31:07
3.5 TiB 25:10
5.0 TiB 19:03
6.3 TiB 31:42
Dataset Incremental repair (minutes)
1.3 TiB 24:32
3.0 TiB 13:06
4.0 TiB 5:23
4.8 TiB 7:14
5.6 TiB 3:58
6.3 TiB 7:33
7.0 TiB 6:55
Fixes#22472
The following steps are performed in sequence as part of the
Raft-based recovery procedure:
- set `recovery_leader` to the host ID of the recovery leader in
`scylla.yaml` on all live nodes,
- send the `SIGHUP` signal to all Scylla processes to reload the config,
- perform a rolling restart (with the recovery leader being restarted
first).
These steps are not intuitive and more complicated than they could be.
In this PR, we simplify these steps. From now on, we will be able to
simply set `recovery_leader` on each node just before restarting it.
Apart from making necessary changes in the code, we also update all
tests of the Raft-based recovery procedure and the user-facing
documentation.
Fixesscylladb/scylladb#25015
The Raft-based procedure was added in 2025.2. This PR makes the
procedure simpler and less error-prone, so it should be backported
to 2025.2 and 2025.3.
Closesscylladb/scylladb#25032
* github.com:scylladb/scylladb:
docs: document the option to set recovery_leader later
test: delay setting recovery_leader in the recovery procedure tests
gossip: add recovery_leader to gossip_digest_syn
db: system_keyspace: peers_table_read_fixup: remove rows with null host_id
db/config, gms/gossiper: change recovery_leader to UUID
db/config, utils: allow using UUID as a config option
When repairing a partition with many rows, we can store many fragments
in a repair_row_on_wire object which is sent as a rpc stream message.
This could cause reactor stalls when the rpc stream compression is
turned on, because the compression compresses the whole message without
any split and compression.
This patch solves the problem at the higher level by reducing the
message size that is sent to the rpc stream.
Tests are added to make sure the message split works.
Fixes#24808
With the change in "repair: Avoid too many fragments in a single
repair_row_on_wire", the
std::list<frozen_mutation_fragment> _mfs;
in partition_key_and_mutation_fragments will not contain large number of
fragments any more. Switch to use chunked_vector.
In the new Raft-based recovery procedure, live nodes join the new
group 0 one by one during a rolling restart. There is a time window when
some of them are in the old group 0, while others are in the new group
0. This causes a group 0 mismatch in `gossiper::handle_syn_msg`. The
current solution for this problem is to ignore group 0 mismatches if
`recovery_leader` is set on the local node and to ask the administrator
to perform the rolling restart in the following way:
- set `recovery_leader` in `scylla.yaml` on all live nodes,
- send the `SIGHUP` signal to all Scylla processes to reload the config,
- proceed with the rolling restart.
This commit makes `gossiper::handle_syn_msg` ignore group 0 mismatches
when exactly one of the two gossiping nodes has `recovery_leader` set.
We achieve this by adding `recovery_leader` to `gossip_digest_syn`.
This change makes setting `recovery_leader` earlier on all nodes and
reloading the config unnecessary. From now on, the administrator can
simply restart each node with `recovery_leader` set.
However, note that nodes that join group 0 must have `recovery_leader`
set until all nodes join the new group 0. For example, assume that we
are in the middle of the rolling restart and one of the nodes in the new
group 0 crashes. It must be restarted with `recovery_leader` set, or
else it would reject `gossip_digest_syn` messages from nodes in the old
group 0. To avoid problems in such cases, we will continue to recommend
setting `recovery_leader` in `scylla.yaml` instead of passing it as
a command line argument.
add a new RPC repair_colocated_tablets which is similar to the RPC
tablet_repair, but instead of repairing a single tablet it takes a set
of co-located tablets, repairs them and returns a shared repair_time
result.
This is useful because the way co-located tablets are represented
doesn't allow to repair tablets independently but only as a group
operation, and the repair_time which is stored in the tablet map is
shared with the entire co-location group.
But when repairing a group of co-located tablets we may require a
different behavior, especially considering that co-located tablets are
derived tablets of a special type. For example, we may want to skip
running repair on CDC tablets when repairing the base table.
The new RPC and the storage service function repair_colocated_tablets
allow the flexibility to implement different strategies when repairing
co-located groups.
Currently the implementation is simply to repair each tablet and return
the minimum repair_time as the shared repair time.
Although valid for compact tables, non-full (or empty) clustering key prefixes are not handled for row keys when writing sstables. Only the present components are written, consequently if the key is empty, it is omitted entirely.
When parsing sstables, the parsing code unconditionally parses a full prefix.
This mis-match results in parsing failures, as the parser parses part of the row content as a key resulting in a garbage key and subsequent mis-parsing of the row content and maybe even subsequent partitions.
Introduce a new system table: `system.corrupt_data` and infrastructure similar to `large_data_handler`: `corrupt_data_handler` which abstracts how corrupt data is handled. The sstable writer now passes rows such corrupt keys to the corrupt data handler. This way, we avoid corrupting the sstables beyond parsing and the rows are also kept around in system.corrupt_data for later inspection and possible recovery.
Add a full-stack test which checks that rows with bad keys are correctly handled.
Fixes: https://github.com/scylladb/scylladb/issues/24489
The bug is present in all versions, has to be backported to all supported versions.
Closesscylladb/scylladb#24492
* github.com:scylladb/scylladb:
test/boost/sstable_datafile_test: add test for corrupt data
sstables/mx/writer: handler rows with empty keys
test/lib/cql_assertions: introduce columns_assertions
sstables: add corrupt_data_handler to sstables::sstables
tools/scylla-sstable: make large_data_handler a local
db: introduce corrupt_data_handler
mutation: introduce frozen_mutation_fragment_v2
mutation/mutation_partition_view: read_{clustering,static}_row(): return row type
mutation/mutation_partition_view: extract de-ser of {clustering,static} row
idl-compiler.py: generate skip() definition for enums serializers
idl: extract full_position.idl from position_in_partition.idl
db/system_keyspace: add apply_mutation()
db/system_keyspace: introduce the corrupt_data table
The primary motivation for this change is to reduce the time during which the Effective Replication Map (ERM) is retained by the mapreduce service. This ensures that long aggregate queries do not block topology operations. As ScyllaDB is generally transitioning towards tablets, and using tablets simplifies work dispatching, the decision was made to design the new algorithm specifically for tablets. The goal of the algorithm is to divide the work in such a way that each `tablet_replica` (that is <host, shard> pair) processes two tablets at a time.
The new algorithm can be summarized as follows:
1. Prepare a tablet_replica -> partition_range mapping where the values cover the entire space.
2. For each tablet_replica, in parallel, take two partition ranges and dispatch them to the node hosting the replica. The ERM is released and re-acquired in each iteration, allowing the destination (i.e., tablet_replica) to change for each
artition range (in such cases, the partition range is assigned to the appropriate tablet_replica).
In step 1, the main difference compared to the old algorithm (dispatch_to_vnodes) is that partition ranges are assigned to a tablet_replica rather than just the host.
In step 2, the main difference is that the work is divided into smaller batches, and the ERM is released and re-acquired for each batch.
In the current implementation, each node can correctly handle every partition range, even if the mapreduce supercoordinator does not retain the ERM and the range is absent locally. This is because mapreduce_service::execute_on_this_shard creates a new pager that coordinates the partition range read, including obtaining its own ERM. However, every partition range that is absent locally is handled by shard 0. Therefore, proper routing of partition ranges is necessary to avoid shard 0 overload. This is why, in step 2, the ERM is retained during each batch processing, and the tablet_replica is refreshed for each processed range.
Additionally, shard_id is added to mapreduce request. When shard_id is set, the entire partition range is handled by the specified shard. As the new tablet-aware mapreduce algorithm balances the workload across shards, shard_id ensure that the balance is preserved, even during events such as tablet splits.
This patch series:
- Refactors a bit mapreduce service, to facilitate having two algorithm versions (one for vnodes and one for tablets).
- Implements tablet-aware dispatching algorithm.
- Adds shard_id to mapreduce request and uses the information to handle requests entirely by selected shard.
- Adds test_long_query_timeout_erm to verify the new functionality.
Fixes: scylladb#21831
No backport, as it is rather new feature than a bugfix.
Closesscylladb/scylladb#24383
* github.com:scylladb/scylladb:
mapreduce: add missing comma and space in mapreduce_request operator<<
mapreduce: add shard_id_hint to mapreduce request
test: add test_long_query_timeout_erm
mapreduce: add tablet-aware dispatching algorithm
storage_proxy: make storage_proxy::is_alive public
mapreduce: remove _shared_token_metadata from mapreduce_service
mapreduce: move dispatching logic to dispatch_to_vnodes
mapreduce: remove underscores from variable names
mapreduce: move req_with_modified_pr handling to a new function
mapreduce: change next_vnode lambda to get_next_partition_range function
If a partition range is not present locally,
`partition_ranges_owned_by_this_shard` assigns it to shard 0, which can
overload shard 0. To address this, this commit adds a `shard_id_hint`
to the mapreduce request. When `shard_id_hint` is set, the entire
partition range in the request is handled by the specified shard.
The `shard_id_hint` is set by the new tablet-aware mapreduce algorithm,
introduced in `dispatch_to_tablets`. This algorithm balances the
workload across shards, so the changes in this commit ensure that
load balancing is preserved, even during events such as tablet splits.
Fixes: scylladb#21831
Mirrors frozen_mutation_fragment and shares most of the underlying
serialization code, the only exception is replacing range_tombstone with
range_tombstone_change in the mutation fragment variant.
A future user of position_in_partition.idl doesn't need full_position
and so doesn't want to include full_position.hh to fix compile errors
when including position_in_partition.idl.hh.
Extract it to a separate idl file: it has a single user in a
storage_proxy VERB.
We'd like to change the data layout of `interval` to save space.
As a result, start() and end() which return references to data
members must return objects (not references). Since we'd like
to maintain zero-copy for these functions, we change them to
return objects containing references (rather than references
to objects), avoiding copying of potentially expensive objects.
We repurpose the interval_bound class to hold references (by
instantiating it with `const T&` instead of `T`) and provide
converting constructors. To make transform_bounds() retain
zero-copy, we add start() and end() that take *this by
rvalue reference.
We are about to change start() to return a proxy object rather
than a `const interval_bound<T>&`. This is generally transparent,
except in one case: `auto x = i.start()`. With the current implementation,
we'll copy object referred to and assign it to x. With the planned
implementation, the proxy object will be assigned to `x`, but it
will keep referring to `i`.
To prevent such problems, rename start() to start_ref() and end()
to end_ref(). This forces us to audit all calls, and redirect calls
that will break to new start_copy() and end_copy() methods.
token_range_vector is a linear vector containing intervals
of tokens. It can grow quite large in certain places
and so cause stalls.
Convert it to utils::chunked_vector, which prevents allocation
stalls.
It is not used in any hot path, as it usually describes
vnodes or similar things.
Fixes#3335.
Currently send_gossip_echo has a 22 seconds timeout
during which _abort_source is ignored.
Mark the verb as cancellable so it can be canceled
on shutdown / abort.
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
"
The series makes endpoint state map in the gossiper addressable by host
id instead of ips. The transition has implication outside of the
gossiper as well. Gossiper based topology operations are affected by
this change since they assume that the mapping is ip based.
On wire protocol is not affected by the change as maps that are sent by
the gossiper protocol remain ip based. If old node sends two different
entries for the same host id the one with newer generation is applied.
If new node has two ids that are mapped to the same ip the newer one is
added to the outgoing map.
Interoperability was verified manually by running mixed cluster.
The series concludes the conversion of the system to be host id based.
"
* 'gleb/gossipper-endpoint-map-to-host-id-v2' of github.com:scylladb/scylla-dev:
gossiper: make examine_gossiper private
gossiper: rename get_nodes_with_host_id to get_node_ip
treewide: drop id parameter from gossiper::for_each_endpoint_state
treewide: move gossiper to index nodes by host id
gossiper: drop ip from replicate function parameters
gossiper: drop ip from apply_new_states parameters
gossiper: drop address from handle_major_state_change parameter list
gossiper: pass rpc::client_info to gossiper_shutdown verb handler
gossiper: add try_get_host_id function
gossiper: add ip to endpoint_state
serialization: fix std::map de-serializer to not invoke value's default constructor
gossiper: drop template from wait_alive_helper function
gossiper: move get_supported_features and its users to host id
storage_service: make candidates_for_removal host id based
gossiper: use peers table to detect address change
storage_service: use std::views::keys instead of std::views::transform that returns a key
gossiper: move _pending_mark_alive_endpoints to host id
gossiper: do not allow to assassinate endpoint in raft topology mode
gossiper: fix indentation after previous patch
gossiper: do not allow to assassinate non existing endpoint
Add two verbs needed to implement dictionary training for SSTable
compression.
SAMPLE_SSTABLES returns a list of randomly-selected chunks of Data files
with a given cardinality and using a given chunk size,
for the given table.
ESTIMATE_SSTABLE_VOLUME returns the total uncompressed size of all Data
files the given table.
"
This is series starts conversion of the gossiper to use host ids to
index nodes. It does not touch the main map yet, but converts a lot of
internal code to host id. There are also some unrelated cleanups that
were done while working on the series. On of which is dropping code
related to old shadow round. We replaced shadow round with explicit
GOSSIP_GET_ENDPOINT_STATES verb in cd7d64f588
which is in scylla-4.3.0, so there should be no compatibility problem.
We already dropped a lot of old shadow round code in previous patches
anyway.
I tested manually that old and new node can co-exist in the same
cluster,
"
* 'gleb/gossiper-host-id-v2' of github.com:scylladb/scylla-dev: (33 commits)
gossiper: drop unneeded code
gossiper: move _expire_time_endpoint_map to host_id
gossiper: move _just_removed_endpoints to host id
gossiper: drop unused get_msg_addr function
messaging_service: change connection dropping notification to pass host id only
messaging_service: pass host id to remove_rpc_client in down notification
treewide: pass host id to endpoint_lifecycle_subscriber
treewide: drop endpoint life cycle subscribers that do nothing
load_meter: move to host id
treewide: use host id directly in endpoint state change subscribers
treewide: pass host id to endpoint state change subscribers
gossiper: drop deprecated unsafe_assassinate_endpoint operation
storage_service: drop unused code in handle_state_removed
treewide: drop endpoint state change subscribers that do nothing
gossiper: drop ip address from handle_echo_msg and simplify code since host_id is now mandatory
gossiper: start using host ids to send messages earlier
messaging_service: add temporary address map entry on incoming connection
topology_coordinator: notify about IP change from sync_raft_topology_nodes as well
treewide: move everyone to use host id based gossiper::is_alive and drop ip based one
storage_proxy: drop unused template
...
Send digest ack and ack2 by host ids as well now since the id->ip
mapping is available after receiving digest syn. It allows to convert
more code to host id here.
The patch adds new marker for a verb - [[ip]] that means that for this
verb ip version of the verbs needs to be generated. Most of the verbs
do not need it.