diff --git a/alternator/controller.cc b/alternator/controller.cc index 40c8388f78..94f64e408d 100644 --- a/alternator/controller.cc +++ b/alternator/controller.cc @@ -38,6 +38,7 @@ controller::controller( sharded& auth_service, sharded& sl_controller, sharded& vsc, + sharded& timeout_config, const db::config& config, seastar::scheduling_group sg) : protocol_server(sg) @@ -52,6 +53,7 @@ controller::controller( , _auth_service(auth_service) , _sl_controller(sl_controller) , _vsc(vsc) + , _timeout_config(timeout_config) , _config(config) { } @@ -99,7 +101,7 @@ future<> controller::start_server() { _executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks), std::ref(_sys_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), std::ref(_vsc), _ssg.value(), sharded_parameter(get_timeout_in_ms, std::ref(_config))).get(); - _server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get(); + _server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller), std::ref(_timeout_config)).get(); // Note: from this point on, if start_server() throws for any reason, // it must first call stop_server() to stop the executor and server // services we just started - or Scylla will cause an assertion diff --git a/alternator/controller.hh b/alternator/controller.hh index cd8bdf7f55..f058720e50 100644 --- a/alternator/controller.hh +++ b/alternator/controller.hh @@ -48,6 +48,8 @@ namespace vector_search { class vector_store_client; } +class updateable_timeout_config; + namespace alternator { // This is the official DynamoDB API version. @@ -72,6 +74,7 @@ class controller : public protocol_server { sharded& _auth_service; sharded& _sl_controller; sharded& _vsc; + sharded& _timeout_config; const db::config& _config; std::vector _listen_addresses; @@ -92,6 +95,7 @@ public: sharded& auth_service, sharded& sl_controller, sharded& vsc, + sharded& timeout_config, const db::config& config, seastar::scheduling_group sg); diff --git a/alternator/server.cc b/alternator/server.cc index 68f7cacf7a..6c932808b8 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -835,7 +835,7 @@ void server::set_routes(routes& r) { //FIXME: A way to immediately invalidate the cache should be considered, // e.g. when the system table which stores the keys is changed. // For now, this propagation may take up to 1 minute. -server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& auth_service, qos::service_level_controller& sl_controller) +server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& auth_service, qos::service_level_controller& sl_controller, updateable_timeout_config& timeout_config) : _http_server("http-alternator") , _https_server("https-alternator") , _executor(exec) @@ -847,7 +847,7 @@ server::server(executor& exec, service::storage_proxy& proxy, gms::gossiper& gos , _max_users_query_size_in_trace_output(1024) , _enabled_servers{} , _pending_requests("alternator::server::pending_requests") - , _timeout_config(_proxy.data_dictionary().get_config()) + , _timeout_config(timeout_config) , _callbacks{ {"CreateTable", [] (executor& e, executor::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value json_request, std::unique_ptr req, std::unique_ptr& audit_info) { return e.create_table(client_state, std::move(trace_state), std::move(permit), std::move(json_request), audit_info); diff --git a/alternator/server.hh b/alternator/server.hh index f105510bcc..26db39a044 100644 --- a/alternator/server.hh +++ b/alternator/server.hh @@ -16,6 +16,7 @@ #include #include #include "alternator/auth.hh" +#include "timeout_config.hh" #include "service/qos/service_level_controller.hh" #include "utils/small_vector.hh" #include "utils/updateable_value.hh" @@ -53,8 +54,8 @@ class server : public peering_sharded_service { named_gate _pending_requests; // In some places we will need a CQL updateable_timeout_config object even // though it isn't really relevant for Alternator which defines its own - // timeouts separately. We can create this object only once. - updateable_timeout_config _timeout_config; + // timeouts separately. + updateable_timeout_config& _timeout_config; client_options_cache_type _connection_options_keys_and_values; alternator_callbacks_map _callbacks; @@ -98,7 +99,7 @@ class server : public peering_sharded_service { utils::scoped_item_list _ongoing_requests; public: - server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller); + server(executor& executor, service::storage_proxy& proxy, gms::gossiper& gossiper, auth::service& service, qos::service_level_controller& sl_controller, updateable_timeout_config& timeout_config); future<> init(net::inet_address addr, std::optional port, std::optional https_port, std::optional port_proxy_protocol, std::optional https_port_proxy_protocol, diff --git a/main.cc b/main.cc index f890b0831e..2bc1cec115 100644 --- a/main.cc +++ b/main.cc @@ -30,6 +30,7 @@ #include "utils/build_id.hh" #include "utils/only_on_shard0.hh" #include "supervisor.hh" +#include "timeout_config.hh" #include "replica/database.hh" #include #include @@ -1368,6 +1369,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl spcfg.hints_write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get(); spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get(); static db::view::node_update_backlog node_backlog(smp::count, 10ms, cfg->view_flow_control_delay_limit_in_ms); + + static sharded timeout_cfg; + timeout_cfg.start(std::ref(*cfg)).get(); + auto stop_timeout_cfg = defer_verbose_shutdown("updateable timeout config", [] { timeout_cfg.stop().get(); }); + scheduling_group_key_config storage_proxy_stats_cfg = make_scheduling_group_key_config(); storage_proxy_stats_cfg.constructor = [plain_constructor = storage_proxy_stats_cfg.constructor] (void* ptr) { @@ -1381,7 +1387,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }; proxy.start(std::ref(db), spcfg, std::ref(node_backlog), scheduling_group_key_create(storage_proxy_stats_cfg).get(), - std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory)).get(); + std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory), + std::ref(timeout_cfg)).get(); // #293 - do not stop anything // engine().at_exit([&proxy] { return proxy.stop(); }); @@ -2186,7 +2193,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl auth::make_maintenance_socket_role_manager_factory(qp, group0_client, mm, auth_cache), maintenance_socket_enabled::yes, std::ref(auth_cache)).get(); - cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, messaging, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group); + cql_maintenance_server_ctl.emplace(maintenance_auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, messaging, timeout_cfg, *cfg, maintenance_cql_sg_stats_key, maintenance_socket_enabled::yes, dbcfg.statement_scheduling_group); start_auth_service(maintenance_auth_service, stop_maintenance_auth_service, "maintenance auth service"); } @@ -2618,11 +2625,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // after drain stops them in stop_transport() // Register controllers after drain_on_shutdown() below, so that even on start // failure drain is called and stops controllers - cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, messaging, *cfg, cql_sg_stats_key, maintenance_socket_enabled::no, dbcfg.statement_scheduling_group); + cql_transport::controller cql_server_ctl(auth_service, mm_notifier, gossiper, qp, service_memory_limiter, sl_controller, lifecycle_notifier, messaging, timeout_cfg, *cfg, cql_sg_stats_key, maintenance_socket_enabled::no, dbcfg.statement_scheduling_group); api::set_server_service_levels(ctx, cql_server_ctl, qp).get(); - alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, sys_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group); + alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, sys_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, timeout_cfg, *cfg, dbcfg.statement_scheduling_group); // Register at_exit last, so that storage_service::drain_on_shutdown will be called first auto do_drain = defer_verbose_shutdown("local storage", [&ss] { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index e22fb9d8de..ffb73f72a3 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -590,7 +590,7 @@ private: storage_proxy::clock_type::time_point timeout; if (!t) { - auto timeout_in_ms = _sp._db.local().get_config().write_request_timeout_in_ms(); + auto timeout_in_ms = _sp._timeout_config.write_timeout_in_ms(); timeout = clock_type::now() + std::chrono::milliseconds(timeout_in_ms); } else { timeout = *t; @@ -3321,7 +3321,8 @@ storage_proxy::~storage_proxy() { } storage_proxy::storage_proxy(sharded& db, storage_proxy::config cfg, db::view::node_update_backlog& max_view_update_backlog, - scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory) + scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory, + updateable_timeout_config& timeout_config) : _db(db) , _shared_token_metadata(stm) , _erm_factory(erm_factory) @@ -3341,6 +3342,7 @@ storage_proxy::storage_proxy(sharded& db, storage_proxy::conf , _background_write_throttle_threahsold(cfg.available_memory / 10) , _mutate_stage{"storage_proxy_mutate", &storage_proxy::do_mutate} , _max_view_update_backlog(max_view_update_backlog) + , _timeout_config(timeout_config) , _cancellable_write_handlers_list(std::make_unique()) { namespace sm = seastar::metrics; @@ -3970,7 +3972,7 @@ future> storage_proxy::mutate_begin(unique_response_handler_vector ids, // frozen_mutation copy, or manage handler live time differently. hint_to_dead_endpoints(response_id, cl); - auto timeout = timeout_opt.value_or(clock_type::now() + std::chrono::milliseconds(_db.local().get_config().write_request_timeout_in_ms())); + auto timeout = timeout_opt.value_or(clock_type::now() + std::chrono::milliseconds(_timeout_config.write_timeout_in_ms())); // call before send_to_live_endpoints() for the same reason as above auto f = response_wait(response_id, timeout); send_to_live_endpoints(protected_response.release(), timeout); // response is now running and it will either complete or timeout @@ -5942,7 +5944,7 @@ public: // occur within write_timeout of a write, as these are the cases where repair is most // beneficial. if (is_datacenter_local(exec->_cl) && exec->_cmd->read_timestamp >= 0 && digest_resolver->last_modified() >= 0) { - auto write_timeout = exec->_proxy->_db.local().get_config().write_request_timeout_in_ms() * 1000; + auto write_timeout = exec->_proxy->_timeout_config.write_timeout_in_ms() * 1000; auto delta = int64_t(digest_resolver->last_modified()) - int64_t(exec->_cmd->read_timestamp); if (std::abs(delta) <= write_timeout) { exec->_proxy->get_stats().global_read_repairs_canceled_due_to_concurrent_write++; @@ -6066,7 +6068,7 @@ public: }); auto& sr = _schema->speculative_retry(); auto t = (sr.get_type() == speculative_retry::type::PERCENTILE) ? - std::min(_cf->get_coordinator_read_latency_percentile(sr.get_value()), std::chrono::milliseconds(_proxy->get_db().local().get_config().read_request_timeout_in_ms()/2)) : + std::min(_cf->get_coordinator_read_latency_percentile(sr.get_value()), std::chrono::milliseconds(_proxy->_timeout_config.read_timeout_in_ms()/2)) : std::chrono::milliseconds(unsigned(sr.get_value())); _speculate_timer.arm(t); resolver->set_on_disconnect([this] { @@ -6784,7 +6786,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s, db::timeout_clock::time_point timeout = query_options.timeout(*this); // When to give up due to contention db::timeout_clock::time_point cas_timeout = db::timeout_clock::now() + - std::chrono::milliseconds(_db.local().get_config().cas_contention_timeout_in_ms()); + std::chrono::milliseconds(_timeout_config.cas_timeout_in_ms()); struct read_cas_request : public cas_request { foreign_ptr> res; diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index 96d3485cfb..a2723f2f3d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -41,6 +41,7 @@ #include "service/storage_service.hh" #include "service/cas_shard.hh" #include "service/maintenance_mode.hh" +#include "timeout_config.hh" #include "service/storage_proxy_fwd.hh" class reconcilable_result; @@ -319,6 +320,7 @@ private: lw_shared_ptr, coordinator_mutate_options> _mutate_stage; db::view::node_update_backlog& _max_view_update_backlog; + updateable_timeout_config& _timeout_config; std::unordered_map _view_update_backlogs; //NOTICE(sarna): This opaque pointer is here just to avoid moving write handler class definitions from .cc to .hh. It's slow path. @@ -528,7 +530,7 @@ private: public: storage_proxy(sharded& db, config cfg, db::view::node_update_backlog& max_view_update_backlog, scheduling_group_key stats_key, gms::feature_service& feat, const locator::shared_token_metadata& stm, - locator::effective_replication_map_factory& erm_factory); + locator::effective_replication_map_factory& erm_factory, updateable_timeout_config& timeout_config); ~storage_proxy(); const sharded& get_db() const { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 1e2cfac53a..89bf5367d1 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -23,6 +23,7 @@ #include "cql3/statements/batch_statement.hh" #include "cql3/statements/modification_statement.hh" #include "cql3/cql_config.hh" +#include "timeout_config.hh" #include #include #include @@ -177,6 +178,7 @@ private: std::optional _disk_space_monitor_shard0; sharded _lang_manager; sharded _cql_config; + sharded _timeout_config; sharded _elc_notif; sharded _cdc_generation_service; sharded _repair; @@ -735,9 +737,13 @@ private: }; spcfg.available_memory = memory::stats().total_memory(); db::view::node_update_backlog b(smp::count, 10ms); + + _timeout_config.start(std::ref(*cfg)).get(); + auto stop_timeout_config = defer_verbose_shutdown("updateable timeout config", [this] { _timeout_config.stop().get(); }); + scheduling_group_key_config sg_conf = make_scheduling_group_key_config(); - _proxy.start(std::ref(_db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get(), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_erm_factory)).get(); + _proxy.start(std::ref(_db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get(), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_erm_factory), std::ref(_timeout_config)).get(); auto stop_proxy = defer_verbose_shutdown("storage proxy", [this] { _proxy.stop().get(); }); _cql_config.start(seastar::sharded_parameter([&] { return cql3::cql_config(*cfg); })).get(); diff --git a/transport/controller.cc b/transport/controller.cc index 077e8972b0..2993cfec59 100644 --- a/transport/controller.cc +++ b/transport/controller.cc @@ -31,7 +31,7 @@ static logging::logger logger("cql_server_controller"); controller::controller(sharded& auth, sharded& mn, sharded& gossiper, sharded& qp, sharded& ml, sharded& sl_controller, sharded& elc_notif, - sharded& ms, + sharded& ms, sharded& timeout_config, const db::config& cfg, scheduling_group_key cql_opcode_stats_key, maintenance_socket_enabled used_by_maintenance_socket, seastar::scheduling_group sg) : protocol_server(sg) @@ -45,6 +45,7 @@ controller::controller(sharded& auth, sharded controller::do_start_server() { shard_aware_transport_port_ssl = cfg.native_shard_aware_transport_port_ssl(); } return cql_server_config { - .timeout_config = updateable_timeout_config(cfg), + .timeout_config = _timeout_config.local(), .max_request_size = _mem_limiter.local().total_memory(), .partitioner_name = cfg.partitioner(), .sharding_ignore_msb = cfg.murmur3_partitioner_ignore_msb_bits(), diff --git a/transport/controller.hh b/transport/controller.hh index de13e773ec..cd0e822ae3 100644 --- a/transport/controller.hh +++ b/transport/controller.hh @@ -30,6 +30,7 @@ namespace qos { class service_level_controller; } namespace netw { class messaging_service; } namespace db { class config; } struct client_data; +class updateable_timeout_config; namespace cql_transport { @@ -50,6 +51,7 @@ class controller : public protocol_server { sharded& _mem_limiter; sharded& _sl_controller; sharded& _messaging; + sharded& _timeout_config; const db::config& _config; scheduling_group_key _cql_opcode_stats_key; @@ -70,7 +72,7 @@ public: controller(sharded&, sharded&, sharded&, sharded&, sharded&, sharded&, sharded&, - sharded&, + sharded&, sharded& timeout_config, const db::config& cfg, scheduling_group_key cql_opcode_stats_key, maintenance_socket_enabled used_by_maintenance_socket, seastar::scheduling_group sg); virtual sstring name() const override; diff --git a/transport/server.hh b/transport/server.hh index 64342ca04d..f522909e3e 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -117,7 +117,7 @@ struct cql_query_state { }; struct cql_server_config { - updateable_timeout_config timeout_config; + updateable_timeout_config& timeout_config; size_t max_request_size; sstring partitioner_name; unsigned sharding_ignore_msb;