As requested in #22120, moved the files and fixed other includes and build system.
Moved files:
- query.cc
- query-request.hh
- query-result.hh
- query-result-reader.hh
- query-result-set.cc
- query-result-set.hh
- query-result-writer.hh
- query_id.hh
- query_result_merger.hh
Fixes: #22120
This is a cleanup, no need to backport
Closesscylladb/scylladb#25105
If a partition range is not present locally,
`partition_ranges_owned_by_this_shard` assigns it to shard 0, which can
overload shard 0. To address this, this commit adds a `shard_id_hint`
to the mapreduce request. When `shard_id_hint` is set, the entire
partition range in the request is handled by the specified shard.
The `shard_id_hint` is set by the new tablet-aware mapreduce algorithm,
introduced in `dispatch_to_tablets`. This algorithm balances the
workload across shards, so the changes in this commit ensure that
load balancing is preserved, even during events such as tablet splits.
Fixes: scylladb#21831
This test verifies the effectiveness of the mechanism for releasing ERM
introduced in this patch series. In test scenario, during processing of
a query in mapreduce service, reads are intentionally blocked by
an injected error. However, when table uses tablets, ERM is now often
released by the mapreduce service, so the topology is not blocked to the
end of the request. As a result, it is possible to add a new node
before the query finishes.
Refs. scylladb#21831
The primary goal of this change is to reduce the time during which the
Effective Replication Map (ERM) is retained by the mapreduce service.
This ensures that long aggregate queries do not block topology
operations. As ScyllaDB transitions towards tablets, which simplify
work dispatching, the new algorithm is designed specifically for
tablets.
The algorithm divides work so that each `tablet_replica` (a <host,
shard> pair) processes two tablets at a time. After processing of each
`tablet_replica`, the ERM is released and re-acquired.
The new algorithm can be summarized as follows:
1. Prepare a set of exclusive `partition_ranges`, where each range
represents one tablet. This set is called `ranges_left`, because it
contains ranges that still need processing.
2. Loop until `ranges_left` is empty:
I. Create `tablet_replica` -> `ranges` mapping for the current ERM
and `ranges_left`. Store this mapping and the number
representing current ERM version as `ranges_per_replica`.
II. In parallel, for each tablet_replica, iterate through
ranges_per_tablet_replica. Select independently up to two ranges
that are still existing in ranges_left. Remove each range
selected for processing from ranges_left. Before each iteration,
verify that ERM version has not changed. If it has,
return to Step I.
Steps I and II are exclusive to simplify maintaining `ranges_left` and
`ranges_per_replica`:
- Step I iterates through `ranges_left` and creates
`ranges_per_replica`
- Step II iterates through `ranges_per_replica` and remove processed
ranges from `ranges_left`
To maintain the exclusivity, the algorithm uses `parallel_for_each` in
Step II, requiring all ongoing `tablet_replica` processing to finish
before returning to Step I.
Currently, each node can handle any partition range, even if the
mapreduce supercoordinator does not retain the ERM and the range is
absent locally. This is because `execute_on_this_shard` creates a new
pager to coordinate the partition range read, including obtaining its
own ERM. However, absent ranges are handled by shard 0, so proper
routing is necessary to avoid overloading shard 0. Thus, in Step II,
the ERM is retained during each `tablet_replica` processing.
The tablet split scenario is not well-handled in this implementation.
After a split, the entire pre-split range is sent to a node hosting
the `tablet_replica` containing the range's `end_token`. The node
will typically not have other tablets in the range, and as
aforementioned, absent ranges are handled by shard 0. As a result,
in such scenario, shard 0 handles a significant portion of the range.
This issue is addressed later in this patch series by introducing
`shard_id` in `mapreduce_request`.
Ref. scylladb#21831
Before this change, `mapreduce_service` used `_shared_token_metadata`
to get the topology. However, the token was used in a part of the code
that already had its own ERM with its own metadata token. Moreover,
as mapreduce_service's token and ERM's token are not guaranteed to be
the same, inconsistencies could occur.
Therefore, this commit removes `_shared_token_metadata` and its usage.
This commit moves the current dispatching logic of the mapreduce service
to a new dispatch_to_vnodes function. The moved code was written before
tablets were introduced, and although it works with tablets,
the variable naming still refers to vnodes (e.g., vnodes_per_addr,
vnodes_generator).
The motivation for this change is that later in this patch series,
a new algorithm for tablets is introduced, and both algorithms
need to coexist.
Ref. scylladb#21831
This commit removes unnecessary underscores from tr_state_ and
dispatcher_ variable names, that were left after moving code to
a separate function in the previous commit.
The motivation for this change is to enable code reuse when
a new implementation of the mapreduce algorithm for tablets
is introduced later in this patch series.
Ref. scylladb#21831
In parallelized aggregation functions super-coordinator (node performing final merging step) receives and merges each partial result in parallel coroutines (`parallel_for_each`).
Usually responses are spread over time and actual merging is atomic.
However sometimes partial results are received at the similar time and if an aggregate function (e.g. lua script) yields, two coroutines can try to overwrite the same accumulator one after another,
which leads to losing some of the results.
To prevent this, in this patch each coroutine stores merging results in its own context and overwrites accumulator atomically, only after it was fully merged.
Comparing to the previous implementation order of operands in merging function is swapped, but the order of aggregation is not guaranteed anyway.
Fixes#20662Closesscylladb/scylladb#24106
Mapreduce Service exception handling differs for local and remote RPC
calls of dispatch_to_shards. Whereas local exceptions are handled
normally, the remote exceptions are converted to rpc::remote_verb_error
by the framework. This is a substantial difference when
read_failure_exception_with_timeout is thrown during mapreduce query
execution - CQL server waits for the exception from the local call but
not from the remote one.
As we don't want to wait for the timeout in CQL server in either of
the cases, this commit catches the local exception (especially
read_failure_exception_with_timeout) and converts it to
std::runtime_error (the one from which rpc::remote_verb_error inherits).
Ideally, Mapreduce Service should execute dispatch_to_shards through RPC
for both local and remote calls. However, such change negatively affects
tens of Unit Tests that rely on the possibility to run local mapreduce
service without any RPC.
This change:
- Catch local exceptions in Mapreduce Service and convert them
to std::runtime_error.
Replace boost::remove_if() with the standard library's std::erase_if() or std::ranges::remove_if() to reduce external dependencies and simplify the codebase. This change eliminates the requirement for boost::range and makes the implementation more maintainable.
---
it's a cleanup, hence no need to backport.
Closesscylladb/scylladb#22788
* github.com:scylladb/scylladb:
service: migrate from boost::range::remove_if() to std::ranges::remove_if
sstable: migrate from boost::remove_if() to std::erase_if()
Replace boost::range::remove_if() with the standard library's
std::ranges::remove_if() to reduce external dependencies and simplify
the codebase. This change eliminates the requirement for boost::range
and makes the implementation more maintainable.
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
Before these changes, shutting down a node could be prolonged because of
mapreduce_service. `mapreduce_service::stop()` uninitializes messaging
service, which includes waiting for all ongoing RPC handlers. We already
had a mechanism for cancelling local mapreduce tasks, but we were missing
one for cancelling external queries.
In this commit, we modify the signature of the request so it supports
cancelling via an abort source. We also provide a reproducer test
for the problem.
Fixesscylladb/scylladb#22337Closesscylladb/scylladb#22651
This adds to the grammar the option to SELECT a specific key in a
collection column using subscript syntax.
For example:
SELECT map['key'] FROM table
SELECT map['key1']['key2'] FROM table
The key can also be parameterized in a prepared query. For this we need
to pass the query options to result_set_builder where we process the
selectors.
Fixesscylladb/scylladb#7751
"
This rather large patch series moves storage proxy and some adjacent
services (like migration manager) to use host ids to identify nodes rather
than ips. Messaging service gains a capability to address nodes by host
ids (which allows dropping translations from topology coordinator code
that worked on host ids already) and also makes sure that a node with
incorrect host id will reject a message (can happen during address
changes).
The series gets rid of the raft address map completely and replaces it with
the gossiper address map which is managed by the gossiper since translation
is now done in the layer below raft.
Fixes: scylladb/scylladb#6403
perf-simple-query -- smp 1 -m 1G output
Before:
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
64336.82 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41291 insns/op, 24485 cycles/op, 0 errors)
62669.58 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41277 insns/op, 24695 cycles/op, 0 errors)
69172.12 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.2 tasks/op, 41326 insns/op, 24463 cycles/op, 0 errors)
56706.60 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41143 insns/op, 24513 cycles/op, 0 errors)
56416.65 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 41186 insns/op, 24851 cycles/op, 0 errors)
throughput: mean=61860.35 standard-deviation=5395.48 median=62669.58 median-absolute-deviation=5153.75 maximum=69172.12 minimum=56416.65
instructions_per_op: mean=41244.62 standard-deviation=76.90 median=41276.94 median-absolute-deviation=58.55 maximum=41326.19 minimum=41142.80
cpu_cycles_per_op: mean=24601.35 standard-deviation=167.39 median=24512.64 median-absolute-deviation=116.65 maximum=24851.45 minimum=24462.70
After:
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, frontend=cql, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
65237.35 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.2 tasks/op, 40733 insns/op, 23145 cycles/op, 0 errors)
59283.09 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40624 insns/op, 23948 cycles/op, 0 errors)
70851.03 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40625 insns/op, 23027 cycles/op, 0 errors)
70549.61 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40650 insns/op, 23266 cycles/op, 0 errors)
68634.96 tps ( 63.1 allocs/op, 0.0 logallocs/op, 14.1 tasks/op, 40622 insns/op, 22935 cycles/op, 0 errors)
throughput: mean=66911.21 standard-deviation=4814.60 median=68634.96 median-absolute-deviation=3638.40 maximum=70851.03 minimum=59283.09
instructions_per_op: mean=40650.89 standard-deviation=47.55 median=40624.60 median-absolute-deviation=27.11 maximum=40733.37 minimum=40622.33
cpu_cycles_per_op: mean=23264.16 standard-deviation=402.12 median=23145.29 median-absolute-deviation=237.63 maximum=23947.96 minimum=22934.59
CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/13531/
SCT (longevity-100gb-4h with nemesis_selector: ['topology_changes']): https://jenkins.scylladb.com/view/staging/job/scylla-staging/job/gleb/job/move-to-host-id/3/
Tested mixed cluster manually.
"
* 'gleb/move-to-host-id-v2' of github.com:scylladb/scylla-dev: (55 commits)
group0: drop unused field from replace_info struct
test: rename raft_address_map_test to address_map_test and move if from raft tests
raft_address_map: remove raft address map
topology coordinator: do not modify expire state for left/new nodes any more in raft address map
topology coordinator: drop expiring entries in gossiper address map on error injections since raft one is no longer used
group0: drop raft address map dependency from raft_rpc
group0: move raft_ticker_type definition from raft_address_map.hh
storage_service: do not update raft address map on gossiper events
group0: drop raft address map dependency from raft_server_with_timeouts
group0: move group0 upgrade code to host ids
repair: drop raft address map dependency
group0: remove unused raft address map getter from raft_group0
group0: drop raft address map from group0_state_machine dependency since it is not used there any more
group0: remove dependency on raft address map from group0_state_id_handler
gossiper: add get_application_state_ptr that searches by host_id
gossiper: change get_live_token_owners to return host ids
view: move view building to host id
hints: use host id to send hints
storage_proxy: remove id_vector_to_addr since it is no longer used
db: consistency_level: change is_sufficient_live_nodes to work on host ids
...
now that we are allowed to use C++23. we now have the luxury of using
`std::views::transform`.
in this change, we:
- replace `boost::adaptors::transformed` with `std::views::transform`
- use `fmt::join()` when appropriate where `boost::algorithm::join()`
is not applicable to a range view returned by `std::view::transform`.
- use `std::ranges::fold_left()` to accumulate the range returned by
`std::view::transform`
- use `std::ranges::fold_left()` to get the maximum element in the
range returned by `std::view::transform`
- use `std::ranges::min()` to get the minimal element in the range
returned by `std::view::transform`
- use `std::ranges::equal()` to compare the range views returned
by `std::view::transform`
- remove unused `#include <boost/range/adaptor/transformed.hpp>`
- use `std::ranges::subrange()` instead of `boost::make_iterator_range()`,
to feed `std::views::transform()` a view range.
to reduce the dependency to boost for better maintainability, and
leverage standard library features for better long-term support.
this change is part of our ongoing effort to modernize our codebase
and reduce external dependencies where possible.
limitations:
there are still a couple places where we are still using
`boost::adaptors::transformed` due to the lack of a C++23 alternative
for `boost::join()` and `boost::adaptors::uniqued`.
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
Closesscylladb/scylladb#21700
In this rather large path we mode to address nodes in storage proxy by
host ids instead of ips. Some subsystems storage proxy calls to are
not yet converted to host ids, so we translate back and forth when we
interact with them.
these unused includes are identified by clang-include-cleaner. after
auditing the source files, all of the reports have been confirmed.
please note, because `mutation/mutation.hh` does not include
`seastar/coroutine/maybe_yield.hh` anymore, and quite a few source
files were relying on this header to bring in the declaration of
`maybe_yield()`, we have to include this header in the places where
this symbol is used. the same applies to `seastar/core/when_all.hh`.
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
`coroutine::parallel_for_each` accepts both a range and a pair of
iterators. let's use the former when appropriate. it is simpler this way.
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
Closesscylladb/scylladb#21684
Our "sstring_view" is an historic alias for the standard std::string_view.
The patch changes the last remaining random uses of this old alias across
our source directory to the standard type name.
After this patch, there are no more uses of the "sstring_view" alias.
It will be removed in the following patch.
Refs #4062.
Signed-off-by: Nadav Har'El <nyh@scylladb.com>
the log.hh under the root of the tree was created keep the backward
compatibility when seastar was extracted into a separate library.
so log.hh should belong to `utils` directory, as it is based solely
on seastar, and can be used all subsystems.
in this change, we move log.hh into utils/log.hh to that it is more
modularized. and this also improves the readability, when one see
`#include "utils/log.hh"`, it is obvious that this source file
needs the logging system, instead of its own log facility -- please
note, we do have two other `log.hh` in the tree.
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>
Simplify the function by converting it to a coroutine.
Note that while the final co_return co_await looks like a loop (and
therefore an await would introduce an O(n) allocation), it really isn't -
we retry at most once.
This is done to ease code reuse in the following commit.
It'd also help should we ever want properly mount functions
class to schema object instead of static storage.
forward_service is nondescriptive and misnamed, as it does more than
forward requests. It's a classic map/reduce algorithm (and in fact one
of its parameters is "reducer"), so name it accordingly.
The name "forward" leaked into the wire protocol for the messaging
service RPC isolation cookie, so it's kept there. It's also maintained
in the name of the logger (for "nodetool setlogginglevel") for
compatibility with tests.
Closesscylladb/scylladb#19444