Compare commits

..

46 Commits

Author SHA1 Message Date
Kamil Braun
33901e1681 Fix version numbers in upgrade page title 2022-11-02 15:36:51 +01:00
guy9
097a65df9f adding top banner to the Docs website with a link to the ScyllaDB University fall LIVE event
Closes #11873
2022-11-02 10:20:40 +02:00
Nadav Har'El
b9d88a3601 cql/pytest: add reproducer for timestamp column validation issue
This patch adds a reproducing test for issue #11588, which is still open
so the test is expected to fail on Scylla ("xfail), and passes on Cassandra.

The test shows that Scylla allows an out-of-range value to be written to
timestamp column, but then it can't be read back.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #11864
2022-11-01 08:11:01 +02:00
Botond Dénes
dc46bfa783 Merge 'Prepare repair for task manager integration' from Aleksandra Martyniuk
The PR prepares repair for task manager integration:
- Creates repair_module
- Keeps repair_module in repair_service
- Moves tracker methods to repair_module
- Changes UUID to task_id in repair module

Closes #11851

* github.com:scylladb/scylladb:
  repair: check shutdown with abort source in repair module
  repair: use generic module gate for repair module operations
  repair: move tracker to repair module
  repair: move next_repair_command to repair_module
  repair: generate repair id in repair module
  repair: keep shard number in repair_uniq_id
  repair: change UUID to task_id
  repair: add task_manager::module to repair_service
  repair: create repair module and task
2022-11-01 08:05:14 +02:00
Aleksandra Martyniuk
f2fe586f03 repair: check shutdown with abort source in repair module
In repair module the shutdown can be checked using abort_source.
Thus, we can get rid of shutdown flag.
2022-10-31 10:57:29 +01:00
Aleksandra Martyniuk
2d878cc9b5 repair: use generic module gate for repair module operations
Repair module uses a gate to prevent starting new tasks on shutdown.
Generic module's gate serves the same purpose, thus we can
use it also in repair specific context.
2022-10-31 10:56:36 +01:00
Aleksandra Martyniuk
4aae7e9026 repair: move tracker to repair module
Since both tracker and repair_module serve similar purpose,
it is confusing where we should seek for methods connected to them.
Thus, to make it more transparent, tracker class is deleted and all
its attributes and methods are moved to repair_module.
2022-10-31 10:55:36 +01:00
Aleksandra Martyniuk
a5c05dcb60 repair: move next_repair_command to repair_module
Number of the repair operation was counted both with
next_repair_command from tracer and sequence number
from task_manager::module.

To get rid of redundancy next_repair_command was deleted and all
methods using its value were moved to repair_module.
2022-10-31 10:54:39 +01:00
Aleksandra Martyniuk
c81260fb8b repair: generate repair id in repair module
repair_uniq_id for repair task can be generated in repair module
and accessed from the task.
2022-10-31 10:54:24 +01:00
Aleksandra Martyniuk
6432a26ccf repair: keep shard number in repair_uniq_id
Execution shard is one of the traits specific to repair tasks.
Child task should freely access shard id of its parent. Thus,
the shard number is kept in a repair_uniq_id struct.
2022-10-31 10:41:17 +01:00
guy9
276ec377c0 removed broken roadmap link
Closes #11854
2022-10-31 11:33:03 +02:00
Aleksandra Martyniuk
e2c7c1495d repair: change UUID to task_id
Change type of repair id from utils::UUID to task_id to distinguish
them from ids of other entities.
2022-10-31 10:07:08 +01:00
Aleksandra Martyniuk
dc80af33bc repair: add task_manager::module to repair_service
repair_service keeps a shared pointer to repair_module.
2022-10-31 10:04:50 +01:00
Aleksandra Martyniuk
576277384a repair: create repair module and task
Create repair_task_impl and repair_module inheriting from respectively
task manager task_impl and module to integrate repair operations with
task manager.
2022-10-31 10:04:48 +01:00
Takuya ASADA
159bc7c7ea install-dependencies.sh: use binary distributions of PIP package
We currently avoid compiling C code in "pip3 install scylla-driver", but
we actually providing portable binary distributions of the package,
so we should use it by "pip3 install --only-binary=:all: scylla-driver".
The binary distribution contains dependency libraries, so we won't have
problem loading it on relocatable python3.

Closes #11852
2022-10-31 10:38:36 +02:00
Botond Dénes
139fbb466e Merge 'Task manager extension' from Aleksandra Martyniuk
The PR adds changes to task manager that allow more convenient integration with modules.

Introduced changes:
- adds internal flag in task::impl that allows user to filter too specific tasks
- renames `parent_data` to more appropriate name `task_info`
- creates `tasks/types.hh` which allows using some types connected with task manager without the necessity to include whole task manager
- adds more flexible version of `make_task` method

Closes #11821

* github.com:scylladb/scylladb:
  tasks: add alternative make_task method
  tasks: rename parent_data to task_info and move it
  tasks: move task_id to tasks/types.hh
  tasks: add internal flag for task_manager::task::impl
2022-10-31 09:57:10 +02:00
Botond Dénes
2c021affd1 Merge 'storage_service, repair: use per-shard abort_source' from Benny Halevy
Prevent copying shared_ptr across shards
in do_sync_data_using_repair by allocating
a shared_ptr<abort_source> per shard in
node_ops_meta_data and respectively in node_ops_info.

Fixes #11826

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #11827

* github.com:scylladb/scylladb:
  repair: use sharded abort_source to abort repair_info
  repair: node_ops_info: add start and stop methods
  storage_service: node_ops_abort_thread: abort all node ops on shutdown
  storage_service: node_ops_abort_thread: co_return only after printing log message
  storage_service: node_ops_meta_data: add start and stop methods
  repair: node_ops_info: prevent accidental copy
2022-10-31 09:43:34 +02:00
Tenghuan He
e0948ba199 Add directory change instruction
Add directory change instruction while building scylla

Closes #11717
2022-10-30 23:53:02 +02:00
Pavel Emelyanov
477e0c967a scylla-gdb: Evaluate LSA object sizes dynamically
The lsa-segment command tries to walk LSA segment objects by decoding
their descriptors and (!) object sizes as well. Some objects in LSA have
dynamic sizes, i.e. those depending on the object contents. The script
tries to drill down the object internals to get this size, but bad news
is that nowadays there are many dynamic objects that are not covered.
Once stepped upon unsupported object, scylla-gdb likely stops because
the "next" descriptor happens to be in the middle of the object and its
parsing throws.

This patch fixes this by taking advantage of the virtual size() call of
the migrate_fn_type all LSA objects are linked with (indirectly). It
gets the migrator object, the LSA object itself and calls

  ((migrate_fn_type*)<migrator_ptr>)->size((const void*)<object_ptr>)

with gdb. The evaluated value is the live dynamic size of the object.

fixes: #11792
refs: #2455

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes #11847
2022-10-28 14:11:30 +03:00
Botond Dénes
74c9aa3a3f Merge 'removenode: allow specifying nodes to ignore using host_id' from Benny Halevy
Currently, when specifying nodes to ignore for replace or removenode,
we support specifying them only using their ip address.

As discussed in https://github.com/scylladb/scylladb/issues/11839 for removenode,
we intentionally require the host uuid for specifying the node to remove,
so the nodes to ignore (that are also done, otherwise we need not ignore them),
should be consistent with that and be specified using their host_id.

The series extends the apis and allows either the nodes ip address or their host_id
to be specified, for backward compatibility.

We should deprecate the ip address method over time and convert the tests and management
software to use the ignored nodes' host_id:s instead.

Closes #11841

* github.com:scylladb/scylladb:
  api: doc: remove_node: improve summary
  api, service: storage_service: removenode: allow passing ignore_nodes as uuid:s
  storage_service: get_ignore_dead_nodes_for_replace: use tm.parse_host_id_and_endpoint
  locator: token_metadata: add parse_host_id_and_endpoint
  api: storage_service: remove_node: validate host_id
2022-10-28 13:35:04 +03:00
Benny Halevy
335a8cc362 api: doc: remove_node: improve summary
The current summary of the operation is obscure.
It refers to a token in the ring and the endpoint associated with it,
while the operation uses a host_id to identify a whole node.

Instead, clarify the summary to refer to a node in the cluster,
consistent with the description for the host_id parameter.
Also, describe the effect the call has on the data the removed node
logically owned.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-28 07:52:37 +03:00
Benny Halevy
9ef2631ec2 api, service: storage_service: removenode: allow passing ignore_nodes as uuid:s
Currently the api is inconsistent: requiring a uuid for the
host_id of the node to be removed, while the ignored nodes list
is given as comma-separated ip addresses.

Instead, support identifying the ignored_nodes either
by their host_id (uuid) or ip address.

Also, require all ignore_nodes to be of the same kind:
either UUIDs or ip addresses, as a mix of the 2 is likely
indicating a user error.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-28 07:49:03 +03:00
Benny Halevy
40cd685371 storage_service: get_ignore_dead_nodes_for_replace: use tm.parse_host_id_and_endpoint
Allow specifying the dead node to ignore either as host_id
or ip address.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-28 07:38:13 +03:00
Benny Halevy
b74807cb8a locator: token_metadata: add parse_host_id_and_endpoint
To be used for specifying nodes either by their
host_id or ip address and using the token_metadata
to resolve the mapping.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-28 07:38:13 +03:00
Benny Halevy
340a5a0c94 api: storage_service: remove_node: validate host_id
The node to be removed must be identified by its host_id.
Validate that at the api layer and pass the parsed host_id
down to storage_service::removenode.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-28 07:38:13 +03:00
Takuya ASADA
464b5de99b scylla_setup: allow symlink to --disks option
Currently, --disks options does not allow symlinks such as
/dev/disk/by-uuid/* or /dev/disk/azure/*.

To allow using them, is_unused_disk() should resolve symlink to
realpath, before evaluating the disk path.

Fixes #11634

Closes #11646
2022-10-28 07:24:11 +03:00
Botond Dénes
b744036840 Merge 'scylla_util.py: on sysconfig_parser, don't use double quote when it's possible' from Takuya ASADA
It seems like distribution original sysconfig files does not use double
quote to set the parameter when the value does not contain space.
Adding function to detect spaces in the value, don't usedouble quote
when it not detected.

Fixes #9149

Closes #9153

* github.com:scylladb/scylladb:
  scylla_util.py: adding unescape for sysconfig_parser
  scylla_util.py: on sysconfig_parser, don't use double quote when it's possible
2022-10-28 07:19:13 +03:00
Benny Halevy
44e1058f63 docs: nodetool/removenode: fix host_id in examples
removenode host_id must specify the host ID as a UUID,
not an ip address.

Fixes #11839

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #11840
2022-10-27 14:29:36 +03:00
Benny Halevy
0ea8250e83 repair: use sharded abort_source to abort repair_info
Currently we use a single shared_ptr<abort_source>
that can't be copied across shards.

Instead, use a sharded<abort_source> in node_ops_info so that each
repair_info instance will use an (optional) abort_source*
on its own shard.

Added respective start and stop methodsm plus a local_abort_source
getter to get the shard-local abort_source (if available).

Fixes #11826

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:18:30 +03:00
Benny Halevy
88f993e5ed repair: node_ops_info: add start and stop methods
Prepare for adding a sharded<abort_source> member.

Wire start/stop in storage_service::node_ops_meta_data.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:18:30 +03:00
Benny Halevy
c2f384093d storage_service: node_ops_abort_thread: abort all node ops on shutdown
A later patch adds a sharded<abort_source> to node_ops_info.
On shutdown, we must orderly stop it, so use node_ops_abort_thread
shutdown path (where node_ops_singal_abort is called will a nullopt)
to abort (and stop) all outstanding node_ops by passing
a null_uuid to node_ops_abort, and let it iterate over all
node ops to abort and stop them.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:14:06 +03:00
Benny Halevy
0efd290378 storage_service: node_ops_abort_thread: co_return only after printing log message
Currently the function co_returns if (!uuid_opt)
so the log info message indicating it's stopped
is not printed.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:14:03 +03:00
Benny Halevy
47e4761b4e storage_service: node_ops_meta_data: add start and stop methods
Prepare for starting and stopping repair node_ops_info

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:14:03 +03:00
Benny Halevy
5c25066ea7 repair: node_ops_info: prevent accidental copy
Delete node_ops_info copy and move constructors before
we add a sharded<abort_source> member for the per-shard repairs
in the next patch.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2022-10-27 12:14:03 +03:00
Takuya ASADA
cd6030d5df scylla_util.py: adding unescape for sysconfig_parser
Even we have __escape() for escaping " middle of the value to writing
sysconfig file, we didn't unescape for reading from sysconfig file.
So adding __unescape() and call it on get().
2022-10-27 16:39:47 +09:00
Takuya ASADA
de57433bcf scylla_util.py: on sysconfig_parser, don't use double quote when it's possible
It seems like distribution original sysconfig files does not use double
quote to set the parameter when the value does not contain space.
Adding function to detect spaces in the value, don't usedouble quote
when it not detected.

Fixes #9149
2022-10-27 16:36:27 +09:00
Aleksandra Martyniuk
6494de9bb0 tasks: add alternative make_task method
Task manager tasks should be created with make_task method since
it properly sets information about child-parent relationship
between tasks. Though, sometimes we may want to keep additional
task data in classes inheriting from task_manager::task::impl.
Doing it with existing make_task method makes it impossible since
implementation objects are created internally.

The commit adds a new make_task that allows to provide a task
implementation pointer created by caller. All the fields except
for the one connected with children and parent should be set before.
2022-10-26 14:01:05 +02:00
Aleksandra Martyniuk
10d11a7baf tasks: rename parent_data to task_info and move it
parent_data struct contains info that is common	for each task,
not only in parent-child relationship context. To use it this way
without confusion, its name is changed to task_info.

In order to be able to widely and comfortably use task_info,
it is moved from tasks/task_manager.hh to tasks/types.hh
and slightly extended.
2022-10-26 14:01:05 +02:00
Aleksandra Martyniuk
9ecc2047ac tasks: move task_id to tasks/types.hh 2022-10-26 14:01:05 +02:00
Aleksandra Martyniuk
e2e8a286cc tasks: add internal flag for task_manager::task::impl
It is convenient to create many different tasks implementations
representing more and more specific parts of the operation in
a module. Presenting all of them through the api makes it cumbersome
for user to navigate and track, though.

Flag internal is added to task_manager::task::impl so that the tasks
could be filtered before they are sent to user.
2022-10-26 14:01:05 +02:00
Pavel Emelyanov
e245780d56 gossiper: Request topology states in shadow round
When doing shadow round for replacement the bootstrapping node needs to
know the dc/rack info about the node it replaces to configure it on
topology. This topology info is later used by e.g. repair service.

fixes: #11829

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes #11838
2022-10-25 13:21:20 +03:00
Pavel Emelyanov
64c9359443 storage_proxy: Don't use default-initialized endpoint in get_read_executor()
After calling filter_for_query() the extra_replica to speculate to may
be left default-initialized which is :0 ipv6 address. Later below this
address is used as-is to check if it belongs to the same DC or not which
is not nice, as :0 is not an address of any existing endpoint.

Recent move of dc/rack data onto topology made this place reveal itself
by emitting the internal error due to :0 not being present on the
topology's collection of endpoints. Prior to this move the dc filter
would count :0 as belonging to "default_dc" datacenter which may or may
not match with the dc of the local node.

The fix is to explicitly tell set extra_replica from unset one.

fixes: #11825

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes #11833
2022-10-25 09:16:50 +03:00
Takuya ASADA
1a11a38add unified: move unified package contents to sub-directory
On most of the software distribution tar.gz, it has sub-directory to contain
everything, to prevent extract contents to current directory.
We should follow this style on our unified package too.

To do this we need to increment relocatable package version to '3.0'.

Fixes #8349

Closes #8867
2022-10-25 08:58:15 +03:00
Takuya ASADA
a938b009ca scylla_raid_setup: run uuidpath existance check only after mount failed
We added UUID device file existance check on #11399, we expect UUID
device file is created before checking, and we wait for the creation by
"udevadm settle" after "mkfs.xfs".

However, we actually getting error which says UUID device file missing,
it probably means "udevadm settle" doesn't guarantee the device file created,
on some condition.

To avoid the error, use var-lib-scylla.mount to wait for UUID device
file is ready, and run the file existance check when the service is
failed.

Fixes #11617

Closes #11666
2022-10-25 08:54:21 +03:00
Yaniv Kaul
cec21d10ed docs: Fix typo (patch -> batch)
See subject.

Closes #11837
2022-10-25 08:50:44 +03:00
Tomasz Grabiec
687df05e28 db: make_forwardable::reader: Do not emit range_tombstone_change with position past the range
Since the end bound is exclusive, the end position should be
before_key(), not after_key().

Affects only tests, as far as I know, only there we can get an end
bound which is a clustering row position.

Would cause failures once row cache is switched to v2 representation
because of violated assumptions about positions.

Introduced in 76ee3f029c

Closes #11823
2022-10-24 17:06:52 +03:00
41 changed files with 635 additions and 313 deletions

View File

@@ -1228,7 +1228,7 @@
"operations":[
{
"method":"POST",
"summary":"Removes token (and all data associated with enpoint that had it) from the ring",
"summary":"Removes a node from the cluster. Replicated data that logically belonged to this node is redistributed among the remaining nodes.",
"type":"void",
"nickname":"remove_node",
"produces":[
@@ -1245,7 +1245,7 @@
},
{
"name":"ignore_nodes",
"description":"List of dead nodes to ingore in removenode operation",
"description":"Comma-separated list of dead nodes to ignore in removenode operation. Use the same method for all nodes to ignore: either Host IDs or ip addresses.",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -49,6 +49,14 @@
"type":"string",
"paramType":"path"
},
{
"name":"internal",
"description":"Boolean flag indicating whether internal tasks should be shown (false by default)",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"keyspace",
"description":"The keyspace to query about",

View File

@@ -69,6 +69,11 @@ sstring validate_keyspace(http_context& ctx, const parameters& param) {
return validate_keyspace(ctx, param["keyspace"]);
}
locator::host_id validate_host_id(const sstring& param) {
auto hoep = locator::host_id_or_endpoint(param, locator::host_id_or_endpoint::param_type::host_id);
return hoep.id;
}
// splits a request parameter assumed to hold a comma-separated list of table names
// verify that the tables are found, otherwise a bad_param_exception exception is thrown
// containing the description of the respective no_such_column_family error.
@@ -715,20 +720,23 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
auto host_id = req->get_query_param("host_id");
auto host_id = validate_host_id(req->get_query_param("host_id"));
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<gms::inet_address>();
auto ignore_nodes = std::list<locator::host_id_or_endpoint>();
for (std::string n : ignore_nodes_strs) {
try {
std::replace(n.begin(), n.end(), '\"', ' ');
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto node = gms::inet_address(n);
ignore_nodes.push_back(node);
auto hoep = locator::host_id_or_endpoint(n);
if (!ignore_nodes.empty() && hoep.has_host_id() != ignore_nodes.front().has_host_id()) {
throw std::runtime_error("All nodes should be identified using the same method: either Host IDs or ip addresses.");
}
ignore_nodes.push_back(std::move(hoep));
}
} catch (...) {
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
}
}
return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {

View File

@@ -93,11 +93,12 @@ void set_task_manager(http_context& ctx, routes& r) {
tm::get_tasks.set(r, [&ctx] (std::unique_ptr<request> req) -> future<json::json_return_type> {
using chunked_stats = utils::chunked_vector<task_stats>;
std::vector<chunked_stats> res = co_await ctx.tm.map([&req] (tasks::task_manager& tm) {
auto internal = tasks::is_internal{req_param<bool>(*req, "internal", false)};
std::vector<chunked_stats> res = co_await ctx.tm.map([&req, internal] (tasks::task_manager& tm) {
chunked_stats local_res;
auto module = tm.find_module(req->param["module"]);
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([&params = req->query_parameters] (const auto& task) {
return filter_tasks(task.second, params);
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([&params = req->query_parameters, internal] (const auto& task) {
return (internal || !task.second->is_internal()) && filter_tasks(task.second, params);
});
for (auto& [task_id, task] : filtered_tasks) {
local_res.push_back(task_stats{task});

View File

@@ -52,7 +52,7 @@ void set_task_manager_test(http_context& ctx, routes& r, db::config& cfg) {
it = req->query_parameters.find("entity");
std::string entity = it != req->query_parameters.end() ? it->second : "";
it = req->query_parameters.find("parent_id");
tasks::task_manager::parent_data data;
tasks::task_info data;
if (it != req->query_parameters.end()) {
data.id = tasks::task_id{utils::UUID{it->second}};
auto parent_ptr = co_await tasks::task_manager::lookup_task_on_all_shards(ctx.tm, data.id);

View File

@@ -790,7 +790,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, load_ring_state(this, "load_ring_state", value_status::Used, true, "When set to true, load tokens and host_ids previously saved. Same as -Dcassandra.load_ring_state in cassandra.")
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2")
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation using a comman-separated list of either host IDs or ip addresses. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")

View File

@@ -237,7 +237,7 @@ filter_for_query(consistency_level cl,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
const gms::gossiper& g,
gms::inet_address* extra,
std::optional<gms::inet_address>* extra,
replica::column_family* cf) {
size_t local_count;

View File

@@ -53,7 +53,7 @@ filter_for_query(consistency_level cl,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
const gms::gossiper& g,
gms::inet_address* extra,
std::optional<gms::inet_address>* extra,
replica::column_family* cf);
inet_address_vector_replica_set filter_for_query(consistency_level cl,

View File

@@ -2874,14 +2874,14 @@ future<> system_keyspace::get_compaction_history(compaction_history_consumer&& f
future<> system_keyspace::update_repair_history(repair_history_entry entry) {
sstring req = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", REPAIR_HISTORY);
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id, entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id.uuid(), entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
}
future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) {
sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id);
co_await _qp.local().query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_history_entry ent;
ent.id = row.get_as<utils::UUID>("repair_uuid");
ent.id = row.get_as<tasks::task_id>("repair_uuid");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
ent.range_start = row.get_as<int64_t>("range_start");
ent.range_end = row.get_as<int64_t>("range_end");

View File

@@ -26,6 +26,7 @@
#include "cdc/generation_id.hh"
#include "locator/host_id.hh"
#include "service/raft/group0_fwd.hh"
#include "tasks/task_manager.hh"
namespace service {
@@ -312,7 +313,7 @@ public:
static future<> get_compaction_history(compaction_history_consumer&& f);
struct repair_history_entry {
utils::UUID id;
tasks::task_id id;
table_id table_uuid;
db_clock::time_point ts;
sstring ks;

View File

@@ -16,7 +16,7 @@ import stat
import distro
from pathlib import Path
from scylla_util import *
from subprocess import run
from subprocess import run, SubprocessError
if __name__ == '__main__':
if os.getuid() > 0:
@@ -159,10 +159,6 @@ if __name__ == '__main__':
raise Exception(f'Failed to get UUID of {fsdev}')
uuidpath = f'/dev/disk/by-uuid/{uuid}'
if not os.path.exists(uuidpath):
raise Exception(f'{uuidpath} is not found')
if not stat.S_ISBLK(os.stat(uuidpath).st_mode):
raise Exception(f'{uuidpath} is not block device')
after = 'local-fs.target'
wants = ''
@@ -202,8 +198,16 @@ WantedBy=multi-user.target
systemd_unit.reload()
if args.raid_level != '0':
md_service.start()
mount = systemd_unit(mntunit_bn)
mount.start()
try:
mount = systemd_unit(mntunit_bn)
mount.start()
except SubprocessError as e:
if not os.path.exists(uuidpath):
print(f'\nERROR: {uuidpath} is not found\n')
elif not stat.S_ISBLK(os.stat(uuidpath).st_mode):
print(f'\nERROR: {uuidpath} is not block device\n')
raise e
if args.enable_on_nextboot:
mount.enable()
uid = pwd.getpwnam('scylla').pw_uid

View File

@@ -197,6 +197,8 @@ def is_system_partition(dev):
return (uuid in SYSTEM_PARTITION_UUIDS)
def is_unused_disk(dev):
# resolve symlink to real path
dev = os.path.realpath(dev)
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
return False
@@ -424,8 +426,16 @@ class sysconfig_parser:
def __escape(self, val):
return re.sub(r'"', r'\"', val)
def __unescape(self, val):
return re.sub(r'\\"', r'"', val)
def __format_line(self, key, val):
need_quotes = any([ch.isspace() for ch in val])
esc_val = self.__escape(val)
return f'{key}="{esc_val}"' if need_quotes else f'{key}={esc_val}'
def __add(self, key, val):
self._data += '{}="{}"\n'.format(key, self.__escape(val))
self._data += self.__format_line(key, val) + '\n'
self.__load()
def __init__(self, filename):
@@ -440,7 +450,8 @@ class sysconfig_parser:
self.__load()
def get(self, key):
return self._cfg.get('global', key).strip('"')
val = self._cfg.get('global', key).strip('"')
return self.__unescape(val)
def has_option(self, key):
return self._cfg.has_option('global', key)
@@ -448,7 +459,8 @@ class sysconfig_parser:
def set(self, key, val):
if not self.has_option(key):
return self.__add(key, val)
self._data = re.sub('^{}=[^\n]*$'.format(key), '{}="{}"'.format(key, self.__escape(val)), self._data, flags=re.MULTILINE)
new_line = self.__format_line(key, val)
self._data = re.sub(f'^{key}=[^\n]*$', new_line, self._data, flags=re.MULTILINE)
self.__load()
def commit(self):

View File

@@ -104,9 +104,9 @@ html_theme_options = {
"versions_unstable": UNSTABLE_VERSIONS,
"versions_deprecated": DEPRECATED_VERSIONS,
'banner_button_text': 'Learn More',
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-03-registration',
'banner_title_text': 'Join us for Scylla University LIVE, a half-day of FREE instructor-led training with exclusive new Scylla database content. March 22-23. Register for Free',
'hide_banner': 'true',
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-12-registration',
'banner_title_text': 'Join us for ScyllaDB University LIVE, a half-day of FREE instructor-led training with exclusive new ScyllaDB database content. December 1. Register for Free',
'hide_banner': 'false',
"collapse_navigation": 'true',
}

View File

@@ -19,6 +19,7 @@ To build everything, run:
```console
git clone https://github.com/scylladb/scylla
cd ./scylla
git submodule update --init --force --recursive
./tools/toolchain/dbuild ./configure.py --mode=<mode>
./tools/toolchain/dbuild ninja

View File

@@ -120,10 +120,6 @@ You can clean snapshots by using :doc:`nodetool clearsnapshot </operating-scylla
Features
--------
Will Scylla have a certain feature in an upcoming release?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Check the `Roadmap <http://www.scylladb.com/technology/status/>`_ page for features scheduled for our GA release.
I want to try out new features. How do I enable experimental mode?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You need to add the line :code:`experimental: true` to your :code:`scylla.yaml` file.

View File

@@ -25,7 +25,7 @@ Example:
.. code-block:: console
nodetool removenode 192.168.1.3
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
Note that all the nodes in the cluster participate in the ``removenode`` operation to sync data if needed. For this reason, the operation will fail if one or more nodes in the cluster are not available.
In such a case, to ensure that the operation succeeds, you must explicitly specify a list of unavailable nodes with the ``--ignore-dead-nodes`` option.
@@ -41,8 +41,8 @@ Example:
.. code-block:: console
nodetool removenode 192.168.1.3
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 192.168.1.3
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 675ed9f4-6564-6dbd-can8-43fddce952gy
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
.. versionadded:: version 4.6 ``--ignore-dead-nodes`` option

View File

@@ -1,5 +1,5 @@
====================================
Upgrade Guide - ScyllaDB 4.6 to 5.0
Upgrade Guide - ScyllaDB 5.0 to 5.1
====================================
.. toctree::

View File

@@ -1885,6 +1885,8 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes)
gms::application_state::STATUS,
gms::application_state::HOST_ID,
gms::application_state::TOKENS,
gms::application_state::DC,
gms::application_state::RACK,
gms::application_state::SUPPORTED_FEATURES,
gms::application_state::SNITCH_NAME}};
logger.info("Gossip shadow round started with nodes={}", nodes);

View File

@@ -120,7 +120,7 @@ struct node_ops_cmd_response {
};
struct repair_update_system_table_request {
utils::UUID repair_uuid;
tasks::task_id repair_uuid;
table_id table_uuid;
sstring keyspace_name;
sstring table_name;
@@ -132,7 +132,7 @@ struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
utils::UUID repair_uuid;
tasks::task_id repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;

View File

@@ -10,6 +10,7 @@
#include "schema_fwd.hh"
#include "query-request.hh"
#include "locator/host_id.hh"
#include "tasks/types.hh"
namespace utils {
class UUID final {
@@ -18,6 +19,10 @@ class UUID final {
};
}
class tasks::task_id final {
utils::UUID uuid();
};
class table_id final {
utils::UUID uuid();
};

View File

@@ -308,11 +308,11 @@ elif [ "$ID" = "fedora" ]; then
exit 1
fi
dnf install -y "${fedora_packages[@]}" "${fedora_python3_packages[@]}"
pip3 install "geomet<0.3,>=0.1"
# Disable C extensions
pip3 install scylla-driver --install-option="--no-murmur3" --install-option="--no-libev" --install-option="--no-cython"
pip3 install traceback-with-variables
pip3 install scylla-api-client
PIP_DEFAULT_ARGS="--only-binary=:all: -v"
pip3 install "$PIP_DEFAULT_ARGS" "geomet<0.3,>=0.1"
pip3 install "$PIP_DEFAULT_ARGS" scylla-driver
pip3 install "$PIP_DEFAULT_ARGS" traceback-with-variables
pip3 install "$PIP_DEFAULT_ARGS" scylla-api-client
cargo install cxxbridge-cmd --root /usr/local
if [ -f "$(node_exporter_fullpath)" ] && node_exporter_checksum; then

View File

@@ -1087,6 +1087,12 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const {
return _impl->get_endpoint_for_host_id(host_id);
}
host_id_or_endpoint token_metadata::parse_host_id_and_endpoint(const sstring& host_id_string) const {
auto res = host_id_or_endpoint(host_id_string);
res.resolve(*this);
return res;
}
const std::unordered_map<inet_address, host_id>&
token_metadata::get_endpoint_to_host_id_map_for_reading() const {
return _impl->get_endpoint_to_host_id_map_for_reading();
@@ -1432,4 +1438,49 @@ future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_funct
set(make_token_metadata_ptr(std::move(tm)));
}
host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) {
switch (restrict) {
case param_type::host_id:
try {
id = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what()));
}
break;
case param_type::endpoint:
try {
endpoint = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what()));
}
break;
case param_type::auto_detect:
try {
id = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
try {
endpoint = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid host_id or inet_address {}", s));
}
}
}
}
void host_id_or_endpoint::resolve(const token_metadata& tm) {
if (id) {
auto endpoint_opt = tm.get_endpoint_for_host_id(id);
if (!endpoint_opt) {
throw std::runtime_error(format("Host ID {} not found in the cluster", id));
}
endpoint = *endpoint_opt;
} else {
auto opt_id = tm.get_host_id_if_known(endpoint);
if (!opt_id) {
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint));
}
id = *opt_id;
}
}
} // namespace locator

View File

@@ -133,6 +133,33 @@ private:
bool _sort_by_proximity = true;
};
class token_metadata;
struct host_id_or_endpoint {
host_id id;
gms::inet_address endpoint;
enum class param_type {
host_id,
endpoint,
auto_detect
};
host_id_or_endpoint(const sstring& s, param_type restrict = param_type::auto_detect);
bool has_host_id() const noexcept {
return bool(id);
}
bool has_endpoint() const noexcept {
return endpoint != gms::inet_address();
}
// Map the host_id to endpoint based on whichever of them is set,
// using the token_metadata
void resolve(const token_metadata& tm);
};
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
class token_metadata_impl;
@@ -220,6 +247,10 @@ public:
/** Return the end-point for a unique host ID */
std::optional<inet_address> get_endpoint_for_host_id(locator::host_id host_id) const;
/// Parses the \c host_id_string either as a host uuid or as an ip address and returns the mapping.
/// Throws std::invalid_argument on parse error or std::runtime_error if the host_id wasn't found.
host_id_or_endpoint parse_host_id_and_endpoint(const sstring& host_id_string) const;
/** @return a copy of the endpoint-to-id map for read-only operations */
const std::unordered_map<inet_address, host_id>& get_endpoint_to_host_id_map_for_reading() const;

View File

@@ -1320,7 +1320,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// both)
supervisor::notify("starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});

View File

@@ -126,7 +126,7 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
}
void maybe_emit_end_tombstone() {
if (_active_tombstone) {
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::after_key(_current.end()), {}));
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::before_key(_current.end()), {}));
}
}
public:

