mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-02 22:25:48 +00:00
If a keyspace has a numeric replication factor in a DC and rf < #racks,
then the replicas of tablets in this keyspace can be distributed among
all racks in the DC (different for each tablet). With rack list, we need all
tablet replicas to be placed on the same racks. Hence, the conversion
requires tablet co-location.
After this series, the conversion can be done using ALTER KEYSPACE
statement. The statement that does this conversion in any DC is not
allowed to change a rf in any DC. So, if we have dc1 and dc2 with 3 racks
each and a keyspace ks then with a single ALTER KEYSPACE we can do:
- {dc1 : 2} -> {dc1 : [r1, r2]};
- {dc1 : 2, dc2: 2} -> {dc1 : [r1, r2], dc2: [r2,r3]};
- {dc1 : 2, dc2: 2} -> {dc1 : [r1, r2], dc2: 2}
- {dc1 : 2} -> {dc1 : 2, dc2 : [r1]}
But we cannot do:
- {dc1 : 2} -> {dc1 : [r1, r2, r3]};
- {dc1 : 1, dc2 : [r1, r2] → dc1: [r1], dc2: [r1].
In order to do the co-locations rf change request is paused. Tablet
load balancer examines the paused rf change requests and schedules
necessary tablet migrations. During the process of co-location, no other
cross-rack migration is allowed.
Load balancer checks whether any paused rf change request is
ready to be resumed. If so, it puts the request back to global topology
request queue.
While an rf change request for a keyspace is running, any other rf change
of this keyspace will fail.
Fixes: #26398.
New feature, no backport
Closes scylladb/scylladb#27279
* github.com:scylladb/scylladb:
test: add est_rack_list_conversion_with_two_replicas_in_rack
test: test creating tablet_rack_list_colocation_plan
test: add test_numeric_rf_to_rack_list_conversion test
tasks: service: add global_topology_request_virtual_task
cql3: statements: allow altering from numeric rf to rack list
service: topology_coordinator: pause keyspace_rf_change request
service: implement make_rack_list_colocation_plan
service: add tablet_rack_list_colocation_plan
cql3: reject concurrent alter of the same keyspace
test: check paused rf change requests persistence
db: service: add paused_rf_change_requests to system.topology
service: pass topology and system_keyspace to load_balancer ctor
service: tablet_allocator: extract load updates
service: tablet_allocator: extract ensure_node
tasks, system_keyspace: Introduce get_topology_request_entry_opt()
node_ops: Drop get_pending_ids()
node_ops: Drop redundant get_status_helper()
1184 lines
53 KiB
C++
1184 lines
53 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <variant>
|
|
#include <seastar/core/shared_future.hh>
|
|
#include "absl-flat_hash_map.hh"
|
|
#include "gms/endpoint_state.hh"
|
|
#include "gms/i_endpoint_state_change_subscriber.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "service/client_routes.hh"
|
|
#include "service/endpoint_lifecycle_subscriber.hh"
|
|
#include "service/qos/service_level_controller.hh"
|
|
#include "service/task_manager_module.hh"
|
|
#include "service/topology_guard.hh"
|
|
#include "locator/abstract_replication_strategy.hh"
|
|
#include "locator/snitch_base.hh"
|
|
#include "locator/tablets.hh"
|
|
#include "locator/tablet_metadata_guard.hh"
|
|
#include "inet_address_vectors.hh"
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/core/condition-variable.hh>
|
|
#include "dht/token_range_endpoints.hh"
|
|
#include "gms/application_state.hh"
|
|
#include "gms/feature.hh"
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include "replica/database_fwd.hh"
|
|
#include "streaming/stream_reason.hh"
|
|
#include <seastar/core/sharded.hh>
|
|
#include "service/migration_listener.hh"
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include "cdc/generation_id.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "raft/raft.hh"
|
|
#include "node_ops/id.hh"
|
|
#include "raft/server.hh"
|
|
#include "service/topology_state_machine.hh"
|
|
#include "db/view/view_building_state.hh"
|
|
#include "service/tablet_allocator.hh"
|
|
#include "service/tablet_operation.hh"
|
|
#include "mutation/timestamp.hh"
|
|
#include "utils/UUID.hh"
|
|
#include "utils/user_provided_param.hh"
|
|
#include "utils/sequenced_set.hh"
|
|
#include "service/topology_coordinator.hh"
|
|
|
|
class node_ops_cmd_request;
|
|
class node_ops_cmd_response;
|
|
struct node_ops_ctl;
|
|
class node_ops_info;
|
|
enum class node_ops_cmd : uint32_t;
|
|
class repair_service;
|
|
class protocol_server;
|
|
|
|
namespace cql3 { class query_processor; }
|
|
|
|
namespace cql_transport { class controller; }
|
|
|
|
namespace cdc {
|
|
class generation_service;
|
|
class metadata;
|
|
}
|
|
|
|
namespace streaming {
|
|
class stream_manager;
|
|
}
|
|
|
|
namespace db {
|
|
class system_distributed_keyspace;
|
|
class system_keyspace;
|
|
class batchlog_manager;
|
|
namespace view {
|
|
class view_builder;
|
|
class view_building_worker;
|
|
}
|
|
namespace schema_tables {
|
|
class schema_applier;
|
|
}
|
|
}
|
|
|
|
namespace netw {
|
|
class messaging_service;
|
|
}
|
|
|
|
namespace dht {
|
|
class boot_strapper;
|
|
class range_streamer;
|
|
}
|
|
|
|
namespace gms {
|
|
class feature_service;
|
|
class gossiper;
|
|
class loaded_endpoint_state;
|
|
};
|
|
|
|
namespace node_ops {
|
|
class node_ops_virtual_task;
|
|
class task_manager_module;
|
|
}
|
|
|
|
namespace tasks {
|
|
class task_manager;
|
|
}
|
|
|
|
namespace replica {
|
|
class tablet_mutation_builder;
|
|
}
|
|
|
|
namespace auth { class cache; }
|
|
|
|
namespace utils {
|
|
class disk_space_monitor;
|
|
}
|
|
|
|
namespace service {
|
|
|
|
class storage_service;
|
|
class storage_proxy;
|
|
class migration_manager;
|
|
class raft_group0;
|
|
class group0_guard;
|
|
class group0_info;
|
|
class raft_group0_client;
|
|
class tablet_virtual_task;
|
|
class task_manager_module;
|
|
|
|
struct join_node_request_params;
|
|
struct join_node_request_result;
|
|
struct join_node_response_params;
|
|
struct join_node_response_result;
|
|
|
|
enum class disk_error { regular, commit };
|
|
|
|
class node_ops_meta_data;
|
|
|
|
using start_hint_manager = seastar::bool_class<class start_hint_manager_tag>;
|
|
using loosen_constraints = seastar::bool_class<class loosen_constraints_tag>;
|
|
|
|
struct token_metadata_change {
|
|
std::vector<locator::mutable_token_metadata_ptr> pending_token_metadata_ptr{smp::count};
|
|
std::vector<std::unordered_map<sstring, locator::static_effective_replication_map_ptr>> pending_effective_replication_maps{smp::count};
|
|
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms{smp::count};
|
|
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms{smp::count};
|
|
std::unordered_set<session_id> open_sessions;
|
|
|
|
future<> destroy();
|
|
};
|
|
|
|
class schema_getter {
|
|
public:
|
|
virtual flat_hash_map<sstring, locator::replication_strategy_ptr> get_keyspaces_replication() const = 0;
|
|
virtual future<> for_each_table_schema_gently(std::function<future<>(table_id, schema_ptr)> f) const = 0;
|
|
virtual ~schema_getter() {};
|
|
};
|
|
|
|
/**
|
|
* This abstraction contains the token/identifier of this node
|
|
* on the identifier space. This token gets gossiped around.
|
|
* This class will also maintain histograms of the load information
|
|
* of other nodes in the cluster.
|
|
*/
|
|
class storage_service : public service::migration_listener, public gms::i_endpoint_state_change_subscriber,
|
|
public seastar::async_sharded_service<storage_service>, public seastar::peering_sharded_service<storage_service> {
|
|
private:
|
|
using token = dht::token;
|
|
using token_range_endpoints = dht::token_range_endpoints;
|
|
using endpoint_details = dht::endpoint_details;
|
|
using boot_strapper = dht::boot_strapper;
|
|
using token_metadata = locator::token_metadata;
|
|
using shared_token_metadata = locator::shared_token_metadata;
|
|
using token_metadata_ptr = locator::token_metadata_ptr;
|
|
using mutable_token_metadata_ptr = locator::mutable_token_metadata_ptr;
|
|
using token_metadata_lock = locator::token_metadata_lock;
|
|
using application_state = gms::application_state;
|
|
using inet_address = gms::inet_address;
|
|
using versioned_value = gms::versioned_value;
|
|
|
|
struct tablet_operation {
|
|
sstring name;
|
|
shared_future<service::tablet_operation_result> done;
|
|
};
|
|
|
|
using tablet_op_registry = std::unordered_map<locator::global_tablet_id, tablet_operation>;
|
|
|
|
abort_source& _abort_source;
|
|
gms::feature_service& _feature_service;
|
|
sharded<replica::database>& _db;
|
|
gms::gossiper& _gossiper;
|
|
sharded<netw::messaging_service>& _messaging;
|
|
sharded<service::migration_manager>& _migration_manager;
|
|
cql3::query_processor& _qp;
|
|
sharded<repair_service>& _repair;
|
|
sharded<streaming::stream_manager>& _stream_manager;
|
|
sharded<locator::snitch_ptr>& _snitch;
|
|
sharded<qos::service_level_controller>& _sl_controller;
|
|
auth::cache& _auth_cache;
|
|
sharded<client_routes_service>& _client_routes;
|
|
|
|
// Engaged on shard 0 before `join_cluster`.
|
|
service::raft_group0* _group0;
|
|
|
|
sstring _operation_in_progress;
|
|
seastar::metrics::metric_groups _metrics;
|
|
using client_shutdown_hook = noncopyable_function<void()>;
|
|
std::vector<protocol_server*> _protocol_servers;
|
|
std::vector<std::any> _listeners;
|
|
gms::feature::listener_registration _workload_prioritization_registration;
|
|
named_gate _async_gate;
|
|
|
|
condition_variable _tablet_split_monitor_event;
|
|
utils::sequenced_set<table_id> _tablet_split_candidates;
|
|
future<> _tablet_split_monitor = make_ready_future<>();
|
|
|
|
std::unordered_map<node_ops_id, node_ops_meta_data> _node_ops;
|
|
std::list<std::optional<node_ops_id>> _node_ops_abort_queue;
|
|
seastar::condition_variable _node_ops_abort_cond;
|
|
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
|
|
future<> _node_ops_abort_thread;
|
|
shared_ptr<node_ops::task_manager_module> _node_ops_module;
|
|
shared_ptr<service::task_manager_module> _tablets_module;
|
|
shared_ptr<service::topo::task_manager_module> _global_topology_requests_module;
|
|
gms::gossip_address_map& _address_map;
|
|
void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list<inet_address> ignore_nodes,
|
|
std::function<future<>()> abort_func);
|
|
future<> node_ops_update_heartbeat(node_ops_id ops_uuid);
|
|
future<> node_ops_done(node_ops_id ops_uuid);
|
|
future<> node_ops_abort(node_ops_id ops_uuid);
|
|
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
|
|
future<> node_ops_abort_thread();
|
|
future<service::tablet_operation_result> do_tablet_operation(locator::global_tablet_id tablet,
|
|
sstring op_name,
|
|
std::function<future<service::tablet_operation_result>(locator::tablet_metadata_guard&)> op);
|
|
future<service::tablet_operation_repair_result> repair_tablet(locator::global_tablet_id, service::session_id);
|
|
future<> stream_tablet(locator::global_tablet_id);
|
|
// Clones storage of leaving tablet into pending one. Done in the context of intra-node migration,
|
|
// when both of which sit on the same node. So all the movement is local.
|
|
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
|
|
future<> cleanup_tablet(locator::global_tablet_id);
|
|
// Handler for table load stats RPC.
|
|
future<locator::load_stats> load_stats_for_tablet_based_tables();
|
|
future<> process_tablet_split_candidate(table_id) noexcept;
|
|
void register_tablet_split_candidate(table_id) noexcept;
|
|
future<> run_tablet_split_monitor();
|
|
public:
|
|
storage_service(abort_source& as, sharded<replica::database>& db,
|
|
gms::gossiper& gossiper,
|
|
sharded<db::system_keyspace>&,
|
|
sharded<db::system_distributed_keyspace>&,
|
|
gms::feature_service& feature_service,
|
|
sharded<service::migration_manager>& mm,
|
|
locator::shared_token_metadata& stm,
|
|
locator::effective_replication_map_factory& erm_factory,
|
|
sharded<netw::messaging_service>& ms,
|
|
sharded<repair_service>& repair,
|
|
sharded<streaming::stream_manager>& stream_manager,
|
|
endpoint_lifecycle_notifier& elc_notif,
|
|
sharded<db::batchlog_manager>& bm,
|
|
sharded<locator::snitch_ptr>& snitch,
|
|
sharded<service::tablet_allocator>& tablet_allocator,
|
|
sharded<cdc::generation_service>& cdc_gs,
|
|
sharded<db::view::view_builder>& view_builder,
|
|
sharded<db::view::view_building_worker>& view_building_worker,
|
|
cql3::query_processor& qp,
|
|
sharded<qos::service_level_controller>& sl_controller,
|
|
auth::cache& auth_cache,
|
|
sharded<client_routes_service>& _client_routes,
|
|
topology_state_machine& topology_state_machine,
|
|
db::view::view_building_state_machine& view_building_state_machine,
|
|
tasks::task_manager& tm,
|
|
gms::gossip_address_map& address_map,
|
|
std::function<future<void>(std::string_view)> compression_dictionary_updated_callback,
|
|
utils::disk_space_monitor* disk_space_minitor);
|
|
~storage_service();
|
|
|
|
node_ops::task_manager_module& get_node_ops_module() noexcept;
|
|
// Needed by sharded<>
|
|
future<> stop();
|
|
void init_messaging_service();
|
|
future<> uninit_messaging_service();
|
|
|
|
// If a hint is provided, only the changed parts of the tablet metadata will be (re)loaded.
|
|
future<locator::mutable_token_metadata_ptr> prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata);
|
|
void wake_up_topology_state_machine() noexcept;
|
|
future<> update_tablet_metadata(const locator::tablet_metadata_change_hint& hint);
|
|
|
|
void start_tablet_split_monitor();
|
|
private:
|
|
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;
|
|
|
|
// Token metadata changes are serialized
|
|
// using the schema_tables merge_lock.
|
|
//
|
|
// Must be called on shard 0.
|
|
future<token_metadata_lock> get_token_metadata_lock() noexcept;
|
|
|
|
// Acquire the token_metadata lock and get a mutable_token_metadata_ptr.
|
|
// Pass that ptr to \c func, and when successfully done,
|
|
// replicate it to all cores.
|
|
//
|
|
// By default the merge_lock (that is unified with the token_metadata_lock)
|
|
// is acquired for mutating the token_metadata. Pass acquire_merge_lock::no
|
|
// when called from paths that already acquire the merge_lock, like
|
|
// db::schema_tables::do_merge_schema.
|
|
//
|
|
// Note: must be called on shard 0.
|
|
future<> mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept;
|
|
|
|
// Prepares token metadata change without making it visible. Combined with commit function
|
|
// and appropriate lock it does exactly the same as mutate_token_metadata.
|
|
// Note: prepare_token_metadata_change must be called on shard 0.
|
|
future<token_metadata_change> prepare_token_metadata_change(mutable_token_metadata_ptr tmptr,
|
|
const schema_getter& loader);
|
|
|
|
// Commits prepared token metadata changes. Must be called under token_metadata_lock
|
|
// and on all shards.
|
|
void commit_token_metadata_change(token_metadata_change& change) noexcept;
|
|
|
|
// Update pending ranges locally and then replicate to all cores.
|
|
// Should be serialized under token_metadata_lock.
|
|
// Must be called on shard 0.
|
|
future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason);
|
|
future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes);
|
|
future<> keyspace_changed(const sstring& ks_name);
|
|
void register_metrics();
|
|
future<> snitch_reconfigured();
|
|
|
|
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
|
|
return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) {
|
|
// bump the token_metadata ring_version
|
|
// to invalidate cached token/replication mappings
|
|
// when the modified token_metadata is committed.
|
|
tm.invalidate_cached_rings();
|
|
return _shared_token_metadata.make_token_metadata_ptr(std::move(tm));
|
|
});
|
|
}
|
|
|
|
sharded<db::batchlog_manager>& get_batchlog_manager() noexcept {
|
|
return _batchlog_manager;
|
|
}
|
|
|
|
friend struct ::node_ops_ctl;
|
|
friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view);
|
|
friend class db::schema_tables::schema_applier;
|
|
public:
|
|
|
|
const gms::gossiper& gossiper() const noexcept {
|
|
return _gossiper;
|
|
};
|
|
|
|
gms::gossiper& gossiper() noexcept {
|
|
return _gossiper;
|
|
};
|
|
|
|
locator::effective_replication_map_factory& get_erm_factory() noexcept {
|
|
return _erm_factory;
|
|
}
|
|
|
|
const locator::effective_replication_map_factory& get_erm_factory() const noexcept {
|
|
return _erm_factory;
|
|
}
|
|
|
|
token_metadata_ptr get_token_metadata_ptr() const noexcept {
|
|
return _shared_token_metadata.get();
|
|
}
|
|
|
|
const locator::token_metadata& get_token_metadata() const noexcept {
|
|
return *_shared_token_metadata.get();
|
|
}
|
|
|
|
abort_source& get_abort_source() noexcept {
|
|
return _abort_source;
|
|
}
|
|
|
|
gms::feature_service& get_feature_service() noexcept {
|
|
return _feature_service;
|
|
}
|
|
|
|
replica::database& get_database() noexcept {
|
|
return _db.local();
|
|
}
|
|
|
|
db::system_keyspace& get_system_keyspace() noexcept {
|
|
return _sys_ks.local();
|
|
}
|
|
|
|
bool is_raft_leader() const noexcept;
|
|
|
|
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
|
|
|
|
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
|
|
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
|
|
|
|
private:
|
|
inet_address get_broadcast_address() const noexcept {
|
|
return get_token_metadata_ptr()->get_topology().my_address();
|
|
}
|
|
locator::host_id my_host_id() const noexcept {
|
|
return get_token_metadata_ptr()->get_topology().my_host_id();
|
|
}
|
|
bool is_me(inet_address addr) const noexcept {
|
|
return addr == get_broadcast_address();
|
|
}
|
|
bool is_me(locator::host_id id) const noexcept {
|
|
return get_token_metadata_ptr()->get_topology().is_me(id);
|
|
}
|
|
|
|
// When we create a tablet mutation, usually we want to write it to the base table.
|
|
// In a group of co-located tables, the tablet info is stored on the base table partition only.
|
|
// Other tables which are co-located with the base table have only a static row that points to the base table.
|
|
// So if for example a tablet migration is requested for a co-located table, we need to write the
|
|
// tablet mutation with the transition stage to the base table, and then the entire co-location group
|
|
// will be migrated.
|
|
replica::tablet_mutation_builder tablet_mutation_builder_for_base_table(api::timestamp_type ts, table_id table);
|
|
|
|
/* This abstraction maintains the token/endpoint metadata information */
|
|
shared_token_metadata& _shared_token_metadata;
|
|
locator::effective_replication_map_factory& _erm_factory;
|
|
|
|
public:
|
|
std::chrono::milliseconds get_ring_delay();
|
|
enum class mode { NONE, STARTING, JOINING, BOOTSTRAP, NORMAL, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, MAINTENANCE };
|
|
private:
|
|
mode _operation_mode = mode::NONE;
|
|
/* Used for tracking drain progress */
|
|
|
|
endpoint_lifecycle_notifier& _lifecycle_notifier;
|
|
sharded<db::batchlog_manager>& _batchlog_manager;
|
|
|
|
public:
|
|
// should only be called via JMX
|
|
future<> stop_gossiping();
|
|
|
|
// should only be called via JMX
|
|
future<> start_gossiping();
|
|
|
|
// should only be called via JMX
|
|
future<bool> is_gossip_running();
|
|
|
|
future<> register_protocol_server(protocol_server& server, bool start_instantly = false);
|
|
|
|
// All pointers are valid.
|
|
const std::vector<protocol_server*>& protocol_servers() const {
|
|
return _protocol_servers;
|
|
}
|
|
private:
|
|
future<> shutdown_protocol_servers();
|
|
|
|
struct replacement_info {
|
|
std::unordered_set<token> tokens;
|
|
locator::endpoint_dc_rack dc_rack;
|
|
locator::host_id host_id;
|
|
gms::inet_address address;
|
|
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> ignore_nodes;
|
|
};
|
|
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
|
const std::unordered_map<locator::host_id, sstring>& loaded_peer_features);
|
|
|
|
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
|
|
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
|
|
|
|
future<> wait_for_ring_to_settle();
|
|
|
|
public:
|
|
|
|
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
|
const std::unordered_map<locator::host_id, sstring>& loaded_peer_features);
|
|
|
|
future<> join_cluster(sharded<service::storage_proxy>& proxy,
|
|
start_hint_manager start_hm, gms::generation_type new_generation);
|
|
|
|
void set_group0(service::raft_group0&);
|
|
|
|
future<> init_address_map(gms::gossip_address_map& address_map);
|
|
|
|
future<> uninit_address_map();
|
|
bool is_topology_coordinator_enabled() const;
|
|
|
|
future<> drain_on_shutdown();
|
|
|
|
future<> stop_transport();
|
|
|
|
future<> wait_for_group0_stop();
|
|
|
|
private:
|
|
bool should_bootstrap();
|
|
bool is_replacing();
|
|
bool is_first_node();
|
|
raft::server* get_group_server_if_raft_topolgy_enabled();
|
|
future<> start_sys_dist_ks() const;
|
|
future<> join_topology(sharded<service::storage_proxy>& proxy,
|
|
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
|
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
|
|
std::unordered_map<locator::host_id, sstring> loaded_peer_features,
|
|
std::chrono::milliseconds,
|
|
start_hint_manager start_hm,
|
|
gms::generation_type new_generation);
|
|
public:
|
|
|
|
future<> rebuild(utils::optional_param source_dc);
|
|
|
|
private:
|
|
void set_mode(mode m);
|
|
|
|
// Stream data for which we become a new replica.
|
|
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
|
|
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
|
|
future<> bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info);
|
|
|
|
public:
|
|
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(sstring keyspace, std::optional<table_id> tid) const;
|
|
|
|
/**
|
|
* The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
|
|
*
|
|
* @param keyspace The keyspace to fetch information about
|
|
*
|
|
* @return a List of TokenRange(s) converted to String for the given keyspace
|
|
*/
|
|
|
|
/*
|
|
* describeRingJMX will be implemented in the API
|
|
* It is left here just as a marker that there is no need to implement it
|
|
* here
|
|
*/
|
|
//std::vector<sstring> describeRingJMX(const sstring& keyspace) const {
|
|
|
|
future<utils::chunked_vector<token_range_endpoints>> describe_ring(const sstring& keyspace, bool include_only_local_dc = false) const;
|
|
|
|
future<utils::chunked_vector<dht::token_range_endpoints>> describe_ring_for_table(const sstring& keyspace_name, const sstring& table_name) const;
|
|
|
|
/**
|
|
* Retrieve a map of tokens to endpoints, including the bootstrapping ones.
|
|
*
|
|
* @return a map of tokens to endpoints in ascending order
|
|
*/
|
|
std::map<token, inet_address> get_token_to_endpoint_map();
|
|
|
|
/**
|
|
* Retrieve a map of tablet tokens to endpoints.
|
|
*
|
|
* Tablet variant of get_token_to_endpoint_map().
|
|
*
|
|
* @return a map of tablet tokens to endpoints in ascending order
|
|
*/
|
|
future<std::map<token, inet_address>> get_tablet_to_endpoint_map(table_id table);
|
|
|
|
public:
|
|
virtual future<> on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
|
|
/*
|
|
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
|
|
* ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
|
|
* from somewhere else.
|
|
*
|
|
* onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
|
|
* received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
|
|
* the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
|
|
* normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
|
|
* pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
|
|
*
|
|
* Normal progression of ApplicationState.STATUS values for a node should be like this:
|
|
* STATUS_BOOTSTRAPPING,token
|
|
* if bootstrapping. stays this way until all files are received.
|
|
* STATUS_NORMAL,token
|
|
* ready to serve reads and writes.
|
|
* STATUS_LEFT,token
|
|
* set after decommission is completed.
|
|
*
|
|
* Other STATUS values that may be seen (possibly anywhere in the normal progression):
|
|
* REMOVING_TOKEN,deadtoken
|
|
* set if the node is dead and is being removed by its REMOVAL_COORDINATOR
|
|
* REMOVED_TOKEN,deadtoken
|
|
* set if the node is dead and has been removed by its REMOVAL_COORDINATOR
|
|
*
|
|
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
|
|
* you should never bootstrap a new node during a removenode, decommission or move.
|
|
*/
|
|
virtual future<> on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id) override;
|
|
virtual future<> on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
|
|
virtual future<> on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
|
|
virtual future<> on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id) override;
|
|
virtual future<> on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
|
|
|
|
public:
|
|
// For migration_listener
|
|
virtual void on_create_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
|
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
|
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
|
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
|
|
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
|
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
|
|
|
|
virtual void on_update_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
|
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
|
|
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
|
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
|
|
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
|
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
|
|
|
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
|
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
|
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
|
|
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
|
|
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
|
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
|
|
private:
|
|
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint);
|
|
// return an engaged value iff app_state_map has changes to the peer info
|
|
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map);
|
|
|
|
std::unordered_set<token> get_tokens_for(locator::host_id endpoint);
|
|
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(const gms::endpoint_state& ep_state);
|
|
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(locator::host_id endpoint);
|
|
private:
|
|
// Should be serialized under token_metadata_lock.
|
|
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
|
|
sharded<db::system_keyspace>& _sys_ks;
|
|
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
|
locator::snitch_signal_slot_t _snitch_reconfigure;
|
|
sharded<service::tablet_allocator>& _tablet_allocator;
|
|
sharded<cdc::generation_service>& _cdc_gens;
|
|
sharded<db::view::view_builder>& _view_builder;
|
|
sharded<db::view::view_building_worker>& _view_building_worker;
|
|
bool _isolated = false;
|
|
private:
|
|
/**
|
|
* Handle node bootstrap
|
|
*
|
|
* @param endpoint bootstrapping node
|
|
*/
|
|
future<> handle_state_bootstrap(inet_address endpoint, locator::host_id id, gms::permit_id);
|
|
|
|
/**
|
|
* Handle node move to normal state. That is, node is entering token ring and participating
|
|
* in reads.
|
|
*
|
|
* @param endpoint node
|
|
*/
|
|
future<> handle_state_normal(inet_address endpoint, locator::host_id id, gms::permit_id);
|
|
|
|
/**
|
|
* Handle node leaving the ring. This will happen when a node is decommissioned
|
|
*
|
|
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
|
|
* @param pieces STATE_LEFT,token
|
|
*/
|
|
future<> handle_state_left(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
|
|
|
|
/**
|
|
* Handle notification that a node being actively removed from the ring via 'removenode'
|
|
*
|
|
* @param endpoint node
|
|
* @param pieces is REMOVED_TOKEN (node is gone)
|
|
*/
|
|
future<> handle_state_removed(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
|
|
|
|
private:
|
|
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
|
|
gms::permit_id);
|
|
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
|
|
long expire_time, gms::permit_id);
|
|
|
|
/** unlike excise we just need this endpoint gone without going through any notifications **/
|
|
future<> remove_endpoint(inet_address endpoint, gms::permit_id pid);
|
|
|
|
void add_expire_time_if_found(locator::host_id endpoint, int64_t expire_time);
|
|
|
|
int64_t extract_expire_time(const std::vector<sstring>& pieces) const {
|
|
return std::stoll(pieces[2]);
|
|
}
|
|
|
|
/**
|
|
* Finds living endpoints responsible for the given ranges
|
|
*
|
|
* @param erm the keyspace effective_replication_map ranges belong to
|
|
* @param ranges the ranges to find sources for
|
|
* @return multimap of addresses to ranges the address is responsible for
|
|
*/
|
|
future<std::unordered_multimap<locator::host_id, dht::token_range>> get_new_source_ranges(const locator::vnode_effective_replication_map* erm, const dht::token_range_vector& ranges) const;
|
|
|
|
future<> removenode_with_stream(locator::host_id leaving_node, frozen_topology_guard, shared_ptr<abort_source> as_ptr);
|
|
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, locator::host_id leaving_node);
|
|
|
|
// needs to be modified to accept either a keyspace or ARS.
|
|
future<std::unordered_multimap<dht::token_range, locator::host_id>> get_changed_ranges_for_leaving(const locator::vnode_effective_replication_map* erm, locator::host_id endpoint);
|
|
|
|
future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip, locator::host_id host_id);
|
|
|
|
public:
|
|
|
|
sstring get_release_version();
|
|
|
|
sstring get_schema_version();
|
|
|
|
future<std::unordered_map<sstring, std::vector<sstring>>> describe_schema_versions();
|
|
|
|
|
|
/**
|
|
* Get all ranges an endpoint is responsible for (by keyspace effective_replication_map)
|
|
* Replication strategy's get_ranges() guarantees that no wrap-around range is returned.
|
|
* @param ep endpoint we are interested in.
|
|
* @return ranges for the specified endpoint.
|
|
*/
|
|
future<dht::token_range_vector> get_ranges_for_endpoint(const locator::effective_replication_map& erm, const locator::host_id& ep) const;
|
|
|
|
/**
|
|
* Get all ranges that span the ring given a set
|
|
* of tokens. All ranges are in sorted order of
|
|
* ranges.
|
|
* @return ranges in sorted order
|
|
*/
|
|
future<dht::token_range_vector> get_all_ranges(const std::vector<token>& sorted_tokens) const;
|
|
/**
|
|
* This method returns the N endpoints that are responsible for storing the
|
|
* specified key i.e for replication.
|
|
*
|
|
* @param keyspaceName keyspace name also known as keyspace
|
|
* @param cf Column family name
|
|
* @param key Nodetool style key for which we need to find the endpoint
|
|
* @return the endpoint responsible for this key
|
|
*/
|
|
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const;
|
|
/**
|
|
* This method returns the N endpoints that are responsible for storing the
|
|
* specified key i.e for replication.
|
|
*
|
|
* @param keyspaceName keyspace name also known as keyspace
|
|
* @param cf Column family name
|
|
* @param key_components the components of the partition key for which we need to find the endpoint
|
|
* @return the endpoint responsible for this key
|
|
*/
|
|
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const std::vector<sstring>& key_components) const;
|
|
|
|
private:
|
|
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
|
|
|
|
public:
|
|
future<> decommission();
|
|
|
|
private:
|
|
/**
|
|
* Broadcast leaving status and update local _token_metadata accordingly
|
|
*/
|
|
future<> leave_ring();
|
|
future<> unbootstrap();
|
|
|
|
public:
|
|
future<> move(sstring new_token) {
|
|
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
|
|
return move(dht::token::from_sstring(new_token));
|
|
}
|
|
|
|
private:
|
|
/**
|
|
* move the node to new token or find a new token to boot to according to load
|
|
*
|
|
* @param newToken new token to boot to, or if null, find balanced token to boot to
|
|
*
|
|
* @throws IOException on any I/O operation error
|
|
*/
|
|
future<> move(token new_token);
|
|
public:
|
|
|
|
/**
|
|
* Get the status of a token removal.
|
|
*/
|
|
future<sstring> get_removal_status();
|
|
|
|
/**
|
|
* Force a remove operation to complete. This may be necessary if a remove operation
|
|
* blocks forever due to node/stream failure. removeToken() must be called
|
|
* first, this is a last resort measure. No further attempt will be made to restore replicas.
|
|
*/
|
|
future<> force_remove_completion();
|
|
|
|
public:
|
|
/**
|
|
* Remove a node that has died, attempting to restore the replica count.
|
|
* If the node is alive, decommission should be attempted. If decommission
|
|
* fails, then removeToken should be called. If we fail while trying to
|
|
* restore the replica count, finally forceRemoveCompleteion should be
|
|
* called to forcibly remove the node without regard to replica count.
|
|
*
|
|
* @param hostIdString token for the node
|
|
*/
|
|
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
|
|
future<> mark_excluded(const std::vector<locator::host_id>&);
|
|
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
|
|
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
|
|
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
|
|
void on_node_ops_registered(node_ops_id);
|
|
|
|
future<mode> get_operation_mode();
|
|
|
|
/**
|
|
* Shuts node off to writes, empties memtables and the commit log.
|
|
* There are two differences between drain and the normal shutdown hook:
|
|
* - Drain waits for in-progress streaming to complete
|
|
* - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs)
|
|
*/
|
|
future<> drain();
|
|
|
|
// Recalculates schema digests on this node from contents of tables on disk.
|
|
future<> reload_schema();
|
|
|
|
future<std::map<gms::inet_address, float>> get_ownership();
|
|
|
|
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name, sstring table_name);
|
|
|
|
// Must run on shard 0.
|
|
future<> check_and_repair_cdc_streams();
|
|
|
|
// for testing
|
|
bool is_isolated() const {
|
|
return _isolated;
|
|
}
|
|
private:
|
|
promise<> _drain_finished;
|
|
std::optional<shared_future<>> _transport_stopped;
|
|
future<> do_drain();
|
|
/**
|
|
* Seed data to the endpoints that will be responsible for it at the future
|
|
*
|
|
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
|
|
* @return async Future for whether stream was success
|
|
*/
|
|
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace);
|
|
|
|
template <typename Func>
|
|
auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) {
|
|
if (!ss._operation_in_progress.empty()) {
|
|
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
|
|
}
|
|
ss._operation_in_progress = std::move(operation);
|
|
return func(ss).finally([&ss] {
|
|
ss._operation_in_progress = sstring();
|
|
});
|
|
}
|
|
|
|
public:
|
|
int32_t get_exception_count();
|
|
|
|
template <typename Func>
|
|
auto run_with_api_lock(sstring operation, Func&& func) {
|
|
return container().invoke_on(0, [operation = std::move(operation),
|
|
func = std::forward<Func>(func)] (storage_service& ss) mutable {
|
|
return ss.run_with_api_lock_internal(ss, std::forward<Func>(func), operation);
|
|
});
|
|
}
|
|
|
|
template <typename Func>
|
|
auto run_with_no_api_lock(Func&& func) {
|
|
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
|
|
return func(ss);
|
|
});
|
|
}
|
|
|
|
template <typename Func>
|
|
auto run_with_api_lock_in_gossiper_mode_only(sstring operation, Func&& func) {
|
|
return container().invoke_on(0, [operation = std::move(operation),
|
|
func = std::forward<Func>(func)] (storage_service& ss) mutable {
|
|
if (ss.raft_topology_change_enabled()) {
|
|
return func(ss);
|
|
}
|
|
return ss.run_with_api_lock_internal(ss, std::forward<Func>(func), operation);
|
|
});
|
|
}
|
|
|
|
private:
|
|
void do_isolate_on_error(disk_error type);
|
|
future<> isolate();
|
|
|
|
future<> notify_down(inet_address endpoint, locator::host_id hid);
|
|
future<> notify_left(inet_address endpoint, locator::host_id hid);
|
|
future<> notify_released(locator::host_id hid);
|
|
future<> notify_up(inet_address endpoint, locator::host_id hid);
|
|
future<> notify_joined(inet_address endpoint, locator::host_id hid);
|
|
future<> notify_cql_change(inet_address endpoint, locator::host_id hid,bool ready);
|
|
future<> remove_rpc_client_with_ignored_topology(inet_address endpoint, locator::host_id id);
|
|
public:
|
|
future<bool> is_vnodes_cleanup_allowed(sstring keyspace);
|
|
bool is_repair_based_node_ops_enabled(streaming::stream_reason reason);
|
|
future<> update_fence_version(token_metadata::version_t version);
|
|
|
|
private:
|
|
std::unordered_set<locator::host_id> _normal_state_handled_on_boot;
|
|
bool is_normal_state_handled_on_boot(locator::host_id);
|
|
future<> wait_for_normal_state_handled_on_boot();
|
|
|
|
friend class group0_state_machine;
|
|
|
|
enum class topology_change_kind {
|
|
// The node is still starting and didn't determine yet which ops kind to use
|
|
unknown,
|
|
// The node uses legacy, gossip-based topology operations
|
|
legacy,
|
|
// The node is in the process of upgrading to raft-based topology operations
|
|
upgrading_to_raft,
|
|
// The node uses raft-based topology operations
|
|
raft
|
|
};
|
|
// The _topology_change_kind_enabled variable is first initialized in `join_cluster`.
|
|
// After the node successfully joins, the control over the variable is yielded
|
|
// to `topology_state_load`, so that it can control it during the upgrade from gossiper
|
|
// based topology to raft-based topology.
|
|
topology_change_kind _topology_change_kind_enabled = topology_change_kind::unknown;
|
|
|
|
// Throws an exception if the node is either starting and didn't determine which
|
|
// topology operations to use, or if it is in the process of upgrade to topology
|
|
// on raft. The name only serves for display purposes (i.e. it will be included
|
|
// in the exception, if one is thrown).
|
|
void check_ability_to_perform_topology_operation(std::string_view operation_name) const;
|
|
|
|
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
|
|
|
|
public:
|
|
bool raft_topology_change_enabled() const;
|
|
bool legacy_topology_change_enabled() const;
|
|
|
|
private:
|
|
future<> _raft_state_monitor = make_ready_future<>();
|
|
// This fibers monitors raft state and start/stops the topology change
|
|
// coordinator fiber
|
|
future<> raft_state_monitor_fiber(raft::server&, gate::holder);
|
|
|
|
public:
|
|
bool topology_global_queue_empty() const {
|
|
return !_topology_state_machine._topology.global_request.has_value();
|
|
}
|
|
future<bool> ongoing_rf_change(const group0_guard& guard, sstring ks) const;
|
|
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
|
|
future<> initialize_done_topology_upgrade_state();
|
|
private:
|
|
// State machine that is responsible for topology change
|
|
topology_state_machine& _topology_state_machine;
|
|
db::view::view_building_state_machine& _view_building_state_machine;
|
|
|
|
future<> _topology_change_coordinator = make_ready_future<>();
|
|
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, cdc::generation_service&, sharded<db::system_distributed_keyspace>&, abort_source&);
|
|
|
|
// Those futures hold results of streaming for various operations
|
|
std::optional<shared_future<>> _bootstrap_result;
|
|
std::optional<shared_future<>> _decommission_result;
|
|
std::optional<shared_future<>> _rebuild_result;
|
|
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
|
|
tablet_op_registry _tablet_ops;
|
|
// This tracks active topology cmd rpc. There can be only one active
|
|
// cmd running and by inspecting this structure it can be checked which
|
|
// cmd is current executing and which nodes are still did not reply.
|
|
// Needed for debugging.
|
|
topology_coordinator_cmd_rpc_tracker _topology_cmd_rpc_tracker;
|
|
struct {
|
|
raft::term_t term{0};
|
|
uint64_t last_index{0};
|
|
} _raft_topology_cmd_handler_state;
|
|
class ip_address_updater;
|
|
// Represents a subscription to gossiper on_change events,
|
|
// updating the raft data structures that depend on
|
|
// IP addresses (token_metadata.topology, erm-s),
|
|
// as well as the system.peers table.
|
|
shared_ptr<ip_address_updater> _ip_address_updater;
|
|
|
|
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const;
|
|
|
|
future<raft_topology_cmd_result> raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd);
|
|
|
|
future<> raft_decommission();
|
|
future<> raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params);
|
|
future<> raft_rebuild(utils::optional_param source_dc);
|
|
future<> raft_check_and_repair_cdc_streams();
|
|
future<> update_topology_with_local_metadata(raft::server&);
|
|
void set_topology_change_kind(topology_change_kind kind);
|
|
|
|
struct state_change_hint {
|
|
std::optional<locator::tablet_metadata_change_hint> tablets_hint;
|
|
};
|
|
|
|
// This is called on all nodes for each new command received through raft
|
|
// raft_group0_client::_read_apply_mutex must be held
|
|
// Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine.
|
|
future<> topology_transition(state_change_hint hint = {});
|
|
|
|
// This is called on all nodes for each new command related to tablet-view-building, received through raft
|
|
// raft_group0_client::_read_apply_mutex must be held
|
|
// Precondition: the mutations were already written to disk; the function only transitions the in-memory state machine.
|
|
future<> view_building_transition();
|
|
public:
|
|
// Reloads in-memory topology state safely under group 0 read_apply mutex.
|
|
// Does not modify on-disk state.
|
|
//
|
|
// Must be called on shard 0.
|
|
future<> reload_raft_topology_state(service::raft_group0_client&);
|
|
|
|
// Service levels cache consists of two levels: service levels cache and effective service levels cache
|
|
// The second one is dependent on the first one.
|
|
// Must be called on shard 0.
|
|
//
|
|
// update_both_cache_levels::yes - updates both levels of the cache
|
|
// update_both_cache_levels::no - update only effective service levels cache
|
|
future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified);
|
|
|
|
auth::cache& auth_cache() noexcept;
|
|
|
|
// Should be called whenever new compression dictionaries are published to system.dicts.
|
|
// This is an arbitrary callback passed through the constructor,
|
|
// but its intended usage is to set up the RPC connections to use the new dictionaries.
|
|
//
|
|
// Must be called on shard 0.
|
|
future<> compression_dictionary_updated_callback(std::string_view name);
|
|
future<> compression_dictionary_updated_callback_all();
|
|
|
|
future<> load_cdc_streams(std::optional<std::unordered_set<table_id>> changed_tables = std::nullopt);
|
|
|
|
future<> do_clusterwide_vnodes_cleanup();
|
|
future<> reset_cleanup_needed();
|
|
|
|
// Starts the upgrade procedure to topology on raft.
|
|
// Must be called on shard 0.
|
|
future<> start_upgrade_to_raft_topology();
|
|
|
|
// Must be called on shard 0.
|
|
topology::upgrade_state_type get_topology_upgrade_state() const;
|
|
|
|
node_state get_node_state(locator::host_id id);
|
|
|
|
// Waits for topology state in which none of tablets has replaced_id as a replica.
|
|
// Must be called on shard 0.
|
|
future<> await_tablets_rebuilt(raft::server_id replaced_id);
|
|
|
|
topology_coordinator_cmd_rpc_tracker get_topology_cmd_status() {
|
|
return _topology_cmd_rpc_tracker;
|
|
}
|
|
private:
|
|
// Tracks progress of the upgrade to topology coordinator.
|
|
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
|
|
future<> track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy);
|
|
|
|
future<> transit_tablet(table_id, dht::token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations);
|
|
future<service::group0_guard> get_guard_for_tablet_update();
|
|
future<bool> exec_tablet_update(service::group0_guard guard, utils::chunked_vector<canonical_mutation> updates, sstring reason);
|
|
public:
|
|
struct all_tokens_tag {};
|
|
future<std::unordered_map<sstring, sstring>> add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant, std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion, locator::tablet_repair_incremental_mode incremental_mode);
|
|
future<> del_repair_tablet_request(table_id table, locator::tablet_task_id);
|
|
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
|
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
|
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
|
|
future<> set_tablet_balancing_enabled(bool);
|
|
|
|
future<> await_topology_quiesced();
|
|
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
|
|
// by the caller.
|
|
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
|
|
|
|
// In the maintenance mode, other nodes won't be available thus we disabled joining
|
|
// the token ring and the token metadata won't be populated with the local node's endpoint.
|
|
// When a CQL query is executed it checks the `token_metadata` structure and fails if it is empty.
|
|
//
|
|
// This method initialises `token_metadata` with the local node as the only node in the token ring.
|
|
// It is incompatible with the `join_cluster` method.
|
|
future<> start_maintenance_mode();
|
|
|
|
// Waits for a topology request with a given ID to complete and return non empty error string
|
|
// if request completes with an error
|
|
future<sstring> wait_for_topology_request_completion(utils::UUID id, bool require_entry = true);
|
|
future<> wait_for_topology_not_busy();
|
|
|
|
future<> abort_paused_rf_change(utils::UUID request_id);
|
|
|
|
private:
|
|
semaphore _do_sample_sstables_concurrency_limiter{1};
|
|
// To avoid overly-large RPC messages, `do_sample_sstables` is broken up into several rounds.
|
|
// This implements a single round.
|
|
future<utils::chunked_vector<temporary_buffer<char>>> do_sample_sstables_oneshot(table_id, uint64_t chunk_size, uint64_t n_chunks);
|
|
public:
|
|
// SSTable sampling results can occupy a considerable amount of memory.
|
|
// Callers of `do_sample_sstables` should hold this semaphore until they are done with the sample,
|
|
// to ensure that there's only one sample around.
|
|
semaphore& get_do_sample_sstables_concurrency_limiter();
|
|
// Gathers a randomly-selected sample of chunks of (decompressed) Data files for the given table,
|
|
// from across the entire cluster.
|
|
future<utils::chunked_vector<temporary_buffer<char>>> do_sample_sstables(table_id, uint64_t chunk_size, uint64_t n_chunks);
|
|
private:
|
|
future<utils::chunked_vector<canonical_mutation>> get_system_mutations(schema_ptr schema);
|
|
future<utils::chunked_vector<canonical_mutation>> get_system_mutations(const sstring& ks_name, const sstring& cf_name);
|
|
|
|
struct nodes_to_notify_after_sync {
|
|
std::vector<std::pair<gms::inet_address, locator::host_id>> left;
|
|
std::vector<std::pair<gms::inet_address, locator::host_id>> joined;
|
|
std::vector<locator::host_id> released;
|
|
};
|
|
|
|
using host_id_to_ip_map_t = db::system_keyspace::host_id_to_ip_map_t;
|
|
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& map, nodes_to_notify_after_sync* nodes_to_notify);
|
|
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
|
// gossiper) to align it with the other raft topology nodes.
|
|
// Optional target_node can be provided to restrict the synchronization to the specified node.
|
|
// Returns a structure that describes which notifications to trigger after token metadata is updated.
|
|
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
|
|
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
|
|
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
|
|
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
|
|
// load topology state machine snapshot into memory
|
|
// raft_group0_client::_read_apply_mutex must be held
|
|
future<> topology_state_load(state_change_hint hint = {});
|
|
// Applies received raft snapshot to local state machine persistent storage
|
|
// raft_group0_client::_read_apply_mutex must be held
|
|
future<> merge_topology_snapshot(raft_snapshot snp);
|
|
// load view building state machine snapshot into memory
|
|
// raft_group0_client::_read_apply_mutex must be held
|
|
future<> view_building_state_load();
|
|
|
|
utils::chunked_vector<canonical_mutation> build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp);
|
|
std::unordered_set<raft::server_id> ignored_nodes_from_join_params(const join_node_request_params& params);
|
|
|
|
future<join_node_request_result> join_node_request_handler(join_node_request_params params);
|
|
future<join_node_response_result> join_node_response_handler(join_node_response_params params);
|
|
shared_promise<> _join_node_request_done;
|
|
shared_promise<> _join_node_group0_started;
|
|
shared_promise<> _join_node_response_done;
|
|
semaphore _join_node_response_handler_mutex{1};
|
|
|
|
future<> _sstable_vnodes_cleanup_fiber = make_ready_future<>();
|
|
future<> sstable_vnodes_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;
|
|
|
|
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
|
|
abort_source _group0_as;
|
|
|
|
std::function<future<void>(std::string_view)> _compression_dictionary_updated_callback;
|
|
using byte_vector = std::vector<std::byte>;
|
|
std::function<future<byte_vector>(std::vector<byte_vector>)> _train_dict;
|
|
|
|
utils::disk_space_monitor* _disk_space_monitor; // != nullptr only on shard0.
|
|
|
|
public:
|
|
future<uint64_t> estimate_total_sstable_volume(table_id);
|
|
future<std::vector<std::byte>> train_dict(utils::chunked_vector<temporary_buffer<char>> sample);
|
|
future<> publish_new_sstable_dict(table_id, std::span<const std::byte>, service::raft_group0_client&);
|
|
void set_train_dict_callback(decltype(_train_dict));
|
|
seastar::future<> notify_client_routes_change(const client_routes_service::client_route_keys& client_route_keys);
|
|
|
|
|
|
friend class join_node_rpc_handshaker;
|
|
friend class node_ops::node_ops_virtual_task;
|
|
friend class tasks::task_manager;
|
|
friend class tablet_virtual_task;
|
|
friend class topo::global_topology_request_virtual_task;
|
|
};
|
|
|
|
}
|
|
|
|
template <>
|
|
struct fmt::formatter<service::storage_service::mode> : fmt::formatter<string_view> {
|
|
template <typename FormatContext>
|
|
auto format(service::storage_service::mode mode, FormatContext& ctx) const {
|
|
std::string_view name;
|
|
using enum service::storage_service::mode;
|
|
switch (mode) {
|
|
case NONE: name = "STARTING"; break;
|
|
case STARTING: name = "STARTING"; break;
|
|
case NORMAL: name = "NORMAL"; break;
|
|
case JOINING: name = "JOINING"; break;
|
|
case BOOTSTRAP: name = "BOOTSTRAP"; break;
|
|
case LEAVING: name = "LEAVING"; break;
|
|
case DECOMMISSIONED: name = "DECOMMISSIONED"; break;
|
|
case MOVING: name = "MOVING"; break;
|
|
case DRAINING: name = "DRAINING"; break;
|
|
case DRAINED: name = "DRAINED"; break;
|
|
case MAINTENANCE: name = "MAINTENANCE"; break;
|
|
}
|
|
return fmt::format_to(ctx.out(), "{}", name);
|
|
}
|
|
};
|