From 3a44fa998831ea01906cb9310e40ace108ac09fb Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 4 Aug 2019 18:27:43 +0300 Subject: [PATCH 1/3] cql3, treewide: introduce empty cql3::cql_config class and propagate it We need a way to configure the cql interpreter and runtime. So far we relied on accessing the configuration class via various backdoors, but that causes its own problems around initialization order and testability. To avoid that, this patch adds an empty cql_config class and propagates it from main.cc (and from tests) to the cql interpreter via the query_options class, which is already passed everywhere. Later patches will fill it with contents. --- cql3/cql_config.hh | 33 +++++++++++++++++++++++++++++++ cql3/query_options.cc | 34 ++++++++++++++++++++++---------- cql3/query_options.hh | 19 ++++++++++++++---- init.cc | 6 ++++-- init.hh | 4 +++- main.cc | 5 ++++- service/storage_service.cc | 13 +++++++----- service/storage_service.hh | 6 ++++-- tests/cql_query_test.cc | 3 ++- tests/cql_test_env.cc | 6 +++++- tests/gossip.cc | 7 +++++-- tests/gossip_test.cc | 7 ++++++- tests/query_processor_test.cc | 1 + tests/test_services.cc | 6 +++++- thrift/handler.cc | 22 +++++++++++++-------- thrift/handler.hh | 9 ++++++++- thrift/server.cc | 3 ++- thrift/server.hh | 8 +++++++- tracing/trace_keyspace_helper.cc | 15 +++++++++----- transport/request.hh | 8 ++++---- transport/server.cc | 11 ++++++----- transport/server.hh | 3 ++- 22 files changed, 172 insertions(+), 57 deletions(-) create mode 100644 cql3/cql_config.hh diff --git a/cql3/cql_config.hh b/cql3/cql_config.hh new file mode 100644 index 0000000000..e7b3f38967 --- /dev/null +++ b/cql3/cql_config.hh @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#pragma once + +namespace cql3 { + +struct cql_config { +}; + +extern const cql_config default_cql_config; + +} diff --git a/cql3/query_options.cc b/cql3/query_options.cc index 082b9799d8..b45d2d32c4 100644 --- a/cql3/query_options.cc +++ b/cql3/query_options.cc @@ -39,17 +39,22 @@ * along with Scylla. If not, see . */ +#include "cql3/cql_config.hh" #include "query_options.hh" #include "version.hh" namespace cql3 { +const cql_config default_cql_config; + thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp}; -thread_local query_options query_options::DEFAULT{db::consistency_level::ONE, infinite_timeout_config, std::nullopt, +thread_local query_options query_options::DEFAULT{default_cql_config, + db::consistency_level::ONE, infinite_timeout_config, std::nullopt, std::vector(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()}; -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector values, @@ -57,7 +62,8 @@ query_options::query_options(db::consistency_level consistency, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) @@ -68,14 +74,16 @@ query_options::query_options(db::consistency_level consistency, { } -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector values, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) @@ -87,14 +95,16 @@ query_options::query_options(db::consistency_level consistency, fill_value_views(); } -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector value_views, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values() @@ -105,8 +115,10 @@ query_options::query_options(db::consistency_level consistency, { } -query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector values, specific_options options) +query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector values, + specific_options options) : query_options( + default_cql_config, cl, timeout_config, {}, @@ -119,7 +131,8 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t } query_options::query_options(std::unique_ptr qo, ::shared_ptr paging_state) - : query_options(qo->_consistency, + : query_options(qo->_cql_config, + qo->_consistency, qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), @@ -131,7 +144,8 @@ query_options::query_options(std::unique_ptr qo, ::shared_ptr qo, ::shared_ptr paging_state, int32_t page_size) - : query_options(qo->_consistency, + : query_options(qo->_cql_config, + qo->_consistency, qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), diff --git a/cql3/query_options.hh b/cql3/query_options.hh index be66b5e905..40c0334b14 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -55,6 +55,9 @@ namespace cql3 { +class cql_config; +extern const cql_config default_cql_config; + /** * Options for a query. */ @@ -70,6 +73,7 @@ public: const api::timestamp_type timestamp; }; private: + const cql_config& _cql_config; const db::consistency_level _consistency; const timeout_config& _timeout_config; const std::optional> _names; @@ -104,14 +108,16 @@ public: query_options(query_options&&) = default; explicit query_options(const query_options&) = default; - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector values, bool skip_metadata, specific_options options, cql_serialization_format sf); - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector values, @@ -119,7 +125,8 @@ public: bool skip_metadata, specific_options options, cql_serialization_format sf); - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector value_views, @@ -227,6 +234,10 @@ public: return _names; } + const cql_config& get_cql_config() const { + return _cql_config; + } + void prepare(const std::vector<::shared_ptr>& specs); private: void fill_value_views(); @@ -244,7 +255,7 @@ query_options::query_options(query_options&& o, std::vector tmp; tmp.reserve(values_ranges.size()); std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) { - return query_options(_consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); + return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); }); _batch_options = std::move(tmp); } diff --git a/init.cc b/init.cc index 523a1add85..22d2bb79a0 100644 --- a/init.cc +++ b/init.cc @@ -38,9 +38,11 @@ logging::logger startlog("init"); // until proper shutdown is done. void init_storage_service(sharded& abort_source, - distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, + distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, service::storage_service_config config) { - service::init_storage_service(abort_source, db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, config).get(); + service::init_storage_service(abort_source, db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, config).get(); // #293 - do not stop anything //engine().at_exit([] { return service::deinit_storage_service(); }); } diff --git a/init.hh b/init.hh index 583aa6e85f..287002d41f 100644 --- a/init.hh +++ b/init.hh @@ -48,7 +48,9 @@ extern logging::logger startlog; class bad_configuration_error : public std::exception {}; void init_storage_service(sharded& abort_sources, - distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, + distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service); struct init_scheduling_config { diff --git a/main.cc b/main.cc index 51757665a2..adb06b5024 100644 --- a/main.cc +++ b/main.cc @@ -69,6 +69,7 @@ #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "distributed_loader.hh" +#include "cql3/cql_config.hh" namespace fs = std::filesystem; @@ -676,6 +677,8 @@ int main(int ac, char** av) { static sharded auth_service; static sharded sys_dist_ks; static sharded view_update_generator; + static sharded cql_config; + cql_config.start().get(); auto& gossiper = gms::get_gossiper(); gossiper.start(std::ref(feature_service), std::ref(*cfg)).get(); // #293 - do not stop anything @@ -683,7 +686,7 @@ int main(int ac, char** av) { supervisor::notify("initializing storage service"); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg); + init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg); supervisor::notify("starting per-shard database core"); // Note: changed from using a move here, because we want the config object intact. diff --git a/service/storage_service.cc b/service/storage_service.cc index 5ba2c860db..ee8bcd7a4a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -136,13 +136,14 @@ int get_generation_number() { return generation_number; } -storage_service::storage_service(abort_source& abort_source, distributed& db, gms::gossiper& gossiper, sharded& auth_service, sharded& sys_dist_ks, +storage_service::storage_service(abort_source& abort_source, distributed& db, gms::gossiper& gossiper, sharded& auth_service, sharded& cql_config, sharded& sys_dist_ks, sharded& view_update_generator, gms::feature_service& feature_service, storage_service_config config, bool for_testing, std::set disabled_features) : _abort_source(abort_source) , _feature_service(feature_service) , _db(db) , _gossiper(gossiper) , _auth_service(auth_service) + , _cql_config(cql_config) , _disabled_features(std::move(disabled_features)) , _service_memory_total(config.available_memory / 10) , _service_memory_limiter(_service_memory_total) @@ -2217,7 +2218,7 @@ future<> storage_service::start_rpc_server() { tsc.timeout_config = make_timeout_config(cfg); tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20); return gms::inet_address::lookup(addr, family, preferred).then([&ss, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) { - return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), tsc).then([tserver, port, addr, ip, keepalive] { + return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), tsc).then([tserver, port, addr, ip, keepalive] { // #293 - do not stop anything //engine().at_exit([tserver] { // return tserver->stop(); @@ -2274,7 +2275,7 @@ future<> storage_service::start_native_transport() { cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; }; cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers(); return gms::inet_address::lookup(addr, family, preferred).then([&ss, cserver, addr, &cfg, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) { - return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() { + return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() { auto f = make_ready_future(); @@ -3409,9 +3410,11 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const }); } -future<> init_storage_service(sharded& abort_source, distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(sharded& abort_source, distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, storage_service_config config) { - return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config); + return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config); } future<> deinit_storage_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index ec77b57364..df56b1f124 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -150,6 +150,7 @@ private: distributed& _db; gms::gossiper& _gossiper; sharded& _auth_service; + sharded& _cql_config; int _update_jobs{0}; // Note that this is obviously only valid for the current shard. Users of // this facility should elect a shard to be the coordinator based on any @@ -170,7 +171,7 @@ private: size_t _service_memory_total; semaphore _service_memory_limiter; public: - storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, sharded&, sharded&, sharded&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set disabled_features = {}); + storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, sharded&, sharded& cql_config, sharded&, sharded&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set disabled_features = {}); void isolate_on_error(); void isolate_on_commit_error(); @@ -2377,7 +2378,8 @@ private: void notify_cql_change(inet_address endpoint, bool ready); }; -future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, storage_service_config config); future<> deinit_storage_service(); diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index f7ba3dc077..b7805c7a48 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -43,6 +43,7 @@ #include "types/list.hh" #include "types/set.hh" #include "db/config.hh" +#include "cql3/cql_config.hh" #include "sstables/compaction_manager.hh" #include "exception_utils.hh" @@ -3030,7 +3031,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) { BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception); auto make_query_options = [] (cql_protocol_version_type version) { - return std::make_unique(db::consistency_level::ONE, infinite_timeout_config, std::nullopt, + return std::make_unique(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version}); }; diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index a6023d0ca3..d1e5620ed1 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -29,6 +29,7 @@ #include "cql3/query_processor.hh" #include "cql3/query_options.hh" #include "cql3/statements/batch_statement.hh" +#include "cql3/cql_config.hh" #include #include #include @@ -385,13 +386,16 @@ public: distributed& proxy = service::get_storage_proxy(); distributed& mm = service::get_migration_manager(); distributed& bm = db::get_batchlog_manager(); + sharded cql_config; + cql_config.start().get(); + auto stop_cql_config = defer([&] { cql_config.stop().get(); }); auto view_update_generator = ::make_shared>(); auto& ss = service::get_storage_service(); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get(); + ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get(); auto stop_storage_service = defer([&ss] { ss.stop().get(); }); database_config dbcfg; diff --git a/tests/gossip.cc b/tests/gossip.cc index ee1a903201..b77bfc51e2 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -35,6 +35,7 @@ #include #include #include "db/config.hh" +#include "cql3/cql_config.hh" namespace bpo = boost::program_options; @@ -60,11 +61,12 @@ namespace bpo = boost::program_options; int main(int ac, char ** av) { distributed db; sharded auth_service; + sharded cql_config; app_template app; app.add_options() ("seed", bpo::value>(), "IP address of seed node") ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen"); - return app.run_deprecated(ac, av, [&auth_service, &db, &app] { + return app.run_deprecated(ac, av, [&auth_service, &db, &app, &cql_config] { auto config = app.configuration(); logging::logger_registry().set_logger_level("gossip", logging::log_level::trace); const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); @@ -83,7 +85,8 @@ int main(int ac, char ** av) { auto stop_abort_source = defer([&] { abort_sources.stop().get(); }); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg).get(); + cql_config.start().get(); + service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg).get(); netw::get_messaging_service().start(listen).get(); auto& server = netw::get_local_messaging_service(); auto port = server.port(); diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc index 06bb3a467c..52e56e1be8 100644 --- a/tests/gossip_test.cc +++ b/tests/gossip_test.cc @@ -37,6 +37,7 @@ #include "database.hh" #include "db/system_distributed_keyspace.hh" #include "db/config.hh" +#include "cql3/cql_config.hh" namespace db::view { class view_update_generator; @@ -71,7 +72,11 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get(); + sharded cql_config; + cql_config.start(); + auto stop_cql_config = defer([&] { cql_config.stop().get(); }); + + service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get(); auto stop_ss = defer([&] { service::get_storage_service().stop().get(); }); db.start(std::ref(cfg), dbcfg).get(); diff --git a/tests/query_processor_test.cc b/tests/query_processor_test.cc index a624aa5441..b147b8e2fa 100644 --- a/tests/query_processor_test.cc +++ b/tests/query_processor_test.cc @@ -36,6 +36,7 @@ #include "transport/messages/result_message.hh" #include "cql3/query_processor.hh" #include "cql3/untyped_result_set.hh" +#include "cql3/cql_config.hh" SEASTAR_TEST_CASE(test_execute_internal_insert) { return do_with_cql_env([] (auto& e) { diff --git a/tests/test_services.cc b/tests/test_services.cc index 3a0837d69f..f86aef89fe 100644 --- a/tests/test_services.cc +++ b/tests/test_services.cc @@ -30,6 +30,7 @@ #include "gms/gossiper.hh" #include "message/messaging_service.hh" #include "service/storage_service.hh" +#include "cql3/cql_config.hh" class storage_service_for_tests::impl { @@ -39,6 +40,7 @@ class storage_service_for_tests::impl { distributed _db; db::config _cfg; sharded _auth_service; + sharded _cql_config; sharded _sys_dist_ks; sharded _view_update_generator; public: @@ -54,13 +56,15 @@ public: netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get(); + _cql_config.start().get(); + service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_cql_config), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get(); service::get_storage_service().invoke_on_all([] (auto& ss) { ss.enable_all_features(); }).get(); } ~impl() { service::get_storage_service().stop().get(); + _cql_config.stop().get(); netw::get_messaging_service().stop().get(); _db.stop().get(); _gossiper.stop().get(); diff --git a/thrift/handler.cc b/thrift/handler.cc index 8971a05614..786517645a 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -203,6 +203,7 @@ enum class query_order { no, yes }; class thrift_handler : public CassandraCobSvIf { distributed& _db; distributed& _query_processor; + const cql3::cql_config& _cql_config; service::query_state _query_state; ::timeout_config _timeout_config; private: @@ -218,9 +219,10 @@ private: }); } public: - explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) + explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, const cql3::cql_config& cql_config, ::timeout_config timeout_config) : _db(db) , _query_processor(qp) + , _cql_config(cql_config) , _query_state(service::client_state::for_external_thrift_calls(auth_service), /*FIXME: pass real permit*/empty_service_permit()) , _timeout_config(timeout_config) { } @@ -964,7 +966,7 @@ public: if (compression != Compression::type::NONE) { throw make_exception("Compressed query strings are not supported"); } - auto opts = std::make_unique(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector(), + auto opts = std::make_unique(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = _query_processor.local().process(query, _query_state, *opts); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { @@ -1043,7 +1045,7 @@ public: std::transform(values.begin(), values.end(), std::back_inserter(bytes_values), [](auto&& s) { return cql3::raw_value::make_value(to_bytes(s)); }); - auto opts = std::make_unique(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), + auto opts = std::make_unique(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = _query_processor.local().process_statement_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { @@ -1942,15 +1944,18 @@ class handler_factory : public CassandraCobSvIfFactory { distributed& _db; distributed& _query_processor; auth::service& _auth_service; + const cql3::cql_config& _cql_config; timeout_config _timeout_config; public: explicit handler_factory(distributed& db, distributed& qp, - auth::service& auth_service, ::timeout_config timeout_config) - : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {} + auth::service& auth_service, + const cql3::cql_config& cql_config, + ::timeout_config timeout_config) + : _db(db), _query_processor(qp), _auth_service(auth_service), _cql_config(cql_config), _timeout_config(timeout_config) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { - return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config); + return new thrift_handler(_db, _query_processor, _auth_service, _cql_config, _timeout_config); } virtual void releaseHandler(CassandraCobSvIf* handler) { delete handler; @@ -1958,6 +1963,7 @@ public: }; std::unique_ptr -create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) { - return std::make_unique(db, qp, auth_service, timeout_config); +create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, + const cql3::cql_config& cql_config, ::timeout_config timeout_config) { + return std::make_unique(db, qp, auth_service, cql_config, timeout_config); } diff --git a/thrift/handler.hh b/thrift/handler.hh index 2765fa9417..0bfdea0815 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -31,6 +31,13 @@ struct timeout_config; -std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config); +namespace cql3 { + +class cql_config; + +} + +std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, + const cql3::cql_config& cql_config, timeout_config); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index 00e382370e..ebeaef213a 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -65,9 +65,10 @@ public: thrift_server::thrift_server(distributed& db, distributed& qp, auth::service& auth_service, + const cql3::cql_config& cql_config, thrift_server_config config) : _stats(new thrift_stats(*this)) - , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release()) + , _handler_factory(create_handler_factory(db, qp, auth_service, cql_config, config.timeout_config).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) , _config(config) { diff --git a/thrift/server.hh b/thrift/server.hh index 8825e4385d..0605621c08 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -41,6 +41,12 @@ namespace thrift_std = boost; namespace thrift_std = std; #endif +namespace cql3 { + +class cql_config; + +} + namespace cassandra { static const sstring thrift_version = "20.1.0"; @@ -117,7 +123,7 @@ private: boost::intrusive::list _connections_list; seastar::gate _stop_gate; public: - thrift_server(distributed& db, distributed& qp, auth::service&, thrift_server_config config); + thrift_server(distributed& db, distributed& qp, auth::service&, const cql3::cql_config& cql_config, thrift_server_config config); ~thrift_server(); future<> listen(socket_address addr, bool keepalive); future<> stop(); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index e90abf3314..459597b3a9 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -44,6 +44,7 @@ #include "tracing/tracing_backend_registry.hh" #include "cql3/statements/batch_statement.hh" #include "cql3/statements/modification_statement.hh" +#include "cql3/cql_config.hh" namespace tracing { @@ -258,7 +259,8 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_ cql3::raw_value::make_value(int32_type->decompose((int32_t)(session_records.ttl.count()))) }; - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) { @@ -275,7 +277,8 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.ttl.count()))) }; - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -315,7 +318,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.slow_query_record_ttl.count()))) }); - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -335,7 +339,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count()))) }); - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } std::vector trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) { @@ -372,7 +377,7 @@ future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), + cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()), [this] (auto& batch_options, auto& batch) { return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr res) { return now(); }); diff --git a/transport/request.hh b/transport/request.hh index 9bed742b49..764b79b081 100644 --- a/transport/request.hh +++ b/transport/request.hh @@ -215,10 +215,10 @@ private: options_flag::NAMES_FOR_VALUES >; public: - std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts) { + std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) { auto consistency = read_consistency(); if (version == 1) { - return std::make_unique(consistency, timeouts, std::nullopt, std::vector{}, + return std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } @@ -266,11 +266,11 @@ public: if (!names.empty()) { onames = std::move(names); } - options = std::make_unique(consistency, timeouts, std::move(onames), std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, cql_ser_format); } else { - options = std::make_unique(consistency, timeouts, std::nullopt, std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } diff --git a/transport/server.cc b/transport/server.cc index b4a36c8eb2..3c7eb7e9f0 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -145,7 +145,7 @@ event::event_type parse_event_type(const sstring& value) } cql_server::cql_server(distributed& proxy, distributed& qp, auth::service& auth_service, - cql_server_config config) + const cql3::cql_config& cql_config, cql_server_config config) : _proxy(proxy) , _query_processor(qp) , _config(config) @@ -153,6 +153,7 @@ cql_server::cql_server(distributed& proxy, distributed()) , _auth_service(auth_service) + , _cql_config(cql_config) { namespace sm = seastar::metrics; @@ -748,7 +749,7 @@ future cql_server::connection::process_query(uint16_t stream, req auto query = in.read_long_string_view(); auto q_state = std::make_unique(client_state, std::move(permit)); auto& query_state = q_state->query_state; - q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config()); + q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config); auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -826,10 +827,10 @@ future cql_server::connection::process_execute(uint16_t stream, r std::vector values; in.read_value_view_list(_version, values); auto consistency = in.read_consistency(); - q_state->options = std::make_unique(consistency, timeout_config(), std::nullopt, values, false, + q_state->options = std::make_unique(_server._cql_config, consistency, timeout_config(), std::nullopt, values, false, cql3::query_options::specific_options::DEFAULT, _cql_serialization_format); } else { - q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config()); + q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config); } auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -946,7 +947,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic auto q_state = std::make_unique(client_state, std::move(permit)); auto& query_state = q_state->query_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. - q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config())), std::move(values))); + q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values))); auto& options = *q_state->options; tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency()); diff --git a/transport/server.hh b/transport/server.hh index 1734af78d8..09099ea998 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -127,9 +127,10 @@ private: uint64_t _requests_serving = 0; uint64_t _requests_blocked_memory = 0; auth::service& _auth_service; + const cql3::cql_config& _cql_config; public: cql_server(distributed& proxy, distributed& qp, auth::service&, - cql_server_config config); + const cql3::cql_config& cql_config, cql_server_config config); future<> listen(socket_address addr, std::shared_ptr = {}, bool keepalive = false); future<> do_accepts(int which, bool keepalive, socket_address server_addr); future<> stop(); From 8c7ad1d4cd58bbd0e1233acadf34063049dd980c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 4 Aug 2019 20:58:19 +0300 Subject: [PATCH 2/3] cql: single_column_clustering_key_restrictions: limit cartesian products Cartesian products (via IN restrictions) make it easy to generate huge primary key sets with simple queries, overflowing server resources. Limit them in the coordinator and report an exception instead of trying to execute a query that would consume all of our memory. A unit test is added. --- cql3/cql_config.hh | 3 ++ cql3/restrictions/restrictions_config.hh | 35 ++++++++++++ .../single_column_primary_key_restrictions.hh | 54 +++++++++++++++++-- tests/cql_query_test.cc | 50 +++++++++++++++++ 4 files changed, 139 insertions(+), 3 deletions(-) create mode 100644 cql3/restrictions/restrictions_config.hh diff --git a/cql3/cql_config.hh b/cql3/cql_config.hh index e7b3f38967..73e7d73976 100644 --- a/cql3/cql_config.hh +++ b/cql3/cql_config.hh @@ -23,9 +23,12 @@ #pragma once +#include "restrictions/restrictions_config.hh" + namespace cql3 { struct cql_config { + restrictions::restrictions_config restrictions; }; extern const cql_config default_cql_config; diff --git a/cql3/restrictions/restrictions_config.hh b/cql3/restrictions/restrictions_config.hh new file mode 100644 index 0000000000..2f3ea3db72 --- /dev/null +++ b/cql3/restrictions/restrictions_config.hh @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#pragma once + +#include + +namespace cql3::restrictions { + +struct restrictions_config { + uint32_t partition_key_restrictions_max_cartesian_product_size = 100; + uint32_t clustering_key_restrictions_max_cartesian_product_size = 100; +}; + +} diff --git a/cql3/restrictions/single_column_primary_key_restrictions.hh b/cql3/restrictions/single_column_primary_key_restrictions.hh index 2f0e33aabe..f7c3421079 100644 --- a/cql3/restrictions/single_column_primary_key_restrictions.hh +++ b/cql3/restrictions/single_column_primary_key_restrictions.hh @@ -46,6 +46,7 @@ #include "cartesian_product.hh" #include "cql3/restrictions/primary_key_restrictions.hh" #include "cql3/restrictions/single_column_restrictions.hh" +#include "cql3/cql_config.hh" #include #include #include @@ -55,6 +56,29 @@ namespace cql3 { namespace restrictions { +namespace { + +template +const char* +restricted_component_name_v; + +template <> +const char* restricted_component_name_v = "partition key"; + +template <> +const char* restricted_component_name_v = "clustering key"; + + +inline +void check_cartesian_product_size(size_t size, size_t max, const char* component_name) { + if (size > max) { + throw std::runtime_error(fmt::format("{} cartesian product size {} is greater than maximum {}", + component_name, size, max)); + } +} + +} + /** * A set of single column restrictions on a primary key part (partition key or clustering key). */ @@ -69,6 +93,8 @@ private: schema_ptr _schema; bool _allow_filtering; ::shared_ptr _restrictions; +private: + static uint32_t max_cartesian_product_size(const restrictions_config& config); public: single_column_primary_key_restrictions(schema_ptr schema, bool allow_filtering) : _schema(schema) @@ -184,7 +210,10 @@ public: } std::vector result; - result.reserve(cartesian_product_size(value_vector)); + auto size = cartesian_product_size(value_vector); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + result.reserve(size); for (auto&& v : make_cartesian_product(value_vector)) { result.emplace_back(ValueType::from_optional_exploded(*_schema, std::move(v))); } @@ -259,7 +288,10 @@ private: return ranges; } - ranges.reserve(cartesian_product_size(vec_of_values)); + auto size = cartesian_product_size(vec_of_values); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + ranges.reserve(size); for (auto&& prefix : make_cartesian_product(vec_of_values)) { auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound { if (r->has_bound(bound)) { @@ -300,7 +332,10 @@ private: vec_of_values.emplace_back(std::move(values)); } - ranges.reserve(cartesian_product_size(vec_of_values)); + auto size = cartesian_product_size(vec_of_values); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + ranges.reserve(size); for (auto&& prefix : make_cartesian_product(vec_of_values)) { ranges.emplace_back(range_type::make_singular(ValueType::from_optional_exploded(*_schema, std::move(prefix)))); } @@ -489,6 +524,19 @@ inline unsigned single_column_primary_key_restrictions::num_prefi using single_column_partition_key_restrictions = single_column_primary_key_restrictions; using single_column_clustering_key_restrictions = single_column_primary_key_restrictions; +template <> +inline +uint32_t single_column_primary_key_restrictions::max_cartesian_product_size(const restrictions_config& config) { + return config.partition_key_restrictions_max_cartesian_product_size; +} + +template <> +inline +uint32_t single_column_primary_key_restrictions::max_cartesian_product_size(const restrictions_config& config) { + return config.clustering_key_restrictions_max_cartesian_product_size; +} + + } } diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index b7805c7a48..06869f7727 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -365,6 +365,56 @@ SEASTAR_TEST_CASE(test_in_clause_validation) { }); } +SEASTAR_THREAD_TEST_CASE(test_in_clause_cartesian_product_limits) { + do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE tab1 (pk1 int, pk2 int, PRIMARY KEY ((pk1, pk2)))").get(); + + // 100 partitions, should pass + e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)" + " AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(); + // 110 partitions, should fail + BOOST_REQUIRE_THROW( + e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)" + " AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(), + std::runtime_error); + + e.execute_cql("CREATE TABLE tab2 (pk1 int, ck1 int, ck2 int, PRIMARY KEY (pk1, ck1, ck2))").get(); + + // 100 clustering rows, should pass + e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1" + " AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)" + " AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(); + // 110 clustering rows, should fail + BOOST_REQUIRE_THROW( + e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1" + " AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)" + " AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(), + std::runtime_error); + auto make_tuple = [] (unsigned count) -> sstring { + std::ostringstream os; + os << "(0"; + for (unsigned i = 1; i < count; ++i) { + os << "," << i; + } + os << ")"; + return os.str(); + }; + e.execute_cql("CREATE TABLE tab3 (pk1 int, ck1 int, PRIMARY KEY (pk1, ck1))").get(); + // tuple with 100 keys, should pass + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(100))).get(); + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 1 AND ck1 IN {}", make_tuple(100))).get(); + // tuple with 101 keys, should fail + BOOST_REQUIRE_THROW( + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(101))).get(), + std::runtime_error + ); + BOOST_REQUIRE_THROW( + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 3 AND ck1 IN {}", make_tuple(101))).get(), + std::runtime_error + ); + }).get(); +} + SEASTAR_TEST_CASE(test_tuple_elements_validation) { return do_with_cql_env_thread([](cql_test_env& e) { auto test_inline = [&] (sstring value, bool should_throw) { From 67b0d379e0815795c19c88a10861e4f17b5c7dcc Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Sun, 4 Aug 2019 21:46:19 +0300 Subject: [PATCH 3/3] main: add glue between db::config and cql3::cql_config Copy values between the flat db::config and the hierarchical cql_config, adding observers to keep the values updated. --- db/config.cc | 7 +++++++ db/config.hh | 2 ++ main.cc | 22 ++++++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/db/config.cc b/db/config.cc index e4943c74b4..a9c5a910ad 100644 --- a/db/config.cc +++ b/db/config.cc @@ -686,6 +686,13 @@ db::config::config(std::shared_ptr exts) , enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance") , enable_ipv6_dns_lookup(this, "enable_ipv6_dns_lookup", value_status::Used, false, "Use IPv6 address resolution") , abort_on_internal_error(this, "abort_on_internal_error", liveness::LiveUpdate, value_status::Used, false, "Abort the server instead of throwing exception when internal invariants are violated") + , max_partition_key_restrictions_per_query(this, "max_partition_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, + "Maximum number of distinct partition keys restrictions per query. This limit places a bound on the size of IN tuples, " + "especially when multiple partition key columns have IN restrictions. Increasing this value can result in server instability.") + , max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, + "Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, " + "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") + , default_log_level(this, "default_log_level", value_status::Used) , logger_log_level(this, "logger_log_level", value_status::Used) , log_to_stdout(this, "log_to_stdout", value_status::Used) diff --git a/db/config.hh b/db/config.hh index 72f7b96ee7..768e2580b1 100644 --- a/db/config.hh +++ b/db/config.hh @@ -282,6 +282,8 @@ public: named_value enable_shard_aware_drivers; named_value enable_ipv6_dns_lookup; named_value abort_on_internal_error; + named_value max_partition_key_restrictions_per_query; + named_value max_clustering_key_restrictions_per_query; seastar::logging_settings logging_settings(const boost::program_options::variables_map&) const; diff --git a/main.cc b/main.cc index adb06b5024..70bc03243e 100644 --- a/main.cc +++ b/main.cc @@ -427,6 +427,25 @@ void print_starting_message(int ac, char** av, const bpo::parsed_options& opts) fmt::print("parsed command line options: {}\n", format_parsed_options(opts.options)); } +// Glue logic between db::config and cql3::cql_config +class cql_config_updater { + cql3::cql_config& _cql_config; + const db::config& _cfg; + std::vector _observers; +private: + template + void tie(T& dest, const db::config::named_value& src) { + dest = src(); + _observers.emplace_back(make_lw_shared>(src.observe([&dest] (const T& value) { dest = value; }))); + } +public: + cql_config_updater(cql3::cql_config& cql_config, const db::config& cfg) + : _cql_config(cql_config), _cfg(cfg) { + tie(_cql_config.restrictions.partition_key_restrictions_max_cartesian_product_size, _cfg.max_partition_key_restrictions_per_query); + tie(_cql_config.restrictions.clustering_key_restrictions_max_cartesian_product_size, _cfg.max_clustering_key_restrictions_per_query); + } +}; + int main(int ac, char** av) { int return_value = 0; try { @@ -678,7 +697,10 @@ int main(int ac, char** av) { static sharded sys_dist_ks; static sharded view_update_generator; static sharded cql_config; + static sharded<::cql_config_updater> cql_config_updater; cql_config.start().get(); + cql_config_updater.start(std::ref(cql_config), std::ref(*cfg)); + auto stop_cql_config_updater = defer([&] { cql_config_updater.stop().get(); }); auto& gossiper = gms::get_gossiper(); gossiper.start(std::ref(feature_service), std::ref(*cfg)).get(); // #293 - do not stop anything