Compare commits
1 Commits
kbr--patch
...
mykaul-doc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f03bf2362 |
@@ -1228,7 +1228,7 @@
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Removes a node from the cluster. Replicated data that logically belonged to this node is redistributed among the remaining nodes.",
|
||||
"summary":"Removes token (and all data associated with enpoint that had it) from the ring",
|
||||
"type":"void",
|
||||
"nickname":"remove_node",
|
||||
"produces":[
|
||||
@@ -1245,7 +1245,7 @@
|
||||
},
|
||||
{
|
||||
"name":"ignore_nodes",
|
||||
"description":"Comma-separated list of dead nodes to ignore in removenode operation. Use the same method for all nodes to ignore: either Host IDs or ip addresses.",
|
||||
"description":"List of dead nodes to ingore in removenode operation",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
|
||||
@@ -49,14 +49,6 @@
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
},
|
||||
{
|
||||
"name":"internal",
|
||||
"description":"Boolean flag indicating whether internal tasks should be shown (false by default)",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"boolean",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"The keyspace to query about",
|
||||
|
||||
@@ -69,11 +69,6 @@ sstring validate_keyspace(http_context& ctx, const parameters& param) {
|
||||
return validate_keyspace(ctx, param["keyspace"]);
|
||||
}
|
||||
|
||||
locator::host_id validate_host_id(const sstring& param) {
|
||||
auto hoep = locator::host_id_or_endpoint(param, locator::host_id_or_endpoint::param_type::host_id);
|
||||
return hoep.id;
|
||||
}
|
||||
|
||||
// splits a request parameter assumed to hold a comma-separated list of table names
|
||||
// verify that the tables are found, otherwise a bad_param_exception exception is thrown
|
||||
// containing the description of the respective no_such_column_family error.
|
||||
@@ -720,23 +715,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
});
|
||||
|
||||
ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
|
||||
auto host_id = validate_host_id(req->get_query_param("host_id"));
|
||||
auto host_id = req->get_query_param("host_id");
|
||||
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
|
||||
auto ignore_nodes = std::list<locator::host_id_or_endpoint>();
|
||||
auto ignore_nodes = std::list<gms::inet_address>();
|
||||
for (std::string n : ignore_nodes_strs) {
|
||||
try {
|
||||
std::replace(n.begin(), n.end(), '\"', ' ');
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
auto hoep = locator::host_id_or_endpoint(n);
|
||||
if (!ignore_nodes.empty() && hoep.has_host_id() != ignore_nodes.front().has_host_id()) {
|
||||
throw std::runtime_error("All nodes should be identified using the same method: either Host IDs or ip addresses.");
|
||||
}
|
||||
ignore_nodes.push_back(std::move(hoep));
|
||||
auto node = gms::inet_address(n);
|
||||
ignore_nodes.push_back(node);
|
||||
}
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
|
||||
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
|
||||
}
|
||||
}
|
||||
return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {
|
||||
|
||||
@@ -93,12 +93,11 @@ void set_task_manager(http_context& ctx, routes& r) {
|
||||
|
||||
tm::get_tasks.set(r, [&ctx] (std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
using chunked_stats = utils::chunked_vector<task_stats>;
|
||||
auto internal = tasks::is_internal{req_param<bool>(*req, "internal", false)};
|
||||
std::vector<chunked_stats> res = co_await ctx.tm.map([&req, internal] (tasks::task_manager& tm) {
|
||||
std::vector<chunked_stats> res = co_await ctx.tm.map([&req] (tasks::task_manager& tm) {
|
||||
chunked_stats local_res;
|
||||
auto module = tm.find_module(req->param["module"]);
|
||||
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([¶ms = req->query_parameters, internal] (const auto& task) {
|
||||
return (internal || !task.second->is_internal()) && filter_tasks(task.second, params);
|
||||
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([¶ms = req->query_parameters] (const auto& task) {
|
||||
return filter_tasks(task.second, params);
|
||||
});
|
||||
for (auto& [task_id, task] : filtered_tasks) {
|
||||
local_res.push_back(task_stats{task});
|
||||
|
||||
@@ -52,7 +52,7 @@ void set_task_manager_test(http_context& ctx, routes& r, db::config& cfg) {
|
||||
it = req->query_parameters.find("entity");
|
||||
std::string entity = it != req->query_parameters.end() ? it->second : "";
|
||||
it = req->query_parameters.find("parent_id");
|
||||
tasks::task_info data;
|
||||
tasks::task_manager::parent_data data;
|
||||
if (it != req->query_parameters.end()) {
|
||||
data.id = tasks::task_id{utils::UUID{it->second}};
|
||||
auto parent_ptr = co_await tasks::task_manager::lookup_task_on_all_shards(ctx.tm, data.id);
|
||||
|
||||
@@ -790,7 +790,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, load_ring_state(this, "load_ring_state", value_status::Used, true, "When set to true, load tokens and host_ids previously saved. Same as -Dcassandra.load_ring_state in cassandra.")
|
||||
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
|
||||
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
|
||||
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation using a comman-separated list of either host IDs or ip addresses. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
|
||||
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2")
|
||||
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
|
||||
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
|
||||
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")
|
||||
|
||||
@@ -237,7 +237,7 @@ filter_for_query(consistency_level cl,
|
||||
const inet_address_vector_replica_set& preferred_endpoints,
|
||||
read_repair_decision read_repair,
|
||||
const gms::gossiper& g,
|
||||
std::optional<gms::inet_address>* extra,
|
||||
gms::inet_address* extra,
|
||||
replica::column_family* cf) {
|
||||
size_t local_count;
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ filter_for_query(consistency_level cl,
|
||||
const inet_address_vector_replica_set& preferred_endpoints,
|
||||
read_repair_decision read_repair,
|
||||
const gms::gossiper& g,
|
||||
std::optional<gms::inet_address>* extra,
|
||||
gms::inet_address* extra,
|
||||
replica::column_family* cf);
|
||||
|
||||
inet_address_vector_replica_set filter_for_query(consistency_level cl,
|
||||
|
||||
@@ -2874,14 +2874,14 @@ future<> system_keyspace::get_compaction_history(compaction_history_consumer&& f
|
||||
|
||||
future<> system_keyspace::update_repair_history(repair_history_entry entry) {
|
||||
sstring req = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", REPAIR_HISTORY);
|
||||
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id.uuid(), entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
|
||||
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id, entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) {
|
||||
sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id);
|
||||
co_await _qp.local().query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
|
||||
repair_history_entry ent;
|
||||
ent.id = row.get_as<tasks::task_id>("repair_uuid");
|
||||
ent.id = row.get_as<utils::UUID>("repair_uuid");
|
||||
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
|
||||
ent.range_start = row.get_as<int64_t>("range_start");
|
||||
ent.range_end = row.get_as<int64_t>("range_end");
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
#include "cdc/generation_id.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "service/raft/group0_fwd.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -313,7 +312,7 @@ public:
|
||||
static future<> get_compaction_history(compaction_history_consumer&& f);
|
||||
|
||||
struct repair_history_entry {
|
||||
tasks::task_id id;
|
||||
utils::UUID id;
|
||||
table_id table_uuid;
|
||||
db_clock::time_point ts;
|
||||
sstring ks;
|
||||
|
||||
18
dist/common/scripts/scylla_raid_setup
vendored
18
dist/common/scripts/scylla_raid_setup
vendored
@@ -16,7 +16,7 @@ import stat
|
||||
import distro
|
||||
from pathlib import Path
|
||||
from scylla_util import *
|
||||
from subprocess import run, SubprocessError
|
||||
from subprocess import run
|
||||
|
||||
if __name__ == '__main__':
|
||||
if os.getuid() > 0:
|
||||
@@ -159,6 +159,10 @@ if __name__ == '__main__':
|
||||
raise Exception(f'Failed to get UUID of {fsdev}')
|
||||
|
||||
uuidpath = f'/dev/disk/by-uuid/{uuid}'
|
||||
if not os.path.exists(uuidpath):
|
||||
raise Exception(f'{uuidpath} is not found')
|
||||
if not stat.S_ISBLK(os.stat(uuidpath).st_mode):
|
||||
raise Exception(f'{uuidpath} is not block device')
|
||||
|
||||
after = 'local-fs.target'
|
||||
wants = ''
|
||||
@@ -198,16 +202,8 @@ WantedBy=multi-user.target
|
||||
systemd_unit.reload()
|
||||
if args.raid_level != '0':
|
||||
md_service.start()
|
||||
try:
|
||||
mount = systemd_unit(mntunit_bn)
|
||||
mount.start()
|
||||
except SubprocessError as e:
|
||||
if not os.path.exists(uuidpath):
|
||||
print(f'\nERROR: {uuidpath} is not found\n')
|
||||
elif not stat.S_ISBLK(os.stat(uuidpath).st_mode):
|
||||
print(f'\nERROR: {uuidpath} is not block device\n')
|
||||
raise e
|
||||
|
||||
mount = systemd_unit(mntunit_bn)
|
||||
mount.start()
|
||||
if args.enable_on_nextboot:
|
||||
mount.enable()
|
||||
uid = pwd.getpwnam('scylla').pw_uid
|
||||
|
||||
18
dist/common/scripts/scylla_util.py
vendored
18
dist/common/scripts/scylla_util.py
vendored
@@ -197,8 +197,6 @@ def is_system_partition(dev):
|
||||
return (uuid in SYSTEM_PARTITION_UUIDS)
|
||||
|
||||
def is_unused_disk(dev):
|
||||
# resolve symlink to real path
|
||||
dev = os.path.realpath(dev)
|
||||
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
|
||||
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
|
||||
return False
|
||||
@@ -426,16 +424,8 @@ class sysconfig_parser:
|
||||
def __escape(self, val):
|
||||
return re.sub(r'"', r'\"', val)
|
||||
|
||||
def __unescape(self, val):
|
||||
return re.sub(r'\\"', r'"', val)
|
||||
|
||||
def __format_line(self, key, val):
|
||||
need_quotes = any([ch.isspace() for ch in val])
|
||||
esc_val = self.__escape(val)
|
||||
return f'{key}="{esc_val}"' if need_quotes else f'{key}={esc_val}'
|
||||
|
||||
def __add(self, key, val):
|
||||
self._data += self.__format_line(key, val) + '\n'
|
||||
self._data += '{}="{}"\n'.format(key, self.__escape(val))
|
||||
self.__load()
|
||||
|
||||
def __init__(self, filename):
|
||||
@@ -450,8 +440,7 @@ class sysconfig_parser:
|
||||
self.__load()
|
||||
|
||||
def get(self, key):
|
||||
val = self._cfg.get('global', key).strip('"')
|
||||
return self.__unescape(val)
|
||||
return self._cfg.get('global', key).strip('"')
|
||||
|
||||
def has_option(self, key):
|
||||
return self._cfg.has_option('global', key)
|
||||
@@ -459,8 +448,7 @@ class sysconfig_parser:
|
||||
def set(self, key, val):
|
||||
if not self.has_option(key):
|
||||
return self.__add(key, val)
|
||||
new_line = self.__format_line(key, val)
|
||||
self._data = re.sub(f'^{key}=[^\n]*$', new_line, self._data, flags=re.MULTILINE)
|
||||
self._data = re.sub('^{}=[^\n]*$'.format(key), '{}="{}"'.format(key, self.__escape(val)), self._data, flags=re.MULTILINE)
|
||||
self.__load()
|
||||
|
||||
def commit(self):
|
||||
|
||||
@@ -104,9 +104,9 @@ html_theme_options = {
|
||||
"versions_unstable": UNSTABLE_VERSIONS,
|
||||
"versions_deprecated": DEPRECATED_VERSIONS,
|
||||
'banner_button_text': 'Learn More',
|
||||
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-12-registration',
|
||||
'banner_title_text': 'Join us for ScyllaDB University LIVE, a half-day of FREE instructor-led training with exclusive new ScyllaDB database content. December 1. Register for Free',
|
||||
'hide_banner': 'false',
|
||||
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-03-registration',
|
||||
'banner_title_text': 'Join us for Scylla University LIVE, a half-day of FREE instructor-led training with exclusive new Scylla database content. March 22-23. Register for Free',
|
||||
'hide_banner': 'true',
|
||||
"collapse_navigation": 'true',
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ To build everything, run:
|
||||
|
||||
```console
|
||||
git clone https://github.com/scylladb/scylla
|
||||
cd ./scylla
|
||||
git submodule update --init --force --recursive
|
||||
./tools/toolchain/dbuild ./configure.py --mode=<mode>
|
||||
./tools/toolchain/dbuild ninja
|
||||
|
||||
@@ -120,6 +120,10 @@ You can clean snapshots by using :doc:`nodetool clearsnapshot </operating-scylla
|
||||
|
||||
Features
|
||||
--------
|
||||
Will Scylla have a certain feature in an upcoming release?
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Check the `Roadmap <http://www.scylladb.com/technology/status/>`_ page for features scheduled for our GA release.
|
||||
|
||||
I want to try out new features. How do I enable experimental mode?
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
You need to add the line :code:`experimental: true` to your :code:`scylla.yaml` file.
|
||||
|
||||
@@ -25,7 +25,7 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode 192.168.1.3
|
||||
|
||||
Note that all the nodes in the cluster participate in the ``removenode`` operation to sync data if needed. For this reason, the operation will fail if one or more nodes in the cluster are not available.
|
||||
In such a case, to ensure that the operation succeeds, you must explicitly specify a list of unavailable nodes with the ``--ignore-dead-nodes`` option.
|
||||
@@ -41,8 +41,8 @@ Example:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
|
||||
nodetool removenode 192.168.1.3
|
||||
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 192.168.1.3
|
||||
|
||||
|
||||
.. versionadded:: version 4.6 ``--ignore-dead-nodes`` option
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
====================================
|
||||
Upgrade Guide - ScyllaDB 5.0 to 5.1
|
||||
Upgrade Guide - ScyllaDB 4.6 to 5.0
|
||||
====================================
|
||||
|
||||
.. toctree::
|
||||
|
||||
@@ -1885,8 +1885,6 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes)
|
||||
gms::application_state::STATUS,
|
||||
gms::application_state::HOST_ID,
|
||||
gms::application_state::TOKENS,
|
||||
gms::application_state::DC,
|
||||
gms::application_state::RACK,
|
||||
gms::application_state::SUPPORTED_FEATURES,
|
||||
gms::application_state::SNITCH_NAME}};
|
||||
logger.info("Gossip shadow round started with nodes={}", nodes);
|
||||
|
||||
@@ -120,7 +120,7 @@ struct node_ops_cmd_response {
|
||||
};
|
||||
|
||||
struct repair_update_system_table_request {
|
||||
tasks::task_id repair_uuid;
|
||||
utils::UUID repair_uuid;
|
||||
table_id table_uuid;
|
||||
sstring keyspace_name;
|
||||
sstring table_name;
|
||||
@@ -132,7 +132,7 @@ struct repair_update_system_table_response {
|
||||
};
|
||||
|
||||
struct repair_flush_hints_batchlog_request {
|
||||
tasks::task_id repair_uuid;
|
||||
utils::UUID repair_uuid;
|
||||
std::list<gms::inet_address> target_nodes;
|
||||
std::chrono::seconds hints_timeout;
|
||||
std::chrono::seconds batchlog_timeout;
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
#include "schema_fwd.hh"
|
||||
#include "query-request.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "tasks/types.hh"
|
||||
|
||||
namespace utils {
|
||||
class UUID final {
|
||||
@@ -19,10 +18,6 @@ class UUID final {
|
||||
};
|
||||
}
|
||||
|
||||
class tasks::task_id final {
|
||||
utils::UUID uuid();
|
||||
};
|
||||
|
||||
class table_id final {
|
||||
utils::UUID uuid();
|
||||
};
|
||||
|
||||
@@ -308,11 +308,11 @@ elif [ "$ID" = "fedora" ]; then
|
||||
exit 1
|
||||
fi
|
||||
dnf install -y "${fedora_packages[@]}" "${fedora_python3_packages[@]}"
|
||||
PIP_DEFAULT_ARGS="--only-binary=:all: -v"
|
||||
pip3 install "$PIP_DEFAULT_ARGS" "geomet<0.3,>=0.1"
|
||||
pip3 install "$PIP_DEFAULT_ARGS" scylla-driver
|
||||
pip3 install "$PIP_DEFAULT_ARGS" traceback-with-variables
|
||||
pip3 install "$PIP_DEFAULT_ARGS" scylla-api-client
|
||||
pip3 install "geomet<0.3,>=0.1"
|
||||
# Disable C extensions
|
||||
pip3 install scylla-driver --install-option="--no-murmur3" --install-option="--no-libev" --install-option="--no-cython"
|
||||
pip3 install traceback-with-variables
|
||||
pip3 install scylla-api-client
|
||||
|
||||
cargo install cxxbridge-cmd --root /usr/local
|
||||
if [ -f "$(node_exporter_fullpath)" ] && node_exporter_checksum; then
|
||||
|
||||
@@ -1087,12 +1087,6 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const {
|
||||
return _impl->get_endpoint_for_host_id(host_id);
|
||||
}
|
||||
|
||||
host_id_or_endpoint token_metadata::parse_host_id_and_endpoint(const sstring& host_id_string) const {
|
||||
auto res = host_id_or_endpoint(host_id_string);
|
||||
res.resolve(*this);
|
||||
return res;
|
||||
}
|
||||
|
||||
const std::unordered_map<inet_address, host_id>&
|
||||
token_metadata::get_endpoint_to_host_id_map_for_reading() const {
|
||||
return _impl->get_endpoint_to_host_id_map_for_reading();
|
||||
@@ -1438,49 +1432,4 @@ future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_funct
|
||||
set(make_token_metadata_ptr(std::move(tm)));
|
||||
}
|
||||
|
||||
host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) {
|
||||
switch (restrict) {
|
||||
case param_type::host_id:
|
||||
try {
|
||||
id = host_id(utils::UUID(s));
|
||||
} catch (const marshal_exception& e) {
|
||||
throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what()));
|
||||
}
|
||||
break;
|
||||
case param_type::endpoint:
|
||||
try {
|
||||
endpoint = gms::inet_address(s);
|
||||
} catch (std::invalid_argument& e) {
|
||||
throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what()));
|
||||
}
|
||||
break;
|
||||
case param_type::auto_detect:
|
||||
try {
|
||||
id = host_id(utils::UUID(s));
|
||||
} catch (const marshal_exception& e) {
|
||||
try {
|
||||
endpoint = gms::inet_address(s);
|
||||
} catch (std::invalid_argument& e) {
|
||||
throw std::invalid_argument(format("Invalid host_id or inet_address {}", s));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void host_id_or_endpoint::resolve(const token_metadata& tm) {
|
||||
if (id) {
|
||||
auto endpoint_opt = tm.get_endpoint_for_host_id(id);
|
||||
if (!endpoint_opt) {
|
||||
throw std::runtime_error(format("Host ID {} not found in the cluster", id));
|
||||
}
|
||||
endpoint = *endpoint_opt;
|
||||
} else {
|
||||
auto opt_id = tm.get_host_id_if_known(endpoint);
|
||||
if (!opt_id) {
|
||||
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint));
|
||||
}
|
||||
id = *opt_id;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -133,33 +133,6 @@ private:
|
||||
bool _sort_by_proximity = true;
|
||||
};
|
||||
|
||||
class token_metadata;
|
||||
|
||||
struct host_id_or_endpoint {
|
||||
host_id id;
|
||||
gms::inet_address endpoint;
|
||||
|
||||
enum class param_type {
|
||||
host_id,
|
||||
endpoint,
|
||||
auto_detect
|
||||
};
|
||||
|
||||
host_id_or_endpoint(const sstring& s, param_type restrict = param_type::auto_detect);
|
||||
|
||||
bool has_host_id() const noexcept {
|
||||
return bool(id);
|
||||
}
|
||||
|
||||
bool has_endpoint() const noexcept {
|
||||
return endpoint != gms::inet_address();
|
||||
}
|
||||
|
||||
// Map the host_id to endpoint based on whichever of them is set,
|
||||
// using the token_metadata
|
||||
void resolve(const token_metadata& tm);
|
||||
};
|
||||
|
||||
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
|
||||
class token_metadata_impl;
|
||||
|
||||
@@ -247,10 +220,6 @@ public:
|
||||
/** Return the end-point for a unique host ID */
|
||||
std::optional<inet_address> get_endpoint_for_host_id(locator::host_id host_id) const;
|
||||
|
||||
/// Parses the \c host_id_string either as a host uuid or as an ip address and returns the mapping.
|
||||
/// Throws std::invalid_argument on parse error or std::runtime_error if the host_id wasn't found.
|
||||
host_id_or_endpoint parse_host_id_and_endpoint(const sstring& host_id_string) const;
|
||||
|
||||
/** @return a copy of the endpoint-to-id map for read-only operations */
|
||||
const std::unordered_map<inet_address, host_id>& get_endpoint_to_host_id_map_for_reading() const;
|
||||
|
||||
|
||||
2
main.cc
2
main.cc
@@ -1320,7 +1320,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// both)
|
||||
supervisor::notify("starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
repair.stop().get();
|
||||
});
|
||||
|
||||
@@ -126,7 +126,7 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
|
||||
}
|
||||
void maybe_emit_end_tombstone() {
|
||||
if (_active_tombstone) {
|
||||
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::before_key(_current.end()), {}));
|
||||
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::after_key(_current.end()), {}));
|
||||
}
|
||||
}
|
||||
public:
|
||||
|
||||
191
repair/repair.cc
191
repair/repair.cc
@@ -49,12 +49,6 @@
|
||||
|
||||
logging::logger rlogger("repair");
|
||||
|
||||
node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept
|
||||
: ops_uuid(ops_uuid_)
|
||||
, as(std::move(as_))
|
||||
, ignore_nodes(std::move(ignore_nodes_))
|
||||
{}
|
||||
|
||||
void node_ops_info::check_abort() {
|
||||
if (as && as->abort_requested()) {
|
||||
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
|
||||
@@ -63,30 +57,8 @@ void node_ops_info::check_abort() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_info::start() {
|
||||
if (as) {
|
||||
co_await _sas.start();
|
||||
_abort_subscription = as->subscribe([this] () noexcept {
|
||||
_abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept {
|
||||
as.request_abort();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
future<> node_ops_info::stop() noexcept {
|
||||
if (as) {
|
||||
co_await std::exchange(_abort_done, make_ready_future<>());
|
||||
co_await _sas.stop();
|
||||
}
|
||||
}
|
||||
|
||||
abort_source* node_ops_info::local_abort_source() {
|
||||
return as ? &_sas.local() : nullptr;
|
||||
}
|
||||
|
||||
node_ops_metrics::node_ops_metrics(shared_ptr<repair_module> module)
|
||||
: _module(module)
|
||||
node_ops_metrics::node_ops_metrics(tracker& tracker)
|
||||
: _tracker(tracker)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
auto ops_label_type = sm::label("ops");
|
||||
@@ -167,7 +139,7 @@ static std::vector<sstring> list_column_families(const replica::database& db, co
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x) {
|
||||
return os << format("[id={}, uuid={}]", x.id, x.uuid());
|
||||
return os << format("[id={}, uuid={}]", x.id, x.uuid);
|
||||
}
|
||||
|
||||
// Must run inside a seastar thread
|
||||
@@ -342,12 +314,11 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(re
|
||||
}
|
||||
|
||||
float node_ops_metrics::repair_finished_percentage() {
|
||||
return _module->report_progress(streaming::stream_reason::repair);
|
||||
return _tracker.report_progress(streaming::stream_reason::repair);
|
||||
}
|
||||
|
||||
repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept
|
||||
: tasks::task_manager::module(tm, "repair")
|
||||
, _rs(rs)
|
||||
tracker::tracker(size_t max_repair_memory)
|
||||
: _shutdown(false)
|
||||
, _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4)),
|
||||
named_semaphore_exception_factory{"repair range parallelism"})
|
||||
{
|
||||
@@ -356,14 +327,14 @@ repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t
|
||||
max_repair_memory, max_repair_memory_per_range(), nr);
|
||||
}
|
||||
|
||||
void repair_module::start(repair_uniq_id id) {
|
||||
_pending_repairs.insert(id.uuid());
|
||||
void tracker::start(repair_uniq_id id) {
|
||||
_pending_repairs.insert(id.uuid);
|
||||
_status[id.id] = repair_status::RUNNING;
|
||||
}
|
||||
|
||||
void repair_module::done(repair_uniq_id id, bool succeeded) {
|
||||
_pending_repairs.erase(id.uuid());
|
||||
_aborted_pending_repairs.erase(id.uuid());
|
||||
void tracker::done(repair_uniq_id id, bool succeeded) {
|
||||
_pending_repairs.erase(id.uuid);
|
||||
_aborted_pending_repairs.erase(id.uuid);
|
||||
if (succeeded) {
|
||||
_status.erase(id.id);
|
||||
} else {
|
||||
@@ -371,9 +342,8 @@ void repair_module::done(repair_uniq_id id, bool succeeded) {
|
||||
}
|
||||
_done_cond.broadcast();
|
||||
}
|
||||
|
||||
repair_status repair_module::get(int id) const {
|
||||
if (id > _sequence_number) {
|
||||
repair_status tracker::get(int id) const {
|
||||
if (id >= _next_repair_command) {
|
||||
throw std::runtime_error(format("unknown repair id {}", id));
|
||||
}
|
||||
auto it = _status.find(id);
|
||||
@@ -384,9 +354,9 @@ repair_status repair_module::get(int id) const {
|
||||
}
|
||||
}
|
||||
|
||||
future<repair_status> repair_module::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
|
||||
return seastar::with_gate(async_gate(), [this, id, timeout] {
|
||||
if (id > _sequence_number) {
|
||||
future<repair_status> tracker::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
|
||||
return seastar::with_gate(_gate, [this, id, timeout] {
|
||||
if (id >= _next_repair_command) {
|
||||
return make_exception_future<repair_status>(std::runtime_error(format("unknown repair id {}", id)));
|
||||
}
|
||||
return repeat_until_value([this, id, timeout] {
|
||||
@@ -408,19 +378,30 @@ future<repair_status> repair_module::repair_await_completion(int id, std::chrono
|
||||
});
|
||||
}
|
||||
|
||||
void repair_module::check_in_shutdown() {
|
||||
abort_source().check();
|
||||
repair_uniq_id tracker::next_repair_command() {
|
||||
return repair_uniq_id{_next_repair_command++, utils::make_random_uuid()};
|
||||
}
|
||||
|
||||
void repair_module::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
|
||||
future<> tracker::shutdown() {
|
||||
_shutdown.store(true, std::memory_order_relaxed);
|
||||
return _gate.close();
|
||||
}
|
||||
|
||||
void tracker::check_in_shutdown() {
|
||||
if (_shutdown.load(std::memory_order_relaxed)) {
|
||||
throw std::runtime_error(format("Repair service is being shutdown"));
|
||||
}
|
||||
}
|
||||
|
||||
void tracker::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
|
||||
_repairs.emplace(id, ri);
|
||||
}
|
||||
|
||||
void repair_module::remove_repair_info(int id) {
|
||||
void tracker::remove_repair_info(int id) {
|
||||
_repairs.erase(id);
|
||||
}
|
||||
|
||||
lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
|
||||
lw_shared_ptr<repair_info> tracker::get_repair_info(int id) {
|
||||
auto it = _repairs.find(id);
|
||||
if (it != _repairs.end()) {
|
||||
return it->second;
|
||||
@@ -428,7 +409,7 @@ lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<int> repair_module::get_active() const {
|
||||
std::vector<int> tracker::get_active() const {
|
||||
std::vector<int> res;
|
||||
boost::push_back(res, _status | boost::adaptors::filtered([] (auto& x) {
|
||||
return x.second == repair_status::RUNNING;
|
||||
@@ -436,7 +417,7 @@ std::vector<int> repair_module::get_active() const {
|
||||
return res;
|
||||
}
|
||||
|
||||
size_t repair_module::nr_running_repair_jobs() {
|
||||
size_t tracker::nr_running_repair_jobs() {
|
||||
size_t count = 0;
|
||||
if (this_shard_id() != 0) {
|
||||
return count;
|
||||
@@ -450,11 +431,11 @@ size_t repair_module::nr_running_repair_jobs() {
|
||||
return count;
|
||||
}
|
||||
|
||||
bool repair_module::is_aborted(const tasks::task_id& uuid) {
|
||||
bool tracker::is_aborted(const utils::UUID& uuid) {
|
||||
return _aborted_pending_repairs.contains(uuid);
|
||||
}
|
||||
|
||||
void repair_module::abort_all_repairs() {
|
||||
void tracker::abort_all_repairs() {
|
||||
_aborted_pending_repairs = _pending_repairs;
|
||||
for (auto& x : _repairs) {
|
||||
auto& ri = x.second;
|
||||
@@ -463,7 +444,7 @@ void repair_module::abort_all_repairs() {
|
||||
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
|
||||
}
|
||||
|
||||
float repair_module::report_progress(streaming::stream_reason reason) {
|
||||
float tracker::report_progress(streaming::stream_reason reason) {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
for (auto& x : _repairs) {
|
||||
@@ -476,15 +457,15 @@ float repair_module::report_progress(streaming::stream_reason reason) {
|
||||
return nr_ranges_total == 0 ? 1 : float(nr_ranges_finished) / float(nr_ranges_total);
|
||||
}
|
||||
|
||||
named_semaphore& repair_module::range_parallelism_semaphore() {
|
||||
named_semaphore& tracker::range_parallelism_semaphore() {
|
||||
return _range_parallelism_semaphore;
|
||||
}
|
||||
|
||||
future<> repair_module::run(repair_uniq_id id, std::function<void ()> func) {
|
||||
return seastar::with_gate(async_gate(), [this, id, func =std::move(func)] {
|
||||
future<> tracker::run(repair_uniq_id id, std::function<void ()> func) {
|
||||
return seastar::with_gate(_gate, [this, id, func =std::move(func)] {
|
||||
start(id);
|
||||
return seastar::async([func = std::move(func)] { func(); }).then([this, id] {
|
||||
rlogger.info("repair[{}]: completed successfully", id.uuid());
|
||||
rlogger.info("repair[{}]: completed successfully", id.uuid);
|
||||
done(id, true);
|
||||
}).handle_exception([this, id] (std::exception_ptr ep) {
|
||||
done(id, false);
|
||||
@@ -494,7 +475,7 @@ future<> repair_module::run(repair_uniq_id id, std::function<void ()> func) {
|
||||
}
|
||||
|
||||
void repair_info::check_in_shutdown() {
|
||||
rs.get_repair_module().check_in_shutdown();
|
||||
rs.repair_tracker().check_in_shutdown();
|
||||
}
|
||||
|
||||
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
|
||||
@@ -551,7 +532,7 @@ repair_info::repair_info(repair_service& repair,
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
abort_source* as,
|
||||
shared_ptr<abort_source> as,
|
||||
bool hints_batchlog_flushed)
|
||||
: rs(repair)
|
||||
, db(repair.get_db())
|
||||
@@ -566,6 +547,7 @@ repair_info::repair_info(repair_service& repair,
|
||||
, cfs(get_table_names(db.local(), table_ids_))
|
||||
, table_ids(std::move(table_ids_))
|
||||
, id(id_)
|
||||
, shard(this_shard_id())
|
||||
, data_centers(data_centers_)
|
||||
, hosts(hosts_)
|
||||
, ignore_nodes(ignore_nodes_)
|
||||
@@ -580,15 +562,15 @@ repair_info::repair_info(repair_service& repair,
|
||||
|
||||
void repair_info::check_failed_ranges() {
|
||||
rlogger.info("repair[{}]: shard {} stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
|
||||
id.uuid(), id.shard(), reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
|
||||
id.uuid, shard, reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
|
||||
if (nr_failed_ranges) {
|
||||
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid(), id.shard(), nr_failed_ranges, ranges_size());
|
||||
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid(), id.shard(), nr_failed_ranges, ranges_size()));
|
||||
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid, shard, nr_failed_ranges, ranges_size());
|
||||
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid, shard, nr_failed_ranges, ranges_size()));
|
||||
} else {
|
||||
if (dropped_tables.size()) {
|
||||
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid(), id.shard(), keyspace, dropped_tables);
|
||||
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid, shard, keyspace, dropped_tables);
|
||||
} else {
|
||||
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid(), id.shard(), keyspace);
|
||||
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid, shard, keyspace);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -599,7 +581,7 @@ void repair_info::abort() noexcept {
|
||||
|
||||
void repair_info::check_in_abort() {
|
||||
if (aborted) {
|
||||
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid(), id.shard()));
|
||||
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid, shard));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -629,7 +611,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
|
||||
nr_failed_ranges++;
|
||||
auto status = format("failed: mandatory neighbor={} is not alive", node);
|
||||
rlogger.error("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
abort();
|
||||
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
|
||||
node, keyspace, mandatory_neighbors)));
|
||||
@@ -639,7 +621,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
|
||||
nr_failed_ranges++;
|
||||
auto status = live_neighbors.empty() ? "skipped" : "partial";
|
||||
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
if (live_neighbors.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -648,11 +630,11 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
|
||||
if (neighbors.empty()) {
|
||||
auto status = "skipped_no_followers";
|
||||
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
|
||||
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
rlogger.debug("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}",
|
||||
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors);
|
||||
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors);
|
||||
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] {
|
||||
sstring cf;
|
||||
try {
|
||||
@@ -945,9 +927,9 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
auto table_name = ri->table_names()[idx];
|
||||
// repair all the ranges in limited parallelism
|
||||
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
|
||||
ri->id.uuid(), idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
|
||||
ri->id.uuid, idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
|
||||
co_await coroutine::parallel_for_each(ri->ranges, [ri, table_id] (auto&& range) {
|
||||
return with_semaphore(ri->rs.get_repair_module().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
|
||||
return with_semaphore(ri->rs.repair_tracker().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
|
||||
return ri->repair_range(range, table_id).then([ri] {
|
||||
if (ri->reason == streaming::stream_reason::bootstrap) {
|
||||
ri->rs.get_metrics().bootstrap_finished_ranges++;
|
||||
@@ -964,7 +946,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
ri->nr_ranges_finished++;
|
||||
}
|
||||
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
|
||||
ri->id.uuid(),
|
||||
ri->id.uuid,
|
||||
ri->rs.get_metrics().bootstrap_finished_percentage(),
|
||||
ri->rs.get_metrics().replace_finished_percentage(),
|
||||
ri->rs.get_metrics().rebuild_finished_percentage(),
|
||||
@@ -979,7 +961,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
try {
|
||||
auto& table = ri->db.local().find_column_family(table_id);
|
||||
rlogger.debug("repair[{}]: Trigger off-strategy compaction for keyspace={}, table={}",
|
||||
ri->id.uuid(), table.schema()->ks_name(), table.schema()->cf_name());
|
||||
ri->id.uuid, table.schema()->ks_name(), table.schema()->cf_name());
|
||||
table.trigger_offstrategy_compaction();
|
||||
} catch (replica::no_such_column_family&) {
|
||||
// Ignore dropped table
|
||||
@@ -994,13 +976,13 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
// is assumed to be a indivisible in the sense that all the tokens in has the
|
||||
// same nodes as replicas.
|
||||
static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
ri->rs.get_repair_module().add_repair_info(ri->id.id, ri);
|
||||
ri->rs.repair_tracker().add_repair_info(ri->id.id, ri);
|
||||
return do_repair_ranges(ri).then([ri] {
|
||||
ri->check_failed_ranges();
|
||||
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
|
||||
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([ri] (std::exception_ptr eptr) {
|
||||
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
|
||||
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
|
||||
return make_exception_future<>(std::move(eptr));
|
||||
});
|
||||
}
|
||||
@@ -1013,17 +995,16 @@ static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
|
||||
int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
auto& topology = db.local().get_token_metadata().get_topology();
|
||||
get_repair_module().check_in_shutdown();
|
||||
repair_tracker().check_in_shutdown();
|
||||
|
||||
repair_options options(options_map);
|
||||
|
||||
// Note: Cassandra can, in some cases, decide immediately that there is
|
||||
// nothing to repair, and return 0. "nodetool repair" prints in this case
|
||||
// that "Nothing to repair for keyspace '...'". We don't have such a case
|
||||
// yet. The id field of repair_uniq_ids returned by next_repair_command()
|
||||
// will be >= 1.
|
||||
auto id = _repair_module->new_repair_uniq_id();
|
||||
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid(), keyspace, id.id, options_map);
|
||||
// yet. Real ids returned by next_repair_command() will be >= 1.
|
||||
auto id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid, keyspace, id.id, options_map);
|
||||
|
||||
if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) {
|
||||
throw std::runtime_error("Node is not in NORMAL status yet!");
|
||||
@@ -1105,14 +1086,14 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
std::vector<sstring> cfs =
|
||||
options.column_families.size() ? options.column_families : list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid());
|
||||
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid);
|
||||
return id.id;
|
||||
}
|
||||
|
||||
// Do it in the background.
|
||||
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace),
|
||||
(void)repair_tracker().run(id, [this, &db, id, keyspace = std::move(keyspace),
|
||||
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
|
||||
auto uuid = id.uuid();
|
||||
auto uuid = id.uuid;
|
||||
|
||||
bool needs_flush_before_repair = false;
|
||||
if (db.local().features().tombstone_gc_options) {
|
||||
@@ -1134,7 +1115,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
});
|
||||
auto hints_timeout = std::chrono::seconds(300);
|
||||
auto batchlog_timeout = std::chrono::seconds(300);
|
||||
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
|
||||
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
|
||||
|
||||
try {
|
||||
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
|
||||
@@ -1163,7 +1144,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
repair_results.reserve(smp::count);
|
||||
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
|
||||
abort_source as;
|
||||
auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] {
|
||||
auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] {
|
||||
auto tables = std::list<table_id>(table_ids.begin(), table_ids.end());
|
||||
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
|
||||
auto update_interval = std::chrono::seconds(30);
|
||||
@@ -1200,7 +1181,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
}
|
||||
});
|
||||
|
||||
if (get_repair_module().is_aborted(id.uuid())) {
|
||||
if (repair_tracker().is_aborted(id.uuid)) {
|
||||
throw std::runtime_error("aborted by user request");
|
||||
}
|
||||
|
||||
@@ -1230,7 +1211,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}).handle_exception([id] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep);
|
||||
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid, ep);
|
||||
});
|
||||
|
||||
return id.id;
|
||||
@@ -1245,29 +1226,30 @@ future<int> repair_start(seastar::sharded<repair_service>& repair,
|
||||
|
||||
future<std::vector<int>> repair_service::get_active_repairs() {
|
||||
return container().invoke_on(0, [] (repair_service& rs) {
|
||||
return rs.get_repair_module().get_active();
|
||||
return rs.repair_tracker().get_active();
|
||||
});
|
||||
}
|
||||
|
||||
future<repair_status> repair_service::get_status(int id) {
|
||||
return container().invoke_on(0, [id] (repair_service& rs) {
|
||||
return rs.get_repair_module().get(id);
|
||||
return rs.repair_tracker().get(id);
|
||||
});
|
||||
}
|
||||
|
||||
future<repair_status> repair_service::await_completion(int id, std::chrono::steady_clock::time_point timeout) {
|
||||
return container().invoke_on(0, [id, timeout] (repair_service& rs) {
|
||||
return rs.get_repair_module().repair_await_completion(id, timeout);
|
||||
return rs.repair_tracker().repair_await_completion(id, timeout);
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::shutdown() {
|
||||
co_await repair_tracker().shutdown();
|
||||
co_await remove_repair_meta();
|
||||
}
|
||||
|
||||
future<> repair_service::abort_all() {
|
||||
return container().invoke_on_all([] (repair_service& rs) {
|
||||
return rs.get_repair_module().abort_all_repairs();
|
||||
return rs.repair_tracker().abort_all_repairs();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1293,18 +1275,18 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
shared_ptr<node_ops_info> ops_info) {
|
||||
seastar::sharded<replica::database>& db = get_db();
|
||||
|
||||
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
|
||||
return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
|
||||
repair_uniq_id id = repair_tracker().next_repair_command();
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid, keyspace);
|
||||
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
|
||||
auto cfs = list_column_families(db.local(), keyspace);
|
||||
if (cfs.empty()) {
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace);
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid, keyspace);
|
||||
return;
|
||||
}
|
||||
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
|
||||
std::vector<future<>> repair_results;
|
||||
repair_results.reserve(smp::count);
|
||||
if (get_repair_module().is_aborted(id.uuid())) {
|
||||
if (repair_tracker().is_aborted(id.uuid)) {
|
||||
throw std::runtime_error("aborted by user request");
|
||||
}
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
@@ -1313,10 +1295,9 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
bool hints_batchlog_flushed = false;
|
||||
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
|
||||
auto ri = make_lw_shared<repair_info>(local_repair,
|
||||
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
|
||||
ri->neighbors = std::move(neighbors);
|
||||
return repair_ranges(ri);
|
||||
});
|
||||
@@ -1337,13 +1318,13 @@ future<> repair_service::do_sync_data_using_repair(
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}).then([id, keyspace] {
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid(), keyspace);
|
||||
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid, keyspace);
|
||||
}).handle_exception([&db, id, keyspace] (std::exception_ptr ep) {
|
||||
if (!db.local().has_keyspace(keyspace)) {
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid(), keyspace, ep);
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid, keyspace, ep);
|
||||
return make_ready_future<>();
|
||||
}
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid(), keyspace, ep);
|
||||
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid, keyspace, ep);
|
||||
return make_exception_future<>(ep);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -28,11 +28,6 @@
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/sync_boundary.hh"
|
||||
#include "tasks/types.hh"
|
||||
|
||||
namespace tasks {
|
||||
class repair_module;
|
||||
}
|
||||
|
||||
namespace replica {
|
||||
class database;
|
||||
@@ -67,42 +62,16 @@ public:
|
||||
struct repair_uniq_id {
|
||||
// The integer ID used to identify a repair job. It is currently used by nodetool and http API.
|
||||
int id;
|
||||
// Task info containing a UUID to identifiy a repair job, and a shard of the job.
|
||||
// We will transit to use UUID over the integer ID.
|
||||
tasks::task_info task_info;
|
||||
|
||||
tasks::task_id uuid() const noexcept {
|
||||
return task_info.id;
|
||||
}
|
||||
|
||||
unsigned shard() const noexcept {
|
||||
return task_info.shard;
|
||||
}
|
||||
// A UUID to identifiy a repair job. We will transit to use UUID over the integer ID.
|
||||
utils::UUID uuid;
|
||||
};
|
||||
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
|
||||
|
||||
class node_ops_info {
|
||||
public:
|
||||
struct node_ops_info {
|
||||
utils::UUID ops_uuid;
|
||||
shared_ptr<abort_source> as;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
|
||||
private:
|
||||
optimized_optional<abort_source::subscription> _abort_subscription;
|
||||
sharded<abort_source> _sas;
|
||||
future<> _abort_done = make_ready_future<>();
|
||||
|
||||
public:
|
||||
node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
|
||||
node_ops_info(const node_ops_info&) = delete;
|
||||
node_ops_info(node_ops_info&&) = delete;
|
||||
|
||||
future<> start();
|
||||
future<> stop() noexcept;
|
||||
|
||||
void check_abort();
|
||||
|
||||
abort_source* local_abort_source();
|
||||
};
|
||||
|
||||
// NOTE: repair_start() can be run on any node, but starts a node-global
|
||||
@@ -184,6 +153,7 @@ public:
|
||||
std::vector<sstring> cfs;
|
||||
std::vector<table_id> table_ids;
|
||||
repair_uniq_id id;
|
||||
shard_id shard;
|
||||
std::vector<sstring> data_centers;
|
||||
std::vector<sstring> hosts;
|
||||
std::unordered_set<gms::inet_address> ignore_nodes;
|
||||
@@ -209,7 +179,7 @@ public:
|
||||
const std::vector<sstring>& hosts_,
|
||||
const std::unordered_set<gms::inet_address>& ingore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
abort_source* as,
|
||||
shared_ptr<abort_source> as,
|
||||
bool hints_batchlog_flushed);
|
||||
void check_failed_ranges();
|
||||
void abort() noexcept;
|
||||
@@ -232,6 +202,58 @@ public:
|
||||
size_t ranges_size();
|
||||
};
|
||||
|
||||
// The repair_tracker tracks ongoing repair operations and their progress.
|
||||
// A repair which has already finished successfully is dropped from this
|
||||
// table, but a failed repair will remain in the table forever so it can
|
||||
// be queried about more than once (FIXME: reconsider this. But note that
|
||||
// failed repairs should be rare anwyay).
|
||||
// This object is not thread safe, and must be used by only one cpu.
|
||||
class tracker {
|
||||
private:
|
||||
// Each repair_start() call returns a unique int which the user can later
|
||||
// use to follow the status of this repair with repair_status().
|
||||
// We can't use the number 0 - if repair_start() returns 0, it means it
|
||||
// decide quickly that there is nothing to repair.
|
||||
int _next_repair_command = 1;
|
||||
// Note that there are no "SUCCESSFUL" entries in the "status" map:
|
||||
// Successfully-finished repairs are those with id < _next_repair_command
|
||||
// but aren't listed as running or failed the status map.
|
||||
std::unordered_map<int, repair_status> _status;
|
||||
// Used to allow shutting down repairs in progress, and waiting for them.
|
||||
seastar::gate _gate;
|
||||
// Set when the repair service is being shutdown
|
||||
std::atomic_bool _shutdown alignas(seastar::cache_line_size);
|
||||
// Map repair id into repair_info.
|
||||
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
|
||||
std::unordered_set<utils::UUID> _pending_repairs;
|
||||
std::unordered_set<utils::UUID> _aborted_pending_repairs;
|
||||
// The semaphore used to control the maximum
|
||||
// ranges that can be repaired in parallel.
|
||||
named_semaphore _range_parallelism_semaphore;
|
||||
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
|
||||
seastar::condition_variable _done_cond;
|
||||
void start(repair_uniq_id id);
|
||||
void done(repair_uniq_id id, bool succeeded);
|
||||
public:
|
||||
explicit tracker(size_t max_repair_memory);
|
||||
repair_status get(int id) const;
|
||||
repair_uniq_id next_repair_command();
|
||||
future<> shutdown();
|
||||
void check_in_shutdown();
|
||||
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
|
||||
void remove_repair_info(int id);
|
||||
lw_shared_ptr<repair_info> get_repair_info(int id);
|
||||
std::vector<int> get_active() const;
|
||||
size_t nr_running_repair_jobs();
|
||||
void abort_all_repairs();
|
||||
named_semaphore& range_parallelism_semaphore();
|
||||
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress(streaming::stream_reason reason);
|
||||
bool is_aborted(const utils::UUID& uuid);
|
||||
};
|
||||
|
||||
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
|
||||
const sstring& cf, const dht::token_range& range);
|
||||
|
||||
@@ -391,7 +413,7 @@ struct node_ops_cmd_response {
|
||||
|
||||
|
||||
struct repair_update_system_table_request {
|
||||
tasks::task_id repair_uuid;
|
||||
utils::UUID repair_uuid;
|
||||
table_id table_uuid;
|
||||
sstring keyspace_name;
|
||||
sstring table_name;
|
||||
@@ -403,7 +425,7 @@ struct repair_update_system_table_response {
|
||||
};
|
||||
|
||||
struct repair_flush_hints_batchlog_request {
|
||||
tasks::task_id repair_uuid;
|
||||
utils::UUID repair_uuid;
|
||||
std::list<gms::inet_address> target_nodes;
|
||||
std::chrono::seconds hints_timeout;
|
||||
std::chrono::seconds batchlog_timeout;
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "repair/repair.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
|
||||
class repair_task_impl : public tasks::task_manager::task::impl {
|
||||
public:
|
||||
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id)
|
||||
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) {
|
||||
_status.progress_units = "ranges";
|
||||
}
|
||||
protected:
|
||||
repair_uniq_id get_repair_uniq_id() const noexcept {
|
||||
return repair_uniq_id{
|
||||
.id = _status.sequence_number,
|
||||
.task_info = tasks::task_info(_status.id, _status.shard)
|
||||
};
|
||||
}
|
||||
|
||||
virtual future<> run() override = 0;
|
||||
};
|
||||
|
||||
// The repair_module tracks ongoing repair operations and their progress.
|
||||
// A repair which has already finished successfully is dropped from this
|
||||
// table, but a failed repair will remain in the table forever so it can
|
||||
// be queried about more than once (FIXME: reconsider this. But note that
|
||||
// failed repairs should be rare anwyay).
|
||||
class repair_module : public tasks::task_manager::module {
|
||||
private:
|
||||
repair_service& _rs;
|
||||
// Note that there are no "SUCCESSFUL" entries in the "status" map:
|
||||
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
|
||||
// but aren't listed as running or failed the status map.
|
||||
std::unordered_map<int, repair_status> _status;
|
||||
// Map repair id into repair_info.
|
||||
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
|
||||
std::unordered_set<tasks::task_id> _pending_repairs;
|
||||
std::unordered_set<tasks::task_id> _aborted_pending_repairs;
|
||||
// The semaphore used to control the maximum
|
||||
// ranges that can be repaired in parallel.
|
||||
named_semaphore _range_parallelism_semaphore;
|
||||
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
|
||||
seastar::condition_variable _done_cond;
|
||||
void start(repair_uniq_id id);
|
||||
void done(repair_uniq_id id, bool succeeded);
|
||||
public:
|
||||
repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
|
||||
|
||||
repair_service& get_repair_service() noexcept {
|
||||
return _rs;
|
||||
}
|
||||
|
||||
repair_uniq_id new_repair_uniq_id() noexcept {
|
||||
return repair_uniq_id{
|
||||
.id = new_sequence_number(),
|
||||
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
|
||||
};
|
||||
}
|
||||
|
||||
repair_status get(int id) const;
|
||||
void check_in_shutdown();
|
||||
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
|
||||
void remove_repair_info(int id);
|
||||
lw_shared_ptr<repair_info> get_repair_info(int id);
|
||||
std::vector<int> get_active() const;
|
||||
size_t nr_running_repair_jobs();
|
||||
void abort_all_repairs();
|
||||
named_semaphore& range_parallelism_semaphore();
|
||||
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress(streaming::stream_reason reason);
|
||||
bool is_aborted(const tasks::task_id& uuid);
|
||||
};
|
||||
@@ -9,7 +9,6 @@
|
||||
#include <seastar/util/defer.hh>
|
||||
#include "repair/repair.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "repair/repair_task.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
@@ -2449,7 +2448,7 @@ private:
|
||||
|
||||
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
|
||||
// Max buffer size per repair round
|
||||
return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range() : 256 * 1024;
|
||||
return is_rpc_stream_supported(algo) ? tracker::max_repair_memory_per_range() : 256 * 1024;
|
||||
}
|
||||
|
||||
// Step A: Negotiate sync boundary to use
|
||||
@@ -2685,7 +2684,7 @@ private:
|
||||
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
|
||||
if (_ri.total_rf != repaired_replicas){
|
||||
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
|
||||
_ri.id.uuid(), _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
|
||||
_ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
|
||||
co_return;
|
||||
}
|
||||
// Update repair_history table only if both hints and batchlog have been flushed.
|
||||
@@ -2693,12 +2692,12 @@ private:
|
||||
co_return;
|
||||
}
|
||||
repair_service& rs = _ri.rs;
|
||||
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid(), _table_id, _range, _start_time);
|
||||
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time);
|
||||
if (!repair_time_opt) {
|
||||
co_return;
|
||||
}
|
||||
auto repair_time = repair_time_opt.value();
|
||||
repair_update_system_table_request req{_ri.id.uuid(), _table_id, _ri.keyspace, _cf_name, _range, repair_time};
|
||||
repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time};
|
||||
auto all_nodes = _all_live_peer_nodes;
|
||||
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
|
||||
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
|
||||
@@ -2706,9 +2705,9 @@ private:
|
||||
auto& ms = _ri.messaging.local();
|
||||
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
|
||||
(void)resp; // nothing to do with the response yet
|
||||
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid(), node);
|
||||
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node);
|
||||
} catch (...) {
|
||||
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid(), node, std::current_exception());
|
||||
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception());
|
||||
}
|
||||
});
|
||||
co_return;
|
||||
@@ -2733,13 +2732,13 @@ public:
|
||||
|
||||
auto& mem_sem = _ri.rs.memory_sem();
|
||||
auto max = _ri.rs.max_repair_memory();
|
||||
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range();
|
||||
auto wanted = (_all_live_peer_nodes.size() + 1) * tracker::max_repair_memory_per_range();
|
||||
wanted = std::min(max, wanted);
|
||||
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
|
||||
_ri.id.uuid(), wanted, mem_sem.current(), max);
|
||||
_ri.id.uuid, wanted, mem_sem.current(), max);
|
||||
auto mem_permit = seastar::get_units(mem_sem, wanted).get0();
|
||||
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
|
||||
_ri.id.uuid(), wanted, mem_sem.current(), max);
|
||||
_ri.id.uuid, wanted, mem_sem.current(), max);
|
||||
|
||||
auto permit = _ri.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
|
||||
|
||||
@@ -2811,11 +2810,11 @@ public:
|
||||
} catch (replica::no_such_column_family& e) {
|
||||
table_dropped = true;
|
||||
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
|
||||
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
|
||||
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
|
||||
_failed = true;
|
||||
} catch (std::exception& e) {
|
||||
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
|
||||
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
|
||||
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
|
||||
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
|
||||
_failed = true;
|
||||
}
|
||||
@@ -2918,7 +2917,6 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory)
|
||||
: _gossiper(gossiper)
|
||||
@@ -2929,13 +2927,12 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _sys_ks(sys_ks)
|
||||
, _view_update_generator(vug)
|
||||
, _repair_module(seastar::make_shared<repair_module>(tm, *this, max_repair_memory))
|
||||
, _mm(mm)
|
||||
, _node_ops_metrics(_repair_module)
|
||||
, _tracker(max_repair_memory)
|
||||
, _node_ops_metrics(_tracker)
|
||||
, _max_repair_memory(max_repair_memory)
|
||||
, _memory_sem(max_repair_memory)
|
||||
{
|
||||
tm.register_module("repair", _repair_module);
|
||||
if (this_shard_id() == 0) {
|
||||
_gossip_helper = make_shared<row_level_repair_gossip_helper>(*this);
|
||||
_gossiper.local().register_(_gossip_helper);
|
||||
@@ -2948,7 +2945,6 @@ future<> repair_service::start() {
|
||||
}
|
||||
|
||||
future<> repair_service::stop() {
|
||||
co_await _repair_module->stop();
|
||||
co_await uninit_ms_handlers();
|
||||
if (this_shard_id() == 0) {
|
||||
co_await _gossiper.local().unregister_(_gossip_helper);
|
||||
@@ -2960,12 +2956,12 @@ repair_service::~repair_service() {
|
||||
assert(_stopped);
|
||||
}
|
||||
|
||||
static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
|
||||
return shard_id(repair_id.uuid().get_most_significant_bits()) % smp::count;
|
||||
static shard_id repair_id_to_shard(utils::UUID& repair_id) {
|
||||
return shard_id(repair_id.get_most_significant_bits()) % smp::count;
|
||||
}
|
||||
|
||||
future<std::optional<gc_clock::time_point>>
|
||||
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
|
||||
repair_service::update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
|
||||
auto shard = repair_id_to_shard(repair_id);
|
||||
return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
|
||||
repair_history& rh = rs._finished_ranges_history[repair_id];
|
||||
@@ -2986,7 +2982,7 @@ repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht:
|
||||
});
|
||||
}
|
||||
|
||||
future<> repair_service::cleanup_history(tasks::task_id repair_id) {
|
||||
future<> repair_service::cleanup_history(utils::UUID repair_id) {
|
||||
auto shard = repair_id_to_shard(repair_id);
|
||||
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
|
||||
rs._finished_ranges_history.erase(repair_id);
|
||||
|
||||
@@ -11,8 +11,6 @@
|
||||
#include <vector>
|
||||
#include "gms/inet_address.hh"
|
||||
#include "repair/repair.hh"
|
||||
#include "repair/repair_task.hh"
|
||||
#include "tasks/task_manager.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
|
||||
@@ -54,9 +52,9 @@ public:
|
||||
};
|
||||
|
||||
class node_ops_metrics {
|
||||
shared_ptr<repair_module> _module;
|
||||
tracker& _tracker;
|
||||
public:
|
||||
node_ops_metrics(shared_ptr<repair_module> module);
|
||||
node_ops_metrics(tracker& tracker);
|
||||
|
||||
uint64_t bootstrap_total_ranges{0};
|
||||
uint64_t bootstrap_finished_ranges{0};
|
||||
@@ -90,13 +88,13 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
sharded<db::view::view_update_generator>& _view_update_generator;
|
||||
shared_ptr<repair_module> _repair_module;
|
||||
service::migration_manager& _mm;
|
||||
tracker _tracker;
|
||||
node_ops_metrics _node_ops_metrics;
|
||||
std::unordered_map<node_repair_meta_id, repair_meta_ptr> _repair_metas;
|
||||
uint32_t _next_repair_meta_id = 0; // used only on shard 0
|
||||
|
||||
std::unordered_map<tasks::task_id, repair_history> _finished_ranges_history;
|
||||
std::unordered_map<utils::UUID, repair_history> _finished_ranges_history;
|
||||
|
||||
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
|
||||
bool _stopped = false;
|
||||
@@ -116,7 +114,6 @@ public:
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<db::view::view_update_generator>& vug,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm, size_t max_repair_memory);
|
||||
~repair_service();
|
||||
future<> start();
|
||||
@@ -129,8 +126,8 @@ public:
|
||||
// stop them abruptly).
|
||||
future<> shutdown();
|
||||
|
||||
future<std::optional<gc_clock::time_point>> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
|
||||
future<> cleanup_history(tasks::task_id repair_id);
|
||||
future<std::optional<gc_clock::time_point>> update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
|
||||
future<> cleanup_history(utils::UUID repair_id);
|
||||
future<> load_history();
|
||||
|
||||
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
|
||||
@@ -174,8 +171,11 @@ public:
|
||||
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
|
||||
size_t max_repair_memory() const { return _max_repair_memory; }
|
||||
seastar::semaphore& memory_sem() { return _memory_sem; }
|
||||
repair_module& get_repair_module() noexcept {
|
||||
return *_repair_module;
|
||||
tracker& repair_tracker() {
|
||||
return _tracker;
|
||||
}
|
||||
const tracker& repair_tracker() const {
|
||||
return _tracker;
|
||||
}
|
||||
|
||||
const node_ops_metrics& get_metrics() const noexcept {
|
||||
|
||||
@@ -114,11 +114,11 @@ have_gnutls = any([lib.startswith('libgnutls.so')
|
||||
gzip_process = subprocess.Popen("pigz > "+output, shell=True, stdin=subprocess.PIPE)
|
||||
|
||||
ar = tarfile.open(fileobj=gzip_process.stdin, mode='w|')
|
||||
# relocatable package format version = 3.0
|
||||
# relocatable package format version = 2.2
|
||||
shutil.rmtree(f'build/{SCYLLA_DIR}', ignore_errors=True)
|
||||
os.makedirs(f'build/{SCYLLA_DIR}')
|
||||
with open(f'build/{SCYLLA_DIR}/.relocatable_package_version', 'w') as f:
|
||||
f.write('3.0\n')
|
||||
f.write('2.2\n')
|
||||
ar.add(f'build/{SCYLLA_DIR}/.relocatable_package_version', arcname='.relocatable_package_version')
|
||||
|
||||
for exe in executables_scylla:
|
||||
|
||||
@@ -2720,6 +2720,7 @@ class lsa_object_descriptor(object):
|
||||
value |= (b & 0x3f) << shift
|
||||
return lsa_object_descriptor(value, start_pos, pos)
|
||||
mig_re = re.compile(r'.* standard_migrator<(.*)>\+16>,')
|
||||
vec_ext_re = re.compile(r'managed_vector<(.*), (.*u), (.*)>::external')
|
||||
|
||||
def __init__(self, value, desc_pos, obj_pos):
|
||||
self.value = value
|
||||
@@ -2732,12 +2733,10 @@ class lsa_object_descriptor(object):
|
||||
def dead_size(self):
|
||||
return self.value / 2
|
||||
|
||||
def migrator_ptr(self):
|
||||
static_migrators = gdb.parse_and_eval("'::debug::static_migrators'")
|
||||
return static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
|
||||
|
||||
def migrator(self):
|
||||
return self.migrator_ptr().dereference()
|
||||
static_migrators = gdb.parse_and_eval("'::debug::static_migrators'")
|
||||
migrator = static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
|
||||
return migrator.dereference()
|
||||
|
||||
def migrator_str(self):
|
||||
mig = str(self.migrator())
|
||||
@@ -2745,11 +2744,29 @@ class lsa_object_descriptor(object):
|
||||
return m.group(1)
|
||||
|
||||
def live_size(self):
|
||||
mig = self.migrator_ptr()
|
||||
obj = int(self.obj_pos)
|
||||
cmd = f'((migrate_fn_type*){mig})->size((const void*){obj})'
|
||||
res = gdb.parse_and_eval(cmd)
|
||||
return int(res)
|
||||
mig = str(self.migrator())
|
||||
m = re.match(self.mig_re, mig)
|
||||
if m:
|
||||
type = m.group(1)
|
||||
external = self.vec_ext_re.match(type)
|
||||
if type == 'blob_storage':
|
||||
t = gdb.lookup_type('blob_storage')
|
||||
blob = self.obj_pos.cast(t.pointer())
|
||||
return t.sizeof + blob['frag_size']
|
||||
elif external:
|
||||
element_type = external.group(1)
|
||||
count = external.group(2)
|
||||
size_type = external.group(3)
|
||||
vec_type = gdb.lookup_type('managed_vector<%s, %s, %s>' % (element_type, count, size_type))
|
||||
# gdb doesn't see 'external' for some reason
|
||||
backref_ptr = self.obj_pos.cast(vec_type.pointer().pointer())
|
||||
vec = backref_ptr.dereference()
|
||||
element_count = vec['_capacity']
|
||||
element_type = gdb.lookup_type(element_type)
|
||||
return backref_ptr.type.sizeof + element_count * element_type.sizeof
|
||||
else:
|
||||
return gdb.lookup_type(type).sizeof
|
||||
return 0
|
||||
|
||||
def end_pos(self):
|
||||
if self.is_live():
|
||||
|
||||
@@ -5077,7 +5077,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
service_permit permit) {
|
||||
const dht::token& token = pr.start()->value().token();
|
||||
speculative_retry::type retry_type = schema->speculative_retry().get_type();
|
||||
std::optional<gms::inet_address> extra_replica;
|
||||
gms::inet_address extra_replica;
|
||||
|
||||
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
|
||||
// Check for a non-local read before heat-weighted load balancing
|
||||
@@ -5141,12 +5141,12 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
|
||||
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
|
||||
if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present
|
||||
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
|
||||
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
|
||||
if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) {
|
||||
slogger.trace("read executor no extra target to speculate");
|
||||
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
|
||||
} else {
|
||||
target_replicas.push_back(*extra_replica);
|
||||
slogger.trace("creating read executor with extra target {}", *extra_replica);
|
||||
target_replicas.push_back(extra_replica);
|
||||
slogger.trace("creating read executor with extra target {}", extra_replica);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -618,7 +618,7 @@ future<> storage_service::mark_existing_views_as_built(sharded<db::system_distri
|
||||
});
|
||||
}
|
||||
|
||||
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
|
||||
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace() {
|
||||
std::vector<sstring> ignore_nodes_strs;
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
|
||||
@@ -628,11 +628,11 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
|
||||
std::replace(n.begin(), n.end(), '\'', ' ');
|
||||
boost::trim_all(n);
|
||||
if (!n.empty()) {
|
||||
auto ep_and_id = tm.parse_host_id_and_endpoint(n);
|
||||
ignore_nodes.push_back(ep_and_id.endpoint);
|
||||
auto node = gms::inet_address(n);
|
||||
ignore_nodes.push_back(node);
|
||||
}
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
|
||||
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
|
||||
}
|
||||
}
|
||||
return ignore_nodes;
|
||||
@@ -2204,8 +2204,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_tokens) {
|
||||
auto replace_address = get_replace_address().value();
|
||||
auto uuid = utils::make_random_uuid();
|
||||
auto tmptr = get_token_metadata_ptr();
|
||||
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr);
|
||||
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace();
|
||||
// Step 1: Decide who needs to sync data for replace operation
|
||||
std::list<gms::inet_address> sync_nodes;
|
||||
for (const auto& x :_gossiper.get_endpoint_states()) {
|
||||
@@ -2249,7 +2248,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
if (!nodes_down.empty()) {
|
||||
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e", uuid, nodes_down);
|
||||
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2", uuid, nodes_down);
|
||||
slogger.warn("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
@@ -2329,11 +2328,12 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
|
||||
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
|
||||
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
|
||||
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
|
||||
auto uuid = utils::make_random_uuid();
|
||||
auto tmptr = ss.get_token_metadata_ptr();
|
||||
auto host_id = locator::host_id(utils::UUID(host_id_string));
|
||||
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
|
||||
if (!endpoint_opt) {
|
||||
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
|
||||
@@ -2341,11 +2341,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
|
||||
auto endpoint = *endpoint_opt;
|
||||
auto tokens = tmptr->get_tokens(endpoint);
|
||||
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
|
||||
std::list<gms::inet_address> ignore_nodes;
|
||||
for (auto& hoep : ignore_nodes_params) {
|
||||
hoep.resolve(*tmptr);
|
||||
ignore_nodes.push_back(hoep.endpoint);
|
||||
}
|
||||
|
||||
// Step 1: Decide who needs to sync data
|
||||
//
|
||||
@@ -2539,7 +2534,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
|
||||
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2588,7 +2582,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
|
||||
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2633,7 +2626,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
|
||||
// Wait for local node has marked replacing node as alive
|
||||
@@ -2688,7 +2680,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
|
||||
});
|
||||
},
|
||||
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
|
||||
meta.start().get();
|
||||
_node_ops.emplace(ops_uuid, std::move(meta));
|
||||
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
|
||||
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
|
||||
@@ -2934,7 +2925,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
|
||||
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
|
||||
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
|
||||
auto ops_uuid = utils::make_random_uuid();
|
||||
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
|
||||
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
|
||||
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
|
||||
return send_replication_notification(notify_endpoint);
|
||||
});
|
||||
@@ -3667,19 +3658,11 @@ node_ops_meta_data::node_ops_meta_data(
|
||||
, _abort(std::move(abort_func))
|
||||
, _abort_source(seastar::make_shared<abort_source>())
|
||||
, _signal(std::move(signal_func))
|
||||
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
|
||||
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
|
||||
, _watchdog([sig = _signal] { sig(); }) {
|
||||
_watchdog.arm(_watchdog_interval);
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::start() {
|
||||
return _ops ? _ops->start() : make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::stop() noexcept {
|
||||
return _ops ? _ops->stop() : make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> node_ops_meta_data::abort() {
|
||||
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
|
||||
_watchdog.cancel();
|
||||
@@ -3725,7 +3708,6 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
meta.cancel_watchdog();
|
||||
co_await meta.stop();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -3733,24 +3715,6 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
|
||||
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
|
||||
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
|
||||
|
||||
if (!ops_uuid) {
|
||||
for (auto& [uuid, meta] : _node_ops) {
|
||||
co_await meta.abort();
|
||||
auto as = meta.get_abort_source();
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
}
|
||||
|
||||
for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
co_await meta.stop();
|
||||
}
|
||||
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto it = _node_ops.find(ops_uuid);
|
||||
if (it != _node_ops.end()) {
|
||||
node_ops_meta_data& meta = it->second;
|
||||
@@ -3759,7 +3723,6 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
|
||||
if (as && !as->abort_requested()) {
|
||||
as->request_abort();
|
||||
}
|
||||
co_await meta.stop();
|
||||
_node_ops.erase(it);
|
||||
}
|
||||
}
|
||||
@@ -3778,18 +3741,17 @@ future<> storage_service::node_ops_abort_thread() {
|
||||
while (!_node_ops_abort_queue.empty()) {
|
||||
auto uuid_opt = _node_ops_abort_queue.front();
|
||||
_node_ops_abort_queue.pop_front();
|
||||
if (!uuid_opt) {
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid()));
|
||||
co_await node_ops_abort(*uuid_opt);
|
||||
} catch (...) {
|
||||
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
|
||||
}
|
||||
if (!uuid_opt) {
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
}
|
||||
__builtin_unreachable();
|
||||
slogger.info("Stopped node_ops_abort_thread");
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
|
||||
@@ -105,8 +105,6 @@ public:
|
||||
std::list<gms::inet_address> ignore_nodes,
|
||||
std::function<future<> ()> abort_func,
|
||||
std::function<void ()> signal_func);
|
||||
future<> start();
|
||||
future<> stop() noexcept;
|
||||
shared_ptr<node_ops_info> get_ops_info();
|
||||
shared_ptr<abort_source> get_abort_source();
|
||||
future<> abort();
|
||||
@@ -300,7 +298,7 @@ private:
|
||||
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens);
|
||||
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
|
||||
|
||||
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
|
||||
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace();
|
||||
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);
|
||||
|
||||
public:
|
||||
@@ -706,7 +704,7 @@ public:
|
||||
*
|
||||
* @param hostIdString token for the node
|
||||
*/
|
||||
future<> removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes);
|
||||
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
|
||||
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
|
||||
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
||||
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
||||
|
||||
@@ -14,15 +14,14 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "db_clock.hh"
|
||||
#include "log.hh"
|
||||
#include "tasks/types.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
namespace tasks {
|
||||
|
||||
using task_id = utils::tagged_uuid<struct task_id_tag>;
|
||||
using is_abortable = bool_class <struct abortable_tag>;
|
||||
using is_internal = bool_class<struct internal_tag>;
|
||||
|
||||
extern logging::logger tmlogger;
|
||||
|
||||
@@ -65,6 +64,17 @@ public:
|
||||
failed
|
||||
};
|
||||
|
||||
struct parent_data {
|
||||
task_id id;
|
||||
unsigned shard;
|
||||
|
||||
parent_data() : id(task_id::create_null_id()) {}
|
||||
|
||||
operator bool() const noexcept {
|
||||
return bool(id);
|
||||
}
|
||||
};
|
||||
|
||||
class task : public enable_lw_shared_from_this<task> {
|
||||
public:
|
||||
struct progress {
|
||||
@@ -121,10 +131,6 @@ public:
|
||||
return is_abortable::no;
|
||||
}
|
||||
|
||||
virtual is_internal is_internal() const noexcept {
|
||||
return is_internal::no;
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -237,10 +243,6 @@ public:
|
||||
return _impl->is_abortable();
|
||||
};
|
||||
|
||||
is_internal is_internal() const noexcept {
|
||||
return _impl->is_internal();
|
||||
}
|
||||
|
||||
future<> abort() noexcept {
|
||||
return _impl->abort();
|
||||
}
|
||||
@@ -318,37 +320,37 @@ public:
|
||||
co_await _gate.close();
|
||||
_tm.unregister_module(_name);
|
||||
}
|
||||
public:
|
||||
|
||||
template<typename T>
|
||||
requires std::is_base_of_v<task_manager::task::impl, T>
|
||||
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_info parent_d = task_info{}) {
|
||||
return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
|
||||
auto module_ptr = tm.find_module(module);
|
||||
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
|
||||
return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) {
|
||||
return task->id();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Must be called on target shard.
|
||||
// If task has a parent, data concerning its children is updated and sequence number is inherited
|
||||
// from a parent and set. Otherwise, it must be set by caller.
|
||||
future<task_ptr> make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) {
|
||||
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
|
||||
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", parent_data parent_d = parent_data{}) {
|
||||
foreign_task_ptr parent;
|
||||
uint64_t sequence_number = 0;
|
||||
if (parent_d) {
|
||||
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable {
|
||||
parent = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id] (task_manager& tm) mutable -> future<foreign_task_ptr> {
|
||||
const auto& all_tasks = tm.get_all_tasks();
|
||||
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
|
||||
it->second->add_child(std::move(task));
|
||||
return make_ready_future<uint64_t>(it->second->get_sequence_number());
|
||||
co_return it->second;
|
||||
} else {
|
||||
return make_exception_future<uint64_t>(task_manager::task_not_found(id));
|
||||
co_return coroutine::return_exception(task_manager::task_not_found(id));
|
||||
}
|
||||
});
|
||||
sequence_number = parent->get_sequence_number();
|
||||
}
|
||||
co_return task;
|
||||
|
||||
auto task = co_await _tm.container().invoke_on(shard, [id, module = _name, sequence_number, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
|
||||
auto module_ptr = tm.find_module(module);
|
||||
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? sequence_number : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
|
||||
return make_ready_future<foreign_task_ptr>(make_lw_shared<task_manager::task>(std::move(task_impl_ptr)));
|
||||
});
|
||||
id = task->id();
|
||||
|
||||
if (parent_d) {
|
||||
co_await _tm.container().invoke_on(parent.get_owner_shard(), [task = std::move(parent), child = std::move(task)] (task_manager& tm) mutable {
|
||||
task->add_child(std::move(child));
|
||||
});
|
||||
}
|
||||
co_return id;
|
||||
}
|
||||
};
|
||||
public:
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2022-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
namespace tasks {
|
||||
|
||||
using task_id = utils::tagged_uuid<struct task_id_tag>;
|
||||
|
||||
struct task_info {
|
||||
task_id id;
|
||||
unsigned shard;
|
||||
|
||||
task_info() noexcept : id(task_id::create_null_id()) {}
|
||||
task_info(task_id id, unsigned parent_shard) noexcept : id(id), shard(parent_shard) {}
|
||||
|
||||
operator bool() const noexcept {
|
||||
return bool(id);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
@@ -1,46 +0,0 @@
|
||||
# Copyright 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
|
||||
#############################################################################
|
||||
# Tests involving the "timestamp" column type.
|
||||
#
|
||||
# Please do not confuse the "timestamp" column type tested here, with the
|
||||
# unrelated concept of the "timestamp" of a write (the "USING TIMESTAMP").
|
||||
# That latter concept is tested in test_timestamp.py, not here.
|
||||
#############################################################################
|
||||
|
||||
from util import new_test_table, unique_key_int
|
||||
import pytest
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def table1(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int primary key, t timestamp") as table:
|
||||
yield table
|
||||
|
||||
# According to the Cassandra documentation, a timestamp is a 64-bit integer
|
||||
# "representing a number of milliseconds since the standard base time known
|
||||
# as the epoch". This underlying "number" can be stored into a timestamp
|
||||
# column by writing an integer directly to it, and later the number can be
|
||||
# extracted during select with the built-in tounixtimestamp() function.
|
||||
#
|
||||
# In issue #11588, a user who mistakenly used a microsecond count reported
|
||||
# that the write was successful, but then the read failed reporting that
|
||||
# "timestamp is out of range. Must be in milliseconds since epoch".
|
||||
# This is wrong and unhelpful beahvior, and the following test reproduces
|
||||
# it. Cassandra currently allows the out-of-range number to be both written
|
||||
# and then read - so that is the Cassandra-compatible approach - but
|
||||
# arguably a more correct and useful behavior would be to fail the write
|
||||
# up-front, even before reaching the read.
|
||||
@pytest.mark.xfail(reason="issue #11588")
|
||||
def test_type_timestamp_overflow(cql, table1):
|
||||
p = unique_key_int()
|
||||
t = 1667215862 * 1000000
|
||||
# t is out of the normal range for milliseconds since the epoch (it
|
||||
# was calculated as microseconds since the epoch), but Cassandra allows
|
||||
# to write it. Scylla may decide in the future to fail this write (in
|
||||
# which case this test should be changed to accept both options, or
|
||||
# be marked scylla_only), but passing the write and failing the read
|
||||
# (as in #11588) is a bug.
|
||||
cql.execute(f"INSERT INTO {table1} (p, t) VALUES ({p}, {t})")
|
||||
assert list(cql.execute(f"SELECT tounixtimestamp(t) from {table1} where p = {p}")) == [(t,)]
|
||||
@@ -1436,21 +1436,6 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
|
||||
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(5)), {}))
|
||||
.produces_end_of_stream();
|
||||
|
||||
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
|
||||
s.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::yes,
|
||||
mutation_reader::forwarding::no))
|
||||
.produces_partition_start(pkey)
|
||||
.produces_end_of_stream()
|
||||
.fast_forward_to(position_range(
|
||||
position_in_partition::after_key(s.make_ckey(0)),
|
||||
position_in_partition::for_key(s.make_ckey(2))))
|
||||
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
|
||||
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {}))
|
||||
.produces_end_of_stream();
|
||||
|
||||
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
|
||||
s.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
|
||||
7
types.hh
7
types.hh
@@ -33,7 +33,6 @@
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "utils/bit_cast.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "tasks/types.hh"
|
||||
|
||||
class tuple_type_impl;
|
||||
class big_decimal;
|
||||
@@ -1042,12 +1041,6 @@ shared_ptr<const abstract_type> data_type_for<cql_duration>() {
|
||||
return duration_type;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline
|
||||
shared_ptr<const abstract_type> data_type_for<tasks::task_id>() {
|
||||
return uuid_type;
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template <>
|
||||
|
||||
@@ -51,10 +51,9 @@ done
|
||||
|
||||
UNIFIED_PKG="$(realpath -s $UNIFIED_PKG)"
|
||||
PKGS="build/$MODE/dist/tar/$PRODUCT-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-python3-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-jmx-$VERSION-$RELEASE.noarch.tar.gz build/$MODE/dist/tar/$PRODUCT-tools-$VERSION-$RELEASE.noarch.tar.gz"
|
||||
BASEDIR="build/$MODE/unified/$PRODUCT-$VERSION"
|
||||
|
||||
rm -rf build/"$MODE"/unified/
|
||||
mkdir -p "$BASEDIR"
|
||||
mkdir -p build/"$MODE"/unified/
|
||||
for pkg in $PKGS; do
|
||||
if [ ! -e "$pkg" ]; then
|
||||
echo "$pkg not found."
|
||||
@@ -62,17 +61,17 @@ for pkg in $PKGS; do
|
||||
exit 1
|
||||
fi
|
||||
pkg="$(readlink -f $pkg)"
|
||||
tar -C "$BASEDIR" -xpf "$pkg"
|
||||
tar -C build/"$MODE"/unified/ -xpf "$pkg"
|
||||
dirname=$(basename "$pkg"| sed -e "s/-$VERSION_ESC-$RELEASE_ESC\.[^.]*\.tar\.gz//")
|
||||
dirname=${dirname/#$PRODUCT/scylla}
|
||||
if [ ! -d "$BASEDIR/$dirname" ]; then
|
||||
if [ ! -d build/"$MODE"/unified/"$dirname" ]; then
|
||||
echo "Directory $dirname not found in $pkg, the pacakge may corrupted."
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
ln -f unified/install.sh "$BASEDIR"
|
||||
ln -f unified/uninstall.sh "$BASEDIR"
|
||||
# relocatable package format version = 3.0
|
||||
echo "3.0" > "$BASEDIR"/.relocatable_package_version
|
||||
ln -f unified/install.sh build/"$MODE"/unified/
|
||||
ln -f unified/uninstall.sh build/"$MODE"/unified/
|
||||
# relocatable package format version = 2.2
|
||||
echo "2.2" > build/"$MODE"/unified/.relocatable_package_version
|
||||
cd build/"$MODE"/unified
|
||||
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz "$PRODUCT-$VERSION"
|
||||
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz * .relocatable_package_version
|
||||
|
||||
Reference in New Issue
Block a user