Files
scylladb/service/storage_service.hh
Marcin Maliszkiewicz 2cf1ca43b5 service: add auth cache getter to storage service
Prepare for use in a subsequent commit in group0_state_machine,
where the auth cache will be integrated. This follows the same
pattern as updates to the service-level cache, view-building
state, and CDC streams.
2025-11-26 12:00:50 +01:00

1172 lines
52 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include <variant>
#include <seastar/core/shared_future.hh>
#include "absl-flat_hash_map.hh"
#include "gms/endpoint_state.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/qos/service_level_controller.hh"
#include "service/topology_guard.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/snitch_base.hh"
#include "locator/tablets.hh"
#include "locator/tablet_metadata_guard.hh"
#include "inet_address_vectors.hh"
#include <seastar/core/sharded.hh>
#include <seastar/core/condition-variable.hh>
#include "dht/token_range_endpoints.hh"
#include "gms/application_state.hh"
#include "gms/feature.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include "replica/database_fwd.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/sharded.hh>
#include "service/migration_listener.hh"
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/shared_ptr.hh>
#include "cdc/generation_id.hh"
#include "db/system_keyspace.hh"
#include "raft/raft.hh"
#include "node_ops/id.hh"
#include "raft/server.hh"
#include "service/topology_state_machine.hh"
#include "db/view/view_building_state.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "mutation/timestamp.hh"
#include "utils/user_provided_param.hh"
#include "utils/sequenced_set.hh"
#include "service/topology_coordinator.hh"
class node_ops_cmd_request;
class node_ops_cmd_response;
struct node_ops_ctl;
class node_ops_info;
enum class node_ops_cmd : uint32_t;
class repair_service;
class protocol_server;
namespace cql3 { class query_processor; }
namespace cql_transport { class controller; }
namespace cdc {
class generation_service;
class metadata;
}
namespace streaming {
class stream_manager;
}
namespace db {
class system_distributed_keyspace;
class system_keyspace;
class batchlog_manager;
namespace view {
class view_builder;
class view_building_worker;
}
namespace schema_tables {
class schema_applier;
}
}
namespace netw {
class messaging_service;
}
namespace dht {
class boot_strapper;
class range_streamer;
}
namespace gms {
class feature_service;
class gossiper;
class loaded_endpoint_state;
};
namespace node_ops {
class node_ops_virtual_task;
class task_manager_module;
}
namespace tasks {
class task_manager;
}
namespace replica {
class tablet_mutation_builder;
}
namespace auth { class cache; }
namespace utils {
class disk_space_monitor;
}
namespace service {
class storage_service;
class storage_proxy;
class migration_manager;
class raft_group0;
class group0_guard;
class group0_info;
class raft_group0_client;
class tablet_virtual_task;
class task_manager_module;
struct join_node_request_params;
struct join_node_request_result;
struct join_node_response_params;
struct join_node_response_result;
enum class disk_error { regular, commit };
class node_ops_meta_data;
using start_hint_manager = seastar::bool_class<class start_hint_manager_tag>;
using loosen_constraints = seastar::bool_class<class loosen_constraints_tag>;
struct token_metadata_change {
std::vector<locator::mutable_token_metadata_ptr> pending_token_metadata_ptr{smp::count};
std::vector<std::unordered_map<sstring, locator::static_effective_replication_map_ptr>> pending_effective_replication_maps{smp::count};
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_table_erms{smp::count};
std::vector<std::unordered_map<table_id, locator::effective_replication_map_ptr>> pending_view_erms{smp::count};
std::unordered_set<session_id> open_sessions;
future<> destroy();
};
class schema_getter {
public:
virtual flat_hash_map<sstring, locator::replication_strategy_ptr> get_keyspaces_replication() const = 0;
virtual future<> for_each_table_schema_gently(std::function<future<>(table_id, schema_ptr)> f) const = 0;
virtual ~schema_getter() {};
};
/**
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
* This class will also maintain histograms of the load information
* of other nodes in the cluster.
*/
class storage_service : public service::migration_listener, public gms::i_endpoint_state_change_subscriber,
public seastar::async_sharded_service<storage_service>, public seastar::peering_sharded_service<storage_service> {
private:
using token = dht::token;
using token_range_endpoints = dht::token_range_endpoints;
using endpoint_details = dht::endpoint_details;
using boot_strapper = dht::boot_strapper;
using token_metadata = locator::token_metadata;
using shared_token_metadata = locator::shared_token_metadata;
using token_metadata_ptr = locator::token_metadata_ptr;
using mutable_token_metadata_ptr = locator::mutable_token_metadata_ptr;
using token_metadata_lock = locator::token_metadata_lock;
using application_state = gms::application_state;
using inet_address = gms::inet_address;
using versioned_value = gms::versioned_value;
struct tablet_operation {
sstring name;
shared_future<service::tablet_operation_result> done;
};
using tablet_op_registry = std::unordered_map<locator::global_tablet_id, tablet_operation>;
abort_source& _abort_source;
gms::feature_service& _feature_service;
sharded<replica::database>& _db;
gms::gossiper& _gossiper;
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
cql3::query_processor& _qp;
sharded<repair_service>& _repair;
sharded<streaming::stream_manager>& _stream_manager;
sharded<locator::snitch_ptr>& _snitch;
sharded<qos::service_level_controller>& _sl_controller;
auth::cache& _auth_cache;
// Engaged on shard 0 before `join_cluster`.
service::raft_group0* _group0;
sstring _operation_in_progress;
seastar::metrics::metric_groups _metrics;
using client_shutdown_hook = noncopyable_function<void()>;
std::vector<protocol_server*> _protocol_servers;
std::vector<std::any> _listeners;
gms::feature::listener_registration _workload_prioritization_registration;
named_gate _async_gate;
condition_variable _tablet_split_monitor_event;
utils::sequenced_set<table_id> _tablet_split_candidates;
future<> _tablet_split_monitor = make_ready_future<>();
std::unordered_map<node_ops_id, node_ops_meta_data> _node_ops;
std::list<std::optional<node_ops_id>> _node_ops_abort_queue;
seastar::condition_variable _node_ops_abort_cond;
named_semaphore _node_ops_abort_sem{1, named_semaphore_exception_factory{"node_ops_abort_sem"}};
future<> _node_ops_abort_thread;
shared_ptr<node_ops::task_manager_module> _node_ops_module;
shared_ptr<service::task_manager_module> _tablets_module;
gms::gossip_address_map& _address_map;
void node_ops_insert(node_ops_id, gms::inet_address coordinator, std::list<inet_address> ignore_nodes,
std::function<future<>()> abort_func);
future<> node_ops_update_heartbeat(node_ops_id ops_uuid);
future<> node_ops_done(node_ops_id ops_uuid);
future<> node_ops_abort(node_ops_id ops_uuid);
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
future<> node_ops_abort_thread();
future<service::tablet_operation_result> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<service::tablet_operation_result>(locator::tablet_metadata_guard&)> op);
future<service::tablet_operation_repair_result> repair_tablet(locator::global_tablet_id, service::session_id);
future<> stream_tablet(locator::global_tablet_id);
// Clones storage of leaving tablet into pending one. Done in the context of intra-node migration,
// when both of which sit on the same node. So all the movement is local.
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
future<> cleanup_tablet(locator::global_tablet_id);
// Handler for table load stats RPC.
future<locator::load_stats> load_stats_for_tablet_based_tables();
future<> process_tablet_split_candidate(table_id) noexcept;
void register_tablet_split_candidate(table_id) noexcept;
future<> run_tablet_split_monitor();
public:
storage_service(abort_source& as, sharded<replica::database>& db,
gms::gossiper& gossiper,
sharded<db::system_keyspace>&,
sharded<db::system_distributed_keyspace>&,
gms::feature_service& feature_service,
sharded<service::migration_manager>& mm,
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
endpoint_lifecycle_notifier& elc_notif,
sharded<db::batchlog_manager>& bm,
sharded<locator::snitch_ptr>& snitch,
sharded<service::tablet_allocator>& tablet_allocator,
sharded<cdc::generation_service>& cdc_gs,
sharded<db::view::view_builder>& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
cql3::query_processor& qp,
sharded<qos::service_level_controller>& sl_controller,
auth::cache& auth_cache,
topology_state_machine& topology_state_machine,
db::view::view_building_state_machine& view_building_state_machine,
tasks::task_manager& tm,
gms::gossip_address_map& address_map,
std::function<future<void>(std::string_view)> compression_dictionary_updated_callback,
utils::disk_space_monitor* disk_space_minitor);
~storage_service();
node_ops::task_manager_module& get_node_ops_module() noexcept;
// Needed by sharded<>
future<> stop();
void init_messaging_service();
future<> uninit_messaging_service();
// If a hint is provided, only the changed parts of the tablet metadata will be (re)loaded.
future<locator::mutable_token_metadata_ptr> prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint, mutable_token_metadata_ptr pending_token_metadata);
void wake_up_topology_state_machine() noexcept;
future<> update_tablet_metadata(const locator::tablet_metadata_change_hint& hint);
void start_tablet_split_monitor();
private:
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;
// Token metadata changes are serialized
// using the schema_tables merge_lock.
//
// Must be called on shard 0.
future<token_metadata_lock> get_token_metadata_lock() noexcept;
// Acquire the token_metadata lock and get a mutable_token_metadata_ptr.
// Pass that ptr to \c func, and when successfully done,
// replicate it to all cores.
//
// By default the merge_lock (that is unified with the token_metadata_lock)
// is acquired for mutating the token_metadata. Pass acquire_merge_lock::no
// when called from paths that already acquire the merge_lock, like
// db::schema_tables::do_merge_schema.
//
// Note: must be called on shard 0.
future<> mutate_token_metadata(std::function<future<> (mutable_token_metadata_ptr)> func, acquire_merge_lock aml = acquire_merge_lock::yes) noexcept;
// Prepares token metadata change without making it visible. Combined with commit function
// and appropriate lock it does exactly the same as mutate_token_metadata.
// Note: prepare_token_metadata_change must be called on shard 0.
future<token_metadata_change> prepare_token_metadata_change(mutable_token_metadata_ptr tmptr,
const schema_getter& loader);
// Commits prepared token metadata changes. Must be called under token_metadata_lock
// and on all shards.
void commit_token_metadata_change(token_metadata_change& change) noexcept;
// Update pending ranges locally and then replicate to all cores.
// Should be serialized under token_metadata_lock.
// Must be called on shard 0.
future<> update_topology_change_info(mutable_token_metadata_ptr tmptr, sstring reason);
future<> update_topology_change_info(sstring reason, acquire_merge_lock aml = acquire_merge_lock::yes);
future<> keyspace_changed(const sstring& ks_name);
void register_metrics();
future<> snitch_reconfigured();
future<mutable_token_metadata_ptr> get_mutable_token_metadata_ptr() noexcept {
return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) {
// bump the token_metadata ring_version
// to invalidate cached token/replication mappings
// when the modified token_metadata is committed.
tm.invalidate_cached_rings();
return _shared_token_metadata.make_token_metadata_ptr(std::move(tm));
});
}
sharded<db::batchlog_manager>& get_batchlog_manager() noexcept {
return _batchlog_manager;
}
friend struct ::node_ops_ctl;
friend void check_raft_rpc_scheduling_group(storage_service&, std::string_view);
friend class db::schema_tables::schema_applier;
public:
const gms::gossiper& gossiper() const noexcept {
return _gossiper;
};
gms::gossiper& gossiper() noexcept {
return _gossiper;
};
locator::effective_replication_map_factory& get_erm_factory() noexcept {
return _erm_factory;
}
const locator::effective_replication_map_factory& get_erm_factory() const noexcept {
return _erm_factory;
}
token_metadata_ptr get_token_metadata_ptr() const noexcept {
return _shared_token_metadata.get();
}
const locator::token_metadata& get_token_metadata() const noexcept {
return *_shared_token_metadata.get();
}
abort_source& get_abort_source() noexcept {
return _abort_source;
}
gms::feature_service& get_feature_service() noexcept {
return _feature_service;
}
replica::database& get_database() noexcept {
return _db.local();
}
db::system_keyspace& get_system_keyspace() noexcept {
return _sys_ks.local();
}
bool is_raft_leader() const noexcept;
std::vector<table_id> get_tables_with_cdc_tablet_streams() const;
future<> query_cdc_timestamps(table_id table, bool ascending, noncopyable_function<future<>(db_clock::time_point)> f);
future<> query_cdc_streams(table_id table, noncopyable_function<future<>(db_clock::time_point, const utils::chunked_vector<cdc::stream_id>& current, cdc::cdc_stream_diff)> f);
private:
inet_address get_broadcast_address() const noexcept {
return get_token_metadata_ptr()->get_topology().my_address();
}
locator::host_id my_host_id() const noexcept {
return get_token_metadata_ptr()->get_topology().my_host_id();
}
bool is_me(inet_address addr) const noexcept {
return addr == get_broadcast_address();
}
bool is_me(locator::host_id id) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(id);
}
// When we create a tablet mutation, usually we want to write it to the base table.
// In a group of co-located tables, the tablet info is stored on the base table partition only.
// Other tables which are co-located with the base table have only a static row that points to the base table.
// So if for example a tablet migration is requested for a co-located table, we need to write the
// tablet mutation with the transition stage to the base table, and then the entire co-location group
// will be migrated.
replica::tablet_mutation_builder tablet_mutation_builder_for_base_table(api::timestamp_type ts, table_id table);
/* This abstraction maintains the token/endpoint metadata information */
shared_token_metadata& _shared_token_metadata;
locator::effective_replication_map_factory& _erm_factory;
public:
std::chrono::milliseconds get_ring_delay();
enum class mode { NONE, STARTING, JOINING, BOOTSTRAP, NORMAL, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, MAINTENANCE };
private:
mode _operation_mode = mode::NONE;
/* Used for tracking drain progress */
endpoint_lifecycle_notifier& _lifecycle_notifier;
sharded<db::batchlog_manager>& _batchlog_manager;
public:
// should only be called via JMX
future<> stop_gossiping();
// should only be called via JMX
future<> start_gossiping();
// should only be called via JMX
future<bool> is_gossip_running();
future<> register_protocol_server(protocol_server& server, bool start_instantly = false);
// All pointers are valid.
const std::vector<protocol_server*>& protocol_servers() const {
return _protocol_servers;
}
private:
future<> shutdown_protocol_servers();
struct replacement_info {
std::unordered_set<token> tokens;
locator::endpoint_dc_rack dc_rack;
locator::host_id host_id;
gms::inet_address address;
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> ignore_nodes;
};
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<locator::host_id, sstring>& loaded_peer_features);
void run_replace_ops(std::unordered_set<token>& bootstrap_tokens, replacement_info replace_info);
void run_bootstrap_ops(std::unordered_set<token>& bootstrap_tokens);
future<> wait_for_ring_to_settle();
public:
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<locator::host_id, sstring>& loaded_peer_features);
future<> join_cluster(sharded<service::storage_proxy>& proxy,
start_hint_manager start_hm, gms::generation_type new_generation);
void set_group0(service::raft_group0&);
future<> init_address_map(gms::gossip_address_map& address_map);
future<> uninit_address_map();
bool is_topology_coordinator_enabled() const;
future<> drain_on_shutdown();
future<> stop_transport();
future<> wait_for_group0_stop();
private:
bool should_bootstrap();
bool is_replacing();
bool is_first_node();
raft::server* get_group_server_if_raft_topolgy_enabled();
future<> start_sys_dist_ks() const;
future<> join_topology(sharded<service::storage_proxy>& proxy,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_map<locator::host_id, gms::loaded_endpoint_state> loaded_endpoints,
std::unordered_map<locator::host_id, sstring> loaded_peer_features,
std::chrono::milliseconds,
start_hint_manager start_hm,
gms::generation_type new_generation);
public:
future<> rebuild(utils::optional_param source_dc);
private:
void set_mode(mode m);
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
future<> bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info);
public:
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(sstring keyspace, std::optional<table_id> tid) const;
/**
* The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
*
* @param keyspace The keyspace to fetch information about
*
* @return a List of TokenRange(s) converted to String for the given keyspace
*/
/*
* describeRingJMX will be implemented in the API
* It is left here just as a marker that there is no need to implement it
* here
*/
//std::vector<sstring> describeRingJMX(const sstring& keyspace) const {
future<utils::chunked_vector<token_range_endpoints>> describe_ring(const sstring& keyspace, bool include_only_local_dc = false) const;
future<utils::chunked_vector<dht::token_range_endpoints>> describe_ring_for_table(const sstring& keyspace_name, const sstring& table_name) const;
/**
* Retrieve a map of tokens to endpoints, including the bootstrapping ones.
*
* @return a map of tokens to endpoints in ascending order
*/
std::map<token, inet_address> get_token_to_endpoint_map();
/**
* Retrieve a map of tablet tokens to endpoints.
*
* Tablet variant of get_token_to_endpoint_map().
*
* @return a map of tablet tokens to endpoints in ascending order
*/
future<std::map<token, inet_address>> get_tablet_to_endpoint_map(table_id table);
public:
virtual future<> on_join(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr ep_state, gms::permit_id) override;
/*
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
* ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
* from somewhere else.
*
* onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
* received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
* the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
* normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
* pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
*
* Normal progression of ApplicationState.STATUS values for a node should be like this:
* STATUS_BOOTSTRAPPING,token
* if bootstrapping. stays this way until all files are received.
* STATUS_NORMAL,token
* ready to serve reads and writes.
* STATUS_LEFT,token
* set after decommission is completed.
*
* Other STATUS values that may be seen (possibly anywhere in the normal progression):
* REMOVING_TOKEN,deadtoken
* set if the node is dead and is being removed by its REMOVAL_COORDINATOR
* REMOVED_TOKEN,deadtoken
* set if the node is dead and has been removed by its REMOVAL_COORDINATOR
*
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removenode, decommission or move.
*/
virtual future<> on_change(gms::inet_address endpoint, locator::host_id id, const gms::application_state_map& states, gms::permit_id) override;
virtual future<> on_alive(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
virtual future<> on_dead(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
virtual future<> on_remove(gms::inet_address endpoint, locator::host_id id, gms::permit_id) override;
virtual future<> on_restart(gms::inet_address endpoint, locator::host_id id, gms::endpoint_state_ptr state, gms::permit_id) override;
public:
// For migration_listener
virtual void on_create_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_create_column_family(const sstring& ks_name, const sstring& cf_name) override {}
virtual void on_create_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_create_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_create_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_create_view(const sstring& ks_name, const sstring& view_name) override {}
virtual void on_update_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_update_column_family(const sstring& ks_name, const sstring& cf_name, bool) override {}
virtual void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
virtual void on_drop_user_type(const sstring& ks_name, const sstring& type_name) override {}
virtual void on_drop_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
private:
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint);
// return an engaged value iff app_state_map has changes to the peer info
std::optional<db::system_keyspace::peer_info> get_peer_info_for_update(locator::host_id endpoint, const gms::application_state_map& app_state_map);
std::unordered_set<token> get_tokens_for(locator::host_id endpoint);
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(const gms::endpoint_state& ep_state);
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(locator::host_id endpoint);
private:
// Should be serialized under token_metadata_lock.
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
sharded<db::system_keyspace>& _sys_ks;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
locator::snitch_signal_slot_t _snitch_reconfigure;
sharded<service::tablet_allocator>& _tablet_allocator;
sharded<cdc::generation_service>& _cdc_gens;
sharded<db::view::view_builder>& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
bool _isolated = false;
private:
/**
* Handle node bootstrap
*
* @param endpoint bootstrapping node
*/
future<> handle_state_bootstrap(inet_address endpoint, locator::host_id id, gms::permit_id);
/**
* Handle node move to normal state. That is, node is entering token ring and participating
* in reads.
*
* @param endpoint node
*/
future<> handle_state_normal(inet_address endpoint, locator::host_id id, gms::permit_id);
/**
* Handle node leaving the ring. This will happen when a node is decommissioned
*
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
* @param pieces STATE_LEFT,token
*/
future<> handle_state_left(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
/**
* Handle notification that a node being actively removed from the ring via 'removenode'
*
* @param endpoint node
* @param pieces is REMOVED_TOKEN (node is gone)
*/
future<> handle_state_removed(inet_address endpoint, locator::host_id id, std::vector<sstring> pieces, gms::permit_id);
private:
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
gms::permit_id);
future<> excise(std::unordered_set<token> tokens, inet_address endpoint_ip, locator::host_id endpoint_hid,
long expire_time, gms::permit_id);
/** unlike excise we just need this endpoint gone without going through any notifications **/
future<> remove_endpoint(inet_address endpoint, gms::permit_id pid);
void add_expire_time_if_found(locator::host_id endpoint, int64_t expire_time);
int64_t extract_expire_time(const std::vector<sstring>& pieces) const {
return std::stoll(pieces[2]);
}
/**
* Finds living endpoints responsible for the given ranges
*
* @param erm the keyspace effective_replication_map ranges belong to
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
future<std::unordered_multimap<locator::host_id, dht::token_range>> get_new_source_ranges(const locator::vnode_effective_replication_map* erm, const dht::token_range_vector& ranges) const;
future<> removenode_with_stream(locator::host_id leaving_node, frozen_topology_guard, shared_ptr<abort_source> as_ptr);
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, locator::host_id leaving_node);
// needs to be modified to accept either a keyspace or ARS.
future<std::unordered_multimap<dht::token_range, locator::host_id>> get_changed_ranges_for_leaving(const locator::vnode_effective_replication_map* erm, locator::host_id endpoint);
future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip, locator::host_id host_id);
public:
sstring get_release_version();
sstring get_schema_version();
future<std::unordered_map<sstring, std::vector<sstring>>> describe_schema_versions();
/**
* Get all ranges an endpoint is responsible for (by keyspace effective_replication_map)
* Replication strategy's get_ranges() guarantees that no wrap-around range is returned.
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
future<dht::token_range_vector> get_ranges_for_endpoint(const locator::effective_replication_map& erm, const locator::host_id& ep) const;
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
* @return ranges in sorted order
*/
future<dht::token_range_vector> get_all_ranges(const std::vector<token>& sorted_tokens) const;
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param keyspaceName keyspace name also known as keyspace
* @param cf Column family name
* @param key Nodetool style key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const sstring& key) const;
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
*
* @param keyspaceName keyspace name also known as keyspace
* @param cf Column family name
* @param key_components the components of the partition key for which we need to find the endpoint
* @return the endpoint responsible for this key
*/
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const sstring& cf, const std::vector<sstring>& key_components) const;
private:
inet_address_vector_replica_set get_natural_endpoints(const sstring& keyspace, const schema_ptr& schema, const replica::column_family& cf, const partition_key& pk) const;
public:
future<> decommission();
private:
/**
* Broadcast leaving status and update local _token_metadata accordingly
*/
future<> leave_ring();
future<> unbootstrap();
public:
future<> move(sstring new_token) {
// FIXME: getPartitioner().getTokenFactory().validate(newToken);
return move(dht::token::from_sstring(new_token));
}
private:
/**
* move the node to new token or find a new token to boot to according to load
*
* @param newToken new token to boot to, or if null, find balanced token to boot to
*
* @throws IOException on any I/O operation error
*/
future<> move(token new_token);
public:
/**
* Get the status of a token removal.
*/
future<sstring> get_removal_status();
/**
* Force a remove operation to complete. This may be necessary if a remove operation
* blocks forever due to node/stream failure. removeToken() must be called
* first, this is a last resort measure. No further attempt will be made to restore replicas.
*/
future<> force_remove_completion();
public:
/**
* Remove a node that has died, attempting to restore the replica count.
* If the node is alive, decommission should be attempted. If decommission
* fails, then removeToken should be called. If we fail while trying to
* restore the replica count, finally forceRemoveCompleteion should be
* called to forcibly remove the node without regard to replica count.
*
* @param hostIdString token for the node
*/
future<> removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes);
future<> mark_excluded(const std::vector<locator::host_id>&);
future<node_ops_cmd_response> node_ops_cmd_handler(gms::inet_address coordinator, std::optional<locator::host_id> coordinator_host_id, node_ops_cmd_request req);
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(node_ops_cmd cmd, node_ops_id uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
void on_node_ops_registered(node_ops_id);
future<mode> get_operation_mode();
/**
* Shuts node off to writes, empties memtables and the commit log.
* There are two differences between drain and the normal shutdown hook:
* - Drain waits for in-progress streaming to complete
* - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs)
*/
future<> drain();
// Recalculates schema digests on this node from contents of tables on disk.
future<> reload_schema();
future<std::map<gms::inet_address, float>> get_ownership();
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name, sstring table_name);
// Must run on shard 0.
future<> check_and_repair_cdc_streams();
// for testing
bool is_isolated() const {
return _isolated;
}
private:
promise<> _drain_finished;
std::optional<shared_future<>> _transport_stopped;
future<> do_drain();
/**
* Seed data to the endpoints that will be responsible for it at the future
*
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
future<> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<dht::token_range, locator::host_id>> ranges_to_stream_by_keyspace);
template <typename Func>
auto run_with_api_lock_internal(storage_service& ss, Func&& func, sstring& operation) {
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
}
ss._operation_in_progress = std::move(operation);
return func(ss).finally([&ss] {
ss._operation_in_progress = sstring();
});
}
public:
int32_t get_exception_count();
template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
return ss.run_with_api_lock_internal(ss, std::forward<Func>(func), operation);
});
}
template <typename Func>
auto run_with_no_api_lock(Func&& func) {
return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return func(ss);
});
}
template <typename Func>
auto run_with_api_lock_in_gossiper_mode_only(sstring operation, Func&& func) {
return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (ss.raft_topology_change_enabled()) {
return func(ss);
}
return ss.run_with_api_lock_internal(ss, std::forward<Func>(func), operation);
});
}
private:
void do_isolate_on_error(disk_error type);
future<> isolate();
future<> notify_down(inet_address endpoint, locator::host_id hid);
future<> notify_left(inet_address endpoint, locator::host_id hid);
future<> notify_released(locator::host_id hid);
future<> notify_up(inet_address endpoint, locator::host_id hid);
future<> notify_joined(inet_address endpoint, locator::host_id hid);
future<> notify_cql_change(inet_address endpoint, locator::host_id hid,bool ready);
future<> remove_rpc_client_with_ignored_topology(inet_address endpoint, locator::host_id id);
public:
future<bool> is_vnodes_cleanup_allowed(sstring keyspace);
bool is_repair_based_node_ops_enabled(streaming::stream_reason reason);
future<> update_fence_version(token_metadata::version_t version);
private:
std::unordered_set<locator::host_id> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(locator::host_id);
future<> wait_for_normal_state_handled_on_boot();
friend class group0_state_machine;
enum class topology_change_kind {
// The node is still starting and didn't determine yet which ops kind to use
unknown,
// The node uses legacy, gossip-based topology operations
legacy,
// The node is in the process of upgrading to raft-based topology operations
upgrading_to_raft,
// The node uses raft-based topology operations
raft
};
// The _topology_change_kind_enabled variable is first initialized in `join_cluster`.
// After the node successfully joins, the control over the variable is yielded
// to `topology_state_load`, so that it can control it during the upgrade from gossiper
// based topology to raft-based topology.
topology_change_kind _topology_change_kind_enabled = topology_change_kind::unknown;
// Throws an exception if the node is either starting and didn't determine which
// topology operations to use, or if it is in the process of upgrade to topology
// on raft. The name only serves for display purposes (i.e. it will be included
// in the exception, if one is thrown).
void check_ability_to_perform_topology_operation(std::string_view operation_name) const;
topology_change_kind upgrade_state_to_topology_op_kind(topology::upgrade_state_type upgrade_state) const;
public:
bool raft_topology_change_enabled() const;
bool legacy_topology_change_enabled() const;
private:
future<> _raft_state_monitor = make_ready_future<>();
// This fibers monitors raft state and start/stops the topology change
// coordinator fiber
future<> raft_state_monitor_fiber(raft::server&, gate::holder);
public:
bool topology_global_queue_empty() const {
return !_topology_state_machine._topology.global_request.has_value();
}
future<> raft_initialize_discovery_leader(const join_node_request_params& params);
future<> initialize_done_topology_upgrade_state();
private:
// State machine that is responsible for topology change
topology_state_machine& _topology_state_machine;
db::view::view_building_state_machine& _view_building_state_machine;
future<> _topology_change_coordinator = make_ready_future<>();
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, cdc::generation_service&, sharded<db::system_distributed_keyspace>&, abort_source&);
// Those futures hold results of streaming for various operations
std::optional<shared_future<>> _bootstrap_result;
std::optional<shared_future<>> _decommission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
tablet_op_registry _tablet_ops;
// This tracks active topology cmd rpc. There can be only one active
// cmd running and by inspecting this structure it can be checked which
// cmd is current executing and which nodes are still did not reply.
// Needed for debugging.
topology_coordinator_cmd_rpc_tracker _topology_cmd_rpc_tracker;
struct {
raft::term_t term{0};
uint64_t last_index{0};
} _raft_topology_cmd_handler_state;
class ip_address_updater;
// Represents a subscription to gossiper on_change events,
// updating the raft data structures that depend on
// IP addresses (token_metadata.topology, erm-s),
// as well as the system.peers table.
shared_ptr<ip_address_updater> _ip_address_updater;
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const;
future<raft_topology_cmd_result> raft_topology_cmd_handler(raft::term_t term, uint64_t cmd_index, const raft_topology_cmd& cmd);
future<> raft_decommission();
future<> raft_removenode(locator::host_id host_id, locator::host_id_or_endpoint_list ignore_nodes_params);
future<> raft_rebuild(utils::optional_param source_dc);
future<> raft_check_and_repair_cdc_streams();
future<> update_topology_with_local_metadata(raft::server&);
void set_topology_change_kind(topology_change_kind kind);
struct state_change_hint {
std::optional<locator::tablet_metadata_change_hint> tablets_hint;
};
// This is called on all nodes for each new command received through raft
// raft_group0_client::_read_apply_mutex must be held
// Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine.
future<> topology_transition(state_change_hint hint = {});
// This is called on all nodes for each new command related to tablet-view-building, received through raft
// raft_group0_client::_read_apply_mutex must be held
// Precondition: the mutations were already written to disk; the function only transitions the in-memory state machine.
future<> view_building_transition();
public:
// Reloads in-memory topology state safely under group 0 read_apply mutex.
// Does not modify on-disk state.
//
// Must be called on shard 0.
future<> reload_raft_topology_state(service::raft_group0_client&);
// Service levels cache consists of two levels: service levels cache and effective service levels cache
// The second one is dependent on the first one.
// Must be called on shard 0.
//
// update_both_cache_levels::yes - updates both levels of the cache
// update_both_cache_levels::no - update only effective service levels cache
future<> update_service_levels_cache(qos::update_both_cache_levels update_only_effective_cache = qos::update_both_cache_levels::yes, qos::query_context ctx = qos::query_context::unspecified);
auth::cache& auth_cache() noexcept;
// Should be called whenever new compression dictionaries are published to system.dicts.
// This is an arbitrary callback passed through the constructor,
// but its intended usage is to set up the RPC connections to use the new dictionaries.
//
// Must be called on shard 0.
future<> compression_dictionary_updated_callback(std::string_view name);
future<> compression_dictionary_updated_callback_all();
future<> load_cdc_streams(std::optional<std::unordered_set<table_id>> changed_tables = std::nullopt);
future<> do_clusterwide_vnodes_cleanup();
future<> reset_cleanup_needed();
// Starts the upgrade procedure to topology on raft.
// Must be called on shard 0.
future<> start_upgrade_to_raft_topology();
// Must be called on shard 0.
topology::upgrade_state_type get_topology_upgrade_state() const;
node_state get_node_state(locator::host_id id);
// Waits for topology state in which none of tablets has replaced_id as a replica.
// Must be called on shard 0.
future<> await_tablets_rebuilt(raft::server_id replaced_id);
topology_coordinator_cmd_rpc_tracker get_topology_cmd_status() {
return _topology_cmd_rpc_tracker;
}
private:
// Tracks progress of the upgrade to topology coordinator.
future<> _upgrade_to_topology_coordinator_fiber = make_ready_future<>();
future<> track_upgrade_progress_to_topology_coordinator(sharded<service::storage_proxy>& proxy);
future<> transit_tablet(table_id, dht::token, noncopyable_function<std::tuple<utils::chunked_vector<canonical_mutation>, sstring>(const locator::tablet_map& tmap, api::timestamp_type)> prepare_mutations);
future<service::group0_guard> get_guard_for_tablet_update();
future<bool> exec_tablet_update(service::group0_guard guard, utils::chunked_vector<canonical_mutation> updates, sstring reason);
public:
struct all_tokens_tag {};
future<std::unordered_map<sstring, sstring>> add_repair_tablet_request(table_id table, std::variant<utils::chunked_vector<dht::token>, all_tokens_tag> tokens_variant, std::unordered_set<locator::host_id> hosts_filter, std::unordered_set<sstring> dcs_filter, bool await_completion, locator::tablet_repair_incremental_mode incremental_mode);
future<> del_repair_tablet_request(table_id table, locator::tablet_task_id);
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> add_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> del_tablet_replica(table_id, dht::token, locator::tablet_replica dst, loosen_constraints force = loosen_constraints::no);
future<> set_tablet_balancing_enabled(bool);
future<> await_topology_quiesced();
// Verifies topology is not busy, and also that topology version hasn't changed since the one provided
// by the caller.
future<bool> verify_topology_quiesced(token_metadata::version_t expected_version);
// In the maintenance mode, other nodes won't be available thus we disabled joining
// the token ring and the token metadata won't be populated with the local node's endpoint.
// When a CQL query is executed it checks the `token_metadata` structure and fails if it is empty.
//
// This method initialises `token_metadata` with the local node as the only node in the token ring.
// It is incompatible with the `join_cluster` method.
future<> start_maintenance_mode();
// Waits for a topology request with a given ID to complete and return non empty error string
// if request completes with an error
future<sstring> wait_for_topology_request_completion(utils::UUID id, bool require_entry = true);
future<> wait_for_topology_not_busy();
private:
semaphore _do_sample_sstables_concurrency_limiter{1};
// To avoid overly-large RPC messages, `do_sample_sstables` is broken up into several rounds.
// This implements a single round.
future<utils::chunked_vector<temporary_buffer<char>>> do_sample_sstables_oneshot(table_id, uint64_t chunk_size, uint64_t n_chunks);
public:
// SSTable sampling results can occupy a considerable amount of memory.
// Callers of `do_sample_sstables` should hold this semaphore until they are done with the sample,
// to ensure that there's only one sample around.
semaphore& get_do_sample_sstables_concurrency_limiter();
// Gathers a randomly-selected sample of chunks of (decompressed) Data files for the given table,
// from across the entire cluster.
future<utils::chunked_vector<temporary_buffer<char>>> do_sample_sstables(table_id, uint64_t chunk_size, uint64_t n_chunks);
private:
future<utils::chunked_vector<canonical_mutation>> get_system_mutations(schema_ptr schema);
future<utils::chunked_vector<canonical_mutation>> get_system_mutations(const sstring& ks_name, const sstring& cf_name);
struct nodes_to_notify_after_sync {
std::vector<std::pair<gms::inet_address, locator::host_id>> left;
std::vector<std::pair<gms::inet_address, locator::host_id>> joined;
std::vector<locator::host_id> released;
};
using host_id_to_ip_map_t = db::system_keyspace::host_id_to_ip_map_t;
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& map, nodes_to_notify_after_sync* nodes_to_notify);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
// Returns a structure that describes which notifications to trigger after token metadata is updated.
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);
// load topology state machine snapshot into memory
// raft_group0_client::_read_apply_mutex must be held
future<> topology_state_load(state_change_hint hint = {});
// Applies received raft snapshot to local state machine persistent storage
// raft_group0_client::_read_apply_mutex must be held
future<> merge_topology_snapshot(raft_snapshot snp);
// load view building state machine snapshot into memory
// raft_group0_client::_read_apply_mutex must be held
future<> view_building_state_load();
utils::chunked_vector<canonical_mutation> build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp);
std::unordered_set<raft::server_id> ignored_nodes_from_join_params(const join_node_request_params& params);
future<join_node_request_result> join_node_request_handler(join_node_request_params params);
future<join_node_response_result> join_node_response_handler(join_node_response_params params);
shared_promise<> _join_node_request_done;
shared_promise<> _join_node_group0_started;
shared_promise<> _join_node_response_done;
semaphore _join_node_response_handler_mutex{1};
future<> _sstable_vnodes_cleanup_fiber = make_ready_future<>();
future<> sstable_vnodes_cleanup_fiber(raft::server& raft, gate::holder, sharded<service::storage_proxy>& proxy) noexcept;
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;
std::function<future<void>(std::string_view)> _compression_dictionary_updated_callback;
using byte_vector = std::vector<std::byte>;
std::function<future<byte_vector>(std::vector<byte_vector>)> _train_dict;
utils::disk_space_monitor* _disk_space_monitor; // != nullptr only on shard0.
public:
future<uint64_t> estimate_total_sstable_volume(table_id);
future<std::vector<std::byte>> train_dict(utils::chunked_vector<temporary_buffer<char>> sample);
future<> publish_new_sstable_dict(table_id, std::span<const std::byte>, service::raft_group0_client&);
void set_train_dict_callback(decltype(_train_dict));
friend class join_node_rpc_handshaker;
friend class node_ops::node_ops_virtual_task;
friend class tasks::task_manager;
friend class tablet_virtual_task;
};
}
template <>
struct fmt::formatter<service::storage_service::mode> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(service::storage_service::mode mode, FormatContext& ctx) const {
std::string_view name;
using enum service::storage_service::mode;
switch (mode) {
case NONE: name = "STARTING"; break;
case STARTING: name = "STARTING"; break;
case NORMAL: name = "NORMAL"; break;
case JOINING: name = "JOINING"; break;
case BOOTSTRAP: name = "BOOTSTRAP"; break;
case LEAVING: name = "LEAVING"; break;
case DECOMMISSIONED: name = "DECOMMISSIONED"; break;
case MOVING: name = "MOVING"; break;
case DRAINING: name = "DRAINING"; break;
case DRAINED: name = "DRAINED"; break;
case MAINTENANCE: name = "MAINTENANCE"; break;
}
return fmt::format_to(ctx.out(), "{}", name);
}
};