diff --git a/cql3/cql_config.hh b/cql3/cql_config.hh new file mode 100644 index 0000000000..73e7d73976 --- /dev/null +++ b/cql3/cql_config.hh @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#pragma once + +#include "restrictions/restrictions_config.hh" + +namespace cql3 { + +struct cql_config { + restrictions::restrictions_config restrictions; +}; + +extern const cql_config default_cql_config; + +} diff --git a/cql3/query_options.cc b/cql3/query_options.cc index 082b9799d8..b45d2d32c4 100644 --- a/cql3/query_options.cc +++ b/cql3/query_options.cc @@ -39,17 +39,22 @@ * along with Scylla. If not, see . */ +#include "cql3/cql_config.hh" #include "query_options.hh" #include "version.hh" namespace cql3 { +const cql_config default_cql_config; + thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp}; -thread_local query_options query_options::DEFAULT{db::consistency_level::ONE, infinite_timeout_config, std::nullopt, +thread_local query_options query_options::DEFAULT{default_cql_config, + db::consistency_level::ONE, infinite_timeout_config, std::nullopt, std::vector(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()}; -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector values, @@ -57,7 +62,8 @@ query_options::query_options(db::consistency_level consistency, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) @@ -68,14 +74,16 @@ query_options::query_options(db::consistency_level consistency, { } -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector values, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) @@ -87,14 +95,16 @@ query_options::query_options(db::consistency_level consistency, fill_value_views(); } -query_options::query_options(db::consistency_level consistency, +query_options::query_options(const cql_config& cfg, + db::consistency_level consistency, const ::timeout_config& timeout_config, std::optional> names, std::vector value_views, bool skip_metadata, specific_options options, cql_serialization_format sf) - : _consistency(consistency) + : _cql_config(cfg) + , _consistency(consistency) , _timeout_config(timeout_config) , _names(std::move(names)) , _values() @@ -105,8 +115,10 @@ query_options::query_options(db::consistency_level consistency, { } -query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector values, specific_options options) +query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector values, + specific_options options) : query_options( + default_cql_config, cl, timeout_config, {}, @@ -119,7 +131,8 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t } query_options::query_options(std::unique_ptr qo, ::shared_ptr paging_state) - : query_options(qo->_consistency, + : query_options(qo->_cql_config, + qo->_consistency, qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), @@ -131,7 +144,8 @@ query_options::query_options(std::unique_ptr qo, ::shared_ptr qo, ::shared_ptr paging_state, int32_t page_size) - : query_options(qo->_consistency, + : query_options(qo->_cql_config, + qo->_consistency, qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), diff --git a/cql3/query_options.hh b/cql3/query_options.hh index be66b5e905..40c0334b14 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -55,6 +55,9 @@ namespace cql3 { +class cql_config; +extern const cql_config default_cql_config; + /** * Options for a query. */ @@ -70,6 +73,7 @@ public: const api::timestamp_type timestamp; }; private: + const cql_config& _cql_config; const db::consistency_level _consistency; const timeout_config& _timeout_config; const std::optional> _names; @@ -104,14 +108,16 @@ public: query_options(query_options&&) = default; explicit query_options(const query_options&) = default; - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector values, bool skip_metadata, specific_options options, cql_serialization_format sf); - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector values, @@ -119,7 +125,8 @@ public: bool skip_metadata, specific_options options, cql_serialization_format sf); - explicit query_options(db::consistency_level consistency, + explicit query_options(const cql_config& cfg, + db::consistency_level consistency, const timeout_config& timeouts, std::optional> names, std::vector value_views, @@ -227,6 +234,10 @@ public: return _names; } + const cql_config& get_cql_config() const { + return _cql_config; + } + void prepare(const std::vector<::shared_ptr>& specs); private: void fill_value_views(); @@ -244,7 +255,7 @@ query_options::query_options(query_options&& o, std::vector tmp; tmp.reserve(values_ranges.size()); std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) { - return query_options(_consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); + return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); }); _batch_options = std::move(tmp); } diff --git a/cql3/restrictions/restrictions_config.hh b/cql3/restrictions/restrictions_config.hh new file mode 100644 index 0000000000..2f3ea3db72 --- /dev/null +++ b/cql3/restrictions/restrictions_config.hh @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + + + +#pragma once + +#include + +namespace cql3::restrictions { + +struct restrictions_config { + uint32_t partition_key_restrictions_max_cartesian_product_size = 100; + uint32_t clustering_key_restrictions_max_cartesian_product_size = 100; +}; + +} diff --git a/cql3/restrictions/single_column_primary_key_restrictions.hh b/cql3/restrictions/single_column_primary_key_restrictions.hh index 2f0e33aabe..f7c3421079 100644 --- a/cql3/restrictions/single_column_primary_key_restrictions.hh +++ b/cql3/restrictions/single_column_primary_key_restrictions.hh @@ -46,6 +46,7 @@ #include "cartesian_product.hh" #include "cql3/restrictions/primary_key_restrictions.hh" #include "cql3/restrictions/single_column_restrictions.hh" +#include "cql3/cql_config.hh" #include #include #include @@ -55,6 +56,29 @@ namespace cql3 { namespace restrictions { +namespace { + +template +const char* +restricted_component_name_v; + +template <> +const char* restricted_component_name_v = "partition key"; + +template <> +const char* restricted_component_name_v = "clustering key"; + + +inline +void check_cartesian_product_size(size_t size, size_t max, const char* component_name) { + if (size > max) { + throw std::runtime_error(fmt::format("{} cartesian product size {} is greater than maximum {}", + component_name, size, max)); + } +} + +} + /** * A set of single column restrictions on a primary key part (partition key or clustering key). */ @@ -69,6 +93,8 @@ private: schema_ptr _schema; bool _allow_filtering; ::shared_ptr _restrictions; +private: + static uint32_t max_cartesian_product_size(const restrictions_config& config); public: single_column_primary_key_restrictions(schema_ptr schema, bool allow_filtering) : _schema(schema) @@ -184,7 +210,10 @@ public: } std::vector result; - result.reserve(cartesian_product_size(value_vector)); + auto size = cartesian_product_size(value_vector); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + result.reserve(size); for (auto&& v : make_cartesian_product(value_vector)) { result.emplace_back(ValueType::from_optional_exploded(*_schema, std::move(v))); } @@ -259,7 +288,10 @@ private: return ranges; } - ranges.reserve(cartesian_product_size(vec_of_values)); + auto size = cartesian_product_size(vec_of_values); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + ranges.reserve(size); for (auto&& prefix : make_cartesian_product(vec_of_values)) { auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound { if (r->has_bound(bound)) { @@ -300,7 +332,10 @@ private: vec_of_values.emplace_back(std::move(values)); } - ranges.reserve(cartesian_product_size(vec_of_values)); + auto size = cartesian_product_size(vec_of_values); + check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions), + restricted_component_name_v); + ranges.reserve(size); for (auto&& prefix : make_cartesian_product(vec_of_values)) { ranges.emplace_back(range_type::make_singular(ValueType::from_optional_exploded(*_schema, std::move(prefix)))); } @@ -489,6 +524,19 @@ inline unsigned single_column_primary_key_restrictions::num_prefi using single_column_partition_key_restrictions = single_column_primary_key_restrictions; using single_column_clustering_key_restrictions = single_column_primary_key_restrictions; +template <> +inline +uint32_t single_column_primary_key_restrictions::max_cartesian_product_size(const restrictions_config& config) { + return config.partition_key_restrictions_max_cartesian_product_size; +} + +template <> +inline +uint32_t single_column_primary_key_restrictions::max_cartesian_product_size(const restrictions_config& config) { + return config.clustering_key_restrictions_max_cartesian_product_size; +} + + } } diff --git a/db/config.cc b/db/config.cc index e4943c74b4..a9c5a910ad 100644 --- a/db/config.cc +++ b/db/config.cc @@ -686,6 +686,13 @@ db::config::config(std::shared_ptr exts) , enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance") , enable_ipv6_dns_lookup(this, "enable_ipv6_dns_lookup", value_status::Used, false, "Use IPv6 address resolution") , abort_on_internal_error(this, "abort_on_internal_error", liveness::LiveUpdate, value_status::Used, false, "Abort the server instead of throwing exception when internal invariants are violated") + , max_partition_key_restrictions_per_query(this, "max_partition_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, + "Maximum number of distinct partition keys restrictions per query. This limit places a bound on the size of IN tuples, " + "especially when multiple partition key columns have IN restrictions. Increasing this value can result in server instability.") + , max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100, + "Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, " + "especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.") + , default_log_level(this, "default_log_level", value_status::Used) , logger_log_level(this, "logger_log_level", value_status::Used) , log_to_stdout(this, "log_to_stdout", value_status::Used) diff --git a/db/config.hh b/db/config.hh index 72f7b96ee7..768e2580b1 100644 --- a/db/config.hh +++ b/db/config.hh @@ -282,6 +282,8 @@ public: named_value enable_shard_aware_drivers; named_value enable_ipv6_dns_lookup; named_value abort_on_internal_error; + named_value max_partition_key_restrictions_per_query; + named_value max_clustering_key_restrictions_per_query; seastar::logging_settings logging_settings(const boost::program_options::variables_map&) const; diff --git a/init.cc b/init.cc index 523a1add85..22d2bb79a0 100644 --- a/init.cc +++ b/init.cc @@ -38,9 +38,11 @@ logging::logger startlog("init"); // until proper shutdown is done. void init_storage_service(sharded& abort_source, - distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, + distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, service::storage_service_config config) { - service::init_storage_service(abort_source, db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, config).get(); + service::init_storage_service(abort_source, db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, config).get(); // #293 - do not stop anything //engine().at_exit([] { return service::deinit_storage_service(); }); } diff --git a/init.hh b/init.hh index 583aa6e85f..287002d41f 100644 --- a/init.hh +++ b/init.hh @@ -48,7 +48,9 @@ extern logging::logger startlog; class bad_configuration_error : public std::exception {}; void init_storage_service(sharded& abort_sources, - distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, + distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service); struct init_scheduling_config { diff --git a/main.cc b/main.cc index 51757665a2..70bc03243e 100644 --- a/main.cc +++ b/main.cc @@ -69,6 +69,7 @@ #include "sstables/sstables.hh" #include "gms/feature_service.hh" #include "distributed_loader.hh" +#include "cql3/cql_config.hh" namespace fs = std::filesystem; @@ -426,6 +427,25 @@ void print_starting_message(int ac, char** av, const bpo::parsed_options& opts) fmt::print("parsed command line options: {}\n", format_parsed_options(opts.options)); } +// Glue logic between db::config and cql3::cql_config +class cql_config_updater { + cql3::cql_config& _cql_config; + const db::config& _cfg; + std::vector _observers; +private: + template + void tie(T& dest, const db::config::named_value& src) { + dest = src(); + _observers.emplace_back(make_lw_shared>(src.observe([&dest] (const T& value) { dest = value; }))); + } +public: + cql_config_updater(cql3::cql_config& cql_config, const db::config& cfg) + : _cql_config(cql_config), _cfg(cfg) { + tie(_cql_config.restrictions.partition_key_restrictions_max_cartesian_product_size, _cfg.max_partition_key_restrictions_per_query); + tie(_cql_config.restrictions.clustering_key_restrictions_max_cartesian_product_size, _cfg.max_clustering_key_restrictions_per_query); + } +}; + int main(int ac, char** av) { int return_value = 0; try { @@ -676,6 +696,11 @@ int main(int ac, char** av) { static sharded auth_service; static sharded sys_dist_ks; static sharded view_update_generator; + static sharded cql_config; + static sharded<::cql_config_updater> cql_config_updater; + cql_config.start().get(); + cql_config_updater.start(std::ref(cql_config), std::ref(*cfg)); + auto stop_cql_config_updater = defer([&] { cql_config_updater.stop().get(); }); auto& gossiper = gms::get_gossiper(); gossiper.start(std::ref(feature_service), std::ref(*cfg)).get(); // #293 - do not stop anything @@ -683,7 +708,7 @@ int main(int ac, char** av) { supervisor::notify("initializing storage service"); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg); + init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg); supervisor::notify("starting per-shard database core"); // Note: changed from using a move here, because we want the config object intact. diff --git a/service/storage_service.cc b/service/storage_service.cc index 5ba2c860db..ee8bcd7a4a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -136,13 +136,14 @@ int get_generation_number() { return generation_number; } -storage_service::storage_service(abort_source& abort_source, distributed& db, gms::gossiper& gossiper, sharded& auth_service, sharded& sys_dist_ks, +storage_service::storage_service(abort_source& abort_source, distributed& db, gms::gossiper& gossiper, sharded& auth_service, sharded& cql_config, sharded& sys_dist_ks, sharded& view_update_generator, gms::feature_service& feature_service, storage_service_config config, bool for_testing, std::set disabled_features) : _abort_source(abort_source) , _feature_service(feature_service) , _db(db) , _gossiper(gossiper) , _auth_service(auth_service) + , _cql_config(cql_config) , _disabled_features(std::move(disabled_features)) , _service_memory_total(config.available_memory / 10) , _service_memory_limiter(_service_memory_total) @@ -2217,7 +2218,7 @@ future<> storage_service::start_rpc_server() { tsc.timeout_config = make_timeout_config(cfg); tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20); return gms::inet_address::lookup(addr, family, preferred).then([&ss, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) { - return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), tsc).then([tserver, port, addr, ip, keepalive] { + return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), tsc).then([tserver, port, addr, ip, keepalive] { // #293 - do not stop anything //engine().at_exit([tserver] { // return tserver->stop(); @@ -2274,7 +2275,7 @@ future<> storage_service::start_native_transport() { cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; }; cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers(); return gms::inet_address::lookup(addr, family, preferred).then([&ss, cserver, addr, &cfg, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) { - return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() { + return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() { auto f = make_ready_future(); @@ -3409,9 +3410,11 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const }); } -future<> init_storage_service(sharded& abort_source, distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(sharded& abort_source, distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, + sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, storage_service_config config) { - return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config); + return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config); } future<> deinit_storage_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index ec77b57364..df56b1f124 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -150,6 +150,7 @@ private: distributed& _db; gms::gossiper& _gossiper; sharded& _auth_service; + sharded& _cql_config; int _update_jobs{0}; // Note that this is obviously only valid for the current shard. Users of // this facility should elect a shard to be the coordinator based on any @@ -170,7 +171,7 @@ private: size_t _service_memory_total; semaphore _service_memory_limiter; public: - storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, sharded&, sharded&, sharded&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set disabled_features = {}); + storage_service(abort_source& as, distributed& db, gms::gossiper& gossiper, sharded&, sharded& cql_config, sharded&, sharded&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set disabled_features = {}); void isolate_on_error(); void isolate_on_commit_error(); @@ -2377,7 +2378,8 @@ private: void notify_cql_change(inet_address endpoint, bool ready); }; -future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(sharded& abort_sources, distributed& db, sharded& gossiper, sharded& auth_service, + sharded& cql_config, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service, storage_service_config config); future<> deinit_storage_service(); diff --git a/tests/cql_query_test.cc b/tests/cql_query_test.cc index f7ba3dc077..06869f7727 100644 --- a/tests/cql_query_test.cc +++ b/tests/cql_query_test.cc @@ -43,6 +43,7 @@ #include "types/list.hh" #include "types/set.hh" #include "db/config.hh" +#include "cql3/cql_config.hh" #include "sstables/compaction_manager.hh" #include "exception_utils.hh" @@ -364,6 +365,56 @@ SEASTAR_TEST_CASE(test_in_clause_validation) { }); } +SEASTAR_THREAD_TEST_CASE(test_in_clause_cartesian_product_limits) { + do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("CREATE TABLE tab1 (pk1 int, pk2 int, PRIMARY KEY ((pk1, pk2)))").get(); + + // 100 partitions, should pass + e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)" + " AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(); + // 110 partitions, should fail + BOOST_REQUIRE_THROW( + e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)" + " AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(), + std::runtime_error); + + e.execute_cql("CREATE TABLE tab2 (pk1 int, ck1 int, ck2 int, PRIMARY KEY (pk1, ck1, ck2))").get(); + + // 100 clustering rows, should pass + e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1" + " AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)" + " AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(); + // 110 clustering rows, should fail + BOOST_REQUIRE_THROW( + e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1" + " AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)" + " AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(), + std::runtime_error); + auto make_tuple = [] (unsigned count) -> sstring { + std::ostringstream os; + os << "(0"; + for (unsigned i = 1; i < count; ++i) { + os << "," << i; + } + os << ")"; + return os.str(); + }; + e.execute_cql("CREATE TABLE tab3 (pk1 int, ck1 int, PRIMARY KEY (pk1, ck1))").get(); + // tuple with 100 keys, should pass + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(100))).get(); + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 1 AND ck1 IN {}", make_tuple(100))).get(); + // tuple with 101 keys, should fail + BOOST_REQUIRE_THROW( + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(101))).get(), + std::runtime_error + ); + BOOST_REQUIRE_THROW( + e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 3 AND ck1 IN {}", make_tuple(101))).get(), + std::runtime_error + ); + }).get(); +} + SEASTAR_TEST_CASE(test_tuple_elements_validation) { return do_with_cql_env_thread([](cql_test_env& e) { auto test_inline = [&] (sstring value, bool should_throw) { @@ -3030,7 +3081,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) { BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception); auto make_query_options = [] (cql_protocol_version_type version) { - return std::make_unique(db::consistency_level::ONE, infinite_timeout_config, std::nullopt, + return std::make_unique(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version}); }; diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index a6023d0ca3..d1e5620ed1 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -29,6 +29,7 @@ #include "cql3/query_processor.hh" #include "cql3/query_options.hh" #include "cql3/statements/batch_statement.hh" +#include "cql3/cql_config.hh" #include #include #include @@ -385,13 +386,16 @@ public: distributed& proxy = service::get_storage_proxy(); distributed& mm = service::get_migration_manager(); distributed& bm = db::get_batchlog_manager(); + sharded cql_config; + cql_config.start().get(); + auto stop_cql_config = defer([&] { cql_config.stop().get(); }); auto view_update_generator = ::make_shared>(); auto& ss = service::get_storage_service(); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get(); + ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get(); auto stop_storage_service = defer([&ss] { ss.stop().get(); }); database_config dbcfg; diff --git a/tests/gossip.cc b/tests/gossip.cc index ee1a903201..b77bfc51e2 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -35,6 +35,7 @@ #include #include #include "db/config.hh" +#include "cql3/cql_config.hh" namespace bpo = boost::program_options; @@ -60,11 +61,12 @@ namespace bpo = boost::program_options; int main(int ac, char ** av) { distributed db; sharded auth_service; + sharded cql_config; app_template app; app.add_options() ("seed", bpo::value>(), "IP address of seed node") ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen"); - return app.run_deprecated(ac, av, [&auth_service, &db, &app] { + return app.run_deprecated(ac, av, [&auth_service, &db, &app, &cql_config] { auto config = app.configuration(); logging::logger_registry().set_logger_level("gossip", logging::log_level::trace); const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); @@ -83,7 +85,8 @@ int main(int ac, char ** av) { auto stop_abort_source = defer([&] { abort_sources.stop().get(); }); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg).get(); + cql_config.start().get(); + service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg).get(); netw::get_messaging_service().start(listen).get(); auto& server = netw::get_local_messaging_service(); auto port = server.port(); diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc index 06bb3a467c..52e56e1be8 100644 --- a/tests/gossip_test.cc +++ b/tests/gossip_test.cc @@ -37,6 +37,7 @@ #include "database.hh" #include "db/system_distributed_keyspace.hh" #include "db/config.hh" +#include "cql3/cql_config.hh" namespace db::view { class view_update_generator; @@ -71,7 +72,11 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get(); + sharded cql_config; + cql_config.start(); + auto stop_cql_config = defer([&] { cql_config.stop().get(); }); + + service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get(); auto stop_ss = defer([&] { service::get_storage_service().stop().get(); }); db.start(std::ref(cfg), dbcfg).get(); diff --git a/tests/query_processor_test.cc b/tests/query_processor_test.cc index a624aa5441..b147b8e2fa 100644 --- a/tests/query_processor_test.cc +++ b/tests/query_processor_test.cc @@ -36,6 +36,7 @@ #include "transport/messages/result_message.hh" #include "cql3/query_processor.hh" #include "cql3/untyped_result_set.hh" +#include "cql3/cql_config.hh" SEASTAR_TEST_CASE(test_execute_internal_insert) { return do_with_cql_env([] (auto& e) { diff --git a/tests/test_services.cc b/tests/test_services.cc index 3a0837d69f..f86aef89fe 100644 --- a/tests/test_services.cc +++ b/tests/test_services.cc @@ -30,6 +30,7 @@ #include "gms/gossiper.hh" #include "message/messaging_service.hh" #include "service/storage_service.hh" +#include "cql3/cql_config.hh" class storage_service_for_tests::impl { @@ -39,6 +40,7 @@ class storage_service_for_tests::impl { distributed _db; db::config _cfg; sharded _auth_service; + sharded _cql_config; sharded _sys_dist_ks; sharded _view_update_generator; public: @@ -54,13 +56,15 @@ public: netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); service::storage_service_config sscfg; sscfg.available_memory = memory::stats().total_memory(); - service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get(); + _cql_config.start().get(); + service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_cql_config), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get(); service::get_storage_service().invoke_on_all([] (auto& ss) { ss.enable_all_features(); }).get(); } ~impl() { service::get_storage_service().stop().get(); + _cql_config.stop().get(); netw::get_messaging_service().stop().get(); _db.stop().get(); _gossiper.stop().get(); diff --git a/thrift/handler.cc b/thrift/handler.cc index 8971a05614..786517645a 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -203,6 +203,7 @@ enum class query_order { no, yes }; class thrift_handler : public CassandraCobSvIf { distributed& _db; distributed& _query_processor; + const cql3::cql_config& _cql_config; service::query_state _query_state; ::timeout_config _timeout_config; private: @@ -218,9 +219,10 @@ private: }); } public: - explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) + explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, const cql3::cql_config& cql_config, ::timeout_config timeout_config) : _db(db) , _query_processor(qp) + , _cql_config(cql_config) , _query_state(service::client_state::for_external_thrift_calls(auth_service), /*FIXME: pass real permit*/empty_service_permit()) , _timeout_config(timeout_config) { } @@ -964,7 +966,7 @@ public: if (compression != Compression::type::NONE) { throw make_exception("Compressed query strings are not supported"); } - auto opts = std::make_unique(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector(), + auto opts = std::make_unique(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = _query_processor.local().process(query, _query_state, *opts); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { @@ -1043,7 +1045,7 @@ public: std::transform(values.begin(), values.end(), std::back_inserter(bytes_values), [](auto&& s) { return cql3::raw_value::make_value(to_bytes(s)); }); - auto opts = std::make_unique(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), + auto opts = std::make_unique(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = _query_processor.local().process_statement_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { @@ -1942,15 +1944,18 @@ class handler_factory : public CassandraCobSvIfFactory { distributed& _db; distributed& _query_processor; auth::service& _auth_service; + const cql3::cql_config& _cql_config; timeout_config _timeout_config; public: explicit handler_factory(distributed& db, distributed& qp, - auth::service& auth_service, ::timeout_config timeout_config) - : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {} + auth::service& auth_service, + const cql3::cql_config& cql_config, + ::timeout_config timeout_config) + : _db(db), _query_processor(qp), _auth_service(auth_service), _cql_config(cql_config), _timeout_config(timeout_config) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { - return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config); + return new thrift_handler(_db, _query_processor, _auth_service, _cql_config, _timeout_config); } virtual void releaseHandler(CassandraCobSvIf* handler) { delete handler; @@ -1958,6 +1963,7 @@ public: }; std::unique_ptr -create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) { - return std::make_unique(db, qp, auth_service, timeout_config); +create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, + const cql3::cql_config& cql_config, ::timeout_config timeout_config) { + return std::make_unique(db, qp, auth_service, cql_config, timeout_config); } diff --git a/thrift/handler.hh b/thrift/handler.hh index 2765fa9417..0bfdea0815 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -31,6 +31,13 @@ struct timeout_config; -std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config); +namespace cql3 { + +class cql_config; + +} + +std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, + const cql3::cql_config& cql_config, timeout_config); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index 00e382370e..ebeaef213a 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -65,9 +65,10 @@ public: thrift_server::thrift_server(distributed& db, distributed& qp, auth::service& auth_service, + const cql3::cql_config& cql_config, thrift_server_config config) : _stats(new thrift_stats(*this)) - , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release()) + , _handler_factory(create_handler_factory(db, qp, auth_service, cql_config, config.timeout_config).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) , _config(config) { diff --git a/thrift/server.hh b/thrift/server.hh index 8825e4385d..0605621c08 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -41,6 +41,12 @@ namespace thrift_std = boost; namespace thrift_std = std; #endif +namespace cql3 { + +class cql_config; + +} + namespace cassandra { static const sstring thrift_version = "20.1.0"; @@ -117,7 +123,7 @@ private: boost::intrusive::list _connections_list; seastar::gate _stop_gate; public: - thrift_server(distributed& db, distributed& qp, auth::service&, thrift_server_config config); + thrift_server(distributed& db, distributed& qp, auth::service&, const cql3::cql_config& cql_config, thrift_server_config config); ~thrift_server(); future<> listen(socket_address addr, bool keepalive); future<> stop(); diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index e90abf3314..459597b3a9 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -44,6 +44,7 @@ #include "tracing/tracing_backend_registry.hh" #include "cql3/statements/batch_statement.hh" #include "cql3/statements/modification_statement.hh" +#include "cql3/cql_config.hh" namespace tracing { @@ -258,7 +259,8 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_ cql3::raw_value::make_value(int32_type->decompose((int32_t)(session_records.ttl.count()))) }; - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) { @@ -275,7 +277,8 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.ttl.count()))) }; - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -315,7 +318,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.slow_query_record_ttl.count()))) }); - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -335,7 +339,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count()))) }); - return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + return cql3::query_options(cql3::default_cql_config, + db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } std::vector trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) { @@ -372,7 +377,7 @@ future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), + cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()), [this] (auto& batch_options, auto& batch) { return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr res) { return now(); }); diff --git a/transport/request.hh b/transport/request.hh index 9bed742b49..764b79b081 100644 --- a/transport/request.hh +++ b/transport/request.hh @@ -215,10 +215,10 @@ private: options_flag::NAMES_FOR_VALUES >; public: - std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts) { + std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) { auto consistency = read_consistency(); if (version == 1) { - return std::make_unique(consistency, timeouts, std::nullopt, std::vector{}, + return std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } @@ -266,11 +266,11 @@ public: if (!names.empty()) { onames = std::move(names); } - options = std::make_unique(consistency, timeouts, std::move(onames), std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, cql_ser_format); } else { - options = std::make_unique(consistency, timeouts, std::nullopt, std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } diff --git a/transport/server.cc b/transport/server.cc index b4a36c8eb2..3c7eb7e9f0 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -145,7 +145,7 @@ event::event_type parse_event_type(const sstring& value) } cql_server::cql_server(distributed& proxy, distributed& qp, auth::service& auth_service, - cql_server_config config) + const cql3::cql_config& cql_config, cql_server_config config) : _proxy(proxy) , _query_processor(qp) , _config(config) @@ -153,6 +153,7 @@ cql_server::cql_server(distributed& proxy, distributed()) , _auth_service(auth_service) + , _cql_config(cql_config) { namespace sm = seastar::metrics; @@ -748,7 +749,7 @@ future cql_server::connection::process_query(uint16_t stream, req auto query = in.read_long_string_view(); auto q_state = std::make_unique(client_state, std::move(permit)); auto& query_state = q_state->query_state; - q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config()); + q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config); auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -826,10 +827,10 @@ future cql_server::connection::process_execute(uint16_t stream, r std::vector values; in.read_value_view_list(_version, values); auto consistency = in.read_consistency(); - q_state->options = std::make_unique(consistency, timeout_config(), std::nullopt, values, false, + q_state->options = std::make_unique(_server._cql_config, consistency, timeout_config(), std::nullopt, values, false, cql3::query_options::specific_options::DEFAULT, _cql_serialization_format); } else { - q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config()); + q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config); } auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -946,7 +947,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic auto q_state = std::make_unique(client_state, std::move(permit)); auto& query_state = q_state->query_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. - q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config())), std::move(values))); + q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values))); auto& options = *q_state->options; tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency()); diff --git a/transport/server.hh b/transport/server.hh index 1734af78d8..09099ea998 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -127,9 +127,10 @@ private: uint64_t _requests_serving = 0; uint64_t _requests_blocked_memory = 0; auth::service& _auth_service; + const cql3::cql_config& _cql_config; public: cql_server(distributed& proxy, distributed& qp, auth::service&, - cql_server_config config); + const cql3::cql_config& cql_config, cql_server_config config); future<> listen(socket_address addr, std::shared_ptr = {}, bool keepalive = false); future<> do_accepts(int which, bool keepalive, socket_address server_addr); future<> stop();