View File

@@ -49,6 +49,12 @@
logging::logger rlogger("repair");
node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept
: ops_uuid(ops_uuid_)
, as(std::move(as_))
, ignore_nodes(std::move(ignore_nodes_))
{}
void node_ops_info::check_abort() {
if (as && as->abort_requested()) {
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
@@ -57,8 +63,30 @@ void node_ops_info::check_abort() {
}
}
node_ops_metrics::node_ops_metrics(tracker& tracker)
: _tracker(tracker)
future<> node_ops_info::start() {
if (as) {
co_await _sas.start();
_abort_subscription = as->subscribe([this] () noexcept {
_abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept {
as.request_abort();
});
});
}
}
future<> node_ops_info::stop() noexcept {
if (as) {
co_await std::exchange(_abort_done, make_ready_future<>());
co_await _sas.stop();
}
}
abort_source* node_ops_info::local_abort_source() {
return as ? &_sas.local() : nullptr;
}
node_ops_metrics::node_ops_metrics(shared_ptr<repair_module> module)
: _module(module)
{
namespace sm = seastar::metrics;
auto ops_label_type = sm::label("ops");
@@ -139,7 +167,7 @@ static std::vector<sstring> list_column_families(const replica::database& db, co
}
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x) {
return os << format("[id={}, uuid={}]", x.id, x.uuid);
return os << format("[id={}, uuid={}]", x.id, x.uuid());
}
// Must run inside a seastar thread
@@ -314,11 +342,12 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(re
}
float node_ops_metrics::repair_finished_percentage() {
return _tracker.report_progress(streaming::stream_reason::repair);
return _module->report_progress(streaming::stream_reason::repair);
}
tracker::tracker(size_t max_repair_memory)
: _shutdown(false)
repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept
: tasks::task_manager::module(tm, "repair")
, _rs(rs)
, _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4)),
named_semaphore_exception_factory{"repair range parallelism"})
{
@@ -327,14 +356,14 @@ tracker::tracker(size_t max_repair_memory)
max_repair_memory, max_repair_memory_per_range(), nr);
}
void tracker::start(repair_uniq_id id) {
_pending_repairs.insert(id.uuid);
void repair_module::start(repair_uniq_id id) {
_pending_repairs.insert(id.uuid());
_status[id.id] = repair_status::RUNNING;
}
void tracker::done(repair_uniq_id id, bool succeeded) {
_pending_repairs.erase(id.uuid);
_aborted_pending_repairs.erase(id.uuid);
void repair_module::done(repair_uniq_id id, bool succeeded) {
_pending_repairs.erase(id.uuid());
_aborted_pending_repairs.erase(id.uuid());
if (succeeded) {
_status.erase(id.id);
} else {
@@ -342,8 +371,9 @@ void tracker::done(repair_uniq_id id, bool succeeded) {
}
_done_cond.broadcast();
}
repair_status tracker::get(int id) const {
if (id >= _next_repair_command) {
repair_status repair_module::get(int id) const {
if (id > _sequence_number) {
throw std::runtime_error(format("unknown repair id {}", id));
}
auto it = _status.find(id);
@@ -354,9 +384,9 @@ repair_status tracker::get(int id) const {
}
}
future<repair_status> tracker::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(_gate, [this, id, timeout] {
if (id >= _next_repair_command) {
future<repair_status> repair_module::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(async_gate(), [this, id, timeout] {
if (id > _sequence_number) {
return make_exception_future<repair_status>(std::runtime_error(format("unknown repair id {}", id)));
}
return repeat_until_value([this, id, timeout] {
@@ -378,30 +408,19 @@ future<repair_status> tracker::repair_await_completion(int id, std::chrono::stea
});
}
repair_uniq_id tracker::next_repair_command() {
return repair_uniq_id{_next_repair_command++, utils::make_random_uuid()};
void repair_module::check_in_shutdown() {
abort_source().check();
}
future<> tracker::shutdown() {
_shutdown.store(true, std::memory_order_relaxed);
return _gate.close();
}
void tracker::check_in_shutdown() {
if (_shutdown.load(std::memory_order_relaxed)) {
throw std::runtime_error(format("Repair service is being shutdown"));
}
}
void tracker::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
void repair_module::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
_repairs.emplace(id, ri);
}
void tracker::remove_repair_info(int id) {
void repair_module::remove_repair_info(int id) {
_repairs.erase(id);
}
lw_shared_ptr<repair_info> tracker::get_repair_info(int id) {
lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
auto it = _repairs.find(id);
if (it != _repairs.end()) {
return it->second;
@@ -409,7 +428,7 @@ lw_shared_ptr<repair_info> tracker::get_repair_info(int id) {
return {};
}
std::vector<int> tracker::get_active() const {
std::vector<int> repair_module::get_active() const {
std::vector<int> res;
boost::push_back(res, _status | boost::adaptors::filtered([] (auto& x) {
return x.second == repair_status::RUNNING;
@@ -417,7 +436,7 @@ std::vector<int> tracker::get_active() const {
return res;
}
size_t tracker::nr_running_repair_jobs() {
size_t repair_module::nr_running_repair_jobs() {
size_t count = 0;
if (this_shard_id() != 0) {
return count;
@@ -431,11 +450,11 @@ size_t tracker::nr_running_repair_jobs() {
return count;
}
bool tracker::is_aborted(const utils::UUID& uuid) {
bool repair_module::is_aborted(const tasks::task_id& uuid) {
return _aborted_pending_repairs.contains(uuid);
}
void tracker::abort_all_repairs() {
void repair_module::abort_all_repairs() {
_aborted_pending_repairs = _pending_repairs;
for (auto& x : _repairs) {
auto& ri = x.second;
@@ -444,7 +463,7 @@ void tracker::abort_all_repairs() {
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
}
float tracker::report_progress(streaming::stream_reason reason) {
float repair_module::report_progress(streaming::stream_reason reason) {
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total = 0;
for (auto& x : _repairs) {
@@ -457,15 +476,15 @@ float tracker::report_progress(streaming::stream_reason reason) {
return nr_ranges_total == 0 ? 1 : float(nr_ranges_finished) / float(nr_ranges_total);
}
named_semaphore& tracker::range_parallelism_semaphore() {
named_semaphore& repair_module::range_parallelism_semaphore() {
return _range_parallelism_semaphore;
}
future<> tracker::run(repair_uniq_id id, std::function<void ()> func) {
return seastar::with_gate(_gate, [this, id, func =std::move(func)] {
future<> repair_module::run(repair_uniq_id id, std::function<void ()> func) {
return seastar::with_gate(async_gate(), [this, id, func =std::move(func)] {
start(id);
return seastar::async([func = std::move(func)] { func(); }).then([this, id] {
rlogger.info("repair[{}]: completed successfully", id.uuid);
rlogger.info("repair[{}]: completed successfully", id.uuid());
done(id, true);
}).handle_exception([this, id] (std::exception_ptr ep) {
done(id, false);
@@ -475,7 +494,7 @@ future<> tracker::run(repair_uniq_id id, std::function<void ()> func) {
}
void repair_info::check_in_shutdown() {
rs.repair_tracker().check_in_shutdown();
rs.get_repair_module().check_in_shutdown();
}
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
@@ -532,7 +551,7 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
shared_ptr<abort_source> as,
abort_source* as,
bool hints_batchlog_flushed)
: rs(repair)
, db(repair.get_db())
@@ -547,7 +566,6 @@ repair_info::repair_info(repair_service& repair,
, cfs(get_table_names(db.local(), table_ids_))
, table_ids(std::move(table_ids_))
, id(id_)
, shard(this_shard_id())
, data_centers(data_centers_)
, hosts(hosts_)
, ignore_nodes(ignore_nodes_)
@@ -562,15 +580,15 @@ repair_info::repair_info(repair_service& repair,
void repair_info::check_failed_ranges() {
rlogger.info("repair[{}]: shard {} stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
id.uuid, shard, reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
id.uuid(), id.shard(), reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
if (nr_failed_ranges) {
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid, shard, nr_failed_ranges, ranges_size());
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid, shard, nr_failed_ranges, ranges_size()));
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid(), id.shard(), nr_failed_ranges, ranges_size());
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid(), id.shard(), nr_failed_ranges, ranges_size()));
} else {
if (dropped_tables.size()) {
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid, shard, keyspace, dropped_tables);
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid(), id.shard(), keyspace, dropped_tables);
} else {
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid, shard, keyspace);
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid(), id.shard(), keyspace);
}
}
}
@@ -581,7 +599,7 @@ void repair_info::abort() noexcept {
void repair_info::check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid, shard));
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid(), id.shard()));
}
}
@@ -611,7 +629,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
nr_failed_ranges++;
auto status = format("failed: mandatory neighbor={} is not alive", node);
rlogger.error("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
abort();
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
node, keyspace, mandatory_neighbors)));
@@ -621,7 +639,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
nr_failed_ranges++;
auto status = live_neighbors.empty() ? "skipped" : "partial";
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
if (live_neighbors.empty()) {
return make_ready_future<>();
}
@@ -630,11 +648,11 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
if (neighbors.empty()) {
auto status = "skipped_no_followers";
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
return make_ready_future<>();
}
rlogger.debug("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}",
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors);
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors);
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] {
sstring cf;
try {
@@ -927,9 +945,9 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
auto table_name = ri->table_names()[idx];
// repair all the ranges in limited parallelism
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
ri->id.uuid, idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
ri->id.uuid(), idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
co_await coroutine::parallel_for_each(ri->ranges, [ri, table_id] (auto&& range) {
return with_semaphore(ri->rs.repair_tracker().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
return with_semaphore(ri->rs.get_repair_module().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
return ri->repair_range(range, table_id).then([ri] {
if (ri->reason == streaming::stream_reason::bootstrap) {
ri->rs.get_metrics().bootstrap_finished_ranges++;
@@ -946,7 +964,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->nr_ranges_finished++;
}
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
ri->id.uuid,
ri->id.uuid(),
ri->rs.get_metrics().bootstrap_finished_percentage(),
ri->rs.get_metrics().replace_finished_percentage(),
ri->rs.get_metrics().rebuild_finished_percentage(),
@@ -961,7 +979,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
try {
auto& table = ri->db.local().find_column_family(table_id);
rlogger.debug("repair[{}]: Trigger off-strategy compaction for keyspace={}, table={}",
ri->id.uuid, table.schema()->ks_name(), table.schema()->cf_name());
ri->id.uuid(), table.schema()->ks_name(), table.schema()->cf_name());
table.trigger_offstrategy_compaction();
} catch (replica::no_such_column_family&) {
// Ignore dropped table
@@ -976,13 +994,13 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
// is assumed to be a indivisible in the sense that all the tokens in has the
// same nodes as replicas.
static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->rs.repair_tracker().add_repair_info(ri->id.id, ri);
ri->rs.get_repair_module().add_repair_info(ri->id.id, ri);
return do_repair_ranges(ri).then([ri] {
ri->check_failed_ranges();
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
return make_ready_future<>();
}).handle_exception([ri] (std::exception_ptr eptr) {
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
return make_exception_future<>(std::move(eptr));
});
}
@@ -995,16 +1013,17 @@ static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
seastar::sharded<replica::database>& db = get_db();
auto& topology = db.local().get_token_metadata().get_topology();
repair_tracker().check_in_shutdown();
get_repair_module().check_in_shutdown();
repair_options options(options_map);
// Note: Cassandra can, in some cases, decide immediately that there is
// nothing to repair, and return 0. "nodetool repair" prints in this case
// that "Nothing to repair for keyspace '...'". We don't have such a case
// yet. Real ids returned by next_repair_command() will be >= 1.
auto id = repair_tracker().next_repair_command();
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid, keyspace, id.id, options_map);
// yet. The id field of repair_uniq_ids returned by next_repair_command()
// will be >= 1.
auto id = _repair_module->new_repair_uniq_id();
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid(), keyspace, id.id, options_map);
if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) {
throw std::runtime_error("Node is not in NORMAL status yet!");
@@ -1086,14 +1105,14 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
std::vector<sstring> cfs =
options.column_families.size() ? options.column_families : list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid);
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid());
return id.id;
}
// Do it in the background.
(void)repair_tracker().run(id, [this, &db, id, keyspace = std::move(keyspace),
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
auto uuid = id.uuid;
auto uuid = id.uuid();
bool needs_flush_before_repair = false;
if (db.local().features().tombstone_gc_options) {
@@ -1115,7 +1134,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
try {
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
@@ -1144,7 +1163,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
abort_source as;
auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] {
auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] {
auto tables = std::list<table_id>(table_ids.begin(), table_ids.end());
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
auto update_interval = std::chrono::seconds(30);
@@ -1181,7 +1200,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
}
});
if (repair_tracker().is_aborted(id.uuid)) {
if (get_repair_module().is_aborted(id.uuid())) {
throw std::runtime_error("aborted by user request");
}
@@ -1211,7 +1230,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
return make_ready_future<>();
}).get();
}).handle_exception([id] (std::exception_ptr ep) {
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid, ep);
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep);
});
return id.id;
@@ -1226,30 +1245,29 @@ future<int> repair_start(seastar::sharded<repair_service>& repair,
future<std::vector<int>> repair_service::get_active_repairs() {
return container().invoke_on(0, [] (repair_service& rs) {
return rs.repair_tracker().get_active();
return rs.get_repair_module().get_active();
});
}
future<repair_status> repair_service::get_status(int id) {
return container().invoke_on(0, [id] (repair_service& rs) {
return rs.repair_tracker().get(id);
return rs.get_repair_module().get(id);
});
}
future<repair_status> repair_service::await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return container().invoke_on(0, [id, timeout] (repair_service& rs) {
return rs.repair_tracker().repair_await_completion(id, timeout);
return rs.get_repair_module().repair_await_completion(id, timeout);
});
}
future<> repair_service::shutdown() {
co_await repair_tracker().shutdown();
co_await remove_repair_meta();
}
future<> repair_service::abort_all() {
return container().invoke_on_all([] (repair_service& rs) {
return rs.repair_tracker().abort_all_repairs();
return rs.get_repair_module().abort_all_repairs();
});
}
@@ -1275,18 +1293,18 @@ future<> repair_service::do_sync_data_using_repair(
shared_ptr<node_ops_info> ops_info) {
seastar::sharded<replica::database>& db = get_db();
repair_uniq_id id = repair_tracker().next_repair_command();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid, keyspace);
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid, keyspace);
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace);
return;
}
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
if (repair_tracker().is_aborted(id.uuid)) {
if (get_repair_module().is_aborted(id.uuid())) {
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
@@ -1295,9 +1313,10 @@ future<> repair_service::do_sync_data_using_repair(
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
@@ -1318,13 +1337,13 @@ future<> repair_service::do_sync_data_using_repair(
return make_ready_future<>();
}).get();
}).then([id, keyspace] {
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid, keyspace);
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid(), keyspace);
}).handle_exception([&db, id, keyspace] (std::exception_ptr ep) {
if (!db.local().has_keyspace(keyspace)) {
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid, keyspace, ep);
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid(), keyspace, ep);
return make_ready_future<>();
}
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid, keyspace, ep);
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid(), keyspace, ep);
return make_exception_future<>(ep);
});
}

View File

@@ -28,6 +28,11 @@
#include "locator/token_metadata.hh"
#include "repair/hash.hh"
#include "repair/sync_boundary.hh"
#include "tasks/types.hh"
namespace tasks {
class repair_module;
}
namespace replica {
class database;
@@ -62,16 +67,42 @@ public:
struct repair_uniq_id {
// The integer ID used to identify a repair job. It is currently used by nodetool and http API.
int id;
// A UUID to identifiy a repair job. We will transit to use UUID over the integer ID.
utils::UUID uuid;
// Task info containing a UUID to identifiy a repair job, and a shard of the job.
// We will transit to use UUID over the integer ID.
tasks::task_info task_info;
tasks::task_id uuid() const noexcept {
return task_info.id;
}
unsigned shard() const noexcept {
return task_info.shard;
}
};
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
struct node_ops_info {
class node_ops_info {
public:
utils::UUID ops_uuid;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
private:
optimized_optional<abort_source::subscription> _abort_subscription;
sharded<abort_source> _sas;
future<> _abort_done = make_ready_future<>();
public:
node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
node_ops_info(const node_ops_info&) = delete;
node_ops_info(node_ops_info&&) = delete;
future<> start();
future<> stop() noexcept;
void check_abort();
abort_source* local_abort_source();
};
// NOTE: repair_start() can be run on any node, but starts a node-global
@@ -153,7 +184,6 @@ public:
std::vector<sstring> cfs;
std::vector<table_id> table_ids;
repair_uniq_id id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
std::unordered_set<gms::inet_address> ignore_nodes;
@@ -179,7 +209,7 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
shared_ptr<abort_source> as,
abort_source* as,
bool hints_batchlog_flushed);
void check_failed_ranges();
void abort() noexcept;
@@ -202,58 +232,6 @@ public:
size_t ranges_size();
};
// The repair_tracker tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
// This object is not thread safe, and must be used by only one cpu.
class tracker {
private:
// Each repair_start() call returns a unique int which the user can later
// use to follow the status of this repair with repair_status().
// We can't use the number 0 - if repair_start() returns 0, it means it
// decide quickly that there is nothing to repair.
int _next_repair_command = 1;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id < _next_repair_command
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Used to allow shutting down repairs in progress, and waiting for them.
seastar::gate _gate;
// Set when the repair service is being shutdown
std::atomic_bool _shutdown alignas(seastar::cache_line_size);
// Map repair id into repair_info.
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
std::unordered_set<utils::UUID> _pending_repairs;
std::unordered_set<utils::UUID> _aborted_pending_repairs;
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
explicit tracker(size_t max_repair_memory);
repair_status get(int id) const;
repair_uniq_id next_repair_command();
future<> shutdown();
void check_in_shutdown();
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
void remove_repair_info(int id);
lw_shared_ptr<repair_info> get_repair_info(int id);
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
bool is_aborted(const utils::UUID& uuid);
};
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
const sstring& cf, const dht::token_range& range);
@@ -413,7 +391,7 @@ struct node_ops_cmd_response {
struct repair_update_system_table_request {
utils::UUID repair_uuid;
tasks::task_id repair_uuid;
table_id table_uuid;
sstring keyspace_name;
sstring table_name;
@@ -425,7 +403,7 @@ struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
utils::UUID repair_uuid;
tasks::task_id repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;

82
repair/repair_task.hh Normal file
View File

@@ -0,0 +1,82 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "repair/repair.hh"
#include "tasks/task_manager.hh"
class repair_task_impl : public tasks::task_manager::task::impl {
public:
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id)
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) {
_status.progress_units = "ranges";
}
protected:
repair_uniq_id get_repair_uniq_id() const noexcept {
return repair_uniq_id{
.id = _status.sequence_number,
.task_info = tasks::task_info(_status.id, _status.shard)
};
}
virtual future<> run() override = 0;
};
// The repair_module tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
class repair_module : public tasks::task_manager::module {
private:
repair_service& _rs;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Map repair id into repair_info.
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
std::unordered_set<tasks::task_id> _pending_repairs;
std::unordered_set<tasks::task_id> _aborted_pending_repairs;
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
repair_service& get_repair_service() noexcept {
return _rs;
}
repair_uniq_id new_repair_uniq_id() noexcept {
return repair_uniq_id{
.id = new_sequence_number(),
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
};
}
repair_status get(int id) const;
void check_in_shutdown();
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
void remove_repair_info(int id);
lw_shared_ptr<repair_info> get_repair_info(int id);
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
bool is_aborted(const tasks::task_id& uuid);
};

View File

@@ -9,6 +9,7 @@
#include <seastar/util/defer.hh>
#include "repair/repair.hh"
#include "message/messaging_service.hh"
#include "repair/repair_task.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "mutation_fragment.hh"
@@ -2448,7 +2449,7 @@ private:
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
return is_rpc_stream_supported(algo) ? tracker::max_repair_memory_per_range() : 256 * 1024;
return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range() : 256 * 1024;
}
// Step A: Negotiate sync boundary to use
@@ -2684,7 +2685,7 @@ private:
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_ri.total_rf != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
_ri.id.uuid(), _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
@@ -2692,12 +2693,12 @@ private:
co_return;
}
repair_service& rs = _ri.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time);
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid(), _table_id, _range, _start_time);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time};
repair_update_system_table_request req{_ri.id.uuid(), _table_id, _ri.keyspace, _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
@@ -2705,9 +2706,9 @@ private:
auto& ms = _ri.messaging.local();
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
(void)resp; // nothing to do with the response yet
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node);
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid(), node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception());
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid(), node, std::current_exception());
}
});
co_return;
@@ -2732,13 +2733,13 @@ public:
auto& mem_sem = _ri.rs.memory_sem();
auto max = _ri.rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * tracker::max_repair_memory_per_range();
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range();
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid, wanted, mem_sem.current(), max);
_ri.id.uuid(), wanted, mem_sem.current(), max);
auto mem_permit = seastar::get_units(mem_sem, wanted).get0();
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid, wanted, mem_sem.current(), max);
_ri.id.uuid(), wanted, mem_sem.current(), max);
auto permit = _ri.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
@@ -2810,11 +2811,11 @@ public:
} catch (replica::no_such_column_family& e) {
table_dropped = true;
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_failed = true;
} catch (std::exception& e) {
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
_failed = true;
}
@@ -2917,6 +2918,7 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
: _gossiper(gossiper)
@@ -2927,12 +2929,13 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
, _sys_dist_ks(sys_dist_ks)
, _sys_ks(sys_ks)
, _view_update_generator(vug)
, _repair_module(seastar::make_shared<repair_module>(tm, *this, max_repair_memory))
, _mm(mm)
, _tracker(max_repair_memory)
, _node_ops_metrics(_tracker)
, _node_ops_metrics(_repair_module)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
_gossip_helper = make_shared<row_level_repair_gossip_helper>(*this);
_gossiper.local().register_(_gossip_helper);
@@ -2945,6 +2948,7 @@ future<> repair_service::start() {
}
future<> repair_service::stop() {
co_await _repair_module->stop();
co_await uninit_ms_handlers();
if (this_shard_id() == 0) {
co_await _gossiper.local().unregister_(_gossip_helper);
@@ -2956,12 +2960,12 @@ repair_service::~repair_service() {
assert(_stopped);
}
static shard_id repair_id_to_shard(utils::UUID& repair_id) {
return shard_id(repair_id.get_most_significant_bits()) % smp::count;
static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
return shard_id(repair_id.uuid().get_most_significant_bits()) % smp::count;
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
@@ -2982,7 +2986,7 @@ repair_service::update_history(utils::UUID repair_id, table_id table_id, dht::to
});
}
future<> repair_service::cleanup_history(utils::UUID repair_id) {
future<> repair_service::cleanup_history(tasks::task_id repair_id) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
rs._finished_ranges_history.erase(repair_id);

View File

@@ -11,6 +11,8 @@
#include <vector>
#include "gms/inet_address.hh"
#include "repair/repair.hh"
#include "repair/repair_task.hh"
#include "tasks/task_manager.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/bool_class.hh>
@@ -52,9 +54,9 @@ public:
};
class node_ops_metrics {
tracker& _tracker;
shared_ptr<repair_module> _module;
public:
node_ops_metrics(tracker& tracker);
node_ops_metrics(shared_ptr<repair_module> module);
uint64_t bootstrap_total_ranges{0};
uint64_t bootstrap_finished_ranges{0};
@@ -88,13 +90,13 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::system_keyspace>& _sys_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
shared_ptr<repair_module> _repair_module;
service::migration_manager& _mm;
tracker _tracker;
node_ops_metrics _node_ops_metrics;
std::unordered_map<node_repair_meta_id, repair_meta_ptr> _repair_metas;
uint32_t _next_repair_meta_id = 0; // used only on shard 0
std::unordered_map<utils::UUID, repair_history> _finished_ranges_history;
std::unordered_map<tasks::task_id, repair_history> _finished_ranges_history;
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
bool _stopped = false;
@@ -114,6 +116,7 @@ public:
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
tasks::task_manager& tm,
service::migration_manager& mm, size_t max_repair_memory);
~repair_service();
future<> start();
@@ -126,8 +129,8 @@ public:
// stop them abruptly).
future<> shutdown();
future<std::optional<gc_clock::time_point>> update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
future<> cleanup_history(utils::UUID repair_id);
future<std::optional<gc_clock::time_point>> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
future<> cleanup_history(tasks::task_id repair_id);
future<> load_history();
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
@@ -171,11 +174,8 @@ public:
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
size_t max_repair_memory() const { return _max_repair_memory; }
seastar::semaphore& memory_sem() { return _memory_sem; }
tracker& repair_tracker() {
return _tracker;
}
const tracker& repair_tracker() const {
return _tracker;
repair_module& get_repair_module() noexcept {
return *_repair_module;
}
const node_ops_metrics& get_metrics() const noexcept {

View File

@@ -114,11 +114,11 @@ have_gnutls = any([lib.startswith('libgnutls.so')
gzip_process = subprocess.Popen("pigz > "+output, shell=True, stdin=subprocess.PIPE)
ar = tarfile.open(fileobj=gzip_process.stdin, mode='w|')
# relocatable package format version = 2.2
# relocatable package format version = 3.0
shutil.rmtree(f'build/{SCYLLA_DIR}', ignore_errors=True)
os.makedirs(f'build/{SCYLLA_DIR}')
with open(f'build/{SCYLLA_DIR}/.relocatable_package_version', 'w') as f:
f.write('2.2\n')
f.write('3.0\n')
ar.add(f'build/{SCYLLA_DIR}/.relocatable_package_version', arcname='.relocatable_package_version')
for exe in executables_scylla:

View File

@@ -2720,7 +2720,6 @@ class lsa_object_descriptor(object):
value |= (b & 0x3f) << shift
return lsa_object_descriptor(value, start_pos, pos)
mig_re = re.compile(r'.* standard_migrator<(.*)>\+16>,')
vec_ext_re = re.compile(r'managed_vector<(.*), (.*u), (.*)>::external')
def __init__(self, value, desc_pos, obj_pos):
self.value = value
@@ -2733,10 +2732,12 @@ class lsa_object_descriptor(object):
def dead_size(self):
return self.value / 2
def migrator(self):
def migrator_ptr(self):
static_migrators = gdb.parse_and_eval("'::debug::static_migrators'")
migrator = static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
return migrator.dereference()
return static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
def migrator(self):
return self.migrator_ptr().dereference()
def migrator_str(self):
mig = str(self.migrator())
@@ -2744,29 +2745,11 @@ class lsa_object_descriptor(object):
return m.group(1)
def live_size(self):
mig = str(self.migrator())
m = re.match(self.mig_re, mig)
if m:
type = m.group(1)
external = self.vec_ext_re.match(type)
if type == 'blob_storage':
t = gdb.lookup_type('blob_storage')
blob = self.obj_pos.cast(t.pointer())
return t.sizeof + blob['frag_size']
elif external:
element_type = external.group(1)
count = external.group(2)
size_type = external.group(3)
vec_type = gdb.lookup_type('managed_vector<%s, %s, %s>' % (element_type, count, size_type))
# gdb doesn't see 'external' for some reason
backref_ptr = self.obj_pos.cast(vec_type.pointer().pointer())
vec = backref_ptr.dereference()
element_count = vec['_capacity']
element_type = gdb.lookup_type(element_type)
return backref_ptr.type.sizeof + element_count * element_type.sizeof
else:
return gdb.lookup_type(type).sizeof
return 0
mig = self.migrator_ptr()
obj = int(self.obj_pos)
cmd = f'((migrate_fn_type*){mig})->size((const void*){obj})'
res = gdb.parse_and_eval(cmd)
return int(res)
def end_pos(self):
if self.is_live():

View File

@@ -5077,7 +5077,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
service_permit permit) {
const dht::token& token = pr.start()->value().token();
speculative_retry::type retry_type = schema->speculative_retry().get_type();
gms::inet_address extra_replica;
std::optional<gms::inet_address> extra_replica;
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
// Check for a non-local read before heat-weighted load balancing
@@ -5141,12 +5141,12 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) {
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
slogger.trace("read executor no extra target to speculate");
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {
target_replicas.push_back(extra_replica);
slogger.trace("creating read executor with extra target {}", extra_replica);
target_replicas.push_back(*extra_replica);
slogger.trace("creating read executor with extra target {}", *extra_replica);
}
}

View File

@@ -618,7 +618,7 @@ future<> storage_service::mark_existing_views_as_built(sharded<db::system_distri
});
}
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace() {
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::vector<sstring> ignore_nodes_strs;
std::list<gms::inet_address> ignore_nodes;
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
@@ -628,11 +628,11 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto node = gms::inet_address(n);
ignore_nodes.push_back(node);
auto ep_and_id = tm.parse_host_id_and_endpoint(n);
ignore_nodes.push_back(ep_and_id.endpoint);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
}
}
return ignore_nodes;
@@ -2204,7 +2204,8 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_tokens) {
auto replace_address = get_replace_address().value();
auto uuid = utils::make_random_uuid();
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace();
auto tmptr = get_token_metadata_ptr();
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr);
// Step 1: Decide who needs to sync data for replace operation
std::list<gms::inet_address> sync_nodes;
for (const auto& x :_gossiper.get_endpoint_states()) {
@@ -2248,7 +2249,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
throw std::runtime_error(msg);
}
if (!nodes_down.empty()) {
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2", uuid, nodes_down);
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e", uuid, nodes_down);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
@@ -2328,12 +2329,11 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
}
}
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
future<> storage_service::removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
return run_with_api_lock(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
auto uuid = utils::make_random_uuid();
auto tmptr = ss.get_token_metadata_ptr();
auto host_id = locator::host_id(utils::UUID(host_id_string));
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
if (!endpoint_opt) {
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
@@ -2341,6 +2341,11 @@ future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet
auto endpoint = *endpoint_opt;
auto tokens = tmptr->get_tokens(endpoint);
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
std::list<gms::inet_address> ignore_nodes;
for (auto& hoep : ignore_nodes_params) {
hoep.resolve(*tmptr);
ignore_nodes.push_back(hoep.endpoint);
}
// Step 1: Decide who needs to sync data
//
@@ -2534,6 +2539,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2582,6 +2588,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2626,6 +2633,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
// Wait for local node has marked replacing node as alive
@@ -2680,6 +2688,7 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2925,7 +2934,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = utils::make_random_uuid();
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
@@ -3658,11 +3667,19 @@ node_ops_meta_data::node_ops_meta_data(
, _abort(std::move(abort_func))
, _abort_source(seastar::make_shared<abort_source>())
, _signal(std::move(signal_func))
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
, _watchdog([sig = _signal] { sig(); }) {
_watchdog.arm(_watchdog_interval);
}
future<> node_ops_meta_data::start() {
return _ops ? _ops->start() : make_ready_future<>();
}
future<> node_ops_meta_data::stop() noexcept {
return _ops ? _ops->stop() : make_ready_future<>();
}
future<> node_ops_meta_data::abort() {
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
_watchdog.cancel();
@@ -3708,6 +3725,7 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.cancel_watchdog();
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3715,6 +3733,24 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
if (!ops_uuid) {
for (auto& [uuid, meta] : _node_ops) {
co_await meta.abort();
auto as = meta.get_abort_source();
if (as && !as->abort_requested()) {
as->request_abort();
}
}
for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) {
node_ops_meta_data& meta = it->second;
co_await meta.stop();
}
co_return;
}
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3723,6 +3759,7 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
if (as && !as->abort_requested()) {
as->request_abort();
}
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3741,17 +3778,18 @@ future<> storage_service::node_ops_abort_thread() {
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
co_return;
}
try {
co_await node_ops_abort(*uuid_opt);
co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid()));
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
if (!uuid_opt) {
slogger.info("Stopped node_ops_abort_thread");
co_return;
}
}
}
slogger.info("Stopped node_ops_abort_thread");
__builtin_unreachable();
}
} // namespace service

