Compare commits

..

1 Commits

Author SHA1 Message Date
Yaniv Kaul
9f03bf2362 Fix typo (patch -> batch)
See subject.
2022-10-24 17:45:05 +03:00
41 changed files with 313 additions and 635 deletions

View File

@@ -1228,7 +1228,7 @@
"operations":[
{
"method":"POST",
"summary":"Removes a node from the cluster. Replicated data that logically belonged to this node is redistributed among the remaining nodes.",
"summary":"Removes token (and all data associated with enpoint that had it) from the ring",
"type":"void",
"nickname":"remove_node",
"produces":[
@@ -1245,7 +1245,7 @@
},
{
"name":"ignore_nodes",
"description":"Comma-separated list of dead nodes to ignore in removenode operation. Use the same method for all nodes to ignore: either Host IDs or ip addresses.",
"description":"List of dead nodes to ingore in removenode operation",
"required":false,
"allowMultiple":false,
"type":"string",

View File

@@ -49,14 +49,6 @@
"type":"string",
"paramType":"path"
},
{
"name":"internal",
"description":"Boolean flag indicating whether internal tasks should be shown (false by default)",
"required":false,
"allowMultiple":false,
"type":"boolean",
"paramType":"query"
},
{
"name":"keyspace",
"description":"The keyspace to query about",

View File

@@ -69,11 +69,6 @@ sstring validate_keyspace(http_context& ctx, const parameters& param) {
return validate_keyspace(ctx, param["keyspace"]);
}
locator::host_id validate_host_id(const sstring& param) {
auto hoep = locator::host_id_or_endpoint(param, locator::host_id_or_endpoint::param_type::host_id);
return hoep.id;
}
// splits a request parameter assumed to hold a comma-separated list of table names
// verify that the tables are found, otherwise a bad_param_exception exception is thrown
// containing the description of the respective no_such_column_family error.
@@ -720,23 +715,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
auto host_id = validate_host_id(req->get_query_param("host_id"));
auto host_id = req->get_query_param("host_id");
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<locator::host_id_or_endpoint>();
auto ignore_nodes = std::list<gms::inet_address>();
for (std::string n : ignore_nodes_strs) {
try {
std::replace(n.begin(), n.end(), '\"', ' ');
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto hoep = locator::host_id_or_endpoint(n);
if (!ignore_nodes.empty() && hoep.has_host_id() != ignore_nodes.front().has_host_id()) {
throw std::runtime_error("All nodes should be identified using the same method: either Host IDs or ip addresses.");
}
ignore_nodes.push_back(std::move(hoep));
auto node = gms::inet_address(n);
ignore_nodes.push_back(node);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {

View File

@@ -93,12 +93,11 @@ void set_task_manager(http_context& ctx, routes& r) {
tm::get_tasks.set(r, [&ctx] (std::unique_ptr<request> req) -> future<json::json_return_type> {
using chunked_stats = utils::chunked_vector<task_stats>;
auto internal = tasks::is_internal{req_param<bool>(*req, "internal", false)};
std::vector<chunked_stats> res = co_await ctx.tm.map([&req, internal] (tasks::task_manager& tm) {
std::vector<chunked_stats> res = co_await ctx.tm.map([&req] (tasks::task_manager& tm) {
chunked_stats local_res;
auto module = tm.find_module(req->param["module"]);
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([&params = req->query_parameters, internal] (const auto& task) {
return (internal || !task.second->is_internal()) && filter_tasks(task.second, params);
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([&params = req->query_parameters] (const auto& task) {
return filter_tasks(task.second, params);
});
for (auto& [task_id, task] : filtered_tasks) {
local_res.push_back(task_stats{task});

View File

@@ -52,7 +52,7 @@ void set_task_manager_test(http_context& ctx, routes& r, db::config& cfg) {
it = req->query_parameters.find("entity");
std::string entity = it != req->query_parameters.end() ? it->second : "";
it = req->query_parameters.find("parent_id");
tasks::task_info data;
tasks::task_manager::parent_data data;
if (it != req->query_parameters.end()) {
data.id = tasks::task_id{utils::UUID{it->second}};
auto parent_ptr = co_await tasks::task_manager::lookup_task_on_all_shards(ctx.tm, data.id);

View File

@@ -790,7 +790,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, load_ring_state(this, "load_ring_state", value_status::Used, true, "When set to true, load tokens and host_ids previously saved. Same as -Dcassandra.load_ring_state in cassandra.")
, replace_address(this, "replace_address", value_status::Used, "", "The listen_address or broadcast_address of the dead node to replace. Same as -Dcassandra.replace_address.")
, replace_address_first_boot(this, "replace_address_first_boot", value_status::Used, "", "Like replace_address option, but if the node has been bootstrapped successfully it will be ignored. Same as -Dcassandra.replace_address_first_boot.")
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation using a comman-separated list of either host IDs or ip addresses. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e")
, ignore_dead_nodes_for_replace(this, "ignore_dead_nodes_for_replace", value_status::Used, "", "List dead nodes to ingore for replace operation. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2")
, override_decommission(this, "override_decommission", value_status::Used, false, "Set true to force a decommissioned node to join the cluster")
, enable_repair_based_node_ops(this, "enable_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, true, "Set true to use enable repair based node operations instead of streaming based")
, allowed_repair_based_node_ops(this, "allowed_repair_based_node_ops", liveness::LiveUpdate, value_status::Used, "replace", "A comma separated list of node operations which are allowed to enable repair based node operations. The operations can be bootstrap, replace, removenode, decommission and rebuild")

View File

@@ -237,7 +237,7 @@ filter_for_query(consistency_level cl,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
const gms::gossiper& g,
std::optional<gms::inet_address>* extra,
gms::inet_address* extra,
replica::column_family* cf) {
size_t local_count;

View File

@@ -53,7 +53,7 @@ filter_for_query(consistency_level cl,
const inet_address_vector_replica_set& preferred_endpoints,
read_repair_decision read_repair,
const gms::gossiper& g,
std::optional<gms::inet_address>* extra,
gms::inet_address* extra,
replica::column_family* cf);
inet_address_vector_replica_set filter_for_query(consistency_level cl,

View File

@@ -2874,14 +2874,14 @@ future<> system_keyspace::get_compaction_history(compaction_history_consumer&& f
future<> system_keyspace::update_repair_history(repair_history_entry entry) {
sstring req = format("INSERT INTO system.{} (table_uuid, repair_time, repair_uuid, keyspace_name, table_name, range_start, range_end) VALUES (?, ?, ?, ?, ?, ?, ?)", REPAIR_HISTORY);
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id.uuid(), entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
co_await execute_cql(req, entry.table_uuid.uuid(), entry.ts, entry.id, entry.ks, entry.cf, entry.range_start, entry.range_end).discard_result();
}
future<> system_keyspace::get_repair_history(::table_id table_id, repair_history_consumer f) {
sstring req = format("SELECT * from system.{} WHERE table_uuid = {}", REPAIR_HISTORY, table_id);
co_await _qp.local().query_internal(req, [&f] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
repair_history_entry ent;
ent.id = row.get_as<tasks::task_id>("repair_uuid");
ent.id = row.get_as<utils::UUID>("repair_uuid");
ent.table_uuid = ::table_id(row.get_as<utils::UUID>("table_uuid"));
ent.range_start = row.get_as<int64_t>("range_start");
ent.range_end = row.get_as<int64_t>("range_end");

View File

@@ -26,7 +26,6 @@
#include "cdc/generation_id.hh"
#include "locator/host_id.hh"
#include "service/raft/group0_fwd.hh"
#include "tasks/task_manager.hh"
namespace service {
@@ -313,7 +312,7 @@ public:
static future<> get_compaction_history(compaction_history_consumer&& f);
struct repair_history_entry {
tasks::task_id id;
utils::UUID id;
table_id table_uuid;
db_clock::time_point ts;
sstring ks;

View File

@@ -16,7 +16,7 @@ import stat
import distro
from pathlib import Path
from scylla_util import *
from subprocess import run, SubprocessError
from subprocess import run
if __name__ == '__main__':
if os.getuid() > 0:
@@ -159,6 +159,10 @@ if __name__ == '__main__':
raise Exception(f'Failed to get UUID of {fsdev}')
uuidpath = f'/dev/disk/by-uuid/{uuid}'
if not os.path.exists(uuidpath):
raise Exception(f'{uuidpath} is not found')
if not stat.S_ISBLK(os.stat(uuidpath).st_mode):
raise Exception(f'{uuidpath} is not block device')
after = 'local-fs.target'
wants = ''
@@ -198,16 +202,8 @@ WantedBy=multi-user.target
systemd_unit.reload()
if args.raid_level != '0':
md_service.start()
try:
mount = systemd_unit(mntunit_bn)
mount.start()
except SubprocessError as e:
if not os.path.exists(uuidpath):
print(f'\nERROR: {uuidpath} is not found\n')
elif not stat.S_ISBLK(os.stat(uuidpath).st_mode):
print(f'\nERROR: {uuidpath} is not block device\n')
raise e
mount = systemd_unit(mntunit_bn)
mount.start()
if args.enable_on_nextboot:
mount.enable()
uid = pwd.getpwnam('scylla').pw_uid

View File

@@ -197,8 +197,6 @@ def is_system_partition(dev):
return (uuid in SYSTEM_PARTITION_UUIDS)
def is_unused_disk(dev):
# resolve symlink to real path
dev = os.path.realpath(dev)
# dev is not in /sys/class/block/, like /dev/nvme[0-9]+
if not os.path.isdir('/sys/class/block/{dev}'.format(dev=dev.replace('/dev/', ''))):
return False
@@ -426,16 +424,8 @@ class sysconfig_parser:
def __escape(self, val):
return re.sub(r'"', r'\"', val)
def __unescape(self, val):
return re.sub(r'\\"', r'"', val)
def __format_line(self, key, val):
need_quotes = any([ch.isspace() for ch in val])
esc_val = self.__escape(val)
return f'{key}="{esc_val}"' if need_quotes else f'{key}={esc_val}'
def __add(self, key, val):
self._data += self.__format_line(key, val) + '\n'
self._data += '{}="{}"\n'.format(key, self.__escape(val))
self.__load()
def __init__(self, filename):
@@ -450,8 +440,7 @@ class sysconfig_parser:
self.__load()
def get(self, key):
val = self._cfg.get('global', key).strip('"')
return self.__unescape(val)
return self._cfg.get('global', key).strip('"')
def has_option(self, key):
return self._cfg.has_option('global', key)
@@ -459,8 +448,7 @@ class sysconfig_parser:
def set(self, key, val):
if not self.has_option(key):
return self.__add(key, val)
new_line = self.__format_line(key, val)
self._data = re.sub(f'^{key}=[^\n]*$', new_line, self._data, flags=re.MULTILINE)
self._data = re.sub('^{}=[^\n]*$'.format(key), '{}="{}"'.format(key, self.__escape(val)), self._data, flags=re.MULTILINE)
self.__load()
def commit(self):

View File

@@ -104,9 +104,9 @@ html_theme_options = {
"versions_unstable": UNSTABLE_VERSIONS,
"versions_deprecated": DEPRECATED_VERSIONS,
'banner_button_text': 'Learn More',
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-12-registration',
'banner_title_text': 'Join us for ScyllaDB University LIVE, a half-day of FREE instructor-led training with exclusive new ScyllaDB database content. December 1. Register for Free',
'hide_banner': 'false',
'banner_button_url': 'https://lp.scylladb.com/university-live-2022-03-registration',
'banner_title_text': 'Join us for Scylla University LIVE, a half-day of FREE instructor-led training with exclusive new Scylla database content. March 22-23. Register for Free',
'hide_banner': 'true',
"collapse_navigation": 'true',
}

View File

@@ -19,7 +19,6 @@ To build everything, run:
```console
git clone https://github.com/scylladb/scylla
cd ./scylla
git submodule update --init --force --recursive
./tools/toolchain/dbuild ./configure.py --mode=<mode>
./tools/toolchain/dbuild ninja

View File

@@ -120,6 +120,10 @@ You can clean snapshots by using :doc:`nodetool clearsnapshot </operating-scylla
Features
--------
Will Scylla have a certain feature in an upcoming release?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Check the `Roadmap <http://www.scylladb.com/technology/status/>`_ page for features scheduled for our GA release.
I want to try out new features. How do I enable experimental mode?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You need to add the line :code:`experimental: true` to your :code:`scylla.yaml` file.

View File

@@ -25,7 +25,7 @@ Example:
.. code-block:: console
nodetool removenode 675ed9f4-6564-6dbd-can8-43fddce952gy
nodetool removenode 192.168.1.3
Note that all the nodes in the cluster participate in the ``removenode`` operation to sync data if needed. For this reason, the operation will fail if one or more nodes in the cluster are not available.
In such a case, to ensure that the operation succeeds, you must explicitly specify a list of unavailable nodes with the ``--ignore-dead-nodes`` option.
@@ -41,8 +41,8 @@ Example:
.. code-block:: console
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 675ed9f4-6564-6dbd-can8-43fddce952gy
nodetool removenode --ignore-dead-nodes 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e 675ed9f4-6564-6dbd-can8-43fddce952gy
nodetool removenode 192.168.1.3
nodetool removenode --ignore-dead-nodes 192.168.1.4,192.168.1.5 192.168.1.3
.. versionadded:: version 4.6 ``--ignore-dead-nodes`` option

View File

@@ -1,5 +1,5 @@
====================================
Upgrade Guide - ScyllaDB 5.0 to 5.1
Upgrade Guide - ScyllaDB 4.6 to 5.0
====================================
.. toctree::

View File

@@ -1885,8 +1885,6 @@ future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes)
gms::application_state::STATUS,
gms::application_state::HOST_ID,
gms::application_state::TOKENS,
gms::application_state::DC,
gms::application_state::RACK,
gms::application_state::SUPPORTED_FEATURES,
gms::application_state::SNITCH_NAME}};
logger.info("Gossip shadow round started with nodes={}", nodes);

View File

@@ -120,7 +120,7 @@ struct node_ops_cmd_response {
};
struct repair_update_system_table_request {
tasks::task_id repair_uuid;
utils::UUID repair_uuid;
table_id table_uuid;
sstring keyspace_name;
sstring table_name;
@@ -132,7 +132,7 @@ struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
tasks::task_id repair_uuid;
utils::UUID repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;

View File

@@ -10,7 +10,6 @@
#include "schema_fwd.hh"
#include "query-request.hh"
#include "locator/host_id.hh"
#include "tasks/types.hh"
namespace utils {
class UUID final {
@@ -19,10 +18,6 @@ class UUID final {
};
}
class tasks::task_id final {
utils::UUID uuid();
};
class table_id final {
utils::UUID uuid();
};

View File

@@ -308,11 +308,11 @@ elif [ "$ID" = "fedora" ]; then
exit 1
fi
dnf install -y "${fedora_packages[@]}" "${fedora_python3_packages[@]}"
PIP_DEFAULT_ARGS="--only-binary=:all: -v"
pip3 install "$PIP_DEFAULT_ARGS" "geomet<0.3,>=0.1"
pip3 install "$PIP_DEFAULT_ARGS" scylla-driver
pip3 install "$PIP_DEFAULT_ARGS" traceback-with-variables
pip3 install "$PIP_DEFAULT_ARGS" scylla-api-client
pip3 install "geomet<0.3,>=0.1"
# Disable C extensions
pip3 install scylla-driver --install-option="--no-murmur3" --install-option="--no-libev" --install-option="--no-cython"
pip3 install traceback-with-variables
pip3 install scylla-api-client
cargo install cxxbridge-cmd --root /usr/local
if [ -f "$(node_exporter_fullpath)" ] && node_exporter_checksum; then

View File

@@ -1087,12 +1087,6 @@ token_metadata::get_endpoint_for_host_id(host_id host_id) const {
return _impl->get_endpoint_for_host_id(host_id);
}
host_id_or_endpoint token_metadata::parse_host_id_and_endpoint(const sstring& host_id_string) const {
auto res = host_id_or_endpoint(host_id_string);
res.resolve(*this);
return res;
}
const std::unordered_map<inet_address, host_id>&
token_metadata::get_endpoint_to_host_id_map_for_reading() const {
return _impl->get_endpoint_to_host_id_map_for_reading();
@@ -1438,49 +1432,4 @@ future<> shared_token_metadata::mutate_token_metadata(seastar::noncopyable_funct
set(make_token_metadata_ptr(std::move(tm)));
}
host_id_or_endpoint::host_id_or_endpoint(const sstring& s, param_type restrict) {
switch (restrict) {
case param_type::host_id:
try {
id = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
throw std::invalid_argument(format("Invalid host_id {}: {}", s, e.what()));
}
break;
case param_type::endpoint:
try {
endpoint = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid inet_address {}: {}", s, e.what()));
}
break;
case param_type::auto_detect:
try {
id = host_id(utils::UUID(s));
} catch (const marshal_exception& e) {
try {
endpoint = gms::inet_address(s);
} catch (std::invalid_argument& e) {
throw std::invalid_argument(format("Invalid host_id or inet_address {}", s));
}
}
}
}
void host_id_or_endpoint::resolve(const token_metadata& tm) {
if (id) {
auto endpoint_opt = tm.get_endpoint_for_host_id(id);
if (!endpoint_opt) {
throw std::runtime_error(format("Host ID {} not found in the cluster", id));
}
endpoint = *endpoint_opt;
} else {
auto opt_id = tm.get_host_id_if_known(endpoint);
if (!opt_id) {
throw std::runtime_error(format("Host inet address {} not found in the cluster", endpoint));
}
id = *opt_id;
}
}
} // namespace locator

View File

@@ -133,33 +133,6 @@ private:
bool _sort_by_proximity = true;
};
class token_metadata;
struct host_id_or_endpoint {
host_id id;
gms::inet_address endpoint;
enum class param_type {
host_id,
endpoint,
auto_detect
};
host_id_or_endpoint(const sstring& s, param_type restrict = param_type::auto_detect);
bool has_host_id() const noexcept {
return bool(id);
}
bool has_endpoint() const noexcept {
return endpoint != gms::inet_address();
}
// Map the host_id to endpoint based on whichever of them is set,
// using the token_metadata
void resolve(const token_metadata& tm);
};
using dc_rack_fn = seastar::noncopyable_function<endpoint_dc_rack(inet_address)>;
class token_metadata_impl;
@@ -247,10 +220,6 @@ public:
/** Return the end-point for a unique host ID */
std::optional<inet_address> get_endpoint_for_host_id(locator::host_id host_id) const;
/// Parses the \c host_id_string either as a host uuid or as an ip address and returns the mapping.
/// Throws std::invalid_argument on parse error or std::runtime_error if the host_id wasn't found.
host_id_or_endpoint parse_host_id_and_endpoint(const sstring& host_id_string) const;
/** @return a copy of the endpoint-to-id map for read-only operations */
const std::unordered_map<inet_address, host_id>& get_endpoint_to_host_id_map_for_reading() const;

View File

@@ -1320,7 +1320,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// both)
supervisor::notify("starting repair service");
auto max_memory_repair = memory::stats().total_memory() * 0.1;
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(mm), max_memory_repair).get();
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
repair.stop().get();
});

View File

@@ -126,7 +126,7 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
}
void maybe_emit_end_tombstone() {
if (_active_tombstone) {
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::before_key(_current.end()), {}));
push_mutation_fragment(*_schema, _permit, range_tombstone_change(position_in_partition_view::after_key(_current.end()), {}));
}
}
public:

View File

@@ -49,12 +49,6 @@
logging::logger rlogger("repair");
node_ops_info::node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept
: ops_uuid(ops_uuid_)
, as(std::move(as_))
, ignore_nodes(std::move(ignore_nodes_))
{}
void node_ops_info::check_abort() {
if (as && as->abort_requested()) {
auto msg = format("Node operation with ops_uuid={} is aborted", ops_uuid);
@@ -63,30 +57,8 @@ void node_ops_info::check_abort() {
}
}
future<> node_ops_info::start() {
if (as) {
co_await _sas.start();
_abort_subscription = as->subscribe([this] () noexcept {
_abort_done = _sas.invoke_on_all([] (abort_source& as) noexcept {
as.request_abort();
});
});
}
}
future<> node_ops_info::stop() noexcept {
if (as) {
co_await std::exchange(_abort_done, make_ready_future<>());
co_await _sas.stop();
}
}
abort_source* node_ops_info::local_abort_source() {
return as ? &_sas.local() : nullptr;
}
node_ops_metrics::node_ops_metrics(shared_ptr<repair_module> module)
: _module(module)
node_ops_metrics::node_ops_metrics(tracker& tracker)
: _tracker(tracker)
{
namespace sm = seastar::metrics;
auto ops_label_type = sm::label("ops");
@@ -167,7 +139,7 @@ static std::vector<sstring> list_column_families(const replica::database& db, co
}
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x) {
return os << format("[id={}, uuid={}]", x.id, x.uuid());
return os << format("[id={}, uuid={}]", x.id, x.uuid);
}
// Must run inside a seastar thread
@@ -342,12 +314,11 @@ static future<std::list<gms::inet_address>> get_hosts_participating_in_repair(re
}
float node_ops_metrics::repair_finished_percentage() {
return _module->report_progress(streaming::stream_reason::repair);
return _tracker.report_progress(streaming::stream_reason::repair);
}
repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept
: tasks::task_manager::module(tm, "repair")
, _rs(rs)
tracker::tracker(size_t max_repair_memory)
: _shutdown(false)
, _range_parallelism_semaphore(std::max(size_t(1), size_t(max_repair_memory / max_repair_memory_per_range() / 4)),
named_semaphore_exception_factory{"repair range parallelism"})
{
@@ -356,14 +327,14 @@ repair_module::repair_module(tasks::task_manager& tm, repair_service& rs, size_t
max_repair_memory, max_repair_memory_per_range(), nr);
}
void repair_module::start(repair_uniq_id id) {
_pending_repairs.insert(id.uuid());
void tracker::start(repair_uniq_id id) {
_pending_repairs.insert(id.uuid);
_status[id.id] = repair_status::RUNNING;
}
void repair_module::done(repair_uniq_id id, bool succeeded) {
_pending_repairs.erase(id.uuid());
_aborted_pending_repairs.erase(id.uuid());
void tracker::done(repair_uniq_id id, bool succeeded) {
_pending_repairs.erase(id.uuid);
_aborted_pending_repairs.erase(id.uuid);
if (succeeded) {
_status.erase(id.id);
} else {
@@ -371,9 +342,8 @@ void repair_module::done(repair_uniq_id id, bool succeeded) {
}
_done_cond.broadcast();
}
repair_status repair_module::get(int id) const {
if (id > _sequence_number) {
repair_status tracker::get(int id) const {
if (id >= _next_repair_command) {
throw std::runtime_error(format("unknown repair id {}", id));
}
auto it = _status.find(id);
@@ -384,9 +354,9 @@ repair_status repair_module::get(int id) const {
}
}
future<repair_status> repair_module::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(async_gate(), [this, id, timeout] {
if (id > _sequence_number) {
future<repair_status> tracker::repair_await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return seastar::with_gate(_gate, [this, id, timeout] {
if (id >= _next_repair_command) {
return make_exception_future<repair_status>(std::runtime_error(format("unknown repair id {}", id)));
}
return repeat_until_value([this, id, timeout] {
@@ -408,19 +378,30 @@ future<repair_status> repair_module::repair_await_completion(int id, std::chrono
});
}
void repair_module::check_in_shutdown() {
abort_source().check();
repair_uniq_id tracker::next_repair_command() {
return repair_uniq_id{_next_repair_command++, utils::make_random_uuid()};
}
void repair_module::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
future<> tracker::shutdown() {
_shutdown.store(true, std::memory_order_relaxed);
return _gate.close();
}
void tracker::check_in_shutdown() {
if (_shutdown.load(std::memory_order_relaxed)) {
throw std::runtime_error(format("Repair service is being shutdown"));
}
}
void tracker::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
_repairs.emplace(id, ri);
}
void repair_module::remove_repair_info(int id) {
void tracker::remove_repair_info(int id) {
_repairs.erase(id);
}
lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
lw_shared_ptr<repair_info> tracker::get_repair_info(int id) {
auto it = _repairs.find(id);
if (it != _repairs.end()) {
return it->second;
@@ -428,7 +409,7 @@ lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
return {};
}
std::vector<int> repair_module::get_active() const {
std::vector<int> tracker::get_active() const {
std::vector<int> res;
boost::push_back(res, _status | boost::adaptors::filtered([] (auto& x) {
return x.second == repair_status::RUNNING;
@@ -436,7 +417,7 @@ std::vector<int> repair_module::get_active() const {
return res;
}
size_t repair_module::nr_running_repair_jobs() {
size_t tracker::nr_running_repair_jobs() {
size_t count = 0;
if (this_shard_id() != 0) {
return count;
@@ -450,11 +431,11 @@ size_t repair_module::nr_running_repair_jobs() {
return count;
}
bool repair_module::is_aborted(const tasks::task_id& uuid) {
bool tracker::is_aborted(const utils::UUID& uuid) {
return _aborted_pending_repairs.contains(uuid);
}
void repair_module::abort_all_repairs() {
void tracker::abort_all_repairs() {
_aborted_pending_repairs = _pending_repairs;
for (auto& x : _repairs) {
auto& ri = x.second;
@@ -463,7 +444,7 @@ void repair_module::abort_all_repairs() {
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
}
float repair_module::report_progress(streaming::stream_reason reason) {
float tracker::report_progress(streaming::stream_reason reason) {
uint64_t nr_ranges_finished = 0;
uint64_t nr_ranges_total = 0;
for (auto& x : _repairs) {
@@ -476,15 +457,15 @@ float repair_module::report_progress(streaming::stream_reason reason) {
return nr_ranges_total == 0 ? 1 : float(nr_ranges_finished) / float(nr_ranges_total);
}
named_semaphore& repair_module::range_parallelism_semaphore() {
named_semaphore& tracker::range_parallelism_semaphore() {
return _range_parallelism_semaphore;
}
future<> repair_module::run(repair_uniq_id id, std::function<void ()> func) {
return seastar::with_gate(async_gate(), [this, id, func =std::move(func)] {
future<> tracker::run(repair_uniq_id id, std::function<void ()> func) {
return seastar::with_gate(_gate, [this, id, func =std::move(func)] {
start(id);
return seastar::async([func = std::move(func)] { func(); }).then([this, id] {
rlogger.info("repair[{}]: completed successfully", id.uuid());
rlogger.info("repair[{}]: completed successfully", id.uuid);
done(id, true);
}).handle_exception([this, id] (std::exception_ptr ep) {
done(id, false);
@@ -494,7 +475,7 @@ future<> repair_module::run(repair_uniq_id id, std::function<void ()> func) {
}
void repair_info::check_in_shutdown() {
rs.get_repair_module().check_in_shutdown();
rs.repair_tracker().check_in_shutdown();
}
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
@@ -551,7 +532,7 @@ repair_info::repair_info(repair_service& repair,
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ignore_nodes_,
streaming::stream_reason reason_,
abort_source* as,
shared_ptr<abort_source> as,
bool hints_batchlog_flushed)
: rs(repair)
, db(repair.get_db())
@@ -566,6 +547,7 @@ repair_info::repair_info(repair_service& repair,
, cfs(get_table_names(db.local(), table_ids_))
, table_ids(std::move(table_ids_))
, id(id_)
, shard(this_shard_id())
, data_centers(data_centers_)
, hosts(hosts_)
, ignore_nodes(ignore_nodes_)
@@ -580,15 +562,15 @@ repair_info::repair_info(repair_service& repair,
void repair_info::check_failed_ranges() {
rlogger.info("repair[{}]: shard {} stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
id.uuid(), id.shard(), reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
id.uuid, shard, reason, keyspace, table_names(), ranges.size(), _stats.get_stats());
if (nr_failed_ranges) {
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid(), id.shard(), nr_failed_ranges, ranges_size());
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid(), id.shard(), nr_failed_ranges, ranges_size()));
rlogger.warn("repair[{}]: shard {} failed - {} out of {} ranges failed", id.uuid, shard, nr_failed_ranges, ranges_size());
throw std::runtime_error(format("repair[{}] on shard {} failed to repair {} out of {} ranges", id.uuid, shard, nr_failed_ranges, ranges_size()));
} else {
if (dropped_tables.size()) {
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid(), id.shard(), keyspace, dropped_tables);
rlogger.warn("repair[{}]: shard {} completed successfully, keyspace={}, ignoring dropped tables={}", id.uuid, shard, keyspace, dropped_tables);
} else {
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid(), id.shard(), keyspace);
rlogger.info("repair[{}]: shard {} completed successfully, keyspace={}", id.uuid, shard, keyspace);
}
}
}
@@ -599,7 +581,7 @@ void repair_info::abort() noexcept {
void repair_info::check_in_abort() {
if (aborted) {
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid(), id.shard()));
throw std::runtime_error(format("repair[{}]: aborted on shard {}", id.uuid, shard));
}
}
@@ -629,7 +611,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
nr_failed_ranges++;
auto status = format("failed: mandatory neighbor={} is not alive", node);
rlogger.error("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
abort();
return make_exception_future<>(std::runtime_error(format("Repair mandatory neighbor={} is not alive, keyspace={}, mandatory_neighbors={}",
node, keyspace, mandatory_neighbors)));
@@ -639,7 +621,7 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
nr_failed_ranges++;
auto status = live_neighbors.empty() ? "skipped" : "partial";
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
if (live_neighbors.empty()) {
return make_ready_future<>();
}
@@ -648,11 +630,11 @@ future<> repair_info::repair_range(const dht::token_range& range, ::table_id tab
if (neighbors.empty()) {
auto status = "skipped_no_followers";
rlogger.warn("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}, status={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors, status);
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors, status);
return make_ready_future<>();
}
rlogger.debug("repair[{}]: Repair {} out of {} ranges, shard={}, keyspace={}, table={}, range={}, peers={}, live_peers={}",
id.uuid(), ranges_index, ranges_size(), id.shard(), keyspace, table_names(), range, neighbors, live_neighbors);
id.uuid, ranges_index, ranges_size(), shard, keyspace, table_names(), range, neighbors, live_neighbors);
return mm.sync_schema(db.local(), neighbors).then([this, &neighbors, range, table_id] {
sstring cf;
try {
@@ -945,9 +927,9 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
auto table_name = ri->table_names()[idx];
// repair all the ranges in limited parallelism
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
ri->id.uuid(), idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
ri->id.uuid, idx + 1, ri->table_ids.size(), ri->keyspace, table_name, table_id, ri->reason);
co_await coroutine::parallel_for_each(ri->ranges, [ri, table_id] (auto&& range) {
return with_semaphore(ri->rs.get_repair_module().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
return with_semaphore(ri->rs.repair_tracker().range_parallelism_semaphore(), 1, [ri, &range, table_id] {
return ri->repair_range(range, table_id).then([ri] {
if (ri->reason == streaming::stream_reason::bootstrap) {
ri->rs.get_metrics().bootstrap_finished_ranges++;
@@ -964,7 +946,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->nr_ranges_finished++;
}
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
ri->id.uuid(),
ri->id.uuid,
ri->rs.get_metrics().bootstrap_finished_percentage(),
ri->rs.get_metrics().replace_finished_percentage(),
ri->rs.get_metrics().rebuild_finished_percentage(),
@@ -979,7 +961,7 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
try {
auto& table = ri->db.local().find_column_family(table_id);
rlogger.debug("repair[{}]: Trigger off-strategy compaction for keyspace={}, table={}",
ri->id.uuid(), table.schema()->ks_name(), table.schema()->cf_name());
ri->id.uuid, table.schema()->ks_name(), table.schema()->cf_name());
table.trigger_offstrategy_compaction();
} catch (replica::no_such_column_family&) {
// Ignore dropped table
@@ -994,13 +976,13 @@ static future<> do_repair_ranges(lw_shared_ptr<repair_info> ri) {
// is assumed to be a indivisible in the sense that all the tokens in has the
// same nodes as replicas.
static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
ri->rs.get_repair_module().add_repair_info(ri->id.id, ri);
ri->rs.repair_tracker().add_repair_info(ri->id.id, ri);
return do_repair_ranges(ri).then([ri] {
ri->check_failed_ranges();
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
return make_ready_future<>();
}).handle_exception([ri] (std::exception_ptr eptr) {
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
ri->rs.repair_tracker().remove_repair_info(ri->id.id);
return make_exception_future<>(std::move(eptr));
});
}
@@ -1013,17 +995,16 @@ static future<> repair_ranges(lw_shared_ptr<repair_info> ri) {
int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map) {
seastar::sharded<replica::database>& db = get_db();
auto& topology = db.local().get_token_metadata().get_topology();
get_repair_module().check_in_shutdown();
repair_tracker().check_in_shutdown();
repair_options options(options_map);
// Note: Cassandra can, in some cases, decide immediately that there is
// nothing to repair, and return 0. "nodetool repair" prints in this case
// that "Nothing to repair for keyspace '...'". We don't have such a case
// yet. The id field of repair_uniq_ids returned by next_repair_command()
// will be >= 1.
auto id = _repair_module->new_repair_uniq_id();
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid(), keyspace, id.id, options_map);
// yet. Real ids returned by next_repair_command() will be >= 1.
auto id = repair_tracker().next_repair_command();
rlogger.info("repair[{}]: starting user-requested repair for keyspace {}, repair id {}, options {}", id.uuid, keyspace, id.id, options_map);
if (!_gossiper.local().is_normal(utils::fb_utilities::get_broadcast_address())) {
throw std::runtime_error("Node is not in NORMAL status yet!");
@@ -1105,14 +1086,14 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
std::vector<sstring> cfs =
options.column_families.size() ? options.column_families : list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid());
rlogger.info("repair[{}]: completed successfully: no tables to repair", id.uuid);
return id.id;
}
// Do it in the background.
(void)get_repair_module().run(id, [this, &db, id, keyspace = std::move(keyspace),
(void)repair_tracker().run(id, [this, &db, id, keyspace = std::move(keyspace),
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
auto uuid = id.uuid();
auto uuid = id.uuid;
bool needs_flush_before_repair = false;
if (db.local().features().tombstone_gc_options) {
@@ -1134,7 +1115,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
});
auto hints_timeout = std::chrono::seconds(300);
auto batchlog_timeout = std::chrono::seconds(300);
repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout};
repair_flush_hints_batchlog_request req{id.uuid, participants, hints_timeout, batchlog_timeout};
try {
parallel_for_each(waiting_nodes, [this, uuid, &req, &participants] (gms::inet_address node) -> future<> {
@@ -1163,7 +1144,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
repair_results.reserve(smp::count);
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
abort_source as;
auto off_strategy_updater = seastar::async([this, uuid = uuid.uuid(), &table_ids, &participants, &as] {
auto off_strategy_updater = seastar::async([this, uuid, &table_ids, &participants, &as] {
auto tables = std::list<table_id>(table_ids.begin(), table_ids.end());
auto req = node_ops_cmd_request(node_ops_cmd::repair_updater, uuid, {}, {}, {}, {}, std::move(tables));
auto update_interval = std::chrono::seconds(30);
@@ -1200,7 +1181,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
}
});
if (get_repair_module().is_aborted(id.uuid())) {
if (repair_tracker().is_aborted(id.uuid)) {
throw std::runtime_error("aborted by user request");
}
@@ -1230,7 +1211,7 @@ int repair_service::do_repair_start(sstring keyspace, std::unordered_map<sstring
return make_ready_future<>();
}).get();
}).handle_exception([id] (std::exception_ptr ep) {
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid(), ep);
rlogger.warn("repair[{}]: repair_tracker run failed: {}", id.uuid, ep);
});
return id.id;
@@ -1245,29 +1226,30 @@ future<int> repair_start(seastar::sharded<repair_service>& repair,
future<std::vector<int>> repair_service::get_active_repairs() {
return container().invoke_on(0, [] (repair_service& rs) {
return rs.get_repair_module().get_active();
return rs.repair_tracker().get_active();
});
}
future<repair_status> repair_service::get_status(int id) {
return container().invoke_on(0, [id] (repair_service& rs) {
return rs.get_repair_module().get(id);
return rs.repair_tracker().get(id);
});
}
future<repair_status> repair_service::await_completion(int id, std::chrono::steady_clock::time_point timeout) {
return container().invoke_on(0, [id, timeout] (repair_service& rs) {
return rs.get_repair_module().repair_await_completion(id, timeout);
return rs.repair_tracker().repair_await_completion(id, timeout);
});
}
future<> repair_service::shutdown() {
co_await repair_tracker().shutdown();
co_await remove_repair_meta();
}
future<> repair_service::abort_all() {
return container().invoke_on_all([] (repair_service& rs) {
return rs.get_repair_module().abort_all_repairs();
return rs.repair_tracker().abort_all_repairs();
});
}
@@ -1293,18 +1275,18 @@ future<> repair_service::do_sync_data_using_repair(
shared_ptr<node_ops_info> ops_info) {
seastar::sharded<replica::database>& db = get_db();
repair_uniq_id id = get_repair_module().new_repair_uniq_id();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid(), keyspace);
return get_repair_module().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
repair_uniq_id id = repair_tracker().next_repair_command();
rlogger.info("repair[{}]: sync data for keyspace={}, status=started", id.uuid, keyspace);
return repair_tracker().run(id, [this, id, &db, keyspace, ranges = std::move(ranges), neighbors = std::move(neighbors), reason, ops_info] () mutable {
auto cfs = list_column_families(db.local(), keyspace);
if (cfs.empty()) {
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid(), keyspace);
rlogger.warn("repair[{}]: sync data for keyspace={}, no table in this keyspace", id.uuid, keyspace);
return;
}
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
std::vector<future<>> repair_results;
repair_results.reserve(smp::count);
if (get_repair_module().is_aborted(id.uuid())) {
if (repair_tracker().is_aborted(id.uuid)) {
throw std::runtime_error("aborted by user request");
}
for (auto shard : boost::irange(unsigned(0), smp::count)) {
@@ -1313,10 +1295,9 @@ future<> repair_service::do_sync_data_using_repair(
auto hosts = std::vector<sstring>();
auto ignore_nodes = std::unordered_set<gms::inet_address>();
bool hints_batchlog_flushed = false;
abort_source* asp = ops_info ? ops_info->local_abort_source() : nullptr;
auto ri = make_lw_shared<repair_info>(local_repair,
std::move(keyspace), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_info ? ops_info->as : nullptr, hints_batchlog_flushed);
ri->neighbors = std::move(neighbors);
return repair_ranges(ri);
});
@@ -1337,13 +1318,13 @@ future<> repair_service::do_sync_data_using_repair(
return make_ready_future<>();
}).get();
}).then([id, keyspace] {
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid(), keyspace);
rlogger.info("repair[{}]: sync data for keyspace={}, status=succeeded", id.uuid, keyspace);
}).handle_exception([&db, id, keyspace] (std::exception_ptr ep) {
if (!db.local().has_keyspace(keyspace)) {
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid(), keyspace, ep);
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: keyspace does not exist any more, ignoring it, {}", id.uuid, keyspace, ep);
return make_ready_future<>();
}
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid(), keyspace, ep);
rlogger.warn("repair[{}]: sync data for keyspace={}, status=failed: {}", id.uuid, keyspace, ep);
return make_exception_future<>(ep);
});
}

View File

@@ -28,11 +28,6 @@
#include "locator/token_metadata.hh"
#include "repair/hash.hh"
#include "repair/sync_boundary.hh"
#include "tasks/types.hh"
namespace tasks {
class repair_module;
}
namespace replica {
class database;
@@ -67,42 +62,16 @@ public:
struct repair_uniq_id {
// The integer ID used to identify a repair job. It is currently used by nodetool and http API.
int id;
// Task info containing a UUID to identifiy a repair job, and a shard of the job.
// We will transit to use UUID over the integer ID.
tasks::task_info task_info;
tasks::task_id uuid() const noexcept {
return task_info.id;
}
unsigned shard() const noexcept {
return task_info.shard;
}
// A UUID to identifiy a repair job. We will transit to use UUID over the integer ID.
utils::UUID uuid;
};
std::ostream& operator<<(std::ostream& os, const repair_uniq_id& x);
class node_ops_info {
public:
struct node_ops_info {
utils::UUID ops_uuid;
shared_ptr<abort_source> as;
std::list<gms::inet_address> ignore_nodes;
private:
optimized_optional<abort_source::subscription> _abort_subscription;
sharded<abort_source> _sas;
future<> _abort_done = make_ready_future<>();
public:
node_ops_info(utils::UUID ops_uuid_, shared_ptr<abort_source> as_, std::list<gms::inet_address>&& ignore_nodes_) noexcept;
node_ops_info(const node_ops_info&) = delete;
node_ops_info(node_ops_info&&) = delete;
future<> start();
future<> stop() noexcept;
void check_abort();
abort_source* local_abort_source();
};
// NOTE: repair_start() can be run on any node, but starts a node-global
@@ -184,6 +153,7 @@ public:
std::vector<sstring> cfs;
std::vector<table_id> table_ids;
repair_uniq_id id;
shard_id shard;
std::vector<sstring> data_centers;
std::vector<sstring> hosts;
std::unordered_set<gms::inet_address> ignore_nodes;
@@ -209,7 +179,7 @@ public:
const std::vector<sstring>& hosts_,
const std::unordered_set<gms::inet_address>& ingore_nodes_,
streaming::stream_reason reason_,
abort_source* as,
shared_ptr<abort_source> as,
bool hints_batchlog_flushed);
void check_failed_ranges();
void abort() noexcept;
@@ -232,6 +202,58 @@ public:
size_t ranges_size();
};
// The repair_tracker tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
// This object is not thread safe, and must be used by only one cpu.
class tracker {
private:
// Each repair_start() call returns a unique int which the user can later
// use to follow the status of this repair with repair_status().
// We can't use the number 0 - if repair_start() returns 0, it means it
// decide quickly that there is nothing to repair.
int _next_repair_command = 1;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id < _next_repair_command
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Used to allow shutting down repairs in progress, and waiting for them.
seastar::gate _gate;
// Set when the repair service is being shutdown
std::atomic_bool _shutdown alignas(seastar::cache_line_size);
// Map repair id into repair_info.
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
std::unordered_set<utils::UUID> _pending_repairs;
std::unordered_set<utils::UUID> _aborted_pending_repairs;
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
explicit tracker(size_t max_repair_memory);
repair_status get(int id) const;
repair_uniq_id next_repair_command();
future<> shutdown();
void check_in_shutdown();
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
void remove_repair_info(int id);
lw_shared_ptr<repair_info> get_repair_info(int id);
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
bool is_aborted(const utils::UUID& uuid);
};
future<uint64_t> estimate_partitions(seastar::sharded<replica::database>& db, const sstring& keyspace,
const sstring& cf, const dht::token_range& range);
@@ -391,7 +413,7 @@ struct node_ops_cmd_response {
struct repair_update_system_table_request {
tasks::task_id repair_uuid;
utils::UUID repair_uuid;
table_id table_uuid;
sstring keyspace_name;
sstring table_name;
@@ -403,7 +425,7 @@ struct repair_update_system_table_response {
};
struct repair_flush_hints_batchlog_request {
tasks::task_id repair_uuid;
utils::UUID repair_uuid;
std::list<gms::inet_address> target_nodes;
std::chrono::seconds hints_timeout;
std::chrono::seconds batchlog_timeout;

View File

@@ -1,82 +0,0 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "repair/repair.hh"
#include "tasks/task_manager.hh"
class repair_task_impl : public tasks::task_manager::task::impl {
public:
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id)
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id) {
_status.progress_units = "ranges";
}
protected:
repair_uniq_id get_repair_uniq_id() const noexcept {
return repair_uniq_id{
.id = _status.sequence_number,
.task_info = tasks::task_info(_status.id, _status.shard)
};
}
virtual future<> run() override = 0;
};
// The repair_module tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
// table, but a failed repair will remain in the table forever so it can
// be queried about more than once (FIXME: reconsider this. But note that
// failed repairs should be rare anwyay).
class repair_module : public tasks::task_manager::module {
private:
repair_service& _rs;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id <= repair_module::_sequence_number
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
// Map repair id into repair_info.
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
std::unordered_set<tasks::task_id> _pending_repairs;
std::unordered_set<tasks::task_id> _aborted_pending_repairs;
// The semaphore used to control the maximum
// ranges that can be repaired in parallel.
named_semaphore _range_parallelism_semaphore;
static constexpr size_t _max_repair_memory_per_range = 32 * 1024 * 1024;
seastar::condition_variable _done_cond;
void start(repair_uniq_id id);
void done(repair_uniq_id id, bool succeeded);
public:
repair_module(tasks::task_manager& tm, repair_service& rs, size_t max_repair_memory) noexcept;
repair_service& get_repair_service() noexcept {
return _rs;
}
repair_uniq_id new_repair_uniq_id() noexcept {
return repair_uniq_id{
.id = new_sequence_number(),
.task_info = tasks::task_info(tasks::task_id::create_random_id(), this_shard_id())
};
}
repair_status get(int id) const;
void check_in_shutdown();
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
void remove_repair_info(int id);
lw_shared_ptr<repair_info> get_repair_info(int id);
std::vector<int> get_active() const;
size_t nr_running_repair_jobs();
void abort_all_repairs();
named_semaphore& range_parallelism_semaphore();
static size_t max_repair_memory_per_range() { return _max_repair_memory_per_range; }
future<> run(repair_uniq_id id, std::function<void ()> func);
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
float report_progress(streaming::stream_reason reason);
bool is_aborted(const tasks::task_id& uuid);
};

View File

@@ -9,7 +9,6 @@
#include <seastar/util/defer.hh>
#include "repair/repair.hh"
#include "message/messaging_service.hh"
#include "repair/repair_task.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "mutation_fragment.hh"
@@ -2449,7 +2448,7 @@ private:
size_t get_max_row_buf_size(row_level_diff_detect_algorithm algo) {
// Max buffer size per repair round
return is_rpc_stream_supported(algo) ? repair_module::max_repair_memory_per_range() : 256 * 1024;
return is_rpc_stream_supported(algo) ? tracker::max_repair_memory_per_range() : 256 * 1024;
}
// Step A: Negotiate sync boundary to use
@@ -2685,7 +2684,7 @@ private:
size_t repaired_replicas = _all_live_peer_nodes.size() + 1;
if (_ri.total_rf != repaired_replicas){
rlogger.debug("repair[{}]: Skipped to update system.repair_history total_rf={}, repaired_replicas={}, local={}, peers={}",
_ri.id.uuid(), _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
_ri.id.uuid, _ri.total_rf, repaired_replicas, utils::fb_utilities::get_broadcast_address(), _all_live_peer_nodes);
co_return;
}
// Update repair_history table only if both hints and batchlog have been flushed.
@@ -2693,12 +2692,12 @@ private:
co_return;
}
repair_service& rs = _ri.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid(), _table_id, _range, _start_time);
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_ri.id.uuid, _table_id, _range, _start_time);
if (!repair_time_opt) {
co_return;
}
auto repair_time = repair_time_opt.value();
repair_update_system_table_request req{_ri.id.uuid(), _table_id, _ri.keyspace, _cf_name, _range, repair_time};
repair_update_system_table_request req{_ri.id.uuid, _table_id, _ri.keyspace, _cf_name, _range, repair_time};
auto all_nodes = _all_live_peer_nodes;
all_nodes.push_back(utils::fb_utilities::get_broadcast_address());
co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> {
@@ -2706,9 +2705,9 @@ private:
auto& ms = _ri.messaging.local();
repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req);
(void)resp; // nothing to do with the response yet
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid(), node);
rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _ri.id.uuid, node);
} catch (...) {
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid(), node, std::current_exception());
rlogger.warn("repair[{}]: Failed to update system.repair_history table of node {}: {}", _ri.id.uuid, node, std::current_exception());
}
});
co_return;
@@ -2733,13 +2732,13 @@ public:
auto& mem_sem = _ri.rs.memory_sem();
auto max = _ri.rs.max_repair_memory();
auto wanted = (_all_live_peer_nodes.size() + 1) * repair_module::max_repair_memory_per_range();
auto wanted = (_all_live_peer_nodes.size() + 1) * tracker::max_repair_memory_per_range();
wanted = std::min(max, wanted);
rlogger.trace("repair[{}]: Started to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid(), wanted, mem_sem.current(), max);
_ri.id.uuid, wanted, mem_sem.current(), max);
auto mem_permit = seastar::get_units(mem_sem, wanted).get0();
rlogger.trace("repair[{}]: Finished to get memory budget, wanted={}, available={}, max_repair_memory={}",
_ri.id.uuid(), wanted, mem_sem.current(), max);
_ri.id.uuid, wanted, mem_sem.current(), max);
auto permit = _ri.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout).get0();
@@ -2811,11 +2810,11 @@ public:
} catch (replica::no_such_column_family& e) {
table_dropped = true;
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_failed = true;
} catch (std::exception& e) {
rlogger.warn("repair[{}]: shard={}, keyspace={}, cf={}, range={}, got error in row level repair: {}",
_ri.id.uuid(), this_shard_id(), _ri.keyspace, _cf_name, _range, e);
_ri.id.uuid, this_shard_id(), _ri.keyspace, _cf_name, _range, e);
// In case the repair process fail, we need to call repair_row_level_stop to clean up repair followers
_failed = true;
}
@@ -2918,7 +2917,6 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
tasks::task_manager& tm,
service::migration_manager& mm,
size_t max_repair_memory)
: _gossiper(gossiper)
@@ -2929,13 +2927,12 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
, _sys_dist_ks(sys_dist_ks)
, _sys_ks(sys_ks)
, _view_update_generator(vug)
, _repair_module(seastar::make_shared<repair_module>(tm, *this, max_repair_memory))
, _mm(mm)
, _node_ops_metrics(_repair_module)
, _tracker(max_repair_memory)
, _node_ops_metrics(_tracker)
, _max_repair_memory(max_repair_memory)
, _memory_sem(max_repair_memory)
{
tm.register_module("repair", _repair_module);
if (this_shard_id() == 0) {
_gossip_helper = make_shared<row_level_repair_gossip_helper>(*this);
_gossiper.local().register_(_gossip_helper);
@@ -2948,7 +2945,6 @@ future<> repair_service::start() {
}
future<> repair_service::stop() {
co_await _repair_module->stop();
co_await uninit_ms_handlers();
if (this_shard_id() == 0) {
co_await _gossiper.local().unregister_(_gossip_helper);
@@ -2960,12 +2956,12 @@ repair_service::~repair_service() {
assert(_stopped);
}
static shard_id repair_id_to_shard(tasks::task_id& repair_id) {
return shard_id(repair_id.uuid().get_most_significant_bits()) % smp::count;
static shard_id repair_id_to_shard(utils::UUID& repair_id) {
return shard_id(repair_id.get_most_significant_bits()) % smp::count;
}
future<std::optional<gc_clock::time_point>>
repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
repair_service::update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id, table_id, range, repair_time] (repair_service& rs) mutable -> future<std::optional<gc_clock::time_point>> {
repair_history& rh = rs._finished_ranges_history[repair_id];
@@ -2986,7 +2982,7 @@ repair_service::update_history(tasks::task_id repair_id, table_id table_id, dht:
});
}
future<> repair_service::cleanup_history(tasks::task_id repair_id) {
future<> repair_service::cleanup_history(utils::UUID repair_id) {
auto shard = repair_id_to_shard(repair_id);
return container().invoke_on(shard, [repair_id] (repair_service& rs) mutable {
rs._finished_ranges_history.erase(repair_id);

View File

@@ -11,8 +11,6 @@
#include <vector>
#include "gms/inet_address.hh"
#include "repair/repair.hh"
#include "repair/repair_task.hh"
#include "tasks/task_manager.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/bool_class.hh>
@@ -54,9 +52,9 @@ public:
};
class node_ops_metrics {
shared_ptr<repair_module> _module;
tracker& _tracker;
public:
node_ops_metrics(shared_ptr<repair_module> module);
node_ops_metrics(tracker& tracker);
uint64_t bootstrap_total_ranges{0};
uint64_t bootstrap_finished_ranges{0};
@@ -90,13 +88,13 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::system_keyspace>& _sys_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
shared_ptr<repair_module> _repair_module;
service::migration_manager& _mm;
tracker _tracker;
node_ops_metrics _node_ops_metrics;
std::unordered_map<node_repair_meta_id, repair_meta_ptr> _repair_metas;
uint32_t _next_repair_meta_id = 0; // used only on shard 0
std::unordered_map<tasks::task_id, repair_history> _finished_ranges_history;
std::unordered_map<utils::UUID, repair_history> _finished_ranges_history;
shared_ptr<row_level_repair_gossip_helper> _gossip_helper;
bool _stopped = false;
@@ -116,7 +114,6 @@ public:
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
sharded<db::view::view_update_generator>& vug,
tasks::task_manager& tm,
service::migration_manager& mm, size_t max_repair_memory);
~repair_service();
future<> start();
@@ -129,8 +126,8 @@ public:
// stop them abruptly).
future<> shutdown();
future<std::optional<gc_clock::time_point>> update_history(tasks::task_id repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
future<> cleanup_history(tasks::task_id repair_id);
future<std::optional<gc_clock::time_point>> update_history(utils::UUID repair_id, table_id table_id, dht::token_range range, gc_clock::time_point repair_time);
future<> cleanup_history(utils::UUID repair_id);
future<> load_history();
int do_repair_start(sstring keyspace, std::unordered_map<sstring, sstring> options_map);
@@ -174,8 +171,11 @@ public:
gms::gossiper& get_gossiper() noexcept { return _gossiper.local(); }
size_t max_repair_memory() const { return _max_repair_memory; }
seastar::semaphore& memory_sem() { return _memory_sem; }
repair_module& get_repair_module() noexcept {
return *_repair_module;
tracker& repair_tracker() {
return _tracker;
}
const tracker& repair_tracker() const {
return _tracker;
}
const node_ops_metrics& get_metrics() const noexcept {

View File

@@ -114,11 +114,11 @@ have_gnutls = any([lib.startswith('libgnutls.so')
gzip_process = subprocess.Popen("pigz > "+output, shell=True, stdin=subprocess.PIPE)
ar = tarfile.open(fileobj=gzip_process.stdin, mode='w|')
# relocatable package format version = 3.0
# relocatable package format version = 2.2
shutil.rmtree(f'build/{SCYLLA_DIR}', ignore_errors=True)
os.makedirs(f'build/{SCYLLA_DIR}')
with open(f'build/{SCYLLA_DIR}/.relocatable_package_version', 'w') as f:
f.write('3.0\n')
f.write('2.2\n')
ar.add(f'build/{SCYLLA_DIR}/.relocatable_package_version', arcname='.relocatable_package_version')
for exe in executables_scylla:

View File

@@ -2720,6 +2720,7 @@ class lsa_object_descriptor(object):
value |= (b & 0x3f) << shift
return lsa_object_descriptor(value, start_pos, pos)
mig_re = re.compile(r'.* standard_migrator<(.*)>\+16>,')
vec_ext_re = re.compile(r'managed_vector<(.*), (.*u), (.*)>::external')
def __init__(self, value, desc_pos, obj_pos):
self.value = value
@@ -2732,12 +2733,10 @@ class lsa_object_descriptor(object):
def dead_size(self):
return self.value / 2
def migrator_ptr(self):
static_migrators = gdb.parse_and_eval("'::debug::static_migrators'")
return static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
def migrator(self):
return self.migrator_ptr().dereference()
static_migrators = gdb.parse_and_eval("'::debug::static_migrators'")
migrator = static_migrators['_migrators']['_M_impl']['_M_start'][self.value >> 1]
return migrator.dereference()
def migrator_str(self):
mig = str(self.migrator())
@@ -2745,11 +2744,29 @@ class lsa_object_descriptor(object):
return m.group(1)
def live_size(self):
mig = self.migrator_ptr()
obj = int(self.obj_pos)
cmd = f'((migrate_fn_type*){mig})->size((const void*){obj})'
res = gdb.parse_and_eval(cmd)
return int(res)
mig = str(self.migrator())
m = re.match(self.mig_re, mig)
if m:
type = m.group(1)
external = self.vec_ext_re.match(type)
if type == 'blob_storage':
t = gdb.lookup_type('blob_storage')
blob = self.obj_pos.cast(t.pointer())
return t.sizeof + blob['frag_size']
elif external:
element_type = external.group(1)
count = external.group(2)
size_type = external.group(3)
vec_type = gdb.lookup_type('managed_vector<%s, %s, %s>' % (element_type, count, size_type))
# gdb doesn't see 'external' for some reason
backref_ptr = self.obj_pos.cast(vec_type.pointer().pointer())
vec = backref_ptr.dereference()
element_count = vec['_capacity']
element_type = gdb.lookup_type(element_type)
return backref_ptr.type.sizeof + element_count * element_type.sizeof
else:
return gdb.lookup_type(type).sizeof
return 0
def end_pos(self):
if self.is_live():

View File

@@ -5077,7 +5077,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
service_permit permit) {
const dht::token& token = pr.start()->value().token();
speculative_retry::type retry_type = schema->speculative_retry().get_type();
std::optional<gms::inet_address> extra_replica;
gms::inet_address extra_replica;
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
// Check for a non-local read before heat-weighted load balancing
@@ -5141,12 +5141,12 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
if (target_replicas.size() == block_for) { // If RRD.DC_LOCAL extra replica may already be present
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
if (is_datacenter_local(cl) && !local_dc_filter(extra_replica)) {
slogger.trace("read executor no extra target to speculate");
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {
target_replicas.push_back(*extra_replica);
slogger.trace("creating read executor with extra target {}", *extra_replica);
target_replicas.push_back(extra_replica);
slogger.trace("creating read executor with extra target {}", extra_replica);
}
}

View File

@@ -618,7 +618,7 @@ future<> storage_service::mark_existing_views_as_built(sharded<db::system_distri
});
}
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace() {
std::vector<sstring> ignore_nodes_strs;
std::list<gms::inet_address> ignore_nodes;
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
@@ -628,11 +628,11 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto ep_and_id = tm.parse_host_id_and_endpoint(n);
ignore_nodes.push_back(ep_and_id.endpoint);
auto node = gms::inet_address(n);
ignore_nodes.push_back(node);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
return ignore_nodes;
@@ -2204,8 +2204,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_tokens) {
auto replace_address = get_replace_address().value();
auto uuid = utils::make_random_uuid();
auto tmptr = get_token_metadata_ptr();
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace(*tmptr);
std::list<gms::inet_address> ignore_nodes = get_ignore_dead_nodes_for_replace();
// Step 1: Decide who needs to sync data for replace operation
std::list<gms::inet_address> sync_nodes;
for (const auto& x :_gossiper.get_endpoint_states()) {
@@ -2249,7 +2248,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
throw std::runtime_error(msg);
}
if (!nodes_down.empty()) {
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 8d5ed9f4-7764-4dbd-bad8-43fddce94b7c,125ed9f4-7777-1dbn-mac8-43fddce9123e", uuid, nodes_down);
auto msg = format("replace[{}]: Nodes={} needed for replace operation are down. It is highly recommended to fix the down nodes and try again. To proceed with best-effort mode which might cause data inconsistency, add --ignore-dead-nodes-for-replace <list_of_dead_nodes>. E.g., scylla --ignore-dead-nodes-for-replace 127.0.0.1,127.0.0.2", uuid, nodes_down);
slogger.warn("{}", msg);
throw std::runtime_error(msg);
}
@@ -2329,11 +2328,12 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
}
}
future<> storage_service::removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
return run_with_api_lock(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable {
future<> storage_service::removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes) {
return run_with_api_lock(sstring("removenode"), [host_id_string, ignore_nodes = std::move(ignore_nodes)] (storage_service& ss) mutable {
return seastar::async([&ss, host_id_string, ignore_nodes = std::move(ignore_nodes)] {
auto uuid = utils::make_random_uuid();
auto tmptr = ss.get_token_metadata_ptr();
auto host_id = locator::host_id(utils::UUID(host_id_string));
auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id);
if (!endpoint_opt) {
throw std::runtime_error(format("removenode[{}]: Host ID not found in the cluster", uuid));
@@ -2341,11 +2341,6 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
auto endpoint = *endpoint_opt;
auto tokens = tmptr->get_tokens(endpoint);
auto leaving_nodes = std::list<gms::inet_address>{endpoint};
std::list<gms::inet_address> ignore_nodes;
for (auto& hoep : ignore_nodes_params) {
hoep.resolve(*tmptr);
ignore_nodes.push_back(hoep.endpoint);
}
// Step 1: Decide who needs to sync data
//
@@ -2539,7 +2534,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::removenode_heartbeat) {
slogger.debug("removenode[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2588,7 +2582,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid] () mutable { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::decommission_heartbeat) {
slogger.debug("decommission[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2633,7 +2626,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::replace_prepare_mark_alive) {
// Wait for local node has marked replacing node as alive
@@ -2688,7 +2680,6 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
},
[this, ops_uuid ] { node_ops_singal_abort(ops_uuid); });
meta.start().get();
_node_ops.emplace(ops_uuid, std::move(meta));
} else if (req.cmd == node_ops_cmd::bootstrap_heartbeat) {
slogger.debug("bootstrap[{}]: Updated heartbeat from coordinator={}", req.ops_uuid, coordinator);
@@ -2934,7 +2925,7 @@ future<> storage_service::removenode_with_stream(gms::inet_address leaving_node,
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) {
auto ops_uuid = utils::make_random_uuid();
auto ops = seastar::make_shared<node_ops_info>(ops_uuid, nullptr, std::list<gms::inet_address>());
auto ops = seastar::make_shared<node_ops_info>(node_ops_info{ops_uuid, nullptr, std::list<gms::inet_address>()});
return _repair.local().removenode_with_repair(get_token_metadata_ptr(), endpoint, ops).finally([this, notify_endpoint] () {
return send_replication_notification(notify_endpoint);
});
@@ -3667,19 +3658,11 @@ node_ops_meta_data::node_ops_meta_data(
, _abort(std::move(abort_func))
, _abort_source(seastar::make_shared<abort_source>())
, _signal(std::move(signal_func))
, _ops(seastar::make_shared<node_ops_info>(_ops_uuid, _abort_source, std::move(ignore_nodes)))
, _ops(seastar::make_shared<node_ops_info>({_ops_uuid, _abort_source, std::move(ignore_nodes)}))
, _watchdog([sig = _signal] { sig(); }) {
_watchdog.arm(_watchdog_interval);
}
future<> node_ops_meta_data::start() {
return _ops ? _ops->start() : make_ready_future<>();
}
future<> node_ops_meta_data::stop() noexcept {
return _ops ? _ops->stop() : make_ready_future<>();
}
future<> node_ops_meta_data::abort() {
slogger.debug("node_ops_meta_data: ops_uuid={} abort", _ops_uuid);
_watchdog.cancel();
@@ -3725,7 +3708,6 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
meta.cancel_watchdog();
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3733,24 +3715,6 @@ future<> storage_service::node_ops_done(utils::UUID ops_uuid) {
future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
slogger.debug("node_ops_abort: ops_uuid={}", ops_uuid);
auto permit = co_await seastar::get_units(_node_ops_abort_sem, 1);
if (!ops_uuid) {
for (auto& [uuid, meta] : _node_ops) {
co_await meta.abort();
auto as = meta.get_abort_source();
if (as && !as->abort_requested()) {
as->request_abort();
}
}
for (auto it = _node_ops.begin(); it != _node_ops.end(); it = _node_ops.erase(it)) {
node_ops_meta_data& meta = it->second;
co_await meta.stop();
}
co_return;
}
auto it = _node_ops.find(ops_uuid);
if (it != _node_ops.end()) {
node_ops_meta_data& meta = it->second;
@@ -3759,7 +3723,6 @@ future<> storage_service::node_ops_abort(utils::UUID ops_uuid) {
if (as && !as->abort_requested()) {
as->request_abort();
}
co_await meta.stop();
_node_ops.erase(it);
}
}
@@ -3778,18 +3741,17 @@ future<> storage_service::node_ops_abort_thread() {
while (!_node_ops_abort_queue.empty()) {
auto uuid_opt = _node_ops_abort_queue.front();
_node_ops_abort_queue.pop_front();
if (!uuid_opt) {
co_return;
}
try {
co_await node_ops_abort(uuid_opt.value_or(utils::null_uuid()));
co_await node_ops_abort(*uuid_opt);
} catch (...) {
slogger.warn("Failed to abort node operation ops_uuid={}: {}", *uuid_opt, std::current_exception());
}
if (!uuid_opt) {
slogger.info("Stopped node_ops_abort_thread");
co_return;
}
}
}
__builtin_unreachable();
slogger.info("Stopped node_ops_abort_thread");
}
} // namespace service

View File

@@ -105,8 +105,6 @@ public:
std::list<gms::inet_address> ignore_nodes,
std::function<future<> ()> abort_func,
std::function<void ()> signal_func);
future<> start();
future<> stop() noexcept;
shared_ptr<node_ops_info> get_ops_info();
shared_ptr<abort_source> get_abort_source();
future<> abort();
@@ -300,7 +298,7 @@ private:
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace(const locator::token_metadata& tm);
std::list<gms::inet_address> get_ignore_dead_nodes_for_replace();
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);
public:
@@ -706,7 +704,7 @@ public:
*
* @param hostIdString token for the node
*/
future<> removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes);
future<> removenode(sstring host_id_string, std::list<gms::inet_address> ignore_nodes);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);

View File

@@ -14,15 +14,14 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "db_clock.hh"
#include "log.hh"
#include "tasks/types.hh"
#include "utils/UUID.hh"
#include "utils/serialized_action.hh"
#include "utils/updateable_value.hh"
namespace tasks {
using task_id = utils::tagged_uuid<struct task_id_tag>;
using is_abortable = bool_class <struct abortable_tag>;
using is_internal = bool_class<struct internal_tag>;
extern logging::logger tmlogger;
@@ -65,6 +64,17 @@ public:
failed
};
struct parent_data {
task_id id;
unsigned shard;
parent_data() : id(task_id::create_null_id()) {}
operator bool() const noexcept {
return bool(id);
}
};
class task : public enable_lw_shared_from_this<task> {
public:
struct progress {
@@ -121,10 +131,6 @@ public:
return is_abortable::no;
}
virtual is_internal is_internal() const noexcept {
return is_internal::no;
}
virtual future<> abort() noexcept {
return make_ready_future<>();
}
@@ -237,10 +243,6 @@ public:
return _impl->is_abortable();
};
is_internal is_internal() const noexcept {
return _impl->is_internal();
}
future<> abort() noexcept {
return _impl->abort();
}
@@ -318,37 +320,37 @@ public:
co_await _gate.close();
_tm.unregister_module(_name);
}
public:
template<typename T>
requires std::is_base_of_v<task_manager::task::impl, T>
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_info parent_d = task_info{}) {
return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
auto module_ptr = tm.find_module(module);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) {
return task->id();
});
});
}
// Must be called on target shard.
// If task has a parent, data concerning its children is updated and sequence number is inherited
// from a parent and set. Otherwise, it must be set by caller.
future<task_ptr> make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
future<task_id> make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", parent_data parent_d = parent_data{}) {
foreign_task_ptr parent;
uint64_t sequence_number = 0;
if (parent_d) {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable {
parent = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id] (task_manager& tm) mutable -> future<foreign_task_ptr> {
const auto& all_tasks = tm.get_all_tasks();
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
it->second->add_child(std::move(task));
return make_ready_future<uint64_t>(it->second->get_sequence_number());
co_return it->second;
} else {
return make_exception_future<uint64_t>(task_manager::task_not_found(id));
co_return coroutine::return_exception(task_manager::task_not_found(id));
}
});
sequence_number = parent->get_sequence_number();
}
co_return task;
auto task = co_await _tm.container().invoke_on(shard, [id, module = _name, sequence_number, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
auto module_ptr = tm.find_module(module);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? sequence_number : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
return make_ready_future<foreign_task_ptr>(make_lw_shared<task_manager::task>(std::move(task_impl_ptr)));
});
id = task->id();
if (parent_d) {
co_await _tm.container().invoke_on(parent.get_owner_shard(), [task = std::move(parent), child = std::move(task)] (task_manager& tm) mutable {
task->add_child(std::move(child));
});
}
co_return id;
}
};
public:

View File

@@ -1,29 +0,0 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/UUID.hh"
namespace tasks {
using task_id = utils::tagged_uuid<struct task_id_tag>;
struct task_info {
task_id id;
unsigned shard;
task_info() noexcept : id(task_id::create_null_id()) {}
task_info(task_id id, unsigned parent_shard) noexcept : id(id), shard(parent_shard) {}
operator bool() const noexcept {
return bool(id);
}
};
}

View File

@@ -1,46 +0,0 @@
# Copyright 2022-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#############################################################################
# Tests involving the "timestamp" column type.
#
# Please do not confuse the "timestamp" column type tested here, with the
# unrelated concept of the "timestamp" of a write (the "USING TIMESTAMP").
# That latter concept is tested in test_timestamp.py, not here.
#############################################################################
from util import new_test_table, unique_key_int
import pytest
@pytest.fixture(scope="module")
def table1(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int primary key, t timestamp") as table:
yield table
# According to the Cassandra documentation, a timestamp is a 64-bit integer
# "representing a number of milliseconds since the standard base time known
# as the epoch". This underlying "number" can be stored into a timestamp
# column by writing an integer directly to it, and later the number can be
# extracted during select with the built-in tounixtimestamp() function.
#
# In issue #11588, a user who mistakenly used a microsecond count reported
# that the write was successful, but then the read failed reporting that
# "timestamp is out of range. Must be in milliseconds since epoch".
# This is wrong and unhelpful beahvior, and the following test reproduces
# it. Cassandra currently allows the out-of-range number to be both written
# and then read - so that is the Cassandra-compatible approach - but
# arguably a more correct and useful behavior would be to fail the write
# up-front, even before reaching the read.
@pytest.mark.xfail(reason="issue #11588")
def test_type_timestamp_overflow(cql, table1):
p = unique_key_int()
t = 1667215862 * 1000000
# t is out of the normal range for milliseconds since the epoch (it
# was calculated as microseconds since the epoch), but Cassandra allows
# to write it. Scylla may decide in the future to fail this write (in
# which case this test should be changed to accept both options, or
# be marked scylla_only), but passing the write and failing the read
# (as in #11588) is a bug.
cql.execute(f"INSERT INTO {table1} (p, t) VALUES ({p}, {t})")
assert list(cql.execute(f"SELECT tounixtimestamp(t) from {table1} where p = {p}")) == [(t,)]

View File

@@ -1436,21 +1436,6 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(5)), {}))
.produces_end_of_stream();
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
s.schema()->full_slice(),
default_priority_class(),
nullptr,
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::no))
.produces_partition_start(pkey)
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::after_key(s.make_ckey(0)),
position_in_partition::for_key(s.make_ckey(2))))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {}))
.produces_end_of_stream();
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
s.schema()->full_slice(),
default_priority_class(),

View File

@@ -33,7 +33,6 @@
#include "utils/managed_bytes.hh"
#include "utils/bit_cast.hh"
#include "utils/chunked_vector.hh"
#include "tasks/types.hh"
class tuple_type_impl;
class big_decimal;
@@ -1042,12 +1041,6 @@ shared_ptr<const abstract_type> data_type_for<cql_duration>() {
return duration_type;
}
template <>
inline
shared_ptr<const abstract_type> data_type_for<tasks::task_id>() {
return uuid_type;
}
namespace std {
template <>

View File

@@ -51,10 +51,9 @@ done
UNIFIED_PKG="$(realpath -s $UNIFIED_PKG)"
PKGS="build/$MODE/dist/tar/$PRODUCT-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-python3-$VERSION-$RELEASE.$(arch).tar.gz build/$MODE/dist/tar/$PRODUCT-jmx-$VERSION-$RELEASE.noarch.tar.gz build/$MODE/dist/tar/$PRODUCT-tools-$VERSION-$RELEASE.noarch.tar.gz"
BASEDIR="build/$MODE/unified/$PRODUCT-$VERSION"
rm -rf build/"$MODE"/unified/
mkdir -p "$BASEDIR"
mkdir -p build/"$MODE"/unified/
for pkg in $PKGS; do
if [ ! -e "$pkg" ]; then
echo "$pkg not found."
@@ -62,17 +61,17 @@ for pkg in $PKGS; do
exit 1
fi
pkg="$(readlink -f $pkg)"
tar -C "$BASEDIR" -xpf "$pkg"
tar -C build/"$MODE"/unified/ -xpf "$pkg"
dirname=$(basename "$pkg"| sed -e "s/-$VERSION_ESC-$RELEASE_ESC\.[^.]*\.tar\.gz//")
dirname=${dirname/#$PRODUCT/scylla}
if [ ! -d "$BASEDIR/$dirname" ]; then
if [ ! -d build/"$MODE"/unified/"$dirname" ]; then
echo "Directory $dirname not found in $pkg, the pacakge may corrupted."
exit 1
fi
done
ln -f unified/install.sh "$BASEDIR"
ln -f unified/uninstall.sh "$BASEDIR"
# relocatable package format version = 3.0
echo "3.0" > "$BASEDIR"/.relocatable_package_version
ln -f unified/install.sh build/"$MODE"/unified/
ln -f unified/uninstall.sh build/"$MODE"/unified/
# relocatable package format version = 2.2
echo "2.2" > build/"$MODE"/unified/.relocatable_package_version
cd build/"$MODE"/unified
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz "$PRODUCT-$VERSION"
tar cpf "$UNIFIED_PKG" --use-compress-program=pigz * .relocatable_package_version