Compare commits
133 Commits
copilot/fi
...
scylla-6.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc89aac9d0 | ||
|
|
2306c3b522 | ||
|
|
6d90ff84d9 | ||
|
|
80ac0da11c | ||
|
|
d4c3a43b34 | ||
|
|
31ba5561e7 | ||
|
|
7479167af2 | ||
|
|
e35ab96f8b | ||
|
|
1ace370ecd | ||
|
|
3e7de910ab | ||
|
|
9cf0d618d0 | ||
|
|
4810937ddf | ||
|
|
a3e4dc7b6c | ||
|
|
7a6ff12ace | ||
|
|
e38d675cb9 | ||
|
|
45ff4d2c41 | ||
|
|
0662e80917 | ||
|
|
5b546ad4b1 | ||
|
|
e04378fdf0 | ||
|
|
f8243cbf19 | ||
|
|
27f01bf4e3 | ||
|
|
ded9aca6ee | ||
|
|
ccd441a4de | ||
|
|
79e4e411b3 | ||
|
|
f8ba94a960 | ||
|
|
dfe89157c6 | ||
|
|
50d8fa6b77 | ||
|
|
a77615adf3 | ||
|
|
e518bb68b2 | ||
|
|
af2caeb2de | ||
|
|
d5ebfea1ff | ||
|
|
3fec9e1344 | ||
|
|
5d3dde50f4 | ||
|
|
b7fe4412d0 | ||
|
|
fd7284ec06 | ||
|
|
8d12eeee62 | ||
|
|
e11827f37e | ||
|
|
0acfc223ab | ||
|
|
c53cd98a41 | ||
|
|
fa6a7cf144 | ||
|
|
65021c4b1c | ||
|
|
341c29bd74 | ||
|
|
e963631859 | ||
|
|
c6f0a3267e | ||
|
|
f02f2fef40 | ||
|
|
f8ae38a68c | ||
|
|
8a064daccf | ||
|
|
7f540407c9 | ||
|
|
50e1369d1d | ||
|
|
21e860453c | ||
|
|
fc3d2d8fde | ||
|
|
1d34da21a9 | ||
|
|
377bc345f1 | ||
|
|
607be221b8 | ||
|
|
cb242ad48c | ||
|
|
7258f4f73c | ||
|
|
82d635b6a7 | ||
|
|
baf0385728 | ||
|
|
a373ed52a5 | ||
|
|
9a341a65af | ||
|
|
35b4b47d74 | ||
|
|
6d7388c689 | ||
|
|
6ac34f7acf | ||
|
|
bdf3e71f62 | ||
|
|
ec30bdc483 | ||
|
|
21f87c9cfa | ||
|
|
a38d5463ef | ||
|
|
3cb71c5b88 | ||
|
|
85805f6472 | ||
|
|
62a23fd86a | ||
|
|
b9c88fdf4b | ||
|
|
0c1b6fed16 | ||
|
|
fb7a33be13 | ||
|
|
b208953e07 | ||
|
|
803662351d | ||
|
|
cbf47319c1 | ||
|
|
64388bcf22 | ||
|
|
83dfe6bfd6 | ||
|
|
3c47ab9851 | ||
|
|
bef3777a5f | ||
|
|
b25dd2696f | ||
|
|
57d267a97e | ||
|
|
5b8523273b | ||
|
|
6497ed68ed | ||
|
|
39c1237e25 | ||
|
|
e04964ba17 | ||
|
|
fb5b9012e6 | ||
|
|
749197f0a4 | ||
|
|
1f4428153f | ||
|
|
544c424e89 | ||
|
|
73b59b244d | ||
|
|
5afa3028a3 | ||
|
|
885c7309ee | ||
|
|
adfad686b3 | ||
|
|
1a70db17a6 | ||
|
|
bd4b781dc8 | ||
|
|
51b8b04d97 | ||
|
|
242caa14fe | ||
|
|
cedb47d843 | ||
|
|
da816bf50c | ||
|
|
8bff078a89 | ||
|
|
68d12daa7b | ||
|
|
e1616a2970 | ||
|
|
62f5171a55 | ||
|
|
fd928601ad | ||
|
|
ae474f6897 | ||
|
|
099338b766 | ||
|
|
375610ace8 | ||
|
|
1b64e80393 | ||
|
|
fa330a6a4d | ||
|
|
68544d5bb3 | ||
|
|
bc711a169d | ||
|
|
0d0c037e1d | ||
|
|
4d616ccb8c | ||
|
|
25d3398b93 | ||
|
|
ed3ac1eea4 | ||
|
|
7229c820cf | ||
|
|
67878af591 | ||
|
|
2dbc555933 | ||
|
|
3b9c86dcf5 | ||
|
|
b6f3891282 | ||
|
|
46220bd839 | ||
|
|
55a45e3486 | ||
|
|
1dd522edc8 | ||
|
|
6d655e6766 | ||
|
|
54b9fdab03 | ||
|
|
13f8486cd7 | ||
|
|
747ffd8776 | ||
|
|
a87683c7be | ||
|
|
eff7b0d42d | ||
|
|
7dbcfe5a39 | ||
|
|
d078bafa00 | ||
|
|
1b4d5d02ef |
@@ -78,7 +78,7 @@ fi
|
||||
|
||||
# Default scylla product/version tags
|
||||
PRODUCT=scylla
|
||||
VERSION=5.5.0-dev
|
||||
VERSION=6.0.1
|
||||
|
||||
if test -f version
|
||||
then
|
||||
|
||||
@@ -4576,7 +4576,7 @@ static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_vie
|
||||
// used by default on new Alternator tables. Change this initialization
|
||||
// to 0 enable tablets by default, with automatic number of tablets.
|
||||
std::optional<unsigned> initial_tablets;
|
||||
if (sp.get_db().local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (sp.get_db().local().get_config().enable_tablets()) {
|
||||
auto it = tags_map.find(INITIAL_TABLETS_TAG_KEY);
|
||||
if (it != tags_map.end()) {
|
||||
// Tag set. If it's a valid number, use it. If not - e.g., it's
|
||||
|
||||
@@ -23,6 +23,8 @@ namespace tm = httpd::task_manager_json;
|
||||
using namespace json;
|
||||
using namespace seastar::httpd;
|
||||
|
||||
using task_variant = std::variant<tasks::task_manager::foreign_task_ptr, tasks::task_manager::task::task_essentials>;
|
||||
|
||||
inline bool filter_tasks(tasks::task_manager::task_ptr task, std::unordered_map<sstring, sstring>& query_params) {
|
||||
return (!query_params.contains("keyspace") || query_params["keyspace"] == task->get_status().keyspace) &&
|
||||
(!query_params.contains("table") || query_params["table"] == task->get_status().table);
|
||||
@@ -102,13 +104,14 @@ future<full_task_status> retrieve_status(const tasks::task_manager::foreign_task
|
||||
s.module = task->get_module_name();
|
||||
s.progress.completed = progress.completed;
|
||||
s.progress.total = progress.total;
|
||||
std::vector<std::string> ct{task->get_children().size()};
|
||||
boost::transform(task->get_children(), ct.begin(), [] (const auto& child) {
|
||||
std::vector<std::string> ct = co_await task->get_children().map_each_task<std::string>([] (const tasks::task_manager::foreign_task_ptr& child) {
|
||||
return child->id().to_sstring();
|
||||
}, [] (const tasks::task_manager::task::task_essentials& child) {
|
||||
return child.task_status.id.to_sstring();
|
||||
});
|
||||
s.children_ids = std::move(ct);
|
||||
co_return s;
|
||||
}
|
||||
};
|
||||
|
||||
void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>& tm, db::config& cfg) {
|
||||
tm::get_modules.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
@@ -193,7 +196,6 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
|
||||
try {
|
||||
task = co_await tasks::task_manager::invoke_on_task(tm, id, std::function([] (tasks::task_manager::task_ptr task) {
|
||||
return task->done().then_wrapped([task] (auto f) {
|
||||
task->unregister_task();
|
||||
// done() is called only because we want the task to be complete before getting its status.
|
||||
// The future should be ignored here as the result does not matter.
|
||||
f.ignore_ready_future();
|
||||
@@ -210,7 +212,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
|
||||
tm::get_task_status_recursively.set(r, [&_tm = tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
|
||||
auto& tm = _tm;
|
||||
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
|
||||
std::queue<tasks::task_manager::foreign_task_ptr> q;
|
||||
std::queue<task_variant> q;
|
||||
utils::chunked_vector<full_task_status> res;
|
||||
|
||||
tasks::task_manager::foreign_task_ptr task;
|
||||
@@ -230,10 +232,33 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
|
||||
q.push(co_await task.copy()); // Task cannot be moved since we need it to be alive during whole loop execution.
|
||||
while (!q.empty()) {
|
||||
auto& current = q.front();
|
||||
res.push_back(co_await retrieve_status(current));
|
||||
for (auto& child: current->get_children()) {
|
||||
q.push(co_await child.copy());
|
||||
}
|
||||
co_await std::visit(overloaded_functor {
|
||||
[&] (const tasks::task_manager::foreign_task_ptr& task) -> future<> {
|
||||
res.push_back(co_await retrieve_status(task));
|
||||
co_await task->get_children().for_each_task([&q] (const tasks::task_manager::foreign_task_ptr& child) -> future<> {
|
||||
q.push(co_await child.copy());
|
||||
}, [&] (const tasks::task_manager::task::task_essentials& child) {
|
||||
q.push(child);
|
||||
return make_ready_future();
|
||||
});
|
||||
},
|
||||
[&] (const tasks::task_manager::task::task_essentials& task) -> future<> {
|
||||
res.push_back(full_task_status{
|
||||
.task_status = task.task_status,
|
||||
.type = task.type,
|
||||
.progress = task.task_progress,
|
||||
.parent_id = task.parent_id,
|
||||
.abortable = task.abortable,
|
||||
.children_ids = boost::copy_range<std::vector<std::string>>(task.failed_children | boost::adaptors::transformed([] (auto& child) {
|
||||
return child.task_status.id.to_sstring();
|
||||
}))
|
||||
});
|
||||
for (auto& child: task.failed_children) {
|
||||
q.push(child);
|
||||
}
|
||||
return make_ready_future();
|
||||
}
|
||||
}, current);
|
||||
q.pop();
|
||||
}
|
||||
|
||||
|
||||
@@ -89,14 +89,13 @@ void set_task_manager_test(http_context& ctx, routes& r, sharded<tasks::task_man
|
||||
std::string error = fail ? it->second : "";
|
||||
|
||||
try {
|
||||
co_await tasks::task_manager::invoke_on_task(tm, id, [fail, error = std::move(error)] (tasks::task_manager::task_ptr task) {
|
||||
co_await tasks::task_manager::invoke_on_task(tm, id, [fail, error = std::move(error)] (tasks::task_manager::task_ptr task) -> future<> {
|
||||
tasks::test_task test_task{task};
|
||||
if (fail) {
|
||||
test_task.finish_failed(std::make_exception_ptr(std::runtime_error(error)));
|
||||
co_await test_task.finish_failed(std::make_exception_ptr(std::runtime_error(error)));
|
||||
} else {
|
||||
test_task.finish();
|
||||
co_await test_task.finish();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
} catch (tasks::task_manager::task_not_found& e) {
|
||||
throw bad_param_exception(e.what());
|
||||
|
||||
@@ -24,7 +24,6 @@
|
||||
#include "service/raft/group0_state_machine.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
|
||||
namespace auth {
|
||||
@@ -41,14 +40,14 @@ constinit const std::string_view AUTH_PACKAGE_NAME("org.apache.cassandra.auth.")
|
||||
static logging::logger auth_log("auth");
|
||||
|
||||
bool legacy_mode(cql3::query_processor& qp) {
|
||||
return qp.auth_version < db::system_auth_keyspace::version_t::v2;
|
||||
return qp.auth_version < db::system_keyspace::auth_version_t::v2;
|
||||
}
|
||||
|
||||
std::string_view get_auth_ks_name(cql3::query_processor& qp) {
|
||||
if (legacy_mode(qp)) {
|
||||
return meta::legacy::AUTH_KS;
|
||||
}
|
||||
return db::system_auth_keyspace::NAME;
|
||||
return db::system_keyspace::NAME;
|
||||
}
|
||||
|
||||
// Func must support being invoked more than once.
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
*/
|
||||
|
||||
#include "auth/default_authorizer.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
|
||||
extern "C" {
|
||||
#include <crypt.h>
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include "db/config.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/functions/function_name.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "log.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -644,7 +643,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
}
|
||||
auto muts = co_await qp.get_mutations_internal(
|
||||
format("INSERT INTO {}.{} ({}) VALUES ({})",
|
||||
db::system_auth_keyspace::NAME,
|
||||
db::system_keyspace::NAME,
|
||||
cf_name,
|
||||
col_names_str,
|
||||
val_binders_str),
|
||||
@@ -659,7 +658,7 @@ future<> migrate_to_auth_v2(db::system_keyspace& sys_ks, ::service::raft_group0_
|
||||
}
|
||||
}
|
||||
co_yield co_await sys_ks.make_auth_version_mutation(ts,
|
||||
db::system_auth_keyspace::version_t::v2);
|
||||
db::system_keyspace::auth_version_t::v2);
|
||||
};
|
||||
co_await announce_mutations_with_batching(g0,
|
||||
start_operation_func,
|
||||
|
||||
@@ -618,3 +618,6 @@ maintenance_socket: ignore
|
||||
# replication_strategy_warn_list:
|
||||
# - SimpleStrategy
|
||||
# replication_strategy_fail_list:
|
||||
|
||||
# This enables tablets on newly created keyspaces
|
||||
enable_tablets: true
|
||||
|
||||
21
configure.py
21
configure.py
@@ -1015,7 +1015,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'cql3/result_set.cc',
|
||||
'cql3/prepare_context.cc',
|
||||
'db/consistency_level.cc',
|
||||
'db/system_auth_keyspace.cc',
|
||||
'db/system_keyspace.cc',
|
||||
'db/virtual_table.cc',
|
||||
'db/virtual_tables.cc',
|
||||
@@ -1358,6 +1357,7 @@ scylla_perfs = ['test/perf/perf_alternator.cc',
|
||||
'test/perf/perf_simple_query.cc',
|
||||
'test/perf/perf_sstable.cc',
|
||||
'test/perf/perf_tablets.cc',
|
||||
'test/perf/tablet_load_balancing.cc',
|
||||
'test/perf/perf.cc',
|
||||
'test/lib/alternator_test_env.cc',
|
||||
'test/lib/cql_test_env.cc',
|
||||
@@ -1753,33 +1753,32 @@ def configure_seastar(build_dir, mode, mode_config):
|
||||
|
||||
|
||||
def configure_abseil(build_dir, mode, mode_config):
|
||||
# for sanitizer cflags
|
||||
seastar_flags = query_seastar_flags(f'{outdir}/{mode}/seastar/seastar.pc',
|
||||
mode_config['build_seastar_shared_libs'],
|
||||
args.staticcxx)
|
||||
seastar_cflags = seastar_flags['seastar_cflags']
|
||||
abseil_cflags = mode_config['lib_cflags']
|
||||
cxx_flags = mode_config['cxxflags']
|
||||
if '-DSANITIZE' in cxx_flags:
|
||||
abseil_cflags += ' -fsanitize=address -fsanitize=undefined -fno-sanitize=vptr'
|
||||
|
||||
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
|
||||
|
||||
abseil_cflags = seastar_cflags + ' ' + modes[mode]['cxx_ld_flags']
|
||||
# We want to "undo" coverage for abseil if we have it enabled, as we are not
|
||||
# interested in the coverage of the abseil library. these flags were previously
|
||||
# added to cxx_ld_flags
|
||||
if args.coverage:
|
||||
for flag in COVERAGE_INST_FLAGS:
|
||||
abseil_cflags = abseil_cflags.replace(f' {flag}', '')
|
||||
cxx_flags = cxx_flags.replace(f' {flag}', '')
|
||||
|
||||
cxx_flags += ' ' + abseil_cflags.strip()
|
||||
cmake_mode = mode_config['cmake_build_type']
|
||||
abseil_cmake_args = [
|
||||
'-DCMAKE_BUILD_TYPE={}'.format(cmake_mode),
|
||||
'-DCMAKE_INSTALL_PREFIX={}'.format(build_dir + '/inst'), # just to avoid a warning from absl
|
||||
'-DCMAKE_C_COMPILER={}'.format(args.cc),
|
||||
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
|
||||
'-DCMAKE_CXX_FLAGS_{}={}'.format(cmake_mode.upper(), abseil_cflags),
|
||||
'-DCMAKE_CXX_FLAGS_{}={}'.format(cmake_mode.upper(), cxx_flags),
|
||||
'-DCMAKE_EXPORT_COMPILE_COMMANDS=ON',
|
||||
'-DCMAKE_CXX_STANDARD=20',
|
||||
'-DABSL_PROPAGATE_CXX_STD=ON',
|
||||
]
|
||||
|
||||
abseil_build_dir = os.path.join(build_dir, mode, 'abseil')
|
||||
abseil_cmd = ['cmake', '-G', 'Ninja', real_relpath('abseil', abseil_build_dir)] + abseil_cmake_args
|
||||
|
||||
os.makedirs(abseil_build_dir, exist_ok=True)
|
||||
|
||||
@@ -14,9 +14,11 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/forward_service.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "cql3/CqlParser.hpp"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
@@ -42,16 +44,22 @@ const sstring query_processor::CQL_VERSION = "3.3.1";
|
||||
const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60);
|
||||
|
||||
struct query_processor::remote {
|
||||
remote(service::migration_manager& mm, service::forward_service& fwd, service::raft_group0_client& group0_client)
|
||||
: mm(mm), forwarder(fwd), group0_client(group0_client) {}
|
||||
remote(service::migration_manager& mm, service::forward_service& fwd,
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client)
|
||||
: mm(mm), forwarder(fwd), ss(ss), group0_client(group0_client) {}
|
||||
|
||||
service::migration_manager& mm;
|
||||
service::forward_service& forwarder;
|
||||
service::storage_service& ss;
|
||||
service::raft_group0_client& group0_client;
|
||||
|
||||
seastar::gate gate;
|
||||
};
|
||||
|
||||
bool query_processor::topology_global_queue_empty() {
|
||||
return remote().first.get().ss.topology_global_queue_empty();
|
||||
}
|
||||
|
||||
static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
@@ -498,8 +506,8 @@ query_processor::~query_processor() {
|
||||
}
|
||||
|
||||
void query_processor::start_remote(service::migration_manager& mm, service::forward_service& forwarder,
|
||||
service::raft_group0_client& group0_client) {
|
||||
_remote = std::make_unique<struct remote>(mm, forwarder, group0_client);
|
||||
service::storage_service& ss, service::raft_group0_client& group0_client) {
|
||||
_remote = std::make_unique<struct remote>(mm, forwarder, ss, group0_client);
|
||||
}
|
||||
|
||||
future<> query_processor::stop_remote() {
|
||||
@@ -1018,16 +1026,29 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat
|
||||
|
||||
cql3::cql_warnings_vec warnings;
|
||||
|
||||
auto request_id = guard->new_group0_state_id();
|
||||
stmt.global_req_id = request_id;
|
||||
|
||||
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, options, guard->write_timestamp());
|
||||
warnings = std::move(cql_warnings);
|
||||
|
||||
ce = std::move(ret);
|
||||
if (!m.empty()) {
|
||||
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
|
||||
co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description);
|
||||
if (ce && ce->target == cql_transport::event::schema_change::target_type::TABLET_KEYSPACE) {
|
||||
co_await remote_.get().mm.announce<service::topology_change>(std::move(m), std::move(*guard), description);
|
||||
// TODO: eliminate timeout from alter ks statement on the cqlsh/driver side
|
||||
auto error = co_await remote_.get().ss.wait_for_topology_request_completion(request_id);
|
||||
co_await remote_.get().ss.wait_for_topology_not_busy();
|
||||
if (!error.empty()) {
|
||||
log.error("CQL statement \"{}\" with topology request_id \"{}\" failed with error: \"{}\"", stmt.raw_cql_statement, request_id, error);
|
||||
throw exceptions::request_execution_exception(exceptions::exception_code::INVALID, error);
|
||||
}
|
||||
} else {
|
||||
co_await remote_.get().mm.announce<service::schema_change>(std::move(m), std::move(*guard), description);
|
||||
}
|
||||
}
|
||||
|
||||
ce = std::move(ret);
|
||||
|
||||
// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
|
||||
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
|
||||
::shared_ptr<messages::result_message> result;
|
||||
|
||||
@@ -31,7 +31,6 @@
|
||||
#include "lang/wasm.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "types/types.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
|
||||
|
||||
namespace service {
|
||||
@@ -151,7 +150,8 @@ public:
|
||||
|
||||
~query_processor();
|
||||
|
||||
void start_remote(service::migration_manager&, service::forward_service&, service::raft_group0_client&);
|
||||
void start_remote(service::migration_manager&, service::forward_service&,
|
||||
service::storage_service& ss, service::raft_group0_client&);
|
||||
future<> stop_remote();
|
||||
|
||||
data_dictionary::database db() {
|
||||
@@ -176,7 +176,7 @@ public:
|
||||
|
||||
wasm::manager& wasm() { return _wasm; }
|
||||
|
||||
db::system_auth_keyspace::version_t auth_version;
|
||||
db::system_keyspace::auth_version_t auth_version;
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
||||
if (user) {
|
||||
@@ -461,6 +461,8 @@ public:
|
||||
|
||||
void reset_cache();
|
||||
|
||||
bool topology_global_queue_empty();
|
||||
|
||||
private:
|
||||
// Keep the holder until you stop using the `remote` services.
|
||||
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
|
||||
|
||||
@@ -8,11 +8,15 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <stdexcept>
|
||||
#include "alter_keyspace_statement.hh"
|
||||
#include "prepared_statement.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "data_dictionary/keyspace_metadata.hh"
|
||||
@@ -21,6 +25,8 @@
|
||||
#include "create_keyspace_statement.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
static logging::logger mylogger("alter_keyspace");
|
||||
|
||||
bool is_system_keyspace(std::string_view keyspace);
|
||||
|
||||
cql3::statements::alter_keyspace_statement::alter_keyspace_statement(sstring name, ::shared_ptr<ks_prop_defs> attrs)
|
||||
@@ -36,6 +42,20 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
|
||||
return state.has_keyspace_access(_name, auth::permission::ALTER);
|
||||
}
|
||||
|
||||
static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
|
||||
auto to_number = [] (const std::string_view rf) {
|
||||
int result;
|
||||
// We assume the passed string view represents a valid decimal number,
|
||||
// so we don't need the error code.
|
||||
(void) std::from_chars(rf.begin(), rf.end(), result);
|
||||
return result;
|
||||
};
|
||||
|
||||
// We want to ensure that each DC's RF is going to change by at most 1
|
||||
// because in that case the old and new quorums must overlap.
|
||||
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
|
||||
}
|
||||
|
||||
void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
|
||||
auto tmp = _name;
|
||||
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
|
||||
@@ -61,6 +81,17 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
}
|
||||
|
||||
auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());
|
||||
|
||||
if (ks.get_replication_strategy().uses_tablets()) {
|
||||
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
|
||||
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
|
||||
auto it = current_rfs.find(new_dc);
|
||||
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
|
||||
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
locator::replication_strategy_params params(new_ks->strategy_options(), new_ks->initial_tablets());
|
||||
auto new_rs = locator::abstract_replication_strategy::create_replication_strategy(new_ks->strategy_name(), params);
|
||||
if (new_rs->is_per_table() != ks.get_replication_strategy().is_per_table()) {
|
||||
@@ -83,20 +114,63 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
|
||||
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
|
||||
using namespace cql_transport;
|
||||
try {
|
||||
auto old_ksm = qp.db().find_keyspace(_name).metadata();
|
||||
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
|
||||
auto ks = qp.db().find_keyspace(_name);
|
||||
auto ks_md = ks.metadata();
|
||||
const auto& tm = *qp.proxy().get_token_metadata_ptr();
|
||||
const auto& feat = qp.proxy().features();
|
||||
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
|
||||
std::vector<mutation> muts;
|
||||
std::vector<sstring> warnings;
|
||||
auto ks_options = _attrs->get_all_options_flattened(feat);
|
||||
|
||||
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm, feat), ts);
|
||||
// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
|
||||
if (ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty()) {
|
||||
if (!qp.topology_global_queue_empty()) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
|
||||
}
|
||||
if (_attrs->get_replication_options().contains(ks_prop_defs::REPLICATION_FACTOR_KEY)) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
|
||||
exceptions::invalid_request_exception("'replication_factor' tag is not allowed when executing ALTER KEYSPACE with tablets, please list the DCs explicitly"));
|
||||
}
|
||||
qp.db().real_database().validate_keyspace_update(*ks_md_update);
|
||||
|
||||
service::topology_mutation_builder builder(ts);
|
||||
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
|
||||
builder.set_global_topology_request_id(this->global_req_id);
|
||||
builder.set_new_keyspace_rf_change_data(_name, ks_options);
|
||||
service::topology_change change{{builder.build()}};
|
||||
|
||||
auto topo_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
|
||||
boost::transform(change.mutations, std::back_inserter(muts), [topo_schema] (const canonical_mutation& cm) {
|
||||
return cm.to_mutation(topo_schema);
|
||||
});
|
||||
|
||||
service::topology_request_tracking_mutation_builder rtbuilder{utils::UUID{this->global_req_id}};
|
||||
rtbuilder.set("done", false)
|
||||
.set("start_time", db_clock::now());
|
||||
service::topology_change req_change{{rtbuilder.build()}};
|
||||
|
||||
auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
|
||||
boost::transform(req_change.mutations, std::back_inserter(muts), [topo_req_schema] (const canonical_mutation& cm) {
|
||||
return cm.to_mutation(topo_req_schema);
|
||||
});
|
||||
|
||||
target_type = event::schema_change::target_type::TABLET_KEYSPACE;
|
||||
} else {
|
||||
auto schema_mutations = service::prepare_keyspace_update_announcement(qp.db().real_database(), ks_md_update, ts);
|
||||
muts.insert(muts.begin(), schema_mutations.begin(), schema_mutations.end());
|
||||
}
|
||||
|
||||
using namespace cql_transport;
|
||||
auto ret = ::make_shared<event::schema_change>(
|
||||
event::schema_change::change_type::UPDATED,
|
||||
event::schema_change::target_type::KEYSPACE,
|
||||
target_type,
|
||||
keyspace());
|
||||
|
||||
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(m), std::vector<sstring>()));
|
||||
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(muts), warnings));
|
||||
} catch (data_dictionary::no_such_keyspace& e) {
|
||||
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
|
||||
}
|
||||
@@ -107,7 +181,6 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db
|
||||
return std::make_unique<prepared_statement>(make_shared<alter_keyspace_statement>(*this));
|
||||
}
|
||||
|
||||
static logging::logger mylogger("alter_keyspace");
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
|
||||
@@ -24,7 +24,6 @@ static std::map<sstring, sstring> prepare_options(
|
||||
const sstring& strategy_class,
|
||||
const locator::token_metadata& tm,
|
||||
std::map<sstring, sstring> options,
|
||||
std::optional<unsigned>& initial_tablets,
|
||||
const std::map<sstring, sstring>& old_options = {}) {
|
||||
options.erase(ks_prop_defs::REPLICATION_STRATEGY_CLASS_KEY);
|
||||
|
||||
@@ -72,6 +71,35 @@ static std::map<sstring, sstring> prepare_options(
|
||||
return options;
|
||||
}
|
||||
|
||||
ks_prop_defs::ks_prop_defs(std::map<sstring, sstring> options) {
|
||||
std::map<sstring, sstring> replication_opts, storage_opts, tablets_opts, durable_writes_opts;
|
||||
|
||||
auto read_property_into = [] (auto& map, const sstring& name, const sstring& value, const sstring& tag) {
|
||||
map[name.substr(sstring(tag).size() + 1)] = value;
|
||||
};
|
||||
|
||||
for (const auto& [name, value] : options) {
|
||||
if (name.starts_with(KW_DURABLE_WRITES)) {
|
||||
read_property_into(durable_writes_opts, name, value, KW_DURABLE_WRITES);
|
||||
} else if (name.starts_with(KW_REPLICATION)) {
|
||||
read_property_into(replication_opts, name, value, KW_REPLICATION);
|
||||
} else if (name.starts_with(KW_TABLETS)) {
|
||||
read_property_into(tablets_opts, name, value, KW_TABLETS);
|
||||
} else if (name.starts_with(KW_STORAGE)) {
|
||||
read_property_into(storage_opts, name, value, KW_STORAGE);
|
||||
}
|
||||
}
|
||||
|
||||
if (!replication_opts.empty())
|
||||
add_property(KW_REPLICATION, replication_opts);
|
||||
if (!storage_opts.empty())
|
||||
add_property(KW_STORAGE, storage_opts);
|
||||
if (!tablets_opts.empty())
|
||||
add_property(KW_TABLETS, tablets_opts);
|
||||
if (!durable_writes_opts.empty())
|
||||
add_property(KW_DURABLE_WRITES, durable_writes_opts.begin()->second);
|
||||
}
|
||||
|
||||
void ks_prop_defs::validate() {
|
||||
// Skip validation if the strategy class is already set as it means we've already
|
||||
// prepared (and redoing it would set strategyClass back to null, which we don't want)
|
||||
@@ -134,7 +162,7 @@ std::optional<unsigned> ks_prop_defs::get_initial_tablets(const sstring& strateg
|
||||
assert(!ret.has_value());
|
||||
return ret;
|
||||
} else {
|
||||
throw exceptions::configuration_exception(sstring("Tablets enabled value must be true or false; found ") + it->second);
|
||||
throw exceptions::configuration_exception(sstring("Tablets enabled value must be true or false; found: ") + enabled);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -159,10 +187,30 @@ std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
|
||||
return _strategy_class;
|
||||
}
|
||||
|
||||
bool ks_prop_defs::get_durable_writes() const {
|
||||
return get_boolean(KW_DURABLE_WRITES, true);
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
|
||||
std::map<sstring, sstring> all_options;
|
||||
|
||||
auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
|
||||
for (auto& option: options) {
|
||||
all_options[prefix + ":" + option.first] = option.second;
|
||||
}
|
||||
};
|
||||
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
|
||||
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
|
||||
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
|
||||
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);
|
||||
|
||||
return all_options;
|
||||
}
|
||||
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
|
||||
auto sc = get_replication_strategy_class().value();
|
||||
std::optional<unsigned> initial_tablets = get_initial_tablets(sc, feat.tablets);
|
||||
auto options = prepare_options(sc, tm, get_replication_options(), initial_tablets);
|
||||
auto options = prepare_options(sc, tm, get_replication_options());
|
||||
return data_dictionary::keyspace_metadata::new_keyspace(ks_name, sc,
|
||||
std::move(options), initial_tablets, get_boolean(KW_DURABLE_WRITES, true), get_storage_options());
|
||||
}
|
||||
@@ -171,13 +219,14 @@ lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata_u
|
||||
std::map<sstring, sstring> options;
|
||||
const auto& old_options = old->strategy_options();
|
||||
auto sc = get_replication_strategy_class();
|
||||
std::optional<unsigned> initial_tablets;
|
||||
if (sc) {
|
||||
initial_tablets = get_initial_tablets(*sc, old->initial_tablets().has_value());
|
||||
options = prepare_options(*sc, tm, get_replication_options(), initial_tablets, old_options);
|
||||
options = prepare_options(*sc, tm, get_replication_options(), old_options);
|
||||
} else {
|
||||
sc = old->strategy_name();
|
||||
options = old_options;
|
||||
}
|
||||
std::optional<unsigned> initial_tablets = get_initial_tablets(*sc, old->initial_tablets().has_value());
|
||||
if (!initial_tablets) {
|
||||
initial_tablets = old->initial_tablets();
|
||||
}
|
||||
|
||||
|
||||
@@ -49,11 +49,16 @@ public:
|
||||
private:
|
||||
std::optional<sstring> _strategy_class;
|
||||
public:
|
||||
ks_prop_defs() = default;
|
||||
explicit ks_prop_defs(std::map<sstring, sstring> options);
|
||||
|
||||
void validate();
|
||||
std::map<sstring, sstring> get_replication_options() const;
|
||||
std::optional<sstring> get_replication_strategy_class() const;
|
||||
std::optional<unsigned> get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const;
|
||||
data_dictionary::storage_options get_storage_options() const;
|
||||
bool get_durable_writes() const;
|
||||
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
|
||||
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
|
||||
};
|
||||
|
||||
@@ -63,6 +63,7 @@ protected:
|
||||
|
||||
public:
|
||||
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, const query_options& options, api::timestamp_type) const = 0;
|
||||
mutable utils::UUID global_req_id;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -390,6 +390,12 @@ struct fmt::formatter<data_dictionary::user_types_metadata> {
|
||||
};
|
||||
|
||||
auto fmt::formatter<data_dictionary::keyspace_metadata>::format(const data_dictionary::keyspace_metadata& m, fmt::format_context& ctx) const -> decltype(ctx.out()) {
|
||||
return fmt::format_to(ctx.out(), "KSMetaData{{name={}, strategyClass={}, strategyOptions={}, cfMetaData={}, durable_writes={}, userTypes={}}}",
|
||||
m.name(), m.strategy_name(), m.strategy_options(), m.cf_meta_data(), m.durable_writes(), m.user_types());
|
||||
fmt::format_to(ctx.out(), "KSMetaData{{name={}, strategyClass={}, strategyOptions={}, cfMetaData={}, durable_writes={}, tablets=",
|
||||
m.name(), m.strategy_name(), m.strategy_options(), m.cf_meta_data(), m.durable_writes());
|
||||
if (m.initial_tablets()) {
|
||||
fmt::format_to(ctx.out(), "{{\"initial\":{}}}", m.initial_tablets().value());
|
||||
} else {
|
||||
fmt::format_to(ctx.out(), "{{\"enabled\":false}}");
|
||||
}
|
||||
return fmt::format_to(ctx.out(), ", userTypes={}}}", m.user_types());
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ add_library(db STATIC)
|
||||
target_sources(db
|
||||
PRIVATE
|
||||
consistency_level.cc
|
||||
system_auth_keyspace.cc
|
||||
system_keyspace.cc
|
||||
virtual_table.cc
|
||||
virtual_tables.cc
|
||||
|
||||
@@ -991,7 +991,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
|
||||
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
|
||||
"bytes written to data file. Value must be between 0 and 1.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .1, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .2, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
|
||||
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
|
||||
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
|
||||
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")
|
||||
@@ -1157,6 +1157,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, service_levels_interval(this, "service_levels_interval_ms", liveness::LiveUpdate, value_status::Used, 10000, "Controls how often service levels module polls configuration table")
|
||||
, error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.")
|
||||
, topology_barrier_stall_detector_threshold_seconds(this, "topology_barrier_stall_detector_threshold_seconds", value_status::Used, 2, "Report sites blocking topology barrier if it takes longer than this.")
|
||||
, enable_tablets(this, "enable_tablets", value_status::Used, false, "Enable tablets for newly created keyspaces")
|
||||
, default_log_level(this, "default_log_level", value_status::Used)
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used)
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used)
|
||||
@@ -1347,7 +1348,6 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
|
||||
{"consistent-topology-changes", feature::UNUSED},
|
||||
{"broadcast-tables", feature::BROADCAST_TABLES},
|
||||
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
|
||||
{"tablets", feature::TABLETS},
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -111,7 +111,6 @@ struct experimental_features_t {
|
||||
ALTERNATOR_STREAMS,
|
||||
BROADCAST_TABLES,
|
||||
KEYSPACE_STORAGE_OPTIONS,
|
||||
TABLETS,
|
||||
};
|
||||
static std::map<sstring, feature> map(); // See enum_option.
|
||||
static std::vector<enum_option<experimental_features_t>> all();
|
||||
@@ -495,6 +494,7 @@ public:
|
||||
|
||||
named_value<std::vector<error_injection_at_startup>> error_injections_at_startup;
|
||||
named_value<double> topology_barrier_stall_detector_threshold_seconds;
|
||||
named_value<bool> enable_tablets;
|
||||
|
||||
static const sstring default_tls_priority;
|
||||
private:
|
||||
|
||||
@@ -409,6 +409,12 @@ bool manager::have_ep_manager(const std::variant<locator::host_id, gms::inet_add
|
||||
bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
|
||||
tracing::trace_state_ptr tr_state) noexcept
|
||||
{
|
||||
if (utils::get_local_injector().enter("reject_incoming_hints")) {
|
||||
manager_logger.debug("Rejecting a hint to {} / {} due to an error injection", host_id, ip);
|
||||
++_stats.dropped;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (stopping() || draining_all() || !started() || !can_hint_for(host_id)) {
|
||||
manager_logger.trace("Can't store a hint to {}", host_id);
|
||||
++_stats.dropped;
|
||||
@@ -618,12 +624,12 @@ bool manager::check_dc_for(endpoint_id ep) const noexcept {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
future<> manager::drain_for(endpoint_id host_id, gms::inet_address ip) noexcept {
|
||||
if (!started() || stopping() || draining_all()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint);
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", host_id);
|
||||
|
||||
const auto holder = seastar::gate::holder{_draining_eps_gate};
|
||||
// As long as we hold on to this lock, no migration of hinted handoff to host IDs
|
||||
@@ -642,7 +648,7 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
|
||||
std::exception_ptr eptr = nullptr;
|
||||
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(endpoint)) {
|
||||
if (_proxy.local_db().get_token_metadata().get_topology().is_me(host_id)) {
|
||||
set_draining_all();
|
||||
|
||||
try {
|
||||
@@ -657,28 +663,45 @@ future<> manager::drain_for(endpoint_id endpoint) noexcept {
|
||||
_ep_managers.clear();
|
||||
_hint_directory_manager.clear();
|
||||
} else {
|
||||
auto it = _ep_managers.find(endpoint);
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
|
||||
if (_uses_host_id) {
|
||||
return host_id;
|
||||
}
|
||||
// Before the whole cluster is migrated to the host-ID-based hinted handoff,
|
||||
// one hint directory may correspond to multiple target nodes. If *any* of them
|
||||
// leaves the cluster, we should drain the hint directory. This is why we need
|
||||
// to rely on this mapping here.
|
||||
const auto maybe_mapping = _hint_directory_manager.get_mapping(host_id, ip);
|
||||
if (maybe_mapping) {
|
||||
return maybe_mapping->first;
|
||||
}
|
||||
return std::nullopt;
|
||||
});
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(endpoint);
|
||||
_hint_directory_manager.remove_mapping(endpoint);
|
||||
if (maybe_host_id) {
|
||||
auto it = _ep_managers.find(*maybe_host_id);
|
||||
|
||||
if (it != _ep_managers.end()) {
|
||||
try {
|
||||
co_await drain_ep_manager(it->second);
|
||||
} catch (...) {
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
|
||||
// We can't provide the function with `it` here because we co_await above,
|
||||
// so iterators could have been invalidated.
|
||||
// This never throws.
|
||||
_ep_managers.erase(*maybe_host_id);
|
||||
_hint_directory_manager.remove_mapping(*maybe_host_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (eptr) {
|
||||
manager_logger.error("Exception when draining {}: {}", endpoint, eptr);
|
||||
manager_logger.error("Exception when draining {}: {}", host_id, eptr);
|
||||
}
|
||||
|
||||
manager_logger.trace("drain_for: finished draining {}", endpoint);
|
||||
manager_logger.trace("drain_for: finished draining {}", host_id);
|
||||
}
|
||||
|
||||
void manager::update_backlog(size_t backlog, size_t max_backlog) {
|
||||
|
||||
@@ -317,8 +317,9 @@ public:
|
||||
/// In both cases - removes the corresponding hints' directories after all hints have been drained and erases the
|
||||
/// corresponding hint_endpoint_manager objects.
|
||||
///
|
||||
/// \param endpoint node that left the cluster
|
||||
future<> drain_for(endpoint_id endpoint) noexcept;
|
||||
/// \param host_id host ID of the node that left the cluster
|
||||
/// \param ip the IP of the node that left the cluster
|
||||
future<> drain_for(endpoint_id host_id, gms::inet_address ip) noexcept;
|
||||
|
||||
void update_backlog(size_t backlog, size_t max_backlog);
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include "gms/feature_service.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "system_auth_keyspace.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "query-result-set.hh"
|
||||
#include "query-result-writer.hh"
|
||||
@@ -235,7 +234,6 @@ future<> save_system_schema(cql3::query_processor& qp) {
|
||||
co_await save_system_schema_to_keyspace(qp, schema_tables::NAME);
|
||||
// #2514 - make sure "system" is written to system_schema.keyspaces.
|
||||
co_await save_system_schema_to_keyspace(qp, system_keyspace::NAME);
|
||||
co_await save_system_schema_to_keyspace(qp, system_auth_keyspace::NAME);
|
||||
}
|
||||
|
||||
namespace v3 {
|
||||
@@ -1296,7 +1294,6 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
|
||||
schema_ptr s = keyspaces();
|
||||
// compare before/after schemas of the affected keyspaces only
|
||||
std::set<sstring> keyspaces;
|
||||
std::set<table_id> column_families;
|
||||
std::unordered_map<keyspace_name, table_selector> affected_tables;
|
||||
bool has_tablet_mutations = false;
|
||||
for (auto&& mutation : mutations) {
|
||||
@@ -1311,7 +1308,6 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
|
||||
}
|
||||
|
||||
keyspaces.emplace(std::move(keyspace_name));
|
||||
column_families.emplace(mutation.column_family_id());
|
||||
// We must force recalculation of schema version after the merge, since the resulting
|
||||
// schema may be a mix of the old and new schemas, with the exception of entries
|
||||
// that originate from group 0.
|
||||
|
||||
@@ -1,141 +0,0 @@
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include "system_auth_keyspace.hh"
|
||||
#include "system_keyspace.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "types/set.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
// all system auth tables use schema commitlog
|
||||
namespace {
|
||||
const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
|
||||
if (ks_name == system_auth_keyspace::NAME) {
|
||||
props.enable_schema_commitlog();
|
||||
}
|
||||
});
|
||||
} // anonymous namespace
|
||||
|
||||
namespace system_auth_keyspace {
|
||||
|
||||
// use the same gc setting as system_schema tables
|
||||
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
||||
// FIXME: in some cases time-based gc may cause data resurrection,
|
||||
// for more info see https://github.com/scylladb/scylladb/issues/15607
|
||||
static constexpr auto auth_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
|
||||
|
||||
schema_ptr roles() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLES), NAME, ROLES,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{},
|
||||
// regular columns
|
||||
{
|
||||
{"can_login", boolean_type},
|
||||
{"is_superuser", boolean_type},
|
||||
{"member_of", set_type_impl::get_instance(utf8_type, true)},
|
||||
{"salted_hash", utf8_type}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"roles for authentication and RBAC"
|
||||
);
|
||||
builder.set_gc_grace_seconds(auth_gc_grace);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr role_members() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_MEMBERS), NAME, ROLE_MEMBERS,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"member", utf8_type}},
|
||||
// regular columns
|
||||
{},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"joins users and their granted roles in RBAC"
|
||||
);
|
||||
builder.set_gc_grace_seconds(auth_gc_grace);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr role_attributes() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_ATTRIBUTES), NAME, ROLE_ATTRIBUTES,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"value", utf8_type}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"role permissions in RBAC"
|
||||
);
|
||||
builder.set_gc_grace_seconds(auth_gc_grace);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr role_permissions() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_PERMISSIONS), NAME, ROLE_PERMISSIONS,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"resource", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"permissions", set_type_impl::get_instance(utf8_type, true)}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"role permissions for CassandraAuthorizer"
|
||||
);
|
||||
builder.set_gc_grace_seconds(auth_gc_grace);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
std::vector<schema_ptr> all_tables() {
|
||||
return {roles(), role_members(), role_attributes(), role_permissions()};
|
||||
}
|
||||
|
||||
} // namespace system_auth_keyspace
|
||||
} // namespace db
|
||||
@@ -1,38 +0,0 @@
|
||||
/*
|
||||
* Modified by ScyllaDB
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include <vector>
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace system_auth_keyspace {
|
||||
enum class version_t: int64_t {
|
||||
v1 = 1,
|
||||
v2 = 2,
|
||||
};
|
||||
static constexpr auto NAME = "system_auth_v2";
|
||||
// tables
|
||||
static constexpr auto ROLES = "roles";
|
||||
static constexpr auto ROLE_MEMBERS = "role_members";
|
||||
static constexpr auto ROLE_ATTRIBUTES = "role_attributes";
|
||||
static constexpr auto ROLE_PERMISSIONS = "role_permissions";
|
||||
|
||||
|
||||
schema_ptr roles();
|
||||
schema_ptr role_members();
|
||||
schema_ptr role_attributes();
|
||||
schema_ptr role_permissions();
|
||||
|
||||
std::vector<schema_ptr> all_tables();
|
||||
}; // namespace system_auth_keyspace
|
||||
|
||||
} // namespace db
|
||||
@@ -18,7 +18,6 @@
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "system_keyspace.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "thrift/server.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
@@ -88,6 +87,10 @@ namespace {
|
||||
system_keyspace::SCYLLA_LOCAL,
|
||||
system_keyspace::COMMITLOG_CLEANUPS,
|
||||
system_keyspace::SERVICE_LEVELS_V2,
|
||||
system_keyspace::ROLES,
|
||||
system_keyspace::ROLE_MEMBERS,
|
||||
system_keyspace::ROLE_ATTRIBUTES,
|
||||
system_keyspace::ROLE_PERMISSIONS,
|
||||
system_keyspace::v3::CDC_LOCAL
|
||||
};
|
||||
if (ks_name == system_keyspace::NAME && tables.contains(cf_name)) {
|
||||
@@ -233,12 +236,15 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("request_id", timeuuid_type)
|
||||
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
||||
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
|
||||
.with_column("version", long_type, column_kind::static_column)
|
||||
.with_column("fence_version", long_type, column_kind::static_column)
|
||||
.with_column("transition_state", utf8_type, column_kind::static_column)
|
||||
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("global_topology_request", utf8_type, column_kind::static_column)
|
||||
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
|
||||
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
|
||||
.with_column("session", uuid_type, column_kind::static_column)
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
@@ -1139,6 +1145,103 @@ schema_ptr system_keyspace::service_levels_v2() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::roles() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLES), NAME, ROLES,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{},
|
||||
// regular columns
|
||||
{
|
||||
{"can_login", boolean_type},
|
||||
{"is_superuser", boolean_type},
|
||||
{"member_of", set_type_impl::get_instance(utf8_type, true)},
|
||||
{"salted_hash", utf8_type}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"roles for authentication and RBAC"
|
||||
);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::role_members() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_MEMBERS), NAME, ROLE_MEMBERS,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"member", utf8_type}},
|
||||
// regular columns
|
||||
{},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"joins users and their granted roles in RBAC"
|
||||
);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::role_attributes() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_ATTRIBUTES), NAME, ROLE_ATTRIBUTES,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"name", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"value", utf8_type}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"role permissions in RBAC"
|
||||
);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::role_permissions() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, ROLE_PERMISSIONS), NAME, ROLE_PERMISSIONS,
|
||||
// partition key
|
||||
{{"role", utf8_type}},
|
||||
// clustering key
|
||||
{{"resource", utf8_type}},
|
||||
// regular columns
|
||||
{
|
||||
{"permissions", set_type_impl::get_instance(utf8_type, true)}
|
||||
},
|
||||
// static columns
|
||||
{},
|
||||
// regular column name type
|
||||
utf8_type,
|
||||
// comment
|
||||
"role permissions for CassandraAuthorizer"
|
||||
);
|
||||
builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
|
||||
return builder.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::hints() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
|
||||
@@ -2130,10 +2233,16 @@ future<> system_keyspace::set_bootstrap_state(bootstrap_state state) {
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<schema_ptr> system_keyspace::auth_tables() {
|
||||
return {roles(), role_members(), role_attributes(), role_permissions()};
|
||||
}
|
||||
|
||||
std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
std::vector<schema_ptr> r;
|
||||
auto schema_tables = db::schema_tables::all_tables(schema_features::full());
|
||||
std::copy(schema_tables.begin(), schema_tables.end(), std::back_inserter(r));
|
||||
auto auth_tables = system_keyspace::auth_tables();
|
||||
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
|
||||
r.insert(r.end(), { built_indexes(), hints(), batchlog(), paxos(), local(),
|
||||
peers(), peer_events(), range_xfers(),
|
||||
compactions_in_progress(), compaction_history(),
|
||||
@@ -2149,14 +2258,11 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(),
|
||||
});
|
||||
|
||||
auto auth_tables = db::system_auth_keyspace::all_tables();
|
||||
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
|
||||
r.insert(r.end(), {broadcast_kv_store()});
|
||||
}
|
||||
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (cfg.enable_tablets()) {
|
||||
r.insert(r.end(), {tablets()});
|
||||
}
|
||||
|
||||
@@ -2691,17 +2797,17 @@ future<std::optional<mutation>> system_keyspace::get_group0_schema_version() {
|
||||
|
||||
static constexpr auto AUTH_VERSION_KEY = "auth_version";
|
||||
|
||||
future<system_auth_keyspace::version_t> system_keyspace::get_auth_version() {
|
||||
future<system_keyspace::auth_version_t> system_keyspace::get_auth_version() {
|
||||
auto str_opt = co_await get_scylla_local_param(AUTH_VERSION_KEY);
|
||||
if (!str_opt) {
|
||||
co_return db::system_auth_keyspace::version_t::v1;
|
||||
co_return auth_version_t::v1;
|
||||
}
|
||||
auto& str = *str_opt;
|
||||
if (str == "" || str == "1") {
|
||||
co_return db::system_auth_keyspace::version_t::v1;
|
||||
co_return auth_version_t::v1;
|
||||
}
|
||||
if (str == "2") {
|
||||
co_return db::system_auth_keyspace::version_t::v2;
|
||||
co_return auth_version_t::v2;
|
||||
}
|
||||
on_internal_error(slogger, fmt::format("unexpected auth_version in scylla_local got {}", str));
|
||||
}
|
||||
@@ -2719,7 +2825,7 @@ static service::query_state& internal_system_query_state() {
|
||||
return qs;
|
||||
};
|
||||
|
||||
future<mutation> system_keyspace::make_auth_version_mutation(api::timestamp_type ts, db::system_auth_keyspace::version_t version) {
|
||||
future<mutation> system_keyspace::make_auth_version_mutation(api::timestamp_type ts, db::system_keyspace::auth_version_t version) {
|
||||
static sstring query = format("INSERT INTO {}.{} (key, value) VALUES (?, ?);", db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);
|
||||
auto muts = co_await _qp.get_mutations_internal(query, internal_system_query_state(), ts, {AUTH_VERSION_KEY, std::to_string(int64_t(version))});
|
||||
if (muts.size() != 1) {
|
||||
@@ -2967,6 +3073,11 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations"));
|
||||
}
|
||||
|
||||
if (some_row.has("new_keyspace_rf_change_data")) {
|
||||
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
|
||||
ret.new_keyspace_rf_change_data = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
|
||||
}
|
||||
|
||||
if (!ret.committed_cdc_generations.empty()) {
|
||||
// Sanity check for CDC generation data consistency.
|
||||
auto gen_id = ret.committed_cdc_generations.back();
|
||||
@@ -2998,6 +3109,10 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
ret.global_request.emplace(req);
|
||||
}
|
||||
|
||||
if (some_row.has("global_topology_request_id")) {
|
||||
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -180,6 +179,12 @@ public:
|
||||
static constexpr auto TABLETS = "tablets";
|
||||
static constexpr auto SERVICE_LEVELS_V2 = "service_levels_v2";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
static constexpr auto ROLE_MEMBERS = "role_members";
|
||||
static constexpr auto ROLE_ATTRIBUTES = "role_attributes";
|
||||
static constexpr auto ROLE_PERMISSIONS = "role_permissions";
|
||||
|
||||
struct v3 {
|
||||
static constexpr auto BATCHES = "batches";
|
||||
static constexpr auto PAXOS = "paxos";
|
||||
@@ -267,6 +272,12 @@ public:
|
||||
static schema_ptr tablets();
|
||||
static schema_ptr service_levels_v2();
|
||||
|
||||
// auth
|
||||
static schema_ptr roles();
|
||||
static schema_ptr role_members();
|
||||
static schema_ptr role_attributes();
|
||||
static schema_ptr role_permissions();
|
||||
|
||||
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
|
||||
|
||||
future<> build_bootstrap_info();
|
||||
@@ -310,7 +321,9 @@ public:
|
||||
template <typename T>
|
||||
future<std::optional<T>> get_scylla_local_param_as(const sstring& key);
|
||||
|
||||
static std::vector<schema_ptr> auth_tables();
|
||||
static std::vector<schema_ptr> all_tables(const db::config& cfg);
|
||||
|
||||
future<> make(
|
||||
locator::effective_replication_map_factory&,
|
||||
replica::database&);
|
||||
@@ -577,11 +590,16 @@ public:
|
||||
// returns the corresponding mutation. Otherwise returns nullopt.
|
||||
future<std::optional<mutation>> get_group0_schema_version();
|
||||
|
||||
enum class auth_version_t: int64_t {
|
||||
v1 = 1,
|
||||
v2 = 2,
|
||||
};
|
||||
|
||||
// If the `auth_version` key in `system.scylla_local` is present (either live or tombstone),
|
||||
// returns the corresponding mutation. Otherwise returns nullopt.
|
||||
future<std::optional<mutation>> get_auth_version_mutation();
|
||||
future<mutation> make_auth_version_mutation(api::timestamp_type ts, db::system_auth_keyspace::version_t version);
|
||||
future<system_auth_keyspace::version_t> get_auth_version();
|
||||
future<mutation> make_auth_version_mutation(api::timestamp_type ts, auth_version_t version);
|
||||
future<auth_version_t> get_auth_version();
|
||||
|
||||
future<> sstables_registry_create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc);
|
||||
future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status);
|
||||
|
||||
@@ -1625,25 +1625,26 @@ get_view_natural_endpoint(
|
||||
}
|
||||
}
|
||||
|
||||
auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
|
||||
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
|
||||
if (use_legacy_self_pairing) {
|
||||
auto it = std::find(base_endpoints.begin(), base_endpoints.end(),
|
||||
view_endpoint);
|
||||
// If this base replica is also one of the view replicas, we use
|
||||
// ourselves as the view replica.
|
||||
if (view_endpoint == me) {
|
||||
if (view_endpoint == me && it != base_endpoints.end()) {
|
||||
return topology.my_address();
|
||||
}
|
||||
// We have to remove any endpoint which is shared between the base
|
||||
// and the view, as it will select itself and throw off the counts
|
||||
// otherwise.
|
||||
auto it = std::find(base_endpoints.begin(), base_endpoints.end(),
|
||||
view_endpoint);
|
||||
if (it != base_endpoints.end()) {
|
||||
base_endpoints.erase(it);
|
||||
} else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) {
|
||||
} else if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
|
||||
view_endpoints.push_back(view_endpoint);
|
||||
}
|
||||
} else {
|
||||
if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) {
|
||||
if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
|
||||
view_endpoints.push_back(view_endpoint);
|
||||
}
|
||||
}
|
||||
@@ -1658,7 +1659,7 @@ get_view_natural_endpoint(
|
||||
return {};
|
||||
}
|
||||
auto replica = view_endpoints[base_it - base_endpoints.begin()];
|
||||
return topology.get_node(replica).endpoint();
|
||||
return view_topology.get_node(replica).endpoint();
|
||||
}
|
||||
|
||||
static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
|
||||
@@ -1715,6 +1716,7 @@ future<> view_update_generator::mutate_MV(
|
||||
{
|
||||
auto base_ermp = base->table().get_effective_replication_map();
|
||||
static constexpr size_t max_concurrent_updates = 128;
|
||||
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
|
||||
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
|
||||
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> {
|
||||
auto view_token = dht::get_token(*mut.s, mut.fm.key());
|
||||
|
||||
@@ -85,7 +85,7 @@ redirects: setup
|
||||
# Preview commands
|
||||
.PHONY: preview
|
||||
preview: setup
|
||||
$(POETRY) run sphinx-autobuild -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml --host $(PREVIEW_HOST) --port 5500 --ignore *.csv --ignore *.yaml
|
||||
$(POETRY) run sphinx-autobuild -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml --host $(PREVIEW_HOST) --port 5500 --ignore *.csv --ignore *.json --ignore *.yaml
|
||||
|
||||
.PHONY: multiversionpreview
|
||||
multiversionpreview: multiversion
|
||||
|
||||
@@ -1,23 +1,19 @@
|
||||
import os
|
||||
import re
|
||||
import yaml
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import jinja2
|
||||
|
||||
from sphinx import addnodes
|
||||
from sphinx.application import Sphinx
|
||||
from sphinx.directives import ObjectDescription
|
||||
from sphinx.util import logging, ws_re
|
||||
from sphinx.util.display import status_iterator
|
||||
from sphinx.util.docfields import Field
|
||||
from sphinx.util.docutils import switch_source_input, SphinxDirective
|
||||
from sphinx.util.nodes import make_id, nested_parse_with_titles
|
||||
from sphinx.jinja2glue import BuiltinTemplateLoader
|
||||
from docutils import nodes
|
||||
from docutils.parsers.rst import directives
|
||||
from docutils.statemachine import StringList
|
||||
|
||||
from utils import maybe_add_filters
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DBConfigParser:
|
||||
@@ -152,51 +148,6 @@ class DBConfigParser:
|
||||
return DBConfigParser.all_properties[name]
|
||||
|
||||
|
||||
def readable_desc(description: str) -> str:
|
||||
"""
|
||||
This function is deprecated and maintained only for backward compatibility
|
||||
with previous versions. Use ``readable_desc_rst``instead.
|
||||
"""
|
||||
return (
|
||||
description.replace("\\n", "")
|
||||
.replace('<', '<')
|
||||
.replace('>', '>')
|
||||
.replace("\n", "<br>")
|
||||
.replace("\\t", "- ")
|
||||
.replace('"', "")
|
||||
)
|
||||
|
||||
|
||||
def readable_desc_rst(description):
|
||||
indent = ' ' * 3
|
||||
lines = description.split('\n')
|
||||
cleaned_lines = []
|
||||
|
||||
for line in lines:
|
||||
|
||||
cleaned_line = line.replace('\\n', '\n')
|
||||
|
||||
if line.endswith('"'):
|
||||
cleaned_line = cleaned_line[:-1] + ' '
|
||||
|
||||
cleaned_line = cleaned_line.lstrip()
|
||||
cleaned_line = cleaned_line.replace('"', '')
|
||||
|
||||
if cleaned_line != '':
|
||||
cleaned_line = indent + cleaned_line
|
||||
cleaned_lines.append(cleaned_line)
|
||||
|
||||
return ''.join(cleaned_lines)
|
||||
|
||||
|
||||
def maybe_add_filters(builder):
|
||||
env = builder.templates.environment
|
||||
if 'readable_desc' not in env.filters:
|
||||
env.filters['readable_desc'] = readable_desc
|
||||
|
||||
if 'readable_desc_rst' not in env.filters:
|
||||
env.filters['readable_desc_rst'] = readable_desc_rst
|
||||
|
||||
|
||||
class ConfigOption(ObjectDescription):
|
||||
has_content = True
|
||||
|
||||
188
docs/_ext/scylladb_metrics.py
Normal file
188
docs/_ext/scylladb_metrics.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
from sphinx import addnodes
|
||||
from sphinx.directives import ObjectDescription
|
||||
from sphinx.util.docfields import Field
|
||||
from sphinx.util.docutils import switch_source_input
|
||||
from sphinx.util.nodes import make_id
|
||||
from sphinx.util import logging, ws_re
|
||||
from docutils.parsers.rst import Directive, directives
|
||||
from docutils.statemachine import StringList
|
||||
from sphinxcontrib.datatemplates.directive import DataTemplateJSON
|
||||
from utils import maybe_add_filters
|
||||
|
||||
sys.path.insert(0, os.path.abspath("../../scripts"))
|
||||
import scripts.get_description as metrics
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MetricsProcessor:
|
||||
|
||||
MARKER = "::description"
|
||||
|
||||
def _create_output_directory(self, app, metrics_directory):
|
||||
output_directory = os.path.join(app.builder.srcdir, metrics_directory)
|
||||
os.makedirs(output_directory, exist_ok=True)
|
||||
return output_directory
|
||||
|
||||
def _process_single_file(self, file_path, destination_path, metrics_config_path):
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
if self.MARKER in content and not os.path.exists(destination_path):
|
||||
try:
|
||||
metrics_file = metrics.get_metrics_from_file(file_path, "scylla", metrics.get_metrics_information(metrics_config_path))
|
||||
with open(destination_path, 'w+', encoding='utf-8') as f:
|
||||
json.dump(metrics_file, f, indent=4)
|
||||
except SystemExit:
|
||||
LOGGER.info(f'Skipping file: {file_path}')
|
||||
except Exception as error:
|
||||
LOGGER.info(error)
|
||||
|
||||
def _process_metrics_files(self, repo_dir, output_directory, metrics_config_path):
|
||||
for root, _, files in os.walk(repo_dir):
|
||||
for file in files:
|
||||
if file.endswith(".cc"):
|
||||
file_path = os.path.join(root, file)
|
||||
file_name = os.path.splitext(file)[0] + ".json"
|
||||
destination_path = os.path.join(output_directory, file_name)
|
||||
self._process_single_file(file_path, destination_path, metrics_config_path)
|
||||
|
||||
def run(self, app, exception=None):
|
||||
repo_dir = os.path.abspath(os.path.join(app.srcdir, ".."))
|
||||
metrics_config_path = os.path.join(repo_dir, app.config.scylladb_metrics_config_path)
|
||||
output_directory = self._create_output_directory(app, app.config.scylladb_metrics_directory)
|
||||
|
||||
self._process_metrics_files(repo_dir, output_directory, metrics_config_path)
|
||||
|
||||
|
||||
class MetricsTemplateDirective(DataTemplateJSON):
|
||||
option_spec = DataTemplateJSON.option_spec.copy()
|
||||
option_spec["title"] = lambda x: x
|
||||
|
||||
def _make_context(self, data, config, env):
|
||||
context = super()._make_context(data, config, env)
|
||||
context["title"] = self.options.get("title")
|
||||
return context
|
||||
|
||||
def run(self):
|
||||
return super().run()
|
||||
|
||||
|
||||
class MetricsOption(ObjectDescription):
|
||||
has_content = True
|
||||
required_arguments = 1
|
||||
optional_arguments = 0
|
||||
final_argument_whitespace = False
|
||||
option_spec = {
|
||||
'type': directives.unchanged,
|
||||
'component': directives.unchanged,
|
||||
'key': directives.unchanged,
|
||||
'source': directives.unchanged,
|
||||
}
|
||||
|
||||
doc_field_types = [
|
||||
Field('type', label='Type', has_arg=False, names=('type',)),
|
||||
Field('component', label='Component', has_arg=False, names=('component',)),
|
||||
Field('key', label='Key', has_arg=False, names=('key',)),
|
||||
Field('source', label='Source', has_arg=False, names=('source',)),
|
||||
]
|
||||
|
||||
def handle_signature(self, sig: str, signode: addnodes.desc_signature):
|
||||
signode.clear()
|
||||
signode += addnodes.desc_name(sig, sig)
|
||||
return ws_re.sub(' ', sig)
|
||||
|
||||
@property
|
||||
def env(self):
|
||||
return self.state.document.settings.env
|
||||
|
||||
def _render(self, name, option_type, component, key, source):
|
||||
item = {'name': name, 'type': option_type, 'component': component, 'key': key, 'source': source }
|
||||
template = self.config.scylladb_metrics_option_template
|
||||
return self.env.app.builder.templates.render(template, item)
|
||||
|
||||
def transform_content(self, contentnode: addnodes.desc_content) -> None:
|
||||
name = self.arguments[0]
|
||||
option_type = self.options.get('type', '')
|
||||
component = self.options.get('component', '')
|
||||
key = self.options.get('key', '')
|
||||
source_file = self.options.get('source', '')
|
||||
_, lineno = self.get_source_info()
|
||||
source = f'scylladb_metrics:{lineno}:<{name}>'
|
||||
fields = StringList(self._render(name, option_type, component, key, source_file).splitlines(), source=source, parent_offset=lineno)
|
||||
with switch_source_input(self.state, fields):
|
||||
self.state.nested_parse(fields, 0, contentnode)
|
||||
|
||||
def add_target_and_index(self, name: str, sig: str, signode: addnodes.desc_signature) -> None:
|
||||
node_id = make_id(self.env, self.state.document, self.objtype, name)
|
||||
signode['ids'].append(node_id)
|
||||
self.state.document.note_explicit_target(signode)
|
||||
entry = f'{name}; metrics option'
|
||||
self.indexnode['entries'].append(('pair', entry, node_id, '', None))
|
||||
self.env.get_domain('std').note_object(self.objtype, name, node_id, location=signode)
|
||||
|
||||
class MetricsDirective(Directive):
|
||||
TEMPLATE = 'metrics.tmpl'
|
||||
required_arguments = 0
|
||||
optional_arguments = 1
|
||||
option_spec = {'template': directives.path}
|
||||
has_content = True
|
||||
|
||||
def _process_file(self, file, relative_path_from_current_rst):
|
||||
data_directive = MetricsTemplateDirective(
|
||||
name=self.name,
|
||||
arguments=[os.path.join(relative_path_from_current_rst, file)],
|
||||
options=self.options,
|
||||
content=self.content,
|
||||
lineno=self.lineno,
|
||||
content_offset=self.content_offset,
|
||||
block_text=self.block_text,
|
||||
state=self.state,
|
||||
state_machine=self.state_machine,
|
||||
)
|
||||
data_directive.options["template"] = self.options.get('template', self.TEMPLATE)
|
||||
data_directive.options["title"] = file.replace('_', ' ').replace('.json','').capitalize()
|
||||
return data_directive.run()
|
||||
|
||||
def _get_relative_path(self, output_directory, app, docname):
|
||||
current_rst_path = os.path.join(app.builder.srcdir, docname + ".rst")
|
||||
return os.path.relpath(output_directory, os.path.dirname(current_rst_path))
|
||||
|
||||
|
||||
def run(self):
|
||||
maybe_add_filters(self.state.document.settings.env.app.builder)
|
||||
app = self.state.document.settings.env.app
|
||||
docname = self.state.document.settings.env.docname
|
||||
metrics_directory = os.path.join(app.builder.srcdir, app.config.scylladb_metrics_directory)
|
||||
output = []
|
||||
try:
|
||||
relative_path_from_current_rst = self._get_relative_path(metrics_directory, app, docname)
|
||||
files = os.listdir(metrics_directory)
|
||||
for _, file in enumerate(files):
|
||||
output.extend(self._process_file(file, relative_path_from_current_rst))
|
||||
except Exception as error:
|
||||
LOGGER.info(error)
|
||||
return output
|
||||
|
||||
def setup(app):
|
||||
app.add_config_value("scylladb_metrics_directory", default="_data/metrics", rebuild="html")
|
||||
app.add_config_value("scylladb_metrics_config_path", default='scripts/metrics-config.yml', rebuild="html")
|
||||
app.add_config_value('scylladb_metrics_option_template', default='metrics_option.tmpl', rebuild='html', types=[str])
|
||||
app.connect("builder-inited", MetricsProcessor().run)
|
||||
app.add_object_type(
|
||||
'metrics_option',
|
||||
'metrics_option',
|
||||
objname='metrics option')
|
||||
app.add_directive_to_domain('std', 'metrics_option', MetricsOption, override=True)
|
||||
app.add_directive("metrics_option", MetricsOption)
|
||||
app.add_directive("scylladb_metrics", MetricsDirective)
|
||||
|
||||
|
||||
return {
|
||||
"version": "0.1",
|
||||
"parallel_read_safe": True,
|
||||
"parallel_write_safe": True,
|
||||
}
|
||||
|
||||
44
docs/_ext/utils.py
Normal file
44
docs/_ext/utils.py
Normal file
@@ -0,0 +1,44 @@
|
||||
def readable_desc(description: str) -> str:
|
||||
"""
|
||||
This function is deprecated and maintained only for backward compatibility
|
||||
with previous versions. Use ``readable_desc_rst``instead.
|
||||
"""
|
||||
return (
|
||||
description.replace("\\n", "")
|
||||
.replace('<', '<')
|
||||
.replace('>', '>')
|
||||
.replace("\n", "<br>")
|
||||
.replace("\\t", "- ")
|
||||
.replace('"', "")
|
||||
)
|
||||
|
||||
|
||||
def readable_desc_rst(description):
|
||||
indent = ' ' * 3
|
||||
lines = description.split('\n')
|
||||
cleaned_lines = []
|
||||
|
||||
for line in lines:
|
||||
|
||||
cleaned_line = line.replace('\\n', '\n')
|
||||
|
||||
if line.endswith('"'):
|
||||
cleaned_line = cleaned_line[:-1] + ' '
|
||||
|
||||
cleaned_line = cleaned_line.lstrip()
|
||||
cleaned_line = cleaned_line.replace('"', '')
|
||||
|
||||
if cleaned_line != '':
|
||||
cleaned_line = indent + cleaned_line
|
||||
cleaned_lines.append(cleaned_line)
|
||||
|
||||
return ''.join(cleaned_lines)
|
||||
|
||||
|
||||
def maybe_add_filters(builder):
|
||||
env = builder.templates.environment
|
||||
if 'readable_desc' not in env.filters:
|
||||
env.filters['readable_desc'] = readable_desc
|
||||
|
||||
if 'readable_desc_rst' not in env.filters:
|
||||
env.filters['readable_desc_rst'] = readable_desc_rst
|
||||
2
docs/_static/css/custom.css
vendored
2
docs/_static/css/custom.css
vendored
@@ -41,6 +41,6 @@ dl dt:hover > a.headerlink {
|
||||
visibility: visible;
|
||||
}
|
||||
|
||||
dl.confval {
|
||||
dl.confval, dl.metrics_option {
|
||||
border-bottom: 1px solid #cacaca;
|
||||
}
|
||||
|
||||
19
docs/_templates/metrics.tmpl
vendored
Normal file
19
docs/_templates/metrics.tmpl
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
.. -*- mode: rst -*-
|
||||
|
||||
{{title}}
|
||||
{{ '-' * title|length }}
|
||||
|
||||
{% if data %}
|
||||
{% for key, value in data.items() %}
|
||||
.. _metricsprop_{{ key }}:
|
||||
|
||||
.. metrics_option:: {{ key }}
|
||||
:type: {{value[0]}}
|
||||
:source: {{value[4]}}
|
||||
:component: {{value[2]}}
|
||||
:key: {{value[3]}}
|
||||
|
||||
{{value[1] | readable_desc_rst}}
|
||||
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
3
docs/_templates/metrics_option.tmpl
vendored
Normal file
3
docs/_templates/metrics_option.tmpl
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
{% if type %}* **Type:** ``{{ type }}``{% endif %}
|
||||
{% if component %}* **Component:** ``{{ component }}``{% endif %}
|
||||
{% if key %}* **Key:** ``{{ key }}``{% endif %}
|
||||
@@ -21,6 +21,9 @@
|
||||
# remove the Open Source vs. Enterprise Matrix from the Open Source docs
|
||||
|
||||
/stable/reference/versions-matrix-enterprise-oss.html: https://enterprise.docs.scylladb.com/stable/reference/versions-matrix-enterprise-oss.html
|
||||
# Remove the outdated Troubleshooting article
|
||||
|
||||
/stable/troubleshooting/error-messages/create-mv.html: /stable/troubleshooting/index.html
|
||||
|
||||
# Remove the Learn page (replaced with a link to a page in a different repo)
|
||||
|
||||
|
||||
@@ -117,9 +117,9 @@ request. Alternator can then validate the authenticity and authorization of
|
||||
each request using a known list of authorized key pairs.
|
||||
|
||||
In the current implementation, the user stores the list of allowed key pairs
|
||||
in the `system_auth_v2.roles` table: The access key ID is the `role` column, and
|
||||
in the `system.roles` table: The access key ID is the `role` column, and
|
||||
the secret key is the `salted_hash`, i.e., the secret key can be found by
|
||||
`SELECT salted_hash from system_auth_v2.roles WHERE role = ID;`.
|
||||
`SELECT salted_hash from system.roles WHERE role = ID;`.
|
||||
|
||||
<!--- REMOVE IN FUTURE VERSIONS - Remove the note below in version 6.1 -->
|
||||
|
||||
|
||||
BIN
docs/architecture/images/tablets-cluster.png
Normal file
BIN
docs/architecture/images/tablets-cluster.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 18 KiB |
BIN
docs/architecture/images/tablets-load-balancing.png
Normal file
BIN
docs/architecture/images/tablets-load-balancing.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 21 KiB |
@@ -4,6 +4,7 @@ ScyllaDB Architecture
|
||||
:titlesonly:
|
||||
:hidden:
|
||||
|
||||
Data Distribution with Tablets </architecture/tablets>
|
||||
ScyllaDB Ring Architecture <ringarchitecture/index/>
|
||||
ScyllaDB Fault Tolerance <architecture-fault-tolerance>
|
||||
Consistency Level Console Demo <console-CL-full-demo>
|
||||
@@ -13,6 +14,7 @@ ScyllaDB Architecture
|
||||
Raft Consensus Algorithm in ScyllaDB </architecture/raft>
|
||||
|
||||
|
||||
* :doc:`Data Distribution with Tablets </architecture/tablets/>` - Tablets in ScyllaDB
|
||||
* :doc:`ScyllaDB Ring Architecture </architecture/ringarchitecture/index/>` - High-Level view of ScyllaDB Ring Architecture
|
||||
* :doc:`ScyllaDB Fault Tolerance </architecture/architecture-fault-tolerance>` - Deep dive into ScyllaDB Fault Tolerance
|
||||
* :doc:`Consistency Level Console Demo </architecture/console-CL-full-demo>` - Console Demos of Consistency Level Settings
|
||||
|
||||
131
docs/architecture/tablets.rst
Normal file
131
docs/architecture/tablets.rst
Normal file
@@ -0,0 +1,131 @@
|
||||
=========================================
|
||||
Data Distribution with Tablets
|
||||
=========================================
|
||||
|
||||
A ScyllaDB cluster is a group of interconnected nodes. The data of the entire
|
||||
cluster has to be distributed as evenly as possible across those nodes.
|
||||
|
||||
ScyllaDB is designed to ensure a balanced distribution of data by storing data
|
||||
in tablets. When you add or remove nodes to scale your cluster, add or remove
|
||||
a datacenter, or replace a node, tablets are moved between the nodes to keep
|
||||
the same number on each node. In addition, tablets are balanced across shards
|
||||
in each node.
|
||||
|
||||
This article explains the concept of tablets and how they let you scale your
|
||||
cluster quickly and seamlessly.
|
||||
|
||||
Data Distribution
|
||||
-------------------
|
||||
|
||||
ScyllaDB distributes data by splitting tables into tablets. Each tablet has
|
||||
its replicas on different nodes, depending on the RF (replication factor). Each
|
||||
partition of a table is mapped to a single tablet in a deterministic way. When you
|
||||
query or update the data, ScyllaDB can quickly identify the tablet that stores
|
||||
the relevant partition.
|
||||
|
||||
The following example shows a 3-node cluster with a replication factor (RF) of
|
||||
3. The data is stored in a table (Table 1) with two rows. Both rows are mapped
|
||||
to one tablet (T1) with replicas on all three nodes.
|
||||
|
||||
.. image:: images/tablets-cluster.png
|
||||
|
||||
.. TODO - Add a section about tablet splitting when there are more triggers,
|
||||
like throughput. In 6.0, tablets only split when reaching a threshold size
|
||||
(the threshold is based on the average tablet data size).
|
||||
|
||||
Load Balancing
|
||||
==================
|
||||
|
||||
ScyllaDB autonomously moves tablets to balance the load. This process
|
||||
is managed by a load balancer mechanism and happens independently of
|
||||
the administrator. The tablet load balancer decides where to migrate
|
||||
the tablets, either within the same node to balance the shards or across
|
||||
the nodes to balance the global load in the cluster.
|
||||
|
||||
As a table grows, each tablet can split into two, creating a new tablet.
|
||||
The load balancer can migrate the split halves independently to different nodes
|
||||
or shards.
|
||||
|
||||
The load-balancing process takes place in the background and is performed
|
||||
without any service interruption.
|
||||
|
||||
Scaling Out
|
||||
=============
|
||||
|
||||
A tablet can be dynamically migrated to an existing node or a newly added
|
||||
empty node. Paired with consistent topology updates with Raft, tablets allow
|
||||
you to add multiple nodes simultaneously. After nodes are added to the cluster,
|
||||
existing nodes stream data to the new ones, and the system load eventually
|
||||
converges to an even distribution as the process completes.
|
||||
|
||||
With tablets enabled, manual cleanup is not required.
|
||||
Cleanup is performed automatically per tablet,
|
||||
making tablets-based streaming user-independent and safer.
|
||||
|
||||
In addition, tablet cleanup is lightweight and efficient, as it doesn't
|
||||
involve rewriting SStables on the existing nodes, which makes data ownership
|
||||
changes faster. This dramatically reduces
|
||||
the impact of cleanup on the performance of user queries.
|
||||
|
||||
The following diagrams show migrating tablets from heavily loaded nodes A and B
|
||||
to a new node.
|
||||
|
||||
.. image:: images/tablets-load-balancing.png
|
||||
|
||||
.. _tablets-enable-tablets:
|
||||
|
||||
Enabling Tablets
|
||||
-------------------
|
||||
|
||||
ScyllaDB now uses tablets by default for data distribution. This functionality is
|
||||
controlled by the :confval:`enable_tablets` option. However, tablets only work if
|
||||
enabled on all nodes within the cluster.
|
||||
|
||||
When creating a new keyspace with tablets enabled (the default), you can still disable
|
||||
them on a per-keyspace basis. The recommended ``NetworkTopologyStrategy`` for keyspaces
|
||||
remains *required* when using tablets.
|
||||
|
||||
You can create a keyspace with tablets
|
||||
disabled with the ``tablets = {'enabled': false}`` option:
|
||||
|
||||
.. code:: cql
|
||||
|
||||
CREATE KEYSPACE my_keyspace
|
||||
WITH replication = {
|
||||
'class': 'NetworkTopologyStrategy',
|
||||
'replication_factor': 3
|
||||
} AND tablets = {
|
||||
'enabled': false
|
||||
};
|
||||
|
||||
|
||||
|
||||
.. warning::
|
||||
|
||||
You cannot ALTER a keyspace to enable or disable tablets.
|
||||
The only way to update the tablet support for a keyspace is to DROP it
|
||||
(losing the schema and data) and then recreate it after redefining
|
||||
the keyspace schema with ``tablets = { 'enabled': false }`` or
|
||||
``tablets = { 'enabled': true }``.
|
||||
|
||||
Limitations and Unsupported Features
|
||||
--------------------------------------
|
||||
|
||||
The following ScyllaDB features are not supported if a keyspace has tablets
|
||||
enabled:
|
||||
|
||||
* Counters
|
||||
* Change Data Capture (CDC)
|
||||
* Lightweight Transactions (LWT)
|
||||
* Alternator (as it uses LWT)
|
||||
|
||||
If you plan to use any of the above features, CREATE your keyspace
|
||||
:ref:`with tablets disabled <tablets-enable-tablets>`.
|
||||
|
||||
Resharding in keyspaces with tablets enabled has the following limitations:
|
||||
|
||||
* ScyllaDB does not support reducing the number of shards after node restart.
|
||||
* ScyllaDB does not reshard data on node restart. Tablet replicas remain
|
||||
allocated to the old shards on restart and are subject to background
|
||||
load-balancing to additional shards after restart completes and the node
|
||||
starts serving CQL.
|
||||
@@ -44,7 +44,8 @@ extensions = [
|
||||
"scylladb_gcp_images",
|
||||
"scylladb_include_flag",
|
||||
"scylladb_dynamic_substitutions",
|
||||
"scylladb_swagger"
|
||||
"scylladb_swagger",
|
||||
"scylladb_metrics"
|
||||
]
|
||||
|
||||
# The suffix(es) of source filenames.
|
||||
@@ -127,6 +128,10 @@ scylladb_swagger_origin_api = "../api"
|
||||
scylladb_swagger_template = "swagger.tmpl"
|
||||
scylladb_swagger_inc_template = "swagger_inc.tmpl"
|
||||
|
||||
# -- Options for scylladb_metrics
|
||||
scylladb_metrics_directory = "_data/opensource/metrics"
|
||||
|
||||
|
||||
# -- Options for HTML output
|
||||
|
||||
# The theme to use for pages.
|
||||
|
||||
@@ -107,12 +107,6 @@ For example:
|
||||
WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : 1, 'DC2' : 3}
|
||||
AND durable_writes = true;
|
||||
|
||||
.. TODO Add a link to the description of minimum_keyspace_rf when the ScyllaDB options section is added to the docs.
|
||||
|
||||
You can configure the minimum acceptable replication factor using the ``minimum_keyspace_rf`` option.
|
||||
Attempting to create a keyspace with a replication factor lower than the value set with
|
||||
``minimum_keyspace_rf`` will return an error (the default value is 0).
|
||||
|
||||
The supported ``options`` are:
|
||||
|
||||
=================== ========== =========== ========= ===================================================================
|
||||
@@ -122,7 +116,7 @@ name kind mandatory default description
|
||||
details below).
|
||||
``durable_writes`` *simple* no true Whether to use the commit log for updates on this keyspace
|
||||
(disable this option at your own risk!).
|
||||
``tablets`` *map* no Experimental - enables tablets for this keyspace (see :ref:`tablets<tablets>`)
|
||||
``tablets`` *map* no Enables or disables tablets for the keyspace (see :ref:`tablets<tablets>`)
|
||||
=================== ========== =========== ========= ===================================================================
|
||||
|
||||
The ``replication`` property is mandatory and must at least contains the ``'class'`` sub-option, which defines the
|
||||
@@ -142,7 +136,12 @@ query latency. For a production ready strategy, see *NetworkTopologyStrategy* .
|
||||
========================= ====== ======= =============================================
|
||||
sub-option type since description
|
||||
========================= ====== ======= =============================================
|
||||
``'replication_factor'`` int all The number of replicas to store per range
|
||||
``'replication_factor'`` int all The number of replicas to store per range.
|
||||
|
||||
The replication factor should be equal to
|
||||
or lower than the number of nodes.
|
||||
Configuring a higher RF may prevent
|
||||
creating tables in that keyspace.
|
||||
========================= ====== ======= =============================================
|
||||
|
||||
.. note:: Using NetworkTopologyStrategy is recommended. Using SimpleStrategy will make it harder to add Data Center in the future.
|
||||
@@ -166,6 +165,11 @@ sub-option type description
|
||||
definitions or explicit datacenter settings.
|
||||
For example, to have three replicas per
|
||||
datacenter, supply this with a value of 3.
|
||||
|
||||
The replication factor configured for a DC
|
||||
should be equal to or lower than the number
|
||||
of nodes in that DC. Configuring a higher RF
|
||||
may prevent creating tables in that keyspace.
|
||||
===================================== ====== =============================================
|
||||
|
||||
Note that when ``ALTER`` ing keyspaces and supplying ``replication_factor``,
|
||||
@@ -213,39 +217,30 @@ An example that excludes a datacenter while using ``replication_factor``::
|
||||
|
||||
.. _tablets:
|
||||
|
||||
The ``tablets`` property :label-caution:`Experimental`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The ``tablets`` property
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ``tablets`` property is used to make keyspace replication tablets-based.
|
||||
It is only valid when ``experimental_features: tablets`` is specified in ``scylla.yaml`` (which
|
||||
in turn requires ``consistent_cluster_management: true``); it must be a power of two.
|
||||
The ``tablets`` property enables or disables tablets-based distribution
|
||||
for a keyspace.
|
||||
|
||||
Options:
|
||||
|
||||
===================================== ====== =============================================
|
||||
sub-option type description
|
||||
===================================== ====== =============================================
|
||||
``'enabled'`` bool Whether or not to enable tablets for keyspace
|
||||
``'enabled'`` bool Whether or not to enable tablets for a keyspace
|
||||
``'initial'`` int The number of tablets to start with
|
||||
===================================== ====== =============================================
|
||||
|
||||
By default if tablets cluster feature is enabled, any keyspace will be created with tablets
|
||||
enabled. The ``tablets`` option is used to opt-out a keyspace from tablets replication.
|
||||
By default, a keyspace is created with tablets enabled. The ``tablets`` option
|
||||
is used to opt out a keyspace from tablets-based distribution; see :ref:`Enabling Tablets <tablets-enable-tablets>`
|
||||
for details.
|
||||
|
||||
A good rule of thumb to calculate initial tablets is to divide the expected total storage used
|
||||
by tables in this keyspace by (``replication_factor`` * 5GB). For example, if you expect a 30TB
|
||||
table and have a replication factor of 3, divide 30TB by (3*5GB) for a result of 2000. Since the
|
||||
value must be a power of two, round up to 2048.
|
||||
|
||||
.. note::
|
||||
The calculation applies to every table in the keyspace independently; so it can only realistically be
|
||||
used for a keyspace containing a single table. It is expected that per-table controls will be available
|
||||
in the future.
|
||||
|
||||
.. caution::
|
||||
The ``initial`` option may change its definition or be completely removed as it is part
|
||||
of an experimental feature.
|
||||
|
||||
The calculation applies to every table in the keyspace.
|
||||
|
||||
An example that creates a keyspace with 2048 tablets per table::
|
||||
|
||||
@@ -257,6 +252,9 @@ An example that creates a keyspace with 2048 tablets per table::
|
||||
'initial': 2048
|
||||
};
|
||||
|
||||
|
||||
See :doc:`Data Distribution with Tablets </architecture/tablets>` for more information about tablets.
|
||||
|
||||
.. _use-statement:
|
||||
|
||||
USE
|
||||
@@ -289,6 +287,17 @@ For instance::
|
||||
|
||||
The supported options are the same as :ref:`creating a keyspace <create-keyspace-statement>`.
|
||||
|
||||
ALTER KEYSPACE with Tablets :label-caution:`Experimental`
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Modifying a keyspace with tablets enabled is possible and doesn't require any special CQL syntax. However, there are some limitations:
|
||||
|
||||
- The replication factor (RF) can be increased or decreased by at most 1 at a time. To reach the desired RF value, modify the RF repeatedly.
|
||||
- The ``ALTER`` statement rejects the ``replication_factor`` tag. List the DCs explicitly when altering a keyspace. See :ref:`NetworkTopologyStrategy <replication-strategy>`.
|
||||
- If there's any other ongoing global topology operation, executing the ``ALTER`` statement will fail (with an explicit and specific error) and needs to be repeated.
|
||||
- The ``ALTER`` statement may take longer than the regular query timeout, and even if it times out, it will continue to execute in the background.
|
||||
- The replication strategy cannot be modified, as keyspaces with tablets only support ``NetworkTopologyStrategy``.
|
||||
|
||||
.. _drop-keyspace-statement:
|
||||
|
||||
DROP KEYSPACE
|
||||
|
||||
@@ -341,7 +341,7 @@ The `--authenticator` command lines option allows to provide the authenticator c
|
||||
|
||||
#### `--authorizer AUTHORIZER`
|
||||
|
||||
The `--authorizer` command lines option allows to provide the authorizer class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthorizer` which allows any action to any user. The second option is using the `CassandraAuthorizer` parameter, which stores permissions in `system_auth_v2.permissions` table.
|
||||
The `--authorizer` command lines option allows to provide the authorizer class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthorizer` which allows any action to any user. The second option is using the `CassandraAuthorizer` parameter, which stores permissions in `system.permissions` table.
|
||||
|
||||
**Since: 2.3**
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ There are two system tables that are used to facilitate the service level featur
|
||||
### Service Level Attachment Table
|
||||
|
||||
```
|
||||
CREATE TABLE system_auth_v2.role_attributes (
|
||||
CREATE TABLE system.role_attributes (
|
||||
role text,
|
||||
attribute_name text,
|
||||
attribute_value text,
|
||||
@@ -23,7 +23,7 @@ So for example in order to find out which `service_level` is attached to role `r
|
||||
one can run the following query:
|
||||
|
||||
```
|
||||
SELECT * FROM system_auth_v2.role_attributes WHERE role='r' and attribute_name='service_level'
|
||||
SELECT * FROM system.role_attributes WHERE role='r' and attribute_name='service_level'
|
||||
|
||||
```
|
||||
|
||||
@@ -157,4 +157,4 @@ The command displays a table with: option name, effective service level the valu
|
||||
----------------------+-------------------------+-------------
|
||||
workload_type | sl2 | batch
|
||||
timeout | sl1 | 2s
|
||||
```
|
||||
```
|
||||
|
||||
63
docs/dev/task_manager.md
Normal file
63
docs/dev/task_manager.md
Normal file
@@ -0,0 +1,63 @@
|
||||
Task manager is a tool for tracking long-running background
|
||||
operations.
|
||||
|
||||
# Structure overview
|
||||
|
||||
Task manager is divided into modules, e.g. repair or compaction
|
||||
module, which keep track of operations of similar nature. Operations
|
||||
are tracked with tasks.
|
||||
|
||||
Each task covers a logical part of the operation, e.g repair
|
||||
of a keyspace or a table. Each operation is covered by a tree
|
||||
of tasks, e.g. global repair task is a parent of tasks covering
|
||||
a single keyspace, which are parents of table tasks.
|
||||
|
||||
# Time to live of a task
|
||||
|
||||
Root tasks are kept in task manager for `task_ttl` time after they are
|
||||
finished. `task_ttl` value can be set in node configuration with
|
||||
`--task-ttl-in-seconds` option or changed with task manager API
|
||||
(`/task_manager/ttl`).
|
||||
|
||||
A task which isn't a root is unregistered immediately after it is
|
||||
finished and its status is folded into its parent. When a task
|
||||
is being folded into its parent, info about each of its children is
|
||||
lost unless the child or any child's descendant failed.
|
||||
|
||||
# Internal
|
||||
|
||||
Tasks can be marked as `internal`, which means they are not listed
|
||||
by default. A task should be marked as internal if it has a parent
|
||||
or if it's supposed to be unregistered immediately after it's finished.
|
||||
|
||||
# Abortable
|
||||
|
||||
A flag which determines if a task can be aborted through API.
|
||||
|
||||
# Type vs scope
|
||||
|
||||
`type` of a task describes what operation is covered by a task,
|
||||
e.g. "major compaction".
|
||||
|
||||
`scope` of a task describes for which part of the operation
|
||||
the task is responsible, e.g. "shard".
|
||||
|
||||
# API
|
||||
|
||||
Documentation for task manager API is available under `api/api-doc/task_manager.json`.
|
||||
Briefly:
|
||||
- `/task_manager/list_modules` -
|
||||
lists module supported by task manager;
|
||||
- `/task_manager/list_module_tasks/{module}` -
|
||||
lists (by default non-internal) tasks in the module;
|
||||
- `/task_manager/task_status/{task_id}` -
|
||||
gets the task's status, unregisters the task if it's finished;
|
||||
- `/task_manager/abort_task/{task_id}` -
|
||||
aborts the task if it's abortable;
|
||||
- `/task_manager/wait_task/{task_id}` -
|
||||
waits for the task and gets its status;
|
||||
- `/task_manager/task_status_recursive/{task_id}` -
|
||||
gets statuses of the task and all its descendants in BFS
|
||||
order, unregisters the task;
|
||||
- `/task_manager/ttl` -
|
||||
sets new ttl, returns old value.
|
||||
@@ -549,7 +549,10 @@ CREATE TABLE system.topology (
|
||||
committed_cdc_generations set<tuple<timestamp, timeuuid>> static,
|
||||
unpublished_cdc_generations set<tuple<timestamp, timeuuid>> static,
|
||||
global_topology_request text static,
|
||||
global_topology_request_id timeuuid static,
|
||||
new_cdc_generation_data_uuid timeuuid static,
|
||||
new_keyspace_rf_change_ks_name text static,
|
||||
new_keyspace_rf_change_data frozen<map<text, text>> static,
|
||||
PRIMARY KEY (key, host_id)
|
||||
)
|
||||
```
|
||||
@@ -575,8 +578,11 @@ There are also a few static columns for cluster-global properties:
|
||||
- `committed_cdc_generations` - the IDs of the committed CDC generations
|
||||
- `unpublished_cdc_generations` - the IDs of the committed yet unpublished CDC generations
|
||||
- `global_topology_request` - if set, contains one of the supported global topology requests
|
||||
- `global_topology_request_id` - if set, contains global topology request's id, which is a new group0's state id
|
||||
- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the time UUID of the generation to be committed
|
||||
- `upgrade_state` - describes the progress of the upgrade to raft-based topology.
|
||||
- 'new_keyspace_rf_change_ks_name' - the name of the KS that is being the target of the scheduled ALTER KS statement
|
||||
- 'new_keyspace_rf_change_data' - the KS options to be used when executing the scheduled ALTER KS statement
|
||||
|
||||
# Join procedure
|
||||
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
You can `build ScyllaDB from source <https://github.com/scylladb/scylladb#build-prerequisites>`_ on other x86_64 or aarch64 platforms, without any guarantees.
|
||||
|
||||
+----------------------------+-------------+---------------+---------------+
|
||||
| Linux Distributions |Ubuntu | Debian | Rocky / |
|
||||
| | | | RHEL |
|
||||
+----------------------------+------+------+-------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 | 10 | 11 | 8 | 9 |
|
||||
+============================+======+======+=======+=======+=======+=======+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+-------+-------+-------+-------+
|
||||
| 5.4 | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+-------+-------+-------+-------+
|
||||
+----------------------------+--------------------+---------------+---------------+
|
||||
| Linux Distributions |Ubuntu | Debian | Rocky / |
|
||||
| | | | RHEL |
|
||||
+----------------------------+------+------+------+-------+-------+-------+-------+
|
||||
| ScyllaDB Version / Version |20.04 |22.04 |24.04 | 10 | 11 | 8 | 9 |
|
||||
+============================+======+======+======+=======+=======+=======+=======+
|
||||
| 6.0 | |v| | |v| | |v| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+-------+
|
||||
| 5.4 | |v| | |v| | |x| | |v| | |v| | |v| | |v| |
|
||||
+----------------------------+------+------+------+-------+-------+-------+-------+
|
||||
|
||||
* The recommended OS for ScyllaDB Open Source is Ubuntu 22.04.
|
||||
* All releases are available as a Docker container and EC2 AMI, GCP, and Azure images.
|
||||
|
||||
@@ -3,16 +3,31 @@ nodetool decommission
|
||||
|
||||
**decommission** - Deactivate a selected node by streaming its data to the next node in the ring.
|
||||
|
||||
.. note::
|
||||
|
||||
You cannot decomission a node if any existing node is down.
|
||||
|
||||
For example:
|
||||
|
||||
``nodetool decommission``
|
||||
|
||||
.. include:: /operating-scylla/_common/decommission_warning.rst
|
||||
|
||||
Use the ``nodetool netstats`` command to monitor the progress of the token reallocation.
|
||||
|
||||
.. note::
|
||||
|
||||
You cannot decomission a node if any existing node is down.
|
||||
|
||||
See :doc:`Remove a Node from a ScyllaDB Cluster (Down Scale) </operating-scylla/procedures/cluster-management/remove-node>`
|
||||
for procedure details.
|
||||
|
||||
Before you run ``nodetool decommission``:
|
||||
|
||||
* Review current disk space utilization on existing nodes and make sure the amount
|
||||
of data streamed from the node being removed can fit into the disk space available
|
||||
on the remaining nodes. If there is not enough disk space on the remaining nodes,
|
||||
the removal of a node will fail. Add more storage to remaining nodes **before**
|
||||
starting the removal procedure.
|
||||
* Make sure that the number of nodes remaining in the DC after you decommission a node
|
||||
will be the same or higher than the Replication Factor configured for the keyspace
|
||||
in this DC. If the number of remaining nodes is lower than the RF, the decommission
|
||||
request may fail.
|
||||
In such a case, ALTER the keyspace to reduce the RF before running ``nodetool decommission``.
|
||||
|
||||
|
||||
.. include:: nodetool-index.rst
|
||||
|
||||
@@ -2,14 +2,28 @@ Nodetool describering
|
||||
=====================
|
||||
|
||||
**describering** - :code:`<keyspace>`- Shows the partition ranges of a given keyspace.
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
nodetool describering nba
|
||||
|
||||
Example output (for three node cluster on AWS):
|
||||
If :doc:`tablets </architecture/tablets>` are enabled for your keyspace, you
|
||||
need to additionally specify the table name. The command will display the ring
|
||||
of the table.
|
||||
|
||||
.. code:: shell
|
||||
|
||||
nodetool describering <keyspace> <table>
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
nodetool describering nba player_name
|
||||
|
||||
|
||||
Example output (for a three-node cluster on AWS with tablets disabled):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
|
||||
@@ -21,9 +21,16 @@ is removed from the cluster or replaced.
|
||||
Prerequisites
|
||||
------------------------
|
||||
|
||||
Using ``removenode`` requires at least a quorum of nodes in a cluster to be available.
|
||||
If the quorum is lost, it must be restored before you change the cluster topology.
|
||||
See :doc:`Handling Node Failures </troubleshooting/handling-node-failures>` for details.
|
||||
* Using ``removenode`` requires at least a quorum of nodes in a cluster to be available.
|
||||
If the quorum is lost, it must be restored before you change the cluster topology.
|
||||
See :doc:`Handling Node Failures </troubleshooting/handling-node-failures>` for details.
|
||||
|
||||
* Make sure that the number of nodes remaining in the DC after you remove a node
|
||||
will be the same or higher than the Replication Factor configured for the keyspace
|
||||
in this DC. If the number of remaining nodes is lower than the RF, the removenode
|
||||
request may fail. In such a case, you should follow the procedure to
|
||||
:doc:`replace a dead node </operating-scylla/procedures/cluster-management/replace-dead-node>`
|
||||
instead of running ``nodetool removenode``.
|
||||
|
||||
Usage
|
||||
--------
|
||||
|
||||
@@ -29,11 +29,16 @@ With time, SSTables are compacted, but the hard link keeps a copy of each file.
|
||||
|
||||
| 1. Data can only be restored from a snapshot of the table schema, where data exists in a backup. Backup your schema with the following command:
|
||||
|
||||
| ``$: cqlsh -e "DESC SCHEMA" > <schema_name.cql>``
|
||||
| ``$: cqlsh -e "DESC SCHEMA WITH INTERNALS" > <schema_name.cql>``
|
||||
|
||||
For example:
|
||||
|
||||
| ``$: cqlsh -e "DESC SCHEMA" > db_schema.cql``
|
||||
| ``$: cqlsh -e "DESC SCHEMA WITH INTERNALS" > db_schema.cql``
|
||||
|
||||
.. warning::
|
||||
|
||||
To get a proper schema description, you need to use cqlsh at least in version ``6.0.19``. Restoring a schema backup created by
|
||||
an older version of cqlsh may lead to data resurrection or data loss. To check the version of your cqlsh, you can use ``cqlsh --version``.
|
||||
|
||||
|
|
||||
| 2. Take a snapshot, including every keyspace you want to backup.
|
||||
|
||||
@@ -17,8 +17,8 @@ limitations while applying the procedure:
|
||||
retry, or the node refuses to boot on subsequent attempts, consult the
|
||||
:doc:`Handling Membership Change Failures </operating-scylla/procedures/cluster-management/handling-membership-change-failures>`
|
||||
document.
|
||||
* The ``system_auth`` keyspace has not been upgraded to ``system_auth_v2``.
|
||||
* The ``system_auth`` keyspace has not been upgraded to ``system``.
|
||||
As a result, if ``authenticator`` is set to ``PasswordAuthenticator``, you must
|
||||
increase the replication factor of the ``system_auth`` keyspace. It is
|
||||
recommended to set ``system_auth`` replication factor to the number of nodes
|
||||
in each DC.
|
||||
in each DC.
|
||||
|
||||
@@ -156,7 +156,9 @@ Add New DC
|
||||
UN 54.160.174.243 109.54 KB 256 ? c7686ffd-7a5b-4124-858e-df2e61130aaa RACK1
|
||||
UN 54.235.9.159 109.75 KB 256 ? 39798227-9f6f-4868-8193-08570856c09a RACK1
|
||||
UN 54.146.228.25 128.33 KB 256 ? 7a4957a1-9590-4434-9746-9c8a6f796a0c RACK1
|
||||
|
||||
|
||||
.. TODO possibly provide additional information WRT how ALTER works with tablets
|
||||
|
||||
#. When all nodes are up and running ``ALTER`` the following Keyspaces in the new nodes:
|
||||
|
||||
* Keyspace created by the user (which needed to replicate to the new DC).
|
||||
|
||||
@@ -70,11 +70,46 @@ Step One: Determining Host IDs of Ghost Members
|
||||
If you cannot determine the ghost members' host ID using the suggestions above, use the method described below.
|
||||
|
||||
#. Make sure there are no ongoing membership changes.
|
||||
#. Execute the following CQL query on one of your nodes to obtain the host IDs of all token ring members:
|
||||
|
||||
#. Execute the following CQL query on one of your nodes to retrieve the Raft group 0 ID:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
select peer, host_id, up from system.cluster_status;
|
||||
select value from system.scylla_local where key = 'raft_group0_id'
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh> select value from system.scylla_local where key = 'raft_group0_id';
|
||||
|
||||
value
|
||||
--------------------------------------
|
||||
607fef80-c276-11ed-a6f6-3075f294cc65
|
||||
|
||||
#. Use the obtained Raft group 0 ID to query the set of all cluster members' host IDs (which includes the ghost members), by executing the following query:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
select server_id from system.raft_state where group_id = <group0_id>
|
||||
|
||||
replace ``<group0_id>`` with the group 0 ID that you obtained. For example:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
cqlsh> select server_id from system.raft_state where group_id = 607fef80-c276-11ed-a6f6-3075f294cc65;
|
||||
|
||||
server_id
|
||||
--------------------------------------
|
||||
26a9badc-6e96-4b86-a8df-5173e5ab47fe
|
||||
7991e7f5-692e-45a0-8ae5-438be5bc7c4f
|
||||
aff11c6d-fbe7-4395-b7ca-3912d7dba2c6
|
||||
|
||||
#. Execute the following CQL query to obtain the host IDs of all token ring members:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
select host_id, up from system.cluster_status;
|
||||
|
||||
For example:
|
||||
|
||||
@@ -83,25 +118,28 @@ If you cannot determine the ghost members' host ID using the suggestions above,
|
||||
cqlsh> select peer, host_id, up from system.cluster_status;
|
||||
|
||||
peer | host_id | up
|
||||
-----------+--------------------------------------+-------
|
||||
127.0.0.3 | 42405b3b-487e-4759-8590-ddb9bdcebdc5 | False
|
||||
127.0.0.1 | 4e3ee715-528f-4dc9-b10f-7cf294655a9e | True
|
||||
127.0.0.2 | 225a80d0-633d-45d2-afeb-a5fa422c9bd5 | True
|
||||
-----------+--------------------------------------+-------
|
||||
127.0.0.3 | null | False
|
||||
127.0.0.1 | 26a9badc-6e96-4b86-a8df-5173e5ab47fe | True
|
||||
127.0.0.2 | 7991e7f5-692e-45a0-8ae5-438be5bc7c4f | True
|
||||
|
||||
The output of this query is similar to the output of ``nodetool status``.
|
||||
|
||||
We included the ``up`` column to see which nodes are down.
|
||||
We included the ``up`` column to see which nodes are down and the ``peer`` column to see their IP addresses.
|
||||
|
||||
In this example, one of the 3 nodes tried to decommission but crashed while it was leaving the token ring. The node is in a partially left state and will refuse to restart, but other nodes still consider it as a normal member. We'll have to use ``removenode`` to clean up after it.
|
||||
In this example, one of the nodes tried to decommission and crashed as soon as it left the token ring but before it left the Raft group. Its entry will show up in ``system.cluster_status`` queries with ``host_id = null``, like above, until the cluster is restarted.
|
||||
|
||||
#. A host ID belongs to a ghost member if it appears in the ``system.cluster_status`` query but does not correspond to any remaining node in your cluster.
|
||||
#. A host ID belongs to a ghost member if:
|
||||
|
||||
* It appears in the ``system.raft_state`` query but not in the ``system.cluster_status`` query,
|
||||
* Or it appears in the ``system.cluster_status`` query but does not correspond to any remaining node in your cluster.
|
||||
|
||||
In our example, the ghost member's host ID was ``aff11c6d-fbe7-4395-b7ca-3912d7dba2c6`` because it appeared in the ``system.raft_state`` query but not in the ``system.cluster_status`` query.
|
||||
|
||||
If you're unsure whether a given row in the ``system.cluster_status`` query corresponds to a node in your cluster, you can connect to each node in the cluster and execute ``select host_id from system.local`` (or search the node's logs) to obtain that node's host ID, collecting the host IDs of all nodes in your cluster. Then check if each host ID from the ``system.cluster_status`` query appears in your collected set; if not, it's a ghost member.
|
||||
|
||||
A good rule of thumb is to look at the members marked as down (``up = False`` in ``system.cluster_status``) - ghost members are eventually marked as down by the remaining members of the cluster. But remember that a real member might also be marked as down if it was shutdown or partitioned away from the rest of the cluster. If in doubt, connect to each node and collect their host IDs, as described in the previous paragraph.
|
||||
|
||||
In our example, the ghost member's host ID is ``42405b3b-487e-4759-8590-ddb9bdcebdc5`` because it is the only member marked as down and we can verify that the other two rows appearing in ``system.cluster_status`` belong to the remaining 2 nodes in the cluster.
|
||||
|
||||
In some cases, even after a failed topology change, there may be no ghost members left - for example, if a bootstrapping node crashed very early in the procedure or a decommissioning node crashed after it committed the membership change but before it finalized its own shutdown steps.
|
||||
|
||||
If any ghost members are present, proceed to the next step.
|
||||
|
||||
@@ -190,11 +190,11 @@ In this case, the node's data will be cleaned after restart. To remedy this, you
|
||||
|
||||
#. Start Scylla Server
|
||||
|
||||
.. include:: /rst_include/scylla-commands-stop-index.rst
|
||||
.. include:: /rst_include/scylla-commands-start-index.rst
|
||||
|
||||
Sometimes the public/ private IP of instance is changed after restart. If so refer to the Replace Procedure_ above.
|
||||
|
||||
|
||||
.. _replace-node-upgrade-info:
|
||||
|
||||
.. scylladb_include_flag:: upgrade-warning-replace-node.rst
|
||||
.. scylladb_include_flag:: upgrade-warning-replace-node.rst
|
||||
|
||||
@@ -31,10 +31,10 @@ Procedure
|
||||
|
||||
cqlsh -u cassandra -p cassandra
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
|
||||
Before proceeding to the next step, we highly recommend creating a custom superuser
|
||||
to ensure security and prevent performance degradation.
|
||||
Before proceeding to the next step, we recommend creating a custom superuser
|
||||
to improve security.
|
||||
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
|
||||
|
||||
#. If you want to create users and roles, continue to :doc:`Enable Authorization </operating-scylla/security/enable-authorization>`.
|
||||
|
||||
@@ -6,12 +6,7 @@ The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``
|
||||
Users with the ``cassandra`` role have full access to the database and can run
|
||||
any CQL command on the database resources.
|
||||
|
||||
During login, the credentials for the default superuser ``cassandra`` are read with
|
||||
a consistency level of QUORUM, whereas those for all other roles are read at LOCAL_ONE.
|
||||
QUORUM may significantly impact performance, especially in multi-datacenter deployments.
|
||||
|
||||
To prevent performance degradation and ensure better security, we highly recommend creating
|
||||
a custom superuser. You should:
|
||||
To improve security, we recommend creating a custom superuser. You should:
|
||||
|
||||
#. Use the default ``cassandra`` superuser to log in.
|
||||
#. Create a custom superuser.
|
||||
|
||||
@@ -57,13 +57,13 @@ Set a Superuser
|
||||
The default ScyllaDB superuser role is ``cassandra`` with password ``cassandra``. Using the default
|
||||
superuser is unsafe and may significantly impact performance.
|
||||
|
||||
If you haven't created a custom superuser while enablint authentication, you should create a custom superuser
|
||||
If you haven't created a custom superuser while enabling authentication, you should create a custom superuser
|
||||
before creating additional roles.
|
||||
See :doc:`Creating a Custom Superuser </operating-scylla/security/create-superuser/>` for instructions.
|
||||
|
||||
.. warning::
|
||||
.. note::
|
||||
|
||||
We highly recommend creating a custom superuser to ensure security and avoid performance degradation.
|
||||
We recommend creating a custom superuser to improve security.
|
||||
|
||||
.. _roles:
|
||||
|
||||
|
||||
@@ -3,7 +3,24 @@ Reference
|
||||
===============
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 1
|
||||
:glob:
|
||||
:maxdepth: 2
|
||||
:hidden:
|
||||
|
||||
/reference/*
|
||||
AWS Images </reference/aws-images>
|
||||
Azure Images </reference/azure-images>
|
||||
GCP Images </reference/gcp-images>
|
||||
Configuration Parameters </reference/configuration-parameters>
|
||||
Glossary </reference/glossary>
|
||||
API Reference (BETA) </reference/api-reference>
|
||||
Metrics (BETA) </reference/metrics>
|
||||
|
||||
|
||||
* ScyllaDB images for AWS, Azure, and GCP.
|
||||
|
||||
* :doc:`AWS Images </reference/aws-images>`
|
||||
* :doc:`Azure Images </reference/azure-images>`
|
||||
* :doc:`GCP Images </reference/gcp-images>`
|
||||
* :doc:`Configuration Parameters </reference/configuration-parameters>` - ScyllaDB properties configurable in the ``scylla.yaml`` configuration file.
|
||||
* :doc:`Glossary </reference/glossary>` - ScyllaDB-related terms and definitions.
|
||||
* :doc:`API Reference (BETA) </reference/api-reference>`
|
||||
* :doc:`Metrics (BETA) </reference/metrics>`
|
||||
6
docs/reference/metrics.rst
Normal file
6
docs/reference/metrics.rst
Normal file
@@ -0,0 +1,6 @@
|
||||
==============
|
||||
Metrics (BETA)
|
||||
==============
|
||||
|
||||
.. scylladb_metrics::
|
||||
:template: metrics.tmpl
|
||||
@@ -1,95 +0,0 @@
|
||||
A Removed Node was not Removed Properly from the Seed Node List
|
||||
===============================================================
|
||||
|
||||
Phenonoma
|
||||
^^^^^^^^^
|
||||
|
||||
Failed to create :doc:`materialized view </cql/mv>` after node was removed from the cluster.
|
||||
|
||||
|
||||
Error message:
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
InvalidRequest: Error from server: code=2200 [Invalid query] message="Can't create materialized views until the whole cluster has been upgraded"
|
||||
|
||||
Problem
|
||||
^^^^^^^
|
||||
|
||||
A removed node was not removed properly from the seed node list.
|
||||
|
||||
Scylla Open Source 4.3 and later and Scylla Enterprise 2021.1 and later are seedless. See :doc:`Scylla Seed Nodes </kb/seed-nodes/>` for details.
|
||||
This problem may occur in an earlier version of Scylla.
|
||||
|
||||
How to Verify
|
||||
^^^^^^^^^^^^^
|
||||
|
||||
Scylla logs show the error message above.
|
||||
|
||||
To verify that the node wasn't remove properly use the :doc:`nodetool gossipinfo </operating-scylla/nodetool-commands/gossipinfo>` command
|
||||
|
||||
For example:
|
||||
|
||||
A three nodes cluster, with one node (54.62.0.101) removed.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
nodetool gossipinfo
|
||||
|
||||
/54.62.0.99
|
||||
generation:1172279348
|
||||
heartbeat:7212
|
||||
LOAD:2.0293227179E10
|
||||
INTERNAL_IP:10.240.0.83
|
||||
DC:E1
|
||||
STATUS:NORMAL,-872190912874367364312
|
||||
HOST_ID:12fdcf43-4642-53b1-a987-c0e825e4e10a
|
||||
RPC_ADDRESS:10.240.0.83
|
||||
RACK:R1
|
||||
|
||||
/54.62.0.100
|
||||
generation:1657463198
|
||||
heartbeat:8135
|
||||
LOAD:2.0114638716E12
|
||||
INTERNAL_IP:10.240.0.93
|
||||
DC:E1
|
||||
STATUS:NORMAL,-258152127640110957173
|
||||
HOST_ID:99acbh55-1013-24a1-a987-s1w718c1e01b
|
||||
RPC_ADDRESS:10.240.0.93
|
||||
RACK:R1
|
||||
|
||||
/54.62.0.101
|
||||
generation:1657463198
|
||||
heartbeat:7022
|
||||
LOAD:2.5173672157E48
|
||||
INTERNAL_IP:10.240.0.103
|
||||
DC:E1
|
||||
STATUS:NORMAL,-365481201980413697284
|
||||
HOST_ID:99acbh55-1301-55a1-a628-s4w254c1e01b
|
||||
RPC_ADDRESS:10.240.0.103
|
||||
RACK:R1
|
||||
|
||||
We can see that node ``54.62.0.101`` is still part of the cluster and needs to be removed.
|
||||
|
||||
Solution
|
||||
^^^^^^^^
|
||||
|
||||
Remove the relevant node from the other nodes seed list (under scylla.yaml) and restart the nodes one by one.
|
||||
|
||||
For example:
|
||||
|
||||
Seed list before remove the node
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
- seeds: "10.240.0.83,10.240.0.93,10.240.0.103"
|
||||
|
||||
Seed list after removing the node
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
- seeds: "10.240.0.83,10.240.0.93"
|
||||
|
||||
Restart Scylla nodes
|
||||
|
||||
.. include:: /rst_include/scylla-commands-restart-index.rst
|
||||
@@ -6,7 +6,6 @@ Error Messages
|
||||
|
||||
kb-fs-not-qualified-aio
|
||||
address-already-in-use
|
||||
create-mv
|
||||
schema-mismatch
|
||||
invalid-ssl-prot-error
|
||||
|
||||
@@ -20,8 +19,6 @@ Error Messages
|
||||
|
||||
* :doc:`"Address already in use" messages </troubleshooting/error-messages/address-already-in-use/>`
|
||||
|
||||
* :doc:`"Error from server: code=2200 [Invalid query] message="Can't create materialized views until the whole cluster has been upgraded" </troubleshooting/error-messages/create-mv/>`
|
||||
|
||||
* :doc:`Schema Mismatch </troubleshooting/error-messages/schema-mismatch>`
|
||||
|
||||
* :doc:`Invalid SSL Protocol </troubleshooting/error-messages/invalid-ssl-prot-error>`
|
||||
|
||||
@@ -78,16 +78,10 @@ You can follow the manual recovery procedure when:
|
||||
**irrecoverable** nodes. If possible, restart your nodes, and use the manual
|
||||
recovery procedure as a last resort.
|
||||
|
||||
.. note::
|
||||
.. warning::
|
||||
|
||||
Before proceeding, make sure that the irrecoverable nodes are truly dead, and not,
|
||||
for example, temporarily partitioned away due to a network failure. If it is
|
||||
possible for the 'dead' nodes to come back to life, they might communicate and
|
||||
interfere with the recovery procedure and cause unpredictable problems.
|
||||
|
||||
If you have no means of ensuring that these irrecoverable nodes won't come back
|
||||
to life and communicate with the rest of the cluster, setup firewall rules or otherwise
|
||||
isolate your alive nodes to reject any communication attempts from these dead nodes.
|
||||
The manual recovery procedure is not supported :doc:`if tablets are enabled on any of your keyspaces </architecture/tablets/>`.
|
||||
In such a case, you need to :doc:`restore from backup </operating-scylla/procedures/backup-restore/restore>`.
|
||||
|
||||
During the manual recovery procedure you'll enter a special ``RECOVERY`` mode, remove
|
||||
all faulty nodes (using the standard :doc:`node removal procedure </operating-scylla/procedures/cluster-management/remove-node/>`),
|
||||
@@ -97,15 +91,26 @@ perform the Raft upgrade procedure again, initializing the Raft algorithm from s
|
||||
The manual recovery procedure is applicable both to clusters that were not running Raft
|
||||
in the past and then had Raft enabled, and to clusters that were bootstrapped using Raft.
|
||||
|
||||
.. note::
|
||||
**Prerequisites**
|
||||
|
||||
Entering ``RECOVERY`` mode requires a node restart. Restarting an additional node while
|
||||
some nodes are already dead may lead to unavailability of data queries (assuming that
|
||||
you haven't lost it already). For example, if you're using the standard RF=3,
|
||||
CL=QUORUM setup, and you're recovering from a stuck of upgrade procedure because one
|
||||
of your nodes is dead, restarting another node will cause temporary data query
|
||||
unavailability (until the node finishes restarting). Prepare your service for
|
||||
downtime before proceeding.
|
||||
* Before proceeding, make sure that the irrecoverable nodes are truly dead, and not,
|
||||
for example, temporarily partitioned away due to a network failure. If it is
|
||||
possible for the 'dead' nodes to come back to life, they might communicate and
|
||||
interfere with the recovery procedure and cause unpredictable problems.
|
||||
|
||||
If you have no means of ensuring that these irrecoverable nodes won't come back
|
||||
to life and communicate with the rest of the cluster, setup firewall rules or otherwise
|
||||
isolate your alive nodes to reject any communication attempts from these dead nodes.
|
||||
|
||||
* Prepare your service for downtime before proceeding.
|
||||
Entering ``RECOVERY`` mode requires a node restart. Restarting an additional node while
|
||||
some nodes are already dead may lead to unavailability of data queries (assuming that
|
||||
you haven't lost it already). For example, if you're using the standard RF=3,
|
||||
CL=QUORUM setup, and you're recovering from a stuck upgrade procedure because one
|
||||
of your nodes is dead, restarting another node will cause temporary data query
|
||||
unavailability (until the node finishes restarting).
|
||||
|
||||
**Procedure**
|
||||
|
||||
#. Perform the following query on **every alive node** in the cluster, using e.g. ``cqlsh``:
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ Reset Authenticator Password
|
||||
============================
|
||||
|
||||
This procedure describes what to do when a user loses his password and can not reset it with a superuser role.
|
||||
The procedure requires cluster downtime and as a result, all of the ``system_auth_v2`` data is deleted.
|
||||
The procedure requires cluster downtime and as a result, all auth data is deleted.
|
||||
|
||||
.. scylladb_include_flag:: system-auth-name-info.rst
|
||||
|
||||
@@ -15,11 +15,11 @@ Procedure
|
||||
|
||||
sudo systemctl stop scylla-server
|
||||
|
||||
| 2. Remove your tables under ``/var/lib/scylla/data/system_auth_v2/``.
|
||||
| 2. Remove system tables starting with ``role`` prefix from ``/var/lib/scylla/data/system`` directory.
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
rm -rf /var/lib/scylla/data/ssystem_auth_v2/
|
||||
rm -rf /var/lib/scylla/data/system/role*
|
||||
|
||||
| 3. Start Scylla nodes.
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/index.html>`_ for suspending ScyllaDB Manager (only available for ScyllaDB Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
@@ -182,4 +182,4 @@ Start the node
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback is successful, move to the next node in the cluster.
|
||||
Check the upgrade instructions above for validation. Once you are sure the node rollback is successful, move to the next node in the cluster.
|
||||
|
||||
@@ -34,7 +34,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/index.html>`_ for suspending Scylla Manager (only available Scylla Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `Scylla Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `Scylla Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade steps
|
||||
=============
|
||||
|
||||
@@ -32,7 +32,7 @@ Apply the following procedure **serially** on each node. Do not move to the next
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add or remove nodes. See `sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending ScyllaDB Manager (only available for ScyllaDB Enterprise) scheduled or running repairs.
|
||||
* Not to apply schema changes
|
||||
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Montioring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
.. note:: Before upgrading, make sure to use the latest `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_ stack.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
@@ -85,7 +85,7 @@ The following libraries are available:
|
||||
More information
|
||||
----------------
|
||||
|
||||
`Scylla University: Change Data Capture (CDC) lesson <https://university.scylladb.com/courses/scylla-operations/lessons/change-data-capture-cdc/>`_ - Learn how to use CDC. Some of the topics covered are:
|
||||
`Scylla University: Change Data Capture (CDC) lesson <https://university.scylladb.com/courses/data-modeling/lessons/change-data-capture-cdc/>`_ - Learn how to use CDC. Some of the topics covered are:
|
||||
|
||||
* An overview of Change Data Capture, what exactly is it, what are some common use cases, what does it do, and an overview of how it works
|
||||
* How can that data be consumed? Different options for consuming the data changes including normal CQL, a layered approach, and integrators
|
||||
|
||||
@@ -65,15 +65,12 @@ locator::host_id endpoint_state::get_host_id() const noexcept {
|
||||
}
|
||||
|
||||
std::optional<locator::endpoint_dc_rack> endpoint_state::get_dc_rack() const {
|
||||
if (auto app_state = get_application_state_ptr(application_state::DC)) {
|
||||
std::optional<locator::endpoint_dc_rack> ret;
|
||||
ret->dc = app_state->value();
|
||||
if ((app_state = get_application_state_ptr(application_state::RACK))) {
|
||||
ret->rack = app_state->value();
|
||||
if (ret->dc.empty() || ret->rack.empty()) {
|
||||
on_internal_error_noexcept(logger, format("Node {} has empty dc={} or rack={}", get_host_id(), ret->dc, ret->rack));
|
||||
}
|
||||
return ret;
|
||||
if (const auto* dc_state = get_application_state_ptr(application_state::DC)) {
|
||||
const auto* rack_state = get_application_state_ptr(application_state::RACK);
|
||||
if (dc_state->value().empty() || !rack_state || rack_state->value().empty()) {
|
||||
on_internal_error_noexcept(logger, format("Node {} has empty dc={} or rack={}", get_host_id(), dc_state->value(), rack_state ? rack_state->value() : "(null)"));
|
||||
} else {
|
||||
return std::make_optional<locator::endpoint_dc_rack>(dc_state->value(), rack_state->value());
|
||||
}
|
||||
}
|
||||
return std::nullopt;
|
||||
|
||||
@@ -78,7 +78,7 @@ feature_config feature_config_from_db_config(const db::config& cfg, std::set<sst
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
||||
fcfg._disabled_features.insert("KEYSPACE_STORAGE_OPTIONS"s);
|
||||
}
|
||||
if (!cfg.check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (!cfg.enable_tablets()) {
|
||||
fcfg._disabled_features.insert("TABLETS"s);
|
||||
}
|
||||
if (!cfg.uuid_sstable_identifiers_enabled()) {
|
||||
|
||||
@@ -27,12 +27,16 @@ struct topology_change {
|
||||
std::vector<canonical_mutation> mutations;
|
||||
};
|
||||
|
||||
struct mixed_change {
|
||||
std::vector<canonical_mutation> mutations;
|
||||
};
|
||||
|
||||
struct write_mutations {
|
||||
std::vector<canonical_mutation> mutations;
|
||||
};
|
||||
|
||||
struct group0_command {
|
||||
std::variant<service::schema_change, service::broadcast_table_query, service::topology_change, service::write_mutations> change;
|
||||
std::variant<service::schema_change, service::broadcast_table_query, service::topology_change, service::write_mutations, service::mixed_change> change;
|
||||
canonical_mutation history_append;
|
||||
|
||||
std::optional<utils::UUID> prev_state_id;
|
||||
|
||||
@@ -53,7 +53,7 @@ using can_yield = utils::can_yield;
|
||||
|
||||
using replication_strategy_config_options = std::map<sstring, sstring>;
|
||||
struct replication_strategy_params {
|
||||
const replication_strategy_config_options& options;
|
||||
const replication_strategy_config_options options;
|
||||
std::optional<unsigned> initial_tablets;
|
||||
explicit replication_strategy_params(const replication_strategy_config_options& o, std::optional<unsigned> it) noexcept : options(o), initial_tablets(it) {}
|
||||
};
|
||||
|
||||
@@ -65,33 +65,42 @@ private:
|
||||
return trinfo ? trinfo->next : ti.replicas;
|
||||
}
|
||||
|
||||
future<> populate_table(const tablet_map& tmap, std::optional<host_id> host) {
|
||||
const topology& topo = _tm->get_topology();
|
||||
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> {
|
||||
for (auto&& replica : get_replicas_for_tablet_load(ti, tmap.get_tablet_transition_info(tid))) {
|
||||
if (host && *host != replica.host) {
|
||||
continue;
|
||||
}
|
||||
if (!_nodes.contains(replica.host)) {
|
||||
_nodes.emplace(replica.host, node_load{topo.find_node(replica.host)->get_shard_count()});
|
||||
}
|
||||
node_load& n = _nodes.at(replica.host);
|
||||
if (replica.shard < n._shards.size()) {
|
||||
n.load() += 1;
|
||||
n._shards[replica.shard].load += 1;
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
public:
|
||||
load_sketch(token_metadata_ptr tm)
|
||||
: _tm(std::move(tm)) {
|
||||
}
|
||||
|
||||
future<> populate(std::optional<host_id> host = std::nullopt) {
|
||||
const topology& topo = _tm->get_topology();
|
||||
future<> populate(std::optional<host_id> host = std::nullopt, std::optional<table_id> only_table = std::nullopt) {
|
||||
co_await utils::clear_gently(_nodes);
|
||||
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
||||
auto& tmap = tmap_;
|
||||
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) -> future<> {
|
||||
for (auto&& replica : get_replicas_for_tablet_load(ti, tmap.get_tablet_transition_info(tid))) {
|
||||
if (host && *host != replica.host) {
|
||||
continue;
|
||||
}
|
||||
if (!_nodes.contains(replica.host)) {
|
||||
_nodes.emplace(replica.host, node_load{topo.find_node(replica.host)->get_shard_count()});
|
||||
}
|
||||
node_load& n = _nodes.at(replica.host);
|
||||
if (replica.shard < n._shards.size()) {
|
||||
n.load() += 1;
|
||||
n._shards[replica.shard].load += 1;
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
|
||||
if (only_table) {
|
||||
auto& tmap = _tm->tablets().get_tablet_map(*only_table);
|
||||
co_await populate_table(tmap, host);
|
||||
} else {
|
||||
for (auto&& [table, tmap]: _tm->tablets().all_tables()) {
|
||||
co_await populate_table(tmap, host);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& n : _nodes) {
|
||||
std::make_heap(n.second._shards.begin(), n.second._shards.end(), shard_load_cmp());
|
||||
}
|
||||
@@ -135,6 +144,14 @@ public:
|
||||
return _nodes.at(node).load();
|
||||
}
|
||||
|
||||
uint64_t total_load() const {
|
||||
uint64_t total = 0;
|
||||
for (auto&& n : _nodes) {
|
||||
total += n.second.load();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
uint64_t get_avg_shard_load(host_id node) const {
|
||||
if (!_nodes.contains(node)) {
|
||||
return 0;
|
||||
@@ -143,20 +160,34 @@ public:
|
||||
return div_ceil(n.load(), n._shards.size());
|
||||
}
|
||||
|
||||
double get_real_avg_shard_load(host_id node) const {
|
||||
if (!_nodes.contains(node)) {
|
||||
return 0;
|
||||
}
|
||||
auto& n = _nodes.at(node);
|
||||
return double(n.load()) / n._shards.size();
|
||||
}
|
||||
|
||||
// Returns the difference in tablet count between highest-loaded shard and lowest-loaded shard.
|
||||
// Returns 0 when shards are perfectly balanced.
|
||||
// Returns 1 when shards are imbalanced, but it's not possible to balance them.
|
||||
uint64_t get_shard_imbalance(host_id node) const {
|
||||
if (!_nodes.contains(node)) {
|
||||
return 0; // Node has no tablets.
|
||||
}
|
||||
auto& n = _nodes.at(node);
|
||||
min_max_tracker<uint64_t> minmax;
|
||||
for (auto&& s : n._shards) {
|
||||
minmax.update(s.load);
|
||||
}
|
||||
auto minmax = get_shard_minmax(node);
|
||||
return minmax.max() - minmax.max();
|
||||
}
|
||||
|
||||
min_max_tracker<uint64_t> get_shard_minmax(host_id node) const {
|
||||
min_max_tracker<uint64_t> minmax;
|
||||
if (_nodes.contains(node)) {
|
||||
auto& n = _nodes.at(node);
|
||||
for (auto&& s: n._shards) {
|
||||
minmax.update(s.load);
|
||||
}
|
||||
} else {
|
||||
minmax.update(0);
|
||||
}
|
||||
return minmax;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
|
||||
@@ -245,9 +245,17 @@ dht::token_range tablet_map::get_token_range(tablet_id id) const {
|
||||
}
|
||||
}
|
||||
|
||||
host_id tablet_map::get_primary_replica(tablet_id id) const {
|
||||
const auto info = get_tablet_info(id);
|
||||
return info.replicas.at(size_t(id) % info.replicas.size()).host;
|
||||
tablet_replica tablet_map::get_primary_replica(tablet_id id) const {
|
||||
const auto& replicas = get_tablet_info(id).replicas;
|
||||
return replicas.at(size_t(id) % replicas.size());
|
||||
}
|
||||
|
||||
tablet_replica tablet_map::get_primary_replica_within_dc(tablet_id id, const topology& topo, sstring dc) const {
|
||||
const auto replicas = boost::copy_range<tablet_replica_set>(get_tablet_info(id).replicas | boost::adaptors::filtered([&] (const auto& tr) {
|
||||
const auto& node = topo.get_node(tr.host);
|
||||
return node.dc_rack().dc == dc;
|
||||
}));
|
||||
return replicas.at(size_t(id) % replicas.size());
|
||||
}
|
||||
|
||||
future<std::vector<token>> tablet_map::get_sorted_tokens() const {
|
||||
@@ -551,7 +559,7 @@ private:
|
||||
auto& topo = _tmptr->get_topology();
|
||||
for (auto&& replica : replicas) {
|
||||
auto* node = topo.find_node(replica.host);
|
||||
if (node) {
|
||||
if (node && !node->left()) {
|
||||
result.emplace_back(node->endpoint());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -360,7 +360,8 @@ public:
|
||||
dht::token_range get_token_range(tablet_id id) const;
|
||||
|
||||
/// Returns the primary replica for the tablet
|
||||
host_id get_primary_replica(tablet_id id) const;
|
||||
tablet_replica get_primary_replica(tablet_id id) const;
|
||||
tablet_replica get_primary_replica_within_dc(tablet_id id, const topology& topo, sstring dc) const;
|
||||
|
||||
/// Returns a vector of sorted last tokens for tablets.
|
||||
future<std::vector<token>> get_sorted_tokens() const;
|
||||
@@ -399,6 +400,10 @@ public:
|
||||
return _transitions;
|
||||
}
|
||||
|
||||
bool has_transitions() const {
|
||||
return !_transitions.empty();
|
||||
}
|
||||
|
||||
/// Returns an iterable range over tablet_id:s which includes all tablets in token ring order.
|
||||
auto tablet_ids() const {
|
||||
return boost::irange<size_t>(0, tablet_count()) | boost::adaptors::transformed([] (size_t i) {
|
||||
|
||||
31
main.cc
31
main.cc
@@ -1313,7 +1313,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}
|
||||
|
||||
netw::messaging_service::scheduling_config scfg;
|
||||
scfg.statement_tenants = { {dbcfg.statement_scheduling_group, "$user"}, {default_scheduling_group(), "$system"} };
|
||||
scfg.statement_tenants = {
|
||||
{dbcfg.statement_scheduling_group, "$user"},
|
||||
{default_scheduling_group(), "$system"},
|
||||
{dbcfg.streaming_scheduling_group, "$maintenance"}
|
||||
};
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = dbcfg.gossip_scheduling_group;
|
||||
|
||||
@@ -1398,7 +1402,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
service::raft_group0 group0_service{
|
||||
stop_signal.as_local_abort_source(), raft_gr.local(), messaging,
|
||||
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client};
|
||||
gossiper.local(), feature_service.local(), sys_ks.local(), group0_client, dbcfg.gossip_scheduling_group};
|
||||
|
||||
service::tablet_allocator::config tacfg;
|
||||
tacfg.initial_tablets_scale = cfg->tablets_initial_scale_factor();
|
||||
@@ -1435,11 +1439,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
raft_gr.stop().get();
|
||||
});
|
||||
|
||||
supervisor::notify("initializing query processor remote part");
|
||||
// TODO: do this together with proxy.start_remote(...)
|
||||
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(forward_service), std::ref(group0_client)).get();
|
||||
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [&qp] {
|
||||
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
|
||||
sharded<service::topology_state_machine> tsm;
|
||||
tsm.start().get();
|
||||
auto stop_tsm = defer_verbose_shutdown("topology_state_machine", [&tsm] {
|
||||
tsm.stop().get();
|
||||
});
|
||||
|
||||
supervisor::notify("initializing storage service");
|
||||
@@ -1449,12 +1452,21 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
std::ref(feature_service), std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(lifecycle_notifier), std::ref(bm), std::ref(snitch),
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(qp), std::ref(sl_controller)).get();
|
||||
std::ref(tablet_allocator), std::ref(cdc_generation_service), std::ref(view_builder), std::ref(qp), std::ref(sl_controller),
|
||||
std::ref(tsm)).get();
|
||||
|
||||
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
|
||||
ss.stop().get();
|
||||
});
|
||||
|
||||
supervisor::notify("initializing query processor remote part");
|
||||
// TODO: do this together with proxy.start_remote(...)
|
||||
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(forward_service),
|
||||
std::ref(ss), std::ref(group0_client)).get();
|
||||
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [&qp] {
|
||||
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
|
||||
});
|
||||
|
||||
api::set_server_storage_service(ctx, ss, group0_client).get();
|
||||
auto stop_ss_api = defer_verbose_shutdown("storage service API", [&ctx] {
|
||||
api::unset_server_storage_service(ctx).get();
|
||||
@@ -1624,7 +1636,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// both)
|
||||
supervisor::notify("starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
repair.start(std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(raft_address_map), std::ref(bm), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(view_update_generator), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
repair.stop().get();
|
||||
});
|
||||
@@ -2117,6 +2129,7 @@ int main(int ac, char** av) {
|
||||
{"perf-fast-forward", perf::scylla_fast_forward_main, "run performance tests by fast forwarding the reader on this server"},
|
||||
{"perf-row-cache-update", perf::scylla_row_cache_update_main, "run performance tests by updating row cache on this server"},
|
||||
{"perf-tablets", perf::scylla_tablets_main, "run performance tests of tablet metadata management"},
|
||||
{"perf-load-balancing", perf::scylla_tablet_load_balancing_main, "run tablet load balancer tests"},
|
||||
{"perf-simple-query", perf::scylla_simple_query_main, "run performance tests by sending simple queries to this server"},
|
||||
{"perf-sstable", perf::scylla_sstable_main, "run performance tests by exercising sstable related operations on this server"},
|
||||
{"perf-alternator", perf::alternator(scylla_main, &after_init_func), "run performance tests on full alternator stack"}
|
||||
|
||||
@@ -2106,8 +2106,21 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
|
||||
continue;
|
||||
}
|
||||
// FIXME: we need to wait for current tablet movement and disable future tablet movement
|
||||
auto erm = t->get_effective_replication_map();
|
||||
locator::effective_replication_map_ptr erm;
|
||||
while (true) {
|
||||
_repair_module->check_in_shutdown();
|
||||
erm = t->get_effective_replication_map();
|
||||
const locator::tablet_map& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
|
||||
if (!tmap.has_transitions()) {
|
||||
break;
|
||||
}
|
||||
rlogger.info("repair[{}] Table {}.{} has tablet transitions, waiting for topology to quiesce", rid.uuid(), keyspace_name, table_name);
|
||||
erm = nullptr;
|
||||
co_await container().invoke_on(0, [] (repair_service& rs) {
|
||||
return rs._tsm.local().await_not_busy();
|
||||
});
|
||||
rlogger.info("repair[{}] Topology quiesced", rid.uuid());
|
||||
}
|
||||
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(tid);
|
||||
struct repair_tablet_meta {
|
||||
locator::tablet_id id;
|
||||
@@ -2122,6 +2135,7 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
auto mydc = erm->get_topology().get_datacenter();
|
||||
bool select_primary_ranges_within_dc = false;
|
||||
// If the user specified the ranges option, ignore the primary_replica_only option.
|
||||
// Since the ranges are requested explicitly.
|
||||
if (!ranges_specified.empty()) {
|
||||
primary_replica_only = false;
|
||||
}
|
||||
@@ -2143,28 +2157,24 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
co_await tmap.for_each_tablet([&] (locator::tablet_id id, const locator::tablet_info& info) -> future<> {
|
||||
auto range = tmap.get_token_range(id);
|
||||
auto& replicas = info.replicas;
|
||||
|
||||
if (primary_replica_only) {
|
||||
const auto pr = select_primary_ranges_within_dc ? tmap.get_primary_replica_within_dc(id, erm->get_topology(), mydc) : tmap.get_primary_replica(id);
|
||||
if (pr.host == myhostid) {
|
||||
metas.push_back(repair_tablet_meta{id, range, myhostid, pr.shard, replicas});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool found = false;
|
||||
shard_id master_shard_id;
|
||||
// Repair all tablets belong to this node
|
||||
for (auto& r : replicas) {
|
||||
if (select_primary_ranges_within_dc) {
|
||||
auto dc = erm->get_topology().get_datacenter(r.host);
|
||||
if (dc != mydc) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (r.host == myhostid) {
|
||||
master_shard_id = r.shard;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
// If users use both the primary_replica_only and the ranges
|
||||
// option to select which ranges to repair, prefer the more
|
||||
// sophisticated ranges option, since the ranges the requested
|
||||
// explicitly.
|
||||
if (primary_replica_only && ranges_specified.empty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
metas.push_back(repair_tablet_meta{id, range, myhostid, master_shard_id, replicas});
|
||||
@@ -2240,7 +2250,7 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
|
||||
for (auto& r : intersection_ranges) {
|
||||
rlogger.debug("repair[{}] Repair tablet task table={}.{} master_shard_id={} range={} neighbors={} replicas={}",
|
||||
rid.uuid(), keyspace_name, table_name, master_shard_id, r, repair_neighbors(nodes, shards).shard_map, m.replicas);
|
||||
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, tid, master_shard_id, r, repair_neighbors(nodes, shards), m.replicas});
|
||||
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, tid, master_shard_id, r, repair_neighbors(nodes, shards), m.replicas, erm});
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,6 +270,7 @@ struct tablet_repair_task_meta {
|
||||
dht::token_range range;
|
||||
repair_neighbors neighbors;
|
||||
locator::tablet_replica_set replicas;
|
||||
locator::effective_replication_map_ptr erm;
|
||||
};
|
||||
|
||||
namespace std {
|
||||
|
||||
@@ -3177,7 +3177,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
distributed<gms::gossiper>& gossiper,
|
||||
netw::messaging_service& ms,
|
||||
sharded<replica::database>& db,
|
||||
sharded<service::storage_proxy>& sp,
|
||||
@@ -3189,7 +3190,8 @@ repair_service::repair_service(distributed<gms::gossiper>& gossiper,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory)
|
||||
: _gossiper(gossiper)
|
||||
: _tsm(tsm)
|
||||
, _gossiper(gossiper)
|
||||
, _messaging(ms)
|
||||
, _db(db)
|
||||
, _sp(sp)
|
||||
@@ -3217,13 +3219,22 @@ future<> repair_service::start() {
|
||||
}
|
||||
|
||||
future<> repair_service::stop() {
|
||||
try {
|
||||
rlogger.debug("Stopping repair task module");
|
||||
co_await _repair_module->stop();
|
||||
rlogger.debug("Waiting on load_history_done");
|
||||
co_await std::move(_load_history_done);
|
||||
rlogger.debug("Uninitializing messaging service handlers");
|
||||
co_await uninit_ms_handlers();
|
||||
if (this_shard_id() == 0) {
|
||||
rlogger.debug("Unregistering gossiper helper");
|
||||
co_await _gossiper.local().unregister_(_gossip_helper);
|
||||
}
|
||||
_stopped = true;
|
||||
rlogger.info("Stopped repair_service");
|
||||
} catch (...) {
|
||||
on_fatal_internal_error(rlogger, format("Failed stopping repair_service: {}", std::current_exception()));
|
||||
}
|
||||
}
|
||||
|
||||
repair_service::~repair_service() {
|
||||
@@ -3266,6 +3277,7 @@ future<> repair_service::cleanup_history(tasks::task_id repair_id) {
|
||||
}
|
||||
|
||||
future<> repair_service::load_history() {
|
||||
try {
|
||||
co_await get_db().local().get_tables_metadata().parallel_for_each_table(coroutine::lambda([&] (table_id table_uuid, lw_shared_ptr<replica::table> table) -> future<> {
|
||||
auto shard = utils::uuid_xor_to_uint32(table_uuid.uuid()) % smp::count;
|
||||
if (shard != this_shard_id()) {
|
||||
@@ -3294,6 +3306,11 @@ future<> repair_service::load_history() {
|
||||
}
|
||||
});
|
||||
}));
|
||||
} catch (const abort_requested_exception&) {
|
||||
// Ignore
|
||||
} catch (...) {
|
||||
rlogger.warn("Failed to update repair history time: {}. Ignored", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
repair_meta_ptr repair_service::get_repair_meta(gms::inet_address from, uint32_t repair_meta_id) {
|
||||
|
||||
@@ -86,6 +86,7 @@ public:
|
||||
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
|
||||
|
||||
class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
sharded<service::topology_state_machine>& _tsm;
|
||||
distributed<gms::gossiper>& _gossiper;
|
||||
netw::messaging_service& _messaging;
|
||||
sharded<replica::database>& _db;
|
||||
@@ -116,7 +117,8 @@ class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
future<> uninit_ms_handlers();
|
||||
|
||||
public:
|
||||
repair_service(distributed<gms::gossiper>& gossiper,
|
||||
repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
distributed<gms::gossiper>& gossiper,
|
||||
netw::messaging_service& ms,
|
||||
sharded<replica::database>& db,
|
||||
sharded<service::storage_proxy>& sp,
|
||||
|
||||
@@ -262,7 +262,11 @@ public:
|
||||
|
||||
// Caller must keep the current effective_replication_map_ptr valid
|
||||
// until the storage_group_manager finishes update_effective_replication_map
|
||||
virtual future<> update_effective_replication_map(const locator::effective_replication_map& erm) = 0;
|
||||
//
|
||||
// refresh_mutation_source must be called when there are changes to data source
|
||||
// structures but logical state of data is not changed (e.g. when state for a
|
||||
// new tablet replica is allocated).
|
||||
virtual future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) = 0;
|
||||
|
||||
virtual compaction_group& compaction_group_for_token(dht::token token) const noexcept = 0;
|
||||
virtual utils::chunked_vector<compaction_group*> compaction_groups_for_token_range(dht::token_range tr) const = 0;
|
||||
@@ -273,7 +277,7 @@ public:
|
||||
virtual size_t log2_storage_groups() const = 0;
|
||||
virtual storage_group* storage_group_for_token(dht::token) const noexcept = 0;
|
||||
|
||||
virtual locator::resize_decision::seq_number_t split_ready_seq_number() const noexcept = 0;
|
||||
virtual locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
|
||||
virtual bool all_storage_groups_split() = 0;
|
||||
virtual future<> split_all_storage_groups() = 0;
|
||||
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
#include "utils/lister.hh"
|
||||
#include "replica/database.hh"
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_keyspace_sstables_registry.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
@@ -812,7 +811,6 @@ future<> database::drop_keyspace_on_all_shards(sharded<database>& sharded_db, co
|
||||
static bool is_system_table(const schema& s) {
|
||||
auto& k = s.ks_name();
|
||||
return k == db::system_keyspace::NAME ||
|
||||
k == db::system_auth_keyspace::NAME ||
|
||||
k == db::system_distributed_keyspace::NAME ||
|
||||
k == db::system_distributed_keyspace::NAME_EVERYWHERE;
|
||||
}
|
||||
|
||||
@@ -610,6 +610,13 @@ private:
|
||||
// Safely iterate through compaction groups, while performing async operations on them.
|
||||
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
|
||||
|
||||
// Safely iterate through SSTables, with deletion guard taken to make sure they're not
|
||||
// removed during iteration.
|
||||
// WARNING: Be careful that the action doesn't perform an operation that will itself
|
||||
// take the deletion guard, as that will cause a deadlock. For example, memtable flush
|
||||
// can wait on compaction (backpressure) which in turn takes deletion guard on completion.
|
||||
future<> safe_foreach_sstable(const sstables::sstable_set&, noncopyable_function<future<>(const sstables::shared_sstable&)> action);
|
||||
|
||||
bool cache_enabled() const {
|
||||
return _config.enable_cache && _schema->caching_options().enabled();
|
||||
}
|
||||
@@ -1023,7 +1030,7 @@ public:
|
||||
|
||||
// The tablet filter is used to not double account migrating tablets, so it's important that
|
||||
// only one of pending or leaving replica is accounted based on current migration stage.
|
||||
locator::table_load_stats table_load_stats(std::function<bool(locator::global_tablet_id)> tablet_filter) const noexcept;
|
||||
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept;
|
||||
|
||||
const db::view::stats& get_view_stats() const {
|
||||
return _view_stats;
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
#include "replica/global_table_ptr.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/extensions.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
@@ -36,7 +35,7 @@
|
||||
extern logging::logger dblog;
|
||||
|
||||
static const std::unordered_set<std::string_view> system_keyspaces = {
|
||||
db::system_keyspace::NAME, db::system_auth_keyspace::NAME, db::schema_tables::NAME,
|
||||
db::system_keyspace::NAME, db::schema_tables::NAME,
|
||||
};
|
||||
|
||||
// Not super nice. Adding statefulness to the file.
|
||||
@@ -60,7 +59,6 @@ static const std::unordered_set<std::string_view> internal_keyspaces = {
|
||||
db::system_distributed_keyspace::NAME,
|
||||
db::system_distributed_keyspace::NAME_EVERYWHERE,
|
||||
db::system_keyspace::NAME,
|
||||
db::system_auth_keyspace::NAME,
|
||||
db::schema_tables::NAME,
|
||||
auth::meta::legacy::AUTH_KS,
|
||||
tracing::trace_keyspace_helper::KEYSPACE_NAME
|
||||
|
||||
@@ -198,9 +198,11 @@ table::add_memtables_to_reader_list(std::vector<flat_mutation_reader_v2>& reader
|
||||
return;
|
||||
}
|
||||
reserve_fn(boost::accumulate(compaction_groups() | boost::adaptors::transformed(std::mem_fn(&compaction_group::memtable_count)), uint64_t(0)));
|
||||
// TODO: implement a incremental reader selector for memtable, using existing reader_selector interface for combined_reader.
|
||||
auto token_range = range.transform(std::mem_fn(&dht::ring_position::token));
|
||||
for (compaction_group& cg : compaction_groups()) {
|
||||
add_memtables_from_cg(cg);
|
||||
if (cg.token_range().overlaps(token_range, dht::token_comparator())) {
|
||||
add_memtables_from_cg(cg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -596,7 +598,7 @@ public:
|
||||
_storage_groups = std::move(r);
|
||||
}
|
||||
|
||||
future<> update_effective_replication_map(const locator::effective_replication_map& erm) override { return make_ready_future(); }
|
||||
future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override { return make_ready_future(); }
|
||||
|
||||
compaction_group& compaction_group_for_token(dht::token token) const noexcept override {
|
||||
return get_compaction_group();
|
||||
@@ -622,8 +624,11 @@ public:
|
||||
return _single_sg;
|
||||
}
|
||||
|
||||
locator::resize_decision::seq_number_t split_ready_seq_number() const noexcept override {
|
||||
return std::numeric_limits<locator::resize_decision::seq_number_t>::min();
|
||||
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)>) const noexcept override {
|
||||
return locator::table_load_stats{
|
||||
.size_in_bytes = _single_sg->live_disk_space_used(),
|
||||
.split_ready_seq_number = std::numeric_limits<locator::resize_decision::seq_number_t>::min()
|
||||
};
|
||||
}
|
||||
bool all_storage_groups_split() override { return true; }
|
||||
future<> split_all_storage_groups() override { return make_ready_future(); }
|
||||
@@ -666,6 +671,10 @@ private:
|
||||
storage_group* storage_group_for_id(size_t i) const {
|
||||
return storage_group_manager::storage_group_for_id(schema(), i);
|
||||
}
|
||||
|
||||
size_t tablet_id_for_token(dht::token t) const noexcept {
|
||||
return tablet_map().get_tablet_id(t).value();
|
||||
}
|
||||
public:
|
||||
tablet_storage_group_manager(table& t, const locator::effective_replication_map& erm)
|
||||
: _t(t)
|
||||
@@ -689,7 +698,7 @@ public:
|
||||
_storage_groups = std::move(ret);
|
||||
}
|
||||
|
||||
future<> update_effective_replication_map(const locator::effective_replication_map& erm) override;
|
||||
future<> update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) override;
|
||||
|
||||
compaction_group& compaction_group_for_token(dht::token token) const noexcept override;
|
||||
utils::chunked_vector<compaction_group*> compaction_groups_for_token_range(dht::token_range tr) const override;
|
||||
@@ -715,16 +724,11 @@ public:
|
||||
size_t log2_storage_groups() const override {
|
||||
return log2ceil(tablet_map().tablet_count());
|
||||
}
|
||||
size_t storage_group_id_for_token(dht::token t) const noexcept {
|
||||
return storage_group_of(t).first;
|
||||
}
|
||||
storage_group* storage_group_for_token(dht::token token) const noexcept override {
|
||||
return storage_group_for_id(storage_group_of(token).first);
|
||||
}
|
||||
|
||||
locator::resize_decision::seq_number_t split_ready_seq_number() const noexcept override {
|
||||
return _split_ready_seq_number;
|
||||
}
|
||||
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
|
||||
bool all_storage_groups_split() override;
|
||||
future<> split_all_storage_groups() override;
|
||||
future<> maybe_split_compaction_group_of(size_t idx) override;
|
||||
@@ -924,8 +928,8 @@ utils::chunked_vector<compaction_group*> tablet_storage_group_manager::compactio
|
||||
utils::chunked_vector<compaction_group*> ret;
|
||||
auto cmp = dht::token_comparator();
|
||||
|
||||
size_t candidate_start = tr.start() ? storage_group_id_for_token(tr.start()->value()) : size_t(0);
|
||||
size_t candidate_end = tr.end() ? storage_group_id_for_token(tr.end()->value()) : (tablet_count() - 1);
|
||||
size_t candidate_start = tr.start() ? tablet_id_for_token(tr.start()->value()) : size_t(0);
|
||||
size_t candidate_end = tr.end() ? tablet_id_for_token(tr.end()->value()) : (tablet_count() - 1);
|
||||
|
||||
while (candidate_start <= candidate_end) {
|
||||
auto it = _storage_groups.find(candidate_start++);
|
||||
@@ -988,6 +992,14 @@ future<> table::parallel_foreach_compaction_group(std::function<future<>(compact
|
||||
});
|
||||
}
|
||||
|
||||
future<> table::safe_foreach_sstable(const sstables::sstable_set& set, noncopyable_function<future<>(const sstables::shared_sstable&)> action) {
|
||||
auto deletion_guard = co_await get_units(_sstable_deletion_sem, 1);
|
||||
|
||||
co_await set.for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
||||
return action(sst);
|
||||
});
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<sstables::sstable_files_snapshot>> table::take_storage_snapshot(dht::token_range tr) {
|
||||
utils::chunked_vector<sstables::sstable_files_snapshot> ret;
|
||||
|
||||
@@ -1002,12 +1014,12 @@ future<utils::chunked_vector<sstables::sstable_files_snapshot>> table::take_stor
|
||||
|
||||
auto set = cg->make_sstable_set();
|
||||
|
||||
for (auto all_sstables = set->all(); auto& sst : *all_sstables) {
|
||||
co_await safe_foreach_sstable(*set, [&] (const sstables::shared_sstable& sst) -> future<> {
|
||||
ret.push_back({
|
||||
.sst = sst,
|
||||
.files = co_await sst->readable_file_for_all_components(),
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
co_return std::move(ret);
|
||||
@@ -1022,7 +1034,7 @@ table::clone_tablet_storage(locator::tablet_id tid) {
|
||||
auto* sg = storage_group_for_id(tid.value());
|
||||
co_await sg->flush();
|
||||
auto set = sg->make_sstable_set();
|
||||
co_await set->for_each_sstable_gently([this, &ret] (const sstables::shared_sstable& sst) -> future<> {
|
||||
co_await safe_foreach_sstable(*set, [&] (const sstables::shared_sstable& sst) -> future<> {
|
||||
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
|
||||
});
|
||||
co_return ret;
|
||||
@@ -2056,20 +2068,23 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
|
||||
set_metrics();
|
||||
}
|
||||
|
||||
locator::table_load_stats table::table_load_stats(std::function<bool(locator::global_tablet_id)> tablet_filter) const noexcept {
|
||||
locator::table_load_stats tablet_storage_group_manager::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
|
||||
locator::table_load_stats stats;
|
||||
stats.split_ready_seq_number = _sg_manager->split_ready_seq_number();
|
||||
stats.split_ready_seq_number = _split_ready_seq_number;
|
||||
|
||||
_sg_manager->for_each_storage_group([&] (size_t id, storage_group& sg) {
|
||||
locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) };
|
||||
if (!tablet_filter(gid)) {
|
||||
return;
|
||||
for_each_storage_group([&] (size_t id, storage_group& sg) {
|
||||
locator::global_tablet_id gid { _t.schema()->id(), locator::tablet_id(id) };
|
||||
if (tablet_filter(*_tablet_map, gid)) {
|
||||
stats.size_in_bytes += sg.live_disk_space_used();
|
||||
}
|
||||
stats.size_in_bytes += sg.live_disk_space_used();
|
||||
});
|
||||
return stats;
|
||||
}
|
||||
|
||||
locator::table_load_stats table::table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept {
|
||||
return _sg_manager->table_load_stats(std::move(tablet_filter));
|
||||
}
|
||||
|
||||
future<> tablet_storage_group_manager::handle_tablet_split_completion(const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
|
||||
auto table_id = schema()->id();
|
||||
size_t old_tablet_count = old_tmap.tablet_count();
|
||||
@@ -2130,7 +2145,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca
|
||||
return stop_fut;
|
||||
}
|
||||
|
||||
future<> tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm) {
|
||||
future<> tablet_storage_group_manager::update_effective_replication_map(const locator::effective_replication_map& erm, noncopyable_function<void()> refresh_mutation_source) {
|
||||
auto* new_tablet_map = &erm.get_token_metadata().tablets().get_tablet_map(schema()->id());
|
||||
auto* old_tablet_map = std::exchange(_tablet_map, new_tablet_map);
|
||||
|
||||
@@ -2151,6 +2166,7 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
|
||||
auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) {
|
||||
return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica;
|
||||
};
|
||||
bool tablet_migrating_in = false;
|
||||
for (auto& transition : new_tablet_map->transitions()) {
|
||||
auto tid = transition.first;
|
||||
auto transition_info = transition.second;
|
||||
@@ -2158,8 +2174,17 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo
|
||||
auto range = new_tablet_map->get_token_range(tid);
|
||||
auto cg = std::make_unique<compaction_group>(_t, tid.value(), std::move(range));
|
||||
_storage_groups[tid.value()] = std::make_unique<storage_group>(std::move(cg), &_compaction_groups);
|
||||
tablet_migrating_in = true;
|
||||
}
|
||||
}
|
||||
// TODO: possibly use row_cache::invalidate(external_updater) instead on all ranges of new replicas,
|
||||
// as underlying source will be refreshed and external_updater::execute can refresh the sstable set.
|
||||
// Also serves as a protection for clearing the cache on the new range, although it shouldn't be a
|
||||
// problem as fresh node won't have any data in new range and migration cleanup invalidates the
|
||||
// range being moved away.
|
||||
if (tablet_migrating_in) {
|
||||
refresh_mutation_source();
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -2170,8 +2195,13 @@ future<> table::update_effective_replication_map(locator::effective_replication_
|
||||
|
||||
auto old_erm = std::exchange(_erm, std::move(erm));
|
||||
|
||||
auto refresh_mutation_source = [this] {
|
||||
refresh_compound_sstable_set();
|
||||
_cache.refresh_snapshot();
|
||||
};
|
||||
|
||||
if (uses_tablets()) {
|
||||
co_await _sg_manager->update_effective_replication_map(*_erm);
|
||||
co_await _sg_manager->update_effective_replication_map(*_erm, refresh_mutation_source);
|
||||
}
|
||||
if (old_erm) {
|
||||
old_erm->invalidate();
|
||||
|
||||
@@ -1853,7 +1853,7 @@ class schema_ptr:
|
||||
return self.ptr[item]
|
||||
|
||||
def is_system(self):
|
||||
return self.ks_name in ["system", "system_schema", "system_distributed", "system_traces", "system_auth", "system_auth_v2", "audit"]
|
||||
return self.ks_name in ["system", "system_schema", "system_distributed", "system_traces", "system_auth", "audit"]
|
||||
|
||||
|
||||
class scylla_active_sstables(gdb.Command):
|
||||
|
||||
@@ -159,7 +159,7 @@ void migration_manager::init_messaging_service()
|
||||
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
|
||||
if (options->group0_snapshot_transfer) {
|
||||
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
|
||||
if (proxy.local().local_db().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (proxy.local().local_db().get_config().enable_tablets()) {
|
||||
for (auto&& m: co_await replica::read_tablet_mutations(db)) {
|
||||
cm.emplace_back(std::move(m));
|
||||
}
|
||||
@@ -883,7 +883,7 @@ future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, v
|
||||
}
|
||||
mlogger.info("Create new view: {}", view);
|
||||
return seastar::async([&db, keyspace = std::move(keyspace), &sp, view = std::move(view), ts] {
|
||||
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), ts);
|
||||
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, view, ts);
|
||||
// We don't have a separate on_before_create_view() listener to
|
||||
// call. But a view is also a column family, and we need to call
|
||||
// the on_before_create_column_family listener - notably, to
|
||||
@@ -954,18 +954,19 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
|
||||
return _messaging.send_definitions_update(id, std::vector<frozen_mutation>{}, std::move(cm));
|
||||
}
|
||||
|
||||
template<typename mutation_type>
|
||||
future<> migration_manager::announce_with_raft(std::vector<mutation> schema, group0_guard guard, std::string_view description) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto schema_features = _feat.cluster_schema_features();
|
||||
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features);
|
||||
|
||||
auto group0_cmd = _group0_client.prepare_command(
|
||||
schema_change{
|
||||
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
|
||||
mutation_type {
|
||||
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
|
||||
},
|
||||
guard, std::move(description));
|
||||
|
||||
co_return co_await _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as, raft_timeout{});
|
||||
return _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &_as);
|
||||
}
|
||||
|
||||
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
|
||||
@@ -1027,6 +1028,7 @@ static void add_committed_by_group0_flag(std::vector<mutation>& schema, const gr
|
||||
}
|
||||
|
||||
// Returns a future on the local application of the schema
|
||||
template<typename mutation_type>
|
||||
future<> migration_manager::announce(std::vector<mutation> schema, group0_guard guard, std::string_view description) {
|
||||
if (_feat.group0_schema_versioning) {
|
||||
schema.push_back(make_group0_schema_version_mutation(_storage_proxy.data_dictionary(), guard));
|
||||
@@ -1034,11 +1036,20 @@ future<> migration_manager::announce(std::vector<mutation> schema, group0_guard
|
||||
}
|
||||
|
||||
if (guard.with_raft()) {
|
||||
return announce_with_raft(std::move(schema), std::move(guard), std::move(description));
|
||||
return announce_with_raft<mutation_type>(std::move(schema), std::move(guard), std::move(description));
|
||||
} else {
|
||||
return announce_without_raft(std::move(schema), std::move(guard));
|
||||
}
|
||||
}
|
||||
template
|
||||
future<> migration_manager::announce_with_raft<schema_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
template
|
||||
future<> migration_manager::announce_with_raft<topology_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
|
||||
template
|
||||
future<> migration_manager::announce<schema_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
template
|
||||
future<> migration_manager::announce<topology_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
|
||||
future<group0_guard> migration_manager::start_group0_operation() {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
@@ -136,6 +136,7 @@ public:
|
||||
|
||||
// Apply a group 0 change.
|
||||
// The future resolves after the change is applied locally.
|
||||
template<typename mutation_type = schema_change>
|
||||
future<> announce(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
|
||||
void passive_announce(table_schema_version version);
|
||||
@@ -164,6 +165,7 @@ private:
|
||||
|
||||
future<> maybe_schedule_schema_pull(const table_schema_version& their_version, const gms::inet_address& endpoint);
|
||||
|
||||
template<typename mutation_type = schema_change>
|
||||
future<> announce_with_raft(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
future<> announce_without_raft(std::vector<mutation> schema, group0_guard);
|
||||
|
||||
@@ -193,6 +195,17 @@ public:
|
||||
void set_concurrent_ddl_retries(size_t);
|
||||
};
|
||||
|
||||
extern template
|
||||
future<> migration_manager::announce_with_raft<schema_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
extern template
|
||||
future<> migration_manager::announce_with_raft<topology_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
|
||||
extern template
|
||||
future<> migration_manager::announce<schema_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
extern template
|
||||
future<> migration_manager::announce<topology_change>(std::vector<mutation> schema, group0_guard, std::string_view description);
|
||||
|
||||
|
||||
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id, table_schema_version v);
|
||||
|
||||
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts);
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "types/types.hh"
|
||||
@@ -55,7 +56,7 @@ future<> raft_service_level_distributed_data_accessor::do_raft_command(service::
|
||||
co_await _group0_client.add_entry(std::move(group0_cmd), std::move(guard), &as);
|
||||
}
|
||||
|
||||
future<> raft_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo, std::optional<service::group0_guard> guard, abort_source& as) const {
|
||||
static void validate_state(const service::raft_group0_client& group0_client, const std::optional<service::group0_guard>& guard) {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "raft_service_level_distributed_data_accessor: must be executed on shard 0");
|
||||
}
|
||||
@@ -63,6 +64,14 @@ future<> raft_service_level_distributed_data_accessor::set_service_level(sstring
|
||||
if (!guard) {
|
||||
on_internal_error(logger, "raft_service_level_distributed_data_accessor: guard must be present");
|
||||
}
|
||||
|
||||
if (group0_client.in_recovery()) {
|
||||
throw exceptions::invalid_request_exception("The cluster is in recovery mode. Changes to service levels are not allowed.");
|
||||
}
|
||||
}
|
||||
|
||||
future<> raft_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo, std::optional<service::group0_guard> guard, abort_source& as) const {
|
||||
validate_state(_group0_client, guard);
|
||||
|
||||
static sstring insert_query = format("INSERT INTO {}.{} (service_level, timeout, workload_type) VALUES (?, ?, ?);", db::system_keyspace::NAME, db::system_keyspace::SERVICE_LEVELS_V2);
|
||||
data_value workload = slo.workload == qos::service_level_options::workload_type::unspecified
|
||||
@@ -81,13 +90,7 @@ future<> raft_service_level_distributed_data_accessor::drop_service_level(sstrin
|
||||
guard = co_await _group0_client.start_operation(&as);
|
||||
}
|
||||
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "raft_service_level_distributed_data_accessor: must be executed on shard 0");
|
||||
}
|
||||
|
||||
if (!guard) {
|
||||
on_internal_error(logger, "raft_service_level_distributed_data_accessor: guard must be present");
|
||||
}
|
||||
validate_state(_group0_client, guard);
|
||||
|
||||
static sstring delete_query = format("DELETE FROM {}.{} WHERE service_level= ?;", db::system_keyspace::NAME, db::system_keyspace::SERVICE_LEVELS_V2);
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
#include "service/raft/group0_state_machine.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "mutation/atomic_cell.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
@@ -169,6 +168,11 @@ future<> group0_state_machine::merge_and_apply(group0_state_machine_merger& merg
|
||||
co_await write_mutations_to_database(_sp, cmd.creator_addr, std::move(chng.mutations));
|
||||
co_await _ss.topology_transition();
|
||||
},
|
||||
[&] (mixed_change& chng) -> future<> {
|
||||
co_await _mm.merge_schema_from(netw::messaging_service::msg_addr(std::move(cmd.creator_addr)), std::move(chng.mutations));
|
||||
co_await _ss.topology_transition();
|
||||
co_return;
|
||||
},
|
||||
[&] (write_mutations& muts) -> future<> {
|
||||
return write_mutations_to_database(_sp, cmd.creator_addr, std::move(muts.mutations));
|
||||
}
|
||||
@@ -274,7 +278,7 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
|
||||
std::optional<service::raft_snapshot> raft_snp;
|
||||
|
||||
if (_topology_change_enabled) {
|
||||
auto auth_tables = db::system_auth_keyspace::all_tables();
|
||||
auto auth_tables = db::system_keyspace::auth_tables();
|
||||
std::vector<table_id> tables;
|
||||
tables.reserve(3);
|
||||
tables.push_back(db::system_keyspace::topology()->id());
|
||||
|
||||
@@ -46,6 +46,12 @@ struct topology_change {
|
||||
std::vector<canonical_mutation> mutations;
|
||||
};
|
||||
|
||||
// Allows executing combined topology & schema mutations under a single RAFT command.
|
||||
// The order of the mutations doesn't matter.
|
||||
struct mixed_change {
|
||||
std::vector<canonical_mutation> mutations;
|
||||
};
|
||||
|
||||
// This command is used to write data to tables other than topology or
|
||||
// schema tables and it doesn't update any in-memory data structures.
|
||||
struct write_mutations {
|
||||
@@ -53,7 +59,7 @@ struct write_mutations {
|
||||
};
|
||||
|
||||
struct group0_command {
|
||||
std::variant<schema_change, broadcast_table_query, topology_change, write_mutations> change;
|
||||
std::variant<schema_change, broadcast_table_query, topology_change, write_mutations, mixed_change> change;
|
||||
|
||||
// Mutation of group0 history table, appending a new state ID and optionally a description.
|
||||
canonical_mutation history_append;
|
||||
|
||||
@@ -78,6 +78,9 @@ std::vector<canonical_mutation>& group0_state_machine_merger::get_command_mutati
|
||||
[] (topology_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (mixed_change& chng) -> std::vector<canonical_mutation>& {
|
||||
return chng.mutations;
|
||||
},
|
||||
[] (write_mutations& muts) -> std::vector<canonical_mutation>& {
|
||||
return muts.mutations;
|
||||
}
|
||||
|
||||
@@ -138,8 +138,9 @@ raft_group0::raft_group0(seastar::abort_source& abort_source,
|
||||
gms::gossiper& gs,
|
||||
gms::feature_service& feat,
|
||||
db::system_keyspace& sys_ks,
|
||||
raft_group0_client& client)
|
||||
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _feat(feat), _sys_ks(sys_ks), _client(client)
|
||||
raft_group0_client& client,
|
||||
seastar::scheduling_group sg)
|
||||
: _abort_source(abort_source), _raft_gr(raft_gr), _ms(ms), _gossiper(gs), _feat(feat), _sys_ks(sys_ks), _client(client), _sg(sg)
|
||||
, _status_for_monitoring(status_for_monitoring::normal)
|
||||
{
|
||||
register_metrics();
|
||||
@@ -396,7 +397,10 @@ future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service:
|
||||
auto srv_for_group0 = create_server_for_group0(group0_id, my_id, ss, qp, mm, topology_change_enabled);
|
||||
auto& persistence = srv_for_group0.persistence;
|
||||
auto& server = *srv_for_group0.server;
|
||||
co_await _raft_gr.start_server_for_group(std::move(srv_for_group0));
|
||||
co_await with_scheduling_group(_sg, [this, srv_for_group0 = std::move(srv_for_group0)] () mutable {
|
||||
return _raft_gr.start_server_for_group(std::move(srv_for_group0));
|
||||
});
|
||||
|
||||
_group0.emplace<raft::group_id>(group0_id);
|
||||
|
||||
// Fix for scylladb/scylladb#16683:
|
||||
|
||||
@@ -101,6 +101,7 @@ class raft_group0 {
|
||||
gms::feature_service& _feat;
|
||||
db::system_keyspace& _sys_ks;
|
||||
raft_group0_client& _client;
|
||||
seastar::scheduling_group _sg;
|
||||
|
||||
// Status of leader discovery. Initially there is no group 0,
|
||||
// and the variant contains no state. During initial cluster
|
||||
@@ -140,7 +141,8 @@ public:
|
||||
gms::gossiper& gs,
|
||||
gms::feature_service& feat,
|
||||
db::system_keyspace& sys_ks,
|
||||
raft_group0_client& client);
|
||||
raft_group0_client& client,
|
||||
seastar::scheduling_group sg);
|
||||
|
||||
// Initialises RPC verbs on all shards.
|
||||
// Call after construction but before using the object.
|
||||
@@ -294,6 +296,11 @@ public:
|
||||
|
||||
const raft_address_map& address_map() const;
|
||||
raft_address_map& modifiable_address_map();
|
||||
|
||||
// Returns scheduling group group0 is configured to run with
|
||||
seastar::scheduling_group get_scheduling_group() {
|
||||
return _sg;
|
||||
}
|
||||
private:
|
||||
static void init_rpc_verbs(raft_group0& shard0_this);
|
||||
static future<> uninit_rpc_verbs(netw::messaging_service& ms);
|
||||
|
||||
@@ -297,7 +297,7 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
|
||||
}
|
||||
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations>
|
||||
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations> || std::same_as<Command, mixed_change>
|
||||
group0_command raft_group0_client::prepare_command(Command change, group0_guard& guard, std::string_view description) {
|
||||
group0_command group0_cmd {
|
||||
.change{std::move(change)},
|
||||
@@ -501,5 +501,6 @@ template group0_command raft_group0_client::prepare_command(topology_change chan
|
||||
template group0_command raft_group0_client::prepare_command(write_mutations change, group0_guard& guard, std::string_view description);
|
||||
template group0_command raft_group0_client::prepare_command(broadcast_table_query change, std::string_view description);
|
||||
template group0_command raft_group0_client::prepare_command(write_mutations change, std::string_view description);
|
||||
template group0_command raft_group0_client::prepare_command(mixed_change change, group0_guard& guard, std::string_view description);
|
||||
|
||||
}
|
||||
|
||||
@@ -137,7 +137,7 @@ public:
|
||||
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>
|
||||
group0_command prepare_command(Command change, std::string_view description);
|
||||
template<typename Command>
|
||||
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations>
|
||||
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations> || std::same_as<Command, mixed_change>
|
||||
group0_command prepare_command(Command change, group0_guard& guard, std::string_view description);
|
||||
// Checks maximum allowed serialized command size, server rejects bigger commands with command_is_too_big_error exception
|
||||
size_t max_command_size() const;
|
||||
|
||||
@@ -2378,6 +2378,21 @@ bool storage_proxy::need_throttle_writes() const {
|
||||
}
|
||||
|
||||
void storage_proxy::unthrottle() {
|
||||
// Here, we garbage-collect (from _throttled_writes) the response IDs which are no longer
|
||||
// relevant, because their handlers are gone.
|
||||
//
|
||||
// need_throttle_writes() may remain true for an indefinite amount of time, so without this piece of code,
|
||||
// _throttled_writes might also grow without any limit. We saw this happen in a throughput test once.
|
||||
//
|
||||
// Note that we only remove the irrelevant entries which are in front of the list.
|
||||
// We don't touch the middle of the list, so an irrelevant ID will still remain in the list if there is some
|
||||
// earlier ID which is still relevant. But since writes should have some reasonable finite timeout,
|
||||
// we assume that it's not a problem.
|
||||
//
|
||||
while (!_throttled_writes.empty() && !_response_handlers.contains(_throttled_writes.front())) {
|
||||
_throttled_writes.pop_front();
|
||||
}
|
||||
|
||||
while(!need_throttle_writes() && !_throttled_writes.empty()) {
|
||||
auto id = _throttled_writes.front();
|
||||
_throttled_writes.pop_front();
|
||||
@@ -6603,8 +6618,8 @@ void storage_proxy::on_join_cluster(const gms::inet_address& endpoint) {};
|
||||
|
||||
void storage_proxy::on_leave_cluster(const gms::inet_address& endpoint, const locator::host_id& hid) {
|
||||
// Discarding these futures is safe. They're awaited by db::hints::manager::stop().
|
||||
(void) _hints_manager.drain_for(hid);
|
||||
(void) _hints_for_views_manager.drain_for(hid);
|
||||
(void) _hints_manager.drain_for(hid, endpoint);
|
||||
(void) _hints_for_views_manager.drain_for(hid, endpoint);
|
||||
}
|
||||
|
||||
void storage_proxy::on_up(const gms::inet_address& endpoint) {};
|
||||
|
||||
@@ -11,7 +11,6 @@
|
||||
|
||||
#include "storage_service.hh"
|
||||
#include "compaction/task_manager_module.hh"
|
||||
#include "db/system_auth_keyspace.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
@@ -143,7 +142,8 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
sharded<cdc::generation_service>& cdc_gens,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller)
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
topology_state_machine& topology_state_machine)
|
||||
: _abort_source(abort_source)
|
||||
, _feature_service(feature_service)
|
||||
, _db(db)
|
||||
@@ -171,6 +171,7 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _tablet_allocator(tablet_allocator)
|
||||
, _cdc_gens(cdc_gens)
|
||||
, _view_builder(view_builder)
|
||||
, _topology_state_machine(topology_state_machine)
|
||||
{
|
||||
register_metrics();
|
||||
|
||||
@@ -503,20 +504,29 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
switch (rs.state) {
|
||||
case node_state::bootstrapping:
|
||||
if (rs.ring.has_value()) {
|
||||
if (ip && !is_me(*ip)) {
|
||||
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
|
||||
}
|
||||
update_topology(host_id, ip, rs);
|
||||
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||
// (such as the CDC generation write).
|
||||
// It doesn't break anything to set the tokens to normal early in this single-node case.
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
|
||||
} else {
|
||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
if (ip) {
|
||||
if (!is_me(*ip)) {
|
||||
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
|
||||
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
|
||||
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
|
||||
}
|
||||
update_topology(host_id, ip, rs);
|
||||
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||
// (such as the CDC generation write).
|
||||
// It doesn't break anything to set the tokens to normal early in this single-node case.
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
|
||||
} else {
|
||||
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
|
||||
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
|
||||
}
|
||||
} else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) {
|
||||
on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id));
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -633,7 +643,7 @@ future<> storage_service::topology_state_load() {
|
||||
std::unordered_set<raft::server_id> prev_normal = boost::copy_range<std::unordered_set<raft::server_id>>(_topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys);
|
||||
|
||||
std::unordered_set<locator::host_id> tablet_hosts;
|
||||
if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (_db.local().get_config().enable_tablets()) {
|
||||
tablet_hosts = co_await replica::read_required_hosts(_qp);
|
||||
}
|
||||
|
||||
@@ -651,7 +661,7 @@ future<> storage_service::topology_state_load() {
|
||||
co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
|
||||
// auth-v2 gets enabled when consistent topology changes are enabled
|
||||
// (see topology::upgrade_state_type::done above) as we use the same migration procedure
|
||||
qp.auth_version = db::system_auth_keyspace::version_t::v2;
|
||||
qp.auth_version = db::system_keyspace::auth_version_t::v2;
|
||||
});
|
||||
|
||||
co_await _sl_controller.invoke_on_all([this] (qos::service_level_controller& sl_controller) {
|
||||
@@ -707,7 +717,7 @@ future<> storage_service::topology_state_load() {
|
||||
|
||||
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
|
||||
|
||||
if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (_db.local().get_config().enable_tablets()) {
|
||||
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
|
||||
tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
|
||||
}
|
||||
@@ -773,6 +783,7 @@ future<> storage_service::topology_state_load() {
|
||||
for (const auto& gen_id : _topology_state_machine._topology.committed_cdc_generations) {
|
||||
co_await _cdc_gens.local().handle_cdc_generation(gen_id);
|
||||
if (gen_id == _topology_state_machine._topology.committed_cdc_generations.back()) {
|
||||
co_await _sys_ks.local().update_cdc_generation_id(gen_id);
|
||||
rtlogger.debug("topology_state_load: the last committed CDC generation ID: {}", gen_id);
|
||||
}
|
||||
}
|
||||
@@ -1269,7 +1280,7 @@ future<> storage_service::raft_initialize_discovery_leader(const join_node_reque
|
||||
insert_join_request_mutations.emplace_back(std::move(sl_status_mutation));
|
||||
|
||||
insert_join_request_mutations.emplace_back(
|
||||
co_await _sys_ks.local().make_auth_version_mutation(guard.write_timestamp(), db::system_auth_keyspace::version_t::v2));
|
||||
co_await _sys_ks.local().make_auth_version_mutation(guard.write_timestamp(), db::system_keyspace::auth_version_t::v2));
|
||||
|
||||
topology_change change{std::move(insert_join_request_mutations)};
|
||||
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
|
||||
@@ -3037,6 +3048,18 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
|
||||
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
|
||||
auto& cf = db.find_column_family(it->first);
|
||||
co_await cf.update_effective_replication_map(std::move(it->second));
|
||||
co_await utils::get_local_injector().inject("delay_after_erm_update", [&cf, &ss] (auto& handler) -> future<> {
|
||||
auto& ss_ = ss;
|
||||
const auto ks_name = handler.get("ks_name");
|
||||
const auto cf_name = handler.get("cf_name");
|
||||
assert(ks_name);
|
||||
assert(cf_name);
|
||||
if (cf.schema()->ks_name() != *ks_name || cf.schema()->cf_name() != *cf_name) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
co_await sleep_abortable(std::chrono::seconds{5}, ss_._abort_source);
|
||||
});
|
||||
if (cf.uses_tablets()) {
|
||||
register_tablet_split_candidate(it->first);
|
||||
}
|
||||
@@ -4504,6 +4527,15 @@ future<sstring> storage_service::wait_for_topology_request_completion(utils::UUI
|
||||
co_return sstring();
|
||||
}
|
||||
|
||||
future<> storage_service::wait_for_topology_not_busy() {
|
||||
auto guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::raft_rebuild(sstring source_dc) {
|
||||
auto& raft_server = _group0->group0_server();
|
||||
utils::UUID request_id;
|
||||
@@ -5034,7 +5066,7 @@ future<std::map<token, inet_address>> storage_service::get_tablet_to_endpoint_ma
|
||||
const auto& tmap = tm.tablets().get_tablet_map(table);
|
||||
std::map<token, inet_address> result;
|
||||
for (std::optional<locator::tablet_id> tid = tmap.first_tablet(); tid; tid = tmap.next_tablet(*tid)) {
|
||||
result.emplace(tmap.get_last_token(*tid), tm.get_endpoint_for_host_id(tmap.get_primary_replica(*tid)));
|
||||
result.emplace(tmap.get_last_token(*tid), tm.get_endpoint_for_host_id(tmap.get_primary_replica(*tid).host));
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
co_return result;
|
||||
@@ -5131,7 +5163,7 @@ void storage_service::on_update_tablet_metadata() {
|
||||
}
|
||||
|
||||
future<> storage_service::load_tablet_metadata() {
|
||||
if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (!_db.local().get_config().enable_tablets()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
@@ -5216,7 +5248,7 @@ void storage_service::start_tablet_split_monitor() {
|
||||
if (this_shard_id() != 0) {
|
||||
return;
|
||||
}
|
||||
if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (!_db.local().get_config().enable_tablets()) {
|
||||
return;
|
||||
}
|
||||
slogger.info("Starting the tablet split monitor...");
|
||||
@@ -5325,6 +5357,14 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
|
||||
}
|
||||
break;
|
||||
case raft_topology_cmd::command::barrier_and_drain: {
|
||||
if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_old) {
|
||||
for (auto& n : _topology_state_machine._topology.transition_nodes) {
|
||||
if (!_group0->address_map().find(n.first)) {
|
||||
rtlogger.error("The topology transition is in a double write state but the IP of the node in transition is not known");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
co_await container().invoke_on_all([version] (storage_service& ss) -> future<> {
|
||||
const auto current_version = ss._shared_token_metadata.get()->get_version();
|
||||
rtlogger.debug("Got raft_topology_cmd::barrier_and_drain, version {}, current version {}",
|
||||
@@ -6018,34 +6058,36 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
});
|
||||
}
|
||||
|
||||
using table_erms_t = std::unordered_map<table_id, const locator::effective_replication_map_ptr>;
|
||||
// Creates a snapshot of transitions, so different shards will find their tablet replicas in the
|
||||
// same migration stage. Important for intra-node migration.
|
||||
const auto erms = co_await std::invoke([this] () -> future<table_erms_t> {
|
||||
table_erms_t erms;
|
||||
using table_ids_t = std::unordered_set<table_id>;
|
||||
const auto table_ids = co_await std::invoke([this] () -> future<table_ids_t> {
|
||||
table_ids_t ids;
|
||||
co_await _db.local().get_tables_metadata().for_each_table_gently([&] (table_id id, lw_shared_ptr<replica::table> table) mutable {
|
||||
if (table->uses_tablets()) {
|
||||
erms.emplace(id, table->get_effective_replication_map());
|
||||
ids.insert(id);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
co_return std::move(erms);
|
||||
co_return std::move(ids);
|
||||
});
|
||||
|
||||
// Helps with intra-node migration by serializing with changes to token metadata, so shards
|
||||
// participating in the migration will see migration in same stage, therefore preventing
|
||||
// double accounting (anomaly) in the reported size.
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
|
||||
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
|
||||
// So if there are 1k nodes, there will be 1k RPCs in total.
|
||||
auto load_stats = co_await _db.map_reduce0([&erms] (replica::database& db) -> future<locator::load_stats> {
|
||||
auto load_stats = co_await _db.map_reduce0([&table_ids] (replica::database& db) -> future<locator::load_stats> {
|
||||
locator::load_stats load_stats{};
|
||||
auto& tables_metadata = db.get_tables_metadata();
|
||||
|
||||
for (const auto& [id, erm] : erms) {
|
||||
for (const auto& id : table_ids) {
|
||||
auto table = tables_metadata.get_table_if_exists(id);
|
||||
if (!table) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto erm = table->get_effective_replication_map();
|
||||
auto& token_metadata = erm->get_token_metadata();
|
||||
auto& tmap = token_metadata.tablets().get_tablet_map(id);
|
||||
auto me = locator::tablet_replica { token_metadata.get_my_id(), this_shard_id() };
|
||||
|
||||
// It's important to tackle the anomaly in reported size, since both leaving and
|
||||
@@ -6053,7 +6095,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
// If transition hasn't reached cleanup stage, then leaving replicas are accounted.
|
||||
// If transition is past cleanup stage, then pending replicas are accounted.
|
||||
// This helps to reduce the discrepancy window.
|
||||
auto tablet_filter = [&tmap, &me] (locator::global_tablet_id id) {
|
||||
auto tablet_filter = [&me] (const locator::tablet_map& tmap, locator::global_tablet_id id) {
|
||||
auto transition = tmap.get_tablet_transition_info(id.tablet);
|
||||
auto& info = tmap.get_tablet_info(id.tablet);
|
||||
|
||||
@@ -6064,11 +6106,11 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
|
||||
|
||||
bool is_pending = transition->pending_replica == me;
|
||||
bool is_leaving = locator::get_leaving_replica(info, *transition) == me;
|
||||
auto s = transition->stage;
|
||||
auto s = transition->reads; // read selector
|
||||
|
||||
return (!is_pending && !is_leaving)
|
||||
|| (is_leaving && s < locator::tablet_transition_stage::cleanup)
|
||||
|| (is_pending && s >= locator::tablet_transition_stage::cleanup);
|
||||
|| (is_leaving && s == locator::read_replica_set_selector::previous)
|
||||
|| (is_pending && s == locator::read_replica_set_selector::next);
|
||||
};
|
||||
|
||||
load_stats.tables.emplace(id, table->table_load_stats(tablet_filter));
|
||||
@@ -6143,13 +6185,6 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
while (true) {
|
||||
group0_guard guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
}
|
||||
|
||||
std::vector<canonical_mutation> updates;
|
||||
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
|
||||
.set_tablet_balancing_enabled(enabled)
|
||||
@@ -6166,6 +6201,11 @@ future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): concurrent modification");
|
||||
}
|
||||
}
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
rtlogger.debug("set_tablet_balancing_enabled(): topology is busy");
|
||||
co_await _topology_state_machine.event.wait();
|
||||
}
|
||||
}
|
||||
|
||||
future<> storage_service::await_topology_quiesced() {
|
||||
@@ -6179,14 +6219,8 @@ future<> storage_service::await_topology_quiesced() {
|
||||
co_return;
|
||||
}
|
||||
|
||||
group0_guard guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
|
||||
while (_topology_state_machine._topology.is_busy()) {
|
||||
rtlogger.debug("await_topology_quiesced(): topology is busy");
|
||||
release_guard(std::move(guard));
|
||||
co_await _topology_state_machine.event.wait();
|
||||
guard = co_await _group0->client().start_operation(&_group0_as, raft_timeout{});
|
||||
}
|
||||
co_await _group0->group0_server().read_barrier(&_group0_as);
|
||||
co_await _topology_state_machine.await_not_busy();
|
||||
}
|
||||
|
||||
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
|
||||
|
||||
@@ -211,7 +211,8 @@ public:
|
||||
sharded<cdc::generation_service>& cdc_gs,
|
||||
sharded<db::view::view_builder>& view_builder,
|
||||
cql3::query_processor& qp,
|
||||
sharded<qos::service_level_controller>& sl_controller);
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
topology_state_machine& topology_state_machine);
|
||||
|
||||
// Needed by distributed<>
|
||||
future<> stop();
|
||||
@@ -818,8 +819,13 @@ private:
|
||||
// coordinator fiber
|
||||
future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
|
||||
public:
|
||||
bool topology_global_queue_empty() const {
|
||||
return !_topology_state_machine._topology.global_request.has_value();
|
||||
}
|
||||
private:
|
||||
// State machine that is responsible for topology change
|
||||
topology_state_machine _topology_state_machine;
|
||||
topology_state_machine& _topology_state_machine;
|
||||
|
||||
future<> _topology_change_coordinator = make_ready_future<>();
|
||||
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, cdc::generation_service&, sharded<db::system_distributed_keyspace>&, abort_source&);
|
||||
@@ -895,6 +901,11 @@ public:
|
||||
// It is incompatible with the `join_cluster` method.
|
||||
future<> start_maintenance_mode();
|
||||
|
||||
// Waits for a topology request with a given ID to complete and return non empty error string
|
||||
// if request completes with an error
|
||||
future<sstring> wait_for_topology_request_completion(utils::UUID id);
|
||||
future<> wait_for_topology_not_busy();
|
||||
|
||||
private:
|
||||
future<std::vector<canonical_mutation>> get_system_mutations(schema_ptr schema);
|
||||
future<std::vector<canonical_mutation>> get_system_mutations(const sstring& ks_name, const sstring& cf_name);
|
||||
@@ -931,9 +942,6 @@ private:
|
||||
|
||||
future<> _sstable_cleanup_fiber = make_ready_future<>();
|
||||
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
|
||||
// Waits for a topology request with a given ID to complete and return non empty error string
|
||||
// if request completes with an error
|
||||
future<sstring> wait_for_topology_request_completion(utils::UUID id);
|
||||
|
||||
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
||||
abort_source _group0_as;
|
||||
|
||||
@@ -53,6 +53,7 @@ struct load_balancer_cluster_stats {
|
||||
using dc_name = sstring;
|
||||
|
||||
class load_balancer_stats_manager {
|
||||
sstring group_name;
|
||||
std::unordered_map<dc_name, std::unique_ptr<load_balancer_dc_stats>> _dc_stats;
|
||||
std::unordered_map<host_id, std::unique_ptr<load_balancer_node_stats>> _node_stats;
|
||||
load_balancer_cluster_stats _cluster_stats;
|
||||
@@ -63,7 +64,7 @@ class load_balancer_stats_manager {
|
||||
void setup_metrics(const dc_name& dc, load_balancer_dc_stats& stats) {
|
||||
namespace sm = seastar::metrics;
|
||||
auto dc_lb = dc_label(dc);
|
||||
_metrics.add_group("load_balancer", {
|
||||
_metrics.add_group(group_name, {
|
||||
sm::make_counter("calls", sm::description("number of calls to the load balancer"),
|
||||
stats.calls)(dc_lb),
|
||||
sm::make_counter("migrations_produced", sm::description("number of migrations produced by the load balancer"),
|
||||
@@ -77,7 +78,7 @@ class load_balancer_stats_manager {
|
||||
namespace sm = seastar::metrics;
|
||||
auto dc_lb = dc_label(dc);
|
||||
auto node_lb = node_label(node);
|
||||
_metrics.add_group("load_balancer", {
|
||||
_metrics.add_group(group_name, {
|
||||
sm::make_gauge("load", sm::description("node load during last load balancing"),
|
||||
stats.load)(dc_lb)(node_lb)
|
||||
});
|
||||
@@ -86,7 +87,7 @@ class load_balancer_stats_manager {
|
||||
void setup_metrics(load_balancer_cluster_stats& stats) {
|
||||
namespace sm = seastar::metrics;
|
||||
// FIXME: we can probably improve it by making it per resize type (split, merge or none).
|
||||
_metrics.add_group("load_balancer", {
|
||||
_metrics.add_group(group_name, {
|
||||
sm::make_counter("resizes_emitted", sm::description("number of resizes produced by the load balancer"),
|
||||
stats.resizes_emitted),
|
||||
sm::make_counter("resizes_revoked", sm::description("number of resizes revoked by the load balancer"),
|
||||
@@ -96,7 +97,9 @@ class load_balancer_stats_manager {
|
||||
});
|
||||
}
|
||||
public:
|
||||
load_balancer_stats_manager() {
|
||||
load_balancer_stats_manager(sstring group_name):
|
||||
group_name(std::move(group_name))
|
||||
{
|
||||
setup_metrics(_cluster_stats);
|
||||
}
|
||||
|
||||
@@ -216,10 +219,32 @@ class load_balancer {
|
||||
size_t streaming_write_load = 0;
|
||||
|
||||
// Tablets which still have a replica on this shard which are candidates for migrating away from this shard.
|
||||
std::unordered_set<global_tablet_id> candidates;
|
||||
// Grouped by table. Used when _use_table_aware_balancing == true.
|
||||
// The set of candidates per table may be empty.
|
||||
std::unordered_map<table_id, std::unordered_set<global_tablet_id>> candidates;
|
||||
// For all tables. Used when _use_table_aware_balancing == false.
|
||||
std::unordered_set<global_tablet_id> candidates_all_tables;
|
||||
|
||||
future<> clear_gently() {
|
||||
return utils::clear_gently(candidates);
|
||||
co_await utils::clear_gently(candidates);
|
||||
co_await utils::clear_gently(candidates_all_tables);
|
||||
}
|
||||
|
||||
bool has_candidates() const {
|
||||
for (const auto& [table, tablets] : candidates) {
|
||||
if (!tablets.empty()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return !candidates_all_tables.empty();
|
||||
}
|
||||
|
||||
size_t candidate_count() const {
|
||||
size_t result = 0;
|
||||
for (const auto& [table, tablets] : candidates) {
|
||||
result += tablets.size();
|
||||
}
|
||||
return result + candidates_all_tables.size();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -428,6 +453,7 @@ class load_balancer {
|
||||
locator::load_stats_ptr _table_load_stats;
|
||||
load_balancer_stats_manager& _stats;
|
||||
std::unordered_set<host_id> _skiplist;
|
||||
bool _use_table_aware_balancing = true;
|
||||
private:
|
||||
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
|
||||
// We reflect migrations in the load as if they already happened,
|
||||
@@ -490,6 +516,10 @@ public:
|
||||
co_return std::move(plan);
|
||||
}
|
||||
|
||||
void set_use_table_aware_balancing(bool use_table_aware_balancing) {
|
||||
_use_table_aware_balancing = use_table_aware_balancing;
|
||||
}
|
||||
|
||||
const locator::table_load_stats* load_stats_for_table(table_id id) const {
|
||||
if (!_table_load_stats) {
|
||||
return nullptr;
|
||||
@@ -649,10 +679,60 @@ public:
|
||||
return utils::get_local_injector().enter("tablet_allocator_shuffle");
|
||||
}
|
||||
|
||||
shard_id rand_shard(shard_id shard_count) const {
|
||||
size_t rand_int() const {
|
||||
static thread_local std::default_random_engine re{std::random_device{}()};
|
||||
static thread_local std::uniform_int_distribution<shard_id> dist;
|
||||
return dist(re) % shard_count;
|
||||
static thread_local std::uniform_int_distribution<size_t> dist;
|
||||
return dist(re);
|
||||
}
|
||||
|
||||
shard_id rand_shard(shard_id shard_count) const {
|
||||
return rand_int() % shard_count;
|
||||
}
|
||||
|
||||
table_id pick_table(const std::unordered_map<table_id, std::unordered_set<global_tablet_id>>& candidates) {
|
||||
if (!_use_table_aware_balancing) {
|
||||
on_internal_error(lblogger, "pick_table() called when table-aware balancing is disabled");
|
||||
}
|
||||
size_t total = 0;
|
||||
for (auto&& [table, tablets] : candidates) {
|
||||
total += tablets.size();
|
||||
}
|
||||
ssize_t candidate_index = rand_int() % total;
|
||||
for (auto&& [table, tablets] : candidates) {
|
||||
candidate_index -= tablets.size();
|
||||
if (candidate_index <= 0 && !tablets.empty()) {
|
||||
return table;
|
||||
}
|
||||
}
|
||||
on_internal_error(lblogger, "No candidate table");
|
||||
}
|
||||
|
||||
global_tablet_id peek_candidate(shard_load& shard_info) {
|
||||
if (_use_table_aware_balancing) {
|
||||
auto table = pick_table(shard_info.candidates);
|
||||
return *shard_info.candidates[table].begin();
|
||||
}
|
||||
|
||||
return *shard_info.candidates_all_tables.begin();
|
||||
}
|
||||
|
||||
void erase_candidate(shard_load& shard_info, global_tablet_id tablet) {
|
||||
if (_use_table_aware_balancing) {
|
||||
shard_info.candidates[tablet.table].erase(tablet);
|
||||
if (shard_info.candidates[tablet.table].empty()) {
|
||||
shard_info.candidates.erase(tablet.table);
|
||||
}
|
||||
} else {
|
||||
shard_info.candidates_all_tables.erase(tablet);
|
||||
}
|
||||
}
|
||||
|
||||
void add_candidate(shard_load& shard_info, global_tablet_id tablet) {
|
||||
if (_use_table_aware_balancing) {
|
||||
shard_info.candidates[tablet.table].insert(tablet);
|
||||
} else {
|
||||
shard_info.candidates_all_tables.insert(tablet);
|
||||
}
|
||||
}
|
||||
|
||||
future<migration_plan> make_node_plan(node_load_map& nodes, host_id host, node_load& node_load) {
|
||||
@@ -721,7 +801,7 @@ public:
|
||||
break;
|
||||
}
|
||||
|
||||
if (src_info.candidates.empty()) {
|
||||
if (!src_info.has_candidates()) {
|
||||
lblogger.debug("No more candidates on shard {} of {}", src, host);
|
||||
max_load = std::max(max_load, src_info.tablet_count);
|
||||
src_shards.pop_back();
|
||||
@@ -729,7 +809,7 @@ public:
|
||||
continue;
|
||||
}
|
||||
|
||||
auto tablet = *src_info.candidates.begin();
|
||||
global_tablet_id tablet = peek_candidate(src_info);
|
||||
|
||||
// Emit migration.
|
||||
|
||||
@@ -753,7 +833,7 @@ public:
|
||||
|
||||
for (auto&& r : src_tinfo.replicas) {
|
||||
if (nodes.contains(r.host)) {
|
||||
nodes[r.host].shards[r.shard].candidates.erase(tablet);
|
||||
erase_candidate(nodes[r.host].shards[r.shard], tablet);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -810,8 +890,8 @@ public:
|
||||
if (lblogger.is_enabled(seastar::log_level::debug)) {
|
||||
shard_id shard = 0;
|
||||
for (auto&& shard_load : node_load.shards) {
|
||||
lblogger.debug("shard {}: all tablets: {}, candidates: {}", tablet_replica{host, shard},
|
||||
shard_load.tablet_count, shard_load.candidates.size());
|
||||
lblogger.debug("shard {}: all tablets: {}, candidates: {}", tablet_replica {host, shard},
|
||||
shard_load.tablet_count, shard_load.candidate_count());
|
||||
shard++;
|
||||
}
|
||||
}
|
||||
@@ -865,7 +945,7 @@ public:
|
||||
auto src_shard = src_node_info.shards_by_load.back();
|
||||
auto src = tablet_replica{src_host, src_shard};
|
||||
auto&& src_shard_info = src_node_info.shards[src_shard];
|
||||
if (src_shard_info.candidates.empty()) {
|
||||
if (!src_shard_info.has_candidates()) {
|
||||
lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
|
||||
src_node_info.shards_by_load.pop_back();
|
||||
continue;
|
||||
@@ -874,8 +954,9 @@ public:
|
||||
std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
||||
});
|
||||
|
||||
auto source_tablet = *src_shard_info.candidates.begin();
|
||||
src_shard_info.candidates.erase(source_tablet);
|
||||
global_tablet_id source_tablet = peek_candidate(src_shard_info);
|
||||
erase_candidate(src_shard_info, source_tablet);
|
||||
|
||||
auto& tmap = tmeta.get_tablet_map(source_tablet.table);
|
||||
|
||||
// Pick a target node.
|
||||
@@ -1062,7 +1143,7 @@ public:
|
||||
|
||||
for (auto&& r : src_tinfo.replicas) {
|
||||
if (nodes.contains(r.host)) {
|
||||
nodes[r.host].shards[r.shard].candidates.erase(source_tablet);
|
||||
erase_candidate(nodes[r.host].shards[r.shard], source_tablet);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1111,14 +1192,11 @@ public:
|
||||
future<migration_plan> make_plan(dc_name dc) {
|
||||
migration_plan plan;
|
||||
|
||||
_stats.for_dc(dc).calls++;
|
||||
lblogger.info("Examining DC {}", dc);
|
||||
|
||||
// Causes load balancer to move some tablet even though load is balanced.
|
||||
auto shuffle = in_shuffle_mode();
|
||||
if (shuffle) {
|
||||
lblogger.warn("Running without convergence checks");
|
||||
}
|
||||
|
||||
_stats.for_dc(dc).calls++;
|
||||
lblogger.info("Examining DC {} (shuffle={}, balancing={})", dc, shuffle, _tm->tablets().balancing_enabled());
|
||||
|
||||
const locator::topology& topo = _tm->get_topology();
|
||||
|
||||
@@ -1287,7 +1365,7 @@ public:
|
||||
}
|
||||
shard_load_info.tablet_count += 1;
|
||||
if (!trinfo) { // migrating tablets are not candidates
|
||||
shard_load_info.candidates.emplace(global_tablet_id {table, tid});
|
||||
add_candidate(shard_load_info, global_tablet_id {table, tid});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1295,7 +1373,7 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
if (!nodes_to_drain.empty() || shuffle || (max_load != min_load && _tm->tablets().balancing_enabled())) {
|
||||
if (!nodes_to_drain.empty() || (_tm->tablets().balancing_enabled() && (shuffle || max_load != min_load))) {
|
||||
host_id target = *min_load_node;
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", target, min_load, max_load);
|
||||
plan.merge(co_await make_internode_plan(dc, nodes, nodes_to_drain, target));
|
||||
@@ -1319,15 +1397,17 @@ class tablet_allocator_impl : public tablet_allocator::impl
|
||||
replica::database& _db;
|
||||
load_balancer_stats_manager _load_balancer_stats;
|
||||
bool _stopped = false;
|
||||
bool _use_tablet_aware_balancing = true;
|
||||
public:
|
||||
tablet_allocator_impl(tablet_allocator::config cfg, service::migration_notifier& mn, replica::database& db)
|
||||
: _config(std::move(cfg))
|
||||
, _migration_notifier(mn)
|
||||
, _db(db) {
|
||||
, _db(db)
|
||||
, _load_balancer_stats("load_balancer") {
|
||||
if (_config.initial_tablets_scale == 0) {
|
||||
throw std::runtime_error("Initial tablets scale must be positive");
|
||||
}
|
||||
if (db.get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
if (db.get_config().enable_tablets()) {
|
||||
_migration_notifier.register_listener(this);
|
||||
}
|
||||
}
|
||||
@@ -1345,9 +1425,14 @@ public:
|
||||
|
||||
future<migration_plan> balance_tablets(token_metadata_ptr tm, locator::load_stats_ptr table_load_stats, std::unordered_set<host_id> skiplist) {
|
||||
load_balancer lb(tm, std::move(table_load_stats), _load_balancer_stats, _db.get_config().target_tablet_size_in_bytes(), std::move(skiplist));
|
||||
lb.set_use_table_aware_balancing(_use_tablet_aware_balancing);
|
||||
co_return co_await lb.make_plan();
|
||||
}
|
||||
|
||||
void set_use_tablet_aware_balancing(bool use_tablet_aware_balancing) {
|
||||
_use_tablet_aware_balancing = use_tablet_aware_balancing;
|
||||
}
|
||||
|
||||
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
|
||||
locator::replication_strategy_params params(ksm.strategy_options(), ksm.initial_tablets());
|
||||
auto rs = abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), params);
|
||||
@@ -1427,6 +1512,10 @@ future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata
|
||||
return impl().balance_tablets(std::move(tm), std::move(load_stats), std::move(skiplist));
|
||||
}
|
||||
|
||||
void tablet_allocator::set_use_table_aware_balancing(bool use_tablet_aware_balancing) {
|
||||
impl().set_use_tablet_aware_balancing(use_tablet_aware_balancing);
|
||||
}
|
||||
|
||||
future<locator::tablet_map> tablet_allocator::split_tablets(locator::token_metadata_ptr tm, table_id table) {
|
||||
return impl().split_tablets(std::move(tm), table);
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user