View File

@@ -105,6 +105,8 @@ public:
std::list<gms::inet_address> ignore_nodes,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func);
future<> start();
future<> stop() noexcept;
shared_ptr<node_ops_info> get_ops_info();
shared_ptr<abort_source> get_abort_source();
future<> abort();
@@ -298,7 +300,7 @@ private:
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace();
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);
public:
@@ -704,7 +706,7 @@ public:
*
* @param hostIdString token for the node
*/
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<> removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);

View File

@@ -14,14 +14,15 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "db_clock.hh"
#include "log.hh"
#include "tasks/types.hh"
#include "utils/UUID.hh"
#include "utils/serialized_action.hh"
#include "utils/updateable_value.hh"
namespace tasks {
using task_id = utils::tagged_uuid<struct task_id_tag>;
using is_abortable = bool_class <struct abortable_tag>;
using is_internal = bool_class<struct internal_tag>;
extern logging::logger tmlogger;
@@ -64,17 +65,6 @@ public:
failed
};
struct parent_data {
task_id id;
unsigned shard;
parent_data() : id(task_id::create_null_id()) {}
operator bool() const noexcept {
return bool(id);
}
};
class task : public enable_lw_shared_from_this<task> {
public:
struct progress {
@@ -131,6 +121,10 @@ public:
return is_abortable::no;
}
virtual is_internal is_internal() const noexcept {
return is_internal::no;
}
virtual future<> abort() noexcept {
return make_ready_future<>();
}
@@ -243,6 +237,10 @@ public:
return _impl->is_abortable();
};
is_internal is_internal() const noexcept {
return _impl->is_internal();
}
future<> abort() noexcept {
return _impl->abort();
}
@@ -320,37 +318,37 @@ public:
co_await _gate.close();
_tm.unregister_module(_name);
}
public:
template<typename T>
requires std::is_base_of_v<task_manager::task::impl, T>
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", parent_data parent_d = parent_data{}) {
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_info parent_d = task_info{}) {
return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
auto module_ptr = tm.find_module(module);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) {
return task->id();
});
});
}
// Must be called on target shard.
// If task has a parent, data concerning its children is updated and sequence number is inherited
// from a parent and set. Otherwise, it must be set by caller.
future<task_ptr> make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
foreign_task_ptr parent;
uint64_t sequence_number = 0;
if (parent_d) {
parent = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id] (task_manager& tm) mutable -> future<foreign_task_ptr> {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable {
const auto& all_tasks = tm.get_all_tasks();
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
co_return it->second;
it->second->add_child(std::move(task));
return make_ready_future<uint64_t>(it->second->get_sequence_number());
} else {
co_return coroutine::return_exception(task_manager::task_not_found(id));
return make_exception_future<uint64_t>(task_manager::task_not_found(id));
}
});
sequence_number = parent->get_sequence_number();
}
auto task = co_await _tm.container().invoke_on(shard, [id, module = _name, sequence_number, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
auto module_ptr = tm.find_module(module);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? sequence_number : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
return make_ready_future<foreign_task_ptr>(make_lw_shared<task_manager::task>(std::move(task_impl_ptr)));
});
id = task->id();
if (parent_d) {
co_await _tm.container().invoke_on(parent.get_owner_shard(), [task = std::move(parent), child = std::move(task)] (task_manager& tm) mutable {
task->add_child(std::move(child));
});
}
co_return id;
co_return task;
}
};
public:

29
tasks/types.hh Normal file
View File

@@ -0,0 +1,29 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/UUID.hh"
namespace tasks {
using task_id = utils::tagged_uuid<struct task_id_tag>;
struct task_info {
task_id id;
unsigned shard;
task_info() noexcept : id(task_id::create_null_id()) {}
task_info(task_id id, unsigned parent_shard) noexcept : id(id), shard(parent_shard) {}
operator bool() const noexcept {
return bool(id);
}
};
}

View File

@@ -0,0 +1,46 @@
# Copyright 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#############################################################################
# Tests involving the "timestamp" column type.
#
# Please do not confuse the "timestamp" column type tested here, with the
# unrelated concept of the "timestamp" of a write (the "USING TIMESTAMP").
# That latter concept is tested in test_timestamp.py, not here.
#############################################################################
from util import new_test_table, unique_key_int
import pytest
@pytest.fixture(scope="module")
def table1(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int primary key, t timestamp") as table:
yield table
# According to the Cassandra documentation, a timestamp is a 64-bit integer
# "representing a number of milliseconds since the standard base time known
# as the epoch". This underlying "number" can be stored into a timestamp
# column by writing an integer directly to it, and later the number can be
# extracted during select with the built-in tounixtimestamp() function.
#
# In issue #11588, a user who mistakenly used a microsecond count reported
# that the write was successful, but then the read failed reporting that
# "timestamp is out of range. Must be in milliseconds since epoch".
# This is wrong and unhelpful beahvior, and the following test reproduces
# it. Cassandra currently allows the out-of-range number to be both written
# and then read - so that is the Cassandra-compatible approach - but
# arguably a more correct and useful behavior would be to fail the write
# up-front, even before reaching the read.
@pytest.mark.xfail(reason="issue #11588")
def test_type_timestamp_overflow(cql, table1):
p = unique_key_int()
t = 1667215862 * 1000000
# t is out of the normal range for milliseconds since the epoch (it
# was calculated as microseconds since the epoch), but Cassandra allows
# to write it. Scylla may decide in the future to fail this write (in
# which case this test should be changed to accept both options, or
# be marked scylla_only), but passing the write and failing the read
# (as in #11588) is a bug.
cql.execute(f"INSERT INTO {table1} (p, t) VALUES ({p}, {t})")
assert list(cql.execute(f"SELECT tounixtimestamp(t) from {table1} where p = {p}")) == [(t,)]

View File

@@ -1436,6 +1436,21 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(5)), {}))
.produces_end_of_stream();
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
s.schema()->full_slice(),
default_priority_class(),
nullptr,
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::no))
.produces_partition_start(pkey)
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::after_key(s.make_ckey(0)),
position_in_partition::for_key(s.make_ckey(2))))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {}))
.produces_end_of_stream();
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
s.schema()->full_slice(),
default_priority_class(),

View File

@@ -33,6 +33,7 @@
#include "utils/managed_bytes.hh"
#include "utils/bit_cast.hh"
#include "utils/chunked_vector.hh"
#include "tasks/types.hh"
class tuple_type_impl;
class big_decimal;
@@ -1041,6 +1042,12 @@ shared_ptr<const abstract_type> data_type_for<cql_duration>() {
return duration_type;
}
template <>
inline
shared_ptr<const abstract_type> data_type_for<tasks::task_id>() {
return uuid_type;
}
namespace std {
template <>

View File

@@ -51,9 +51,10 @@ done
UNIFIED_PKG="$(realpath -s $UNIFIED_PKG)"
PKGS="build/$MODE/dist/tar/$PRODUCT-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-python3-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-jmx-$VERSION-$RELEASE.noarch.tar.gz build/$MODE/dist/tar/$PRODUCT-tools-$VERSION-$RELEASE.noarch.tar.gz"
BASEDIR="build/$MODE/unified/$PRODUCT-$VERSION"
rm -rf build/"$MODE"/unified/
mkdir -p build/"$MODE"/unified/
mkdir -p "$BASEDIR"
for pkg in $PKGS; do
if [ ! -e "$pkg" ]; then
echo "$pkg not found."
@@ -61,17 +62,17 @@ for pkg in $PKGS; do
exit 1
fi
pkg="$(readlink -f $pkg)"
tar -C build/"$MODE"/unified/ -xpf "$pkg"
tar -C "$BASEDIR" -xpf "$pkg"
dirname=$(basename "$pkg"| sed -e "s/-$VERSION_ESC-$RELEASE_ESC\.[^.]*\.tar\.gz//")
dirname=${dirname/#$PRODUCT/scylla}
if [ ! -d build/"$MODE"/unified/"$dirname" ]; then
if [ ! -d "$BASEDIR/$dirname" ]; then
echo "Directory $dirname not found in $pkg, the pacakge may corrupted."
exit 1
fi
done
ln -f unified/install.sh build/"$MODE"/unified/
ln -f unified/uninstall.sh build/"$MODE"/unified/
# relocatable package format version = 2.2
echo "2.2" > build/"$MODE"/unified/.relocatable_package_version
ln -f unified/install.sh "$BASEDIR"
ln -f unified/uninstall.sh "$BASEDIR"
# relocatable package format version = 3.0
echo "3.0" > "$BASEDIR"/.relocatable_package_version
cd build/"$MODE"/unified
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz * .relocatable_package_version
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz "$PRODUCT-$VERSION"