mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-05 14:33:08 +00:00
Merge "cql3: cartesian product limits" from Avi
Cartesian products (generated by IN restrictions) can grow very large, even for short queries. This can overwhelm server resources. Add limit checking for cartesian products, and configuration items for users that are not satisfied with the default of 100 records fetched. Fixes #4752. Tests: unit (dev), manual test with SIGHUP.
This commit is contained in:
36
cql3/cql_config.hh
Normal file
36
cql3/cql_config.hh
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "restrictions/restrictions_config.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
struct cql_config {
|
||||
restrictions::restrictions_config restrictions;
|
||||
};
|
||||
|
||||
extern const cql_config default_cql_config;
|
||||
|
||||
}
|
||||
@@ -39,17 +39,22 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "cql3/cql_config.hh"
|
||||
#include "query_options.hh"
|
||||
#include "version.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
const cql_config default_cql_config;
|
||||
|
||||
thread_local const query_options::specific_options query_options::specific_options::DEFAULT{-1, {}, {}, api::missing_timestamp};
|
||||
|
||||
thread_local query_options query_options::DEFAULT{db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
thread_local query_options query_options::DEFAULT{default_cql_config,
|
||||
db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()};
|
||||
|
||||
query_options::query_options(db::consistency_level consistency,
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
@@ -57,7 +62,8 @@ query_options::query_options(db::consistency_level consistency,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
cql_serialization_format sf)
|
||||
: _consistency(consistency)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
@@ -68,14 +74,16 @@ query_options::query_options(db::consistency_level consistency,
|
||||
{
|
||||
}
|
||||
|
||||
query_options::query_options(db::consistency_level consistency,
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
cql_serialization_format sf)
|
||||
: _consistency(consistency)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values(std::move(values))
|
||||
@@ -87,14 +95,16 @@ query_options::query_options(db::consistency_level consistency,
|
||||
fill_value_views();
|
||||
}
|
||||
|
||||
query_options::query_options(db::consistency_level consistency,
|
||||
query_options::query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const ::timeout_config& timeout_config,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
cql_serialization_format sf)
|
||||
: _consistency(consistency)
|
||||
: _cql_config(cfg)
|
||||
, _consistency(consistency)
|
||||
, _timeout_config(timeout_config)
|
||||
, _names(std::move(names))
|
||||
, _values()
|
||||
@@ -105,8 +115,10 @@ query_options::query_options(db::consistency_level consistency,
|
||||
{
|
||||
}
|
||||
|
||||
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values, specific_options options)
|
||||
query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector<cql3::raw_value> values,
|
||||
specific_options options)
|
||||
: query_options(
|
||||
default_cql_config,
|
||||
cl,
|
||||
timeout_config,
|
||||
{},
|
||||
@@ -119,7 +131,8 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t
|
||||
}
|
||||
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state)
|
||||
: query_options(qo->_consistency,
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
@@ -131,7 +144,8 @@ query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<ser
|
||||
}
|
||||
|
||||
query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
|
||||
: query_options(qo->_consistency,
|
||||
: query_options(qo->_cql_config,
|
||||
qo->_consistency,
|
||||
qo->get_timeout_config(),
|
||||
std::move(qo->_names),
|
||||
std::move(qo->_values),
|
||||
|
||||
@@ -55,6 +55,9 @@
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class cql_config;
|
||||
extern const cql_config default_cql_config;
|
||||
|
||||
/**
|
||||
* Options for a query.
|
||||
*/
|
||||
@@ -70,6 +73,7 @@ public:
|
||||
const api::timestamp_type timestamp;
|
||||
};
|
||||
private:
|
||||
const cql_config& _cql_config;
|
||||
const db::consistency_level _consistency;
|
||||
const timeout_config& _timeout_config;
|
||||
const std::optional<std::vector<sstring_view>> _names;
|
||||
@@ -104,14 +108,16 @@ public:
|
||||
query_options(query_options&&) = default;
|
||||
explicit query_options(const query_options&) = default;
|
||||
|
||||
explicit query_options(db::consistency_level consistency,
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(db::consistency_level consistency,
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value> values,
|
||||
@@ -119,7 +125,8 @@ public:
|
||||
bool skip_metadata,
|
||||
specific_options options,
|
||||
cql_serialization_format sf);
|
||||
explicit query_options(db::consistency_level consistency,
|
||||
explicit query_options(const cql_config& cfg,
|
||||
db::consistency_level consistency,
|
||||
const timeout_config& timeouts,
|
||||
std::optional<std::vector<sstring_view>> names,
|
||||
std::vector<cql3::raw_value_view> value_views,
|
||||
@@ -227,6 +234,10 @@ public:
|
||||
return _names;
|
||||
}
|
||||
|
||||
const cql_config& get_cql_config() const {
|
||||
return _cql_config;
|
||||
}
|
||||
|
||||
void prepare(const std::vector<::shared_ptr<column_specification>>& specs);
|
||||
private:
|
||||
void fill_value_views();
|
||||
@@ -244,7 +255,7 @@ query_options::query_options(query_options&& o, std::vector<OneMutationDataRange
|
||||
std::vector<query_options> tmp;
|
||||
tmp.reserve(values_ranges.size());
|
||||
std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) {
|
||||
return query_options(_consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format);
|
||||
});
|
||||
_batch_options = std::move(tmp);
|
||||
}
|
||||
|
||||
35
cql3/restrictions/restrictions_config.hh
Normal file
35
cql3/restrictions/restrictions_config.hh
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
namespace cql3::restrictions {
|
||||
|
||||
struct restrictions_config {
|
||||
uint32_t partition_key_restrictions_max_cartesian_product_size = 100;
|
||||
uint32_t clustering_key_restrictions_max_cartesian_product_size = 100;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -46,6 +46,7 @@
|
||||
#include "cartesian_product.hh"
|
||||
#include "cql3/restrictions/primary_key_restrictions.hh"
|
||||
#include "cql3/restrictions/single_column_restrictions.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
#include <boost/algorithm/cxx11/all_of.hpp>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
@@ -55,6 +56,29 @@ namespace cql3 {
|
||||
|
||||
namespace restrictions {
|
||||
|
||||
namespace {
|
||||
|
||||
template <typename ValueType>
|
||||
const char*
|
||||
restricted_component_name_v;
|
||||
|
||||
template <>
|
||||
const char* restricted_component_name_v<partition_key> = "partition key";
|
||||
|
||||
template <>
|
||||
const char* restricted_component_name_v<clustering_key> = "clustering key";
|
||||
|
||||
|
||||
inline
|
||||
void check_cartesian_product_size(size_t size, size_t max, const char* component_name) {
|
||||
if (size > max) {
|
||||
throw std::runtime_error(fmt::format("{} cartesian product size {} is greater than maximum {}",
|
||||
component_name, size, max));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A set of single column restrictions on a primary key part (partition key or clustering key).
|
||||
*/
|
||||
@@ -69,6 +93,8 @@ private:
|
||||
schema_ptr _schema;
|
||||
bool _allow_filtering;
|
||||
::shared_ptr<single_column_restrictions> _restrictions;
|
||||
private:
|
||||
static uint32_t max_cartesian_product_size(const restrictions_config& config);
|
||||
public:
|
||||
single_column_primary_key_restrictions(schema_ptr schema, bool allow_filtering)
|
||||
: _schema(schema)
|
||||
@@ -184,7 +210,10 @@ public:
|
||||
}
|
||||
|
||||
std::vector<ValueType> result;
|
||||
result.reserve(cartesian_product_size(value_vector));
|
||||
auto size = cartesian_product_size(value_vector);
|
||||
check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions),
|
||||
restricted_component_name_v<ValueType>);
|
||||
result.reserve(size);
|
||||
for (auto&& v : make_cartesian_product(value_vector)) {
|
||||
result.emplace_back(ValueType::from_optional_exploded(*_schema, std::move(v)));
|
||||
}
|
||||
@@ -259,7 +288,10 @@ private:
|
||||
return ranges;
|
||||
}
|
||||
|
||||
ranges.reserve(cartesian_product_size(vec_of_values));
|
||||
auto size = cartesian_product_size(vec_of_values);
|
||||
check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions),
|
||||
restricted_component_name_v<ValueType>);
|
||||
ranges.reserve(size);
|
||||
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
|
||||
auto read_bound = [r, &prefix, &options, this](statements::bound bound) -> range_bound {
|
||||
if (r->has_bound(bound)) {
|
||||
@@ -300,7 +332,10 @@ private:
|
||||
vec_of_values.emplace_back(std::move(values));
|
||||
}
|
||||
|
||||
ranges.reserve(cartesian_product_size(vec_of_values));
|
||||
auto size = cartesian_product_size(vec_of_values);
|
||||
check_cartesian_product_size(size, max_cartesian_product_size(options.get_cql_config().restrictions),
|
||||
restricted_component_name_v<ValueType>);
|
||||
ranges.reserve(size);
|
||||
for (auto&& prefix : make_cartesian_product(vec_of_values)) {
|
||||
ranges.emplace_back(range_type::make_singular(ValueType::from_optional_exploded(*_schema, std::move(prefix))));
|
||||
}
|
||||
@@ -489,6 +524,19 @@ inline unsigned single_column_primary_key_restrictions<partition_key>::num_prefi
|
||||
using single_column_partition_key_restrictions = single_column_primary_key_restrictions<partition_key>;
|
||||
using single_column_clustering_key_restrictions = single_column_primary_key_restrictions<clustering_key>;
|
||||
|
||||
template <>
|
||||
inline
|
||||
uint32_t single_column_primary_key_restrictions<partition_key>::max_cartesian_product_size(const restrictions_config& config) {
|
||||
return config.partition_key_restrictions_max_cartesian_product_size;
|
||||
}
|
||||
|
||||
template <>
|
||||
inline
|
||||
uint32_t single_column_primary_key_restrictions<clustering_key>::max_cartesian_product_size(const restrictions_config& config) {
|
||||
return config.clustering_key_restrictions_max_cartesian_product_size;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -686,6 +686,13 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")
|
||||
, enable_ipv6_dns_lookup(this, "enable_ipv6_dns_lookup", value_status::Used, false, "Use IPv6 address resolution")
|
||||
, abort_on_internal_error(this, "abort_on_internal_error", liveness::LiveUpdate, value_status::Used, false, "Abort the server instead of throwing exception when internal invariants are violated")
|
||||
, max_partition_key_restrictions_per_query(this, "max_partition_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100,
|
||||
"Maximum number of distinct partition keys restrictions per query. This limit places a bound on the size of IN tuples, "
|
||||
"especially when multiple partition key columns have IN restrictions. Increasing this value can result in server instability.")
|
||||
, max_clustering_key_restrictions_per_query(this, "max_clustering_key_restrictions_per_query", liveness::LiveUpdate, value_status::Used, 100,
|
||||
"Maximum number of distinct clustering key restrictions per query. This limit places a bound on the size of IN tuples, "
|
||||
"especially when multiple clustering key columns have IN restrictions. Increasing this value can result in server instability.")
|
||||
|
||||
, default_log_level(this, "default_log_level", value_status::Used)
|
||||
, logger_log_level(this, "logger_log_level", value_status::Used)
|
||||
, log_to_stdout(this, "log_to_stdout", value_status::Used)
|
||||
|
||||
@@ -282,6 +282,8 @@ public:
|
||||
named_value<bool> enable_shard_aware_drivers;
|
||||
named_value<bool> enable_ipv6_dns_lookup;
|
||||
named_value<bool> abort_on_internal_error;
|
||||
named_value<uint32_t> max_partition_key_restrictions_per_query;
|
||||
named_value<uint32_t> max_clustering_key_restrictions_per_query;
|
||||
|
||||
seastar::logging_settings logging_settings(const boost::program_options::variables_map&) const;
|
||||
|
||||
|
||||
6
init.cc
6
init.cc
@@ -38,9 +38,11 @@ logging::logger startlog("init");
|
||||
// until proper shutdown is done.
|
||||
|
||||
void init_storage_service(sharded<abort_source>& abort_source,
|
||||
distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
|
||||
sharded<cql3::cql_config>& cql_config,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, service::storage_service_config config) {
|
||||
service::init_storage_service(abort_source, db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, config).get();
|
||||
service::init_storage_service(abort_source, db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, config).get();
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
}
|
||||
|
||||
4
init.hh
4
init.hh
@@ -48,7 +48,9 @@ extern logging::logger startlog;
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(sharded<abort_source>& abort_sources,
|
||||
distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
|
||||
sharded<cql3::cql_config>& cql_config,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service);
|
||||
|
||||
struct init_scheduling_config {
|
||||
|
||||
27
main.cc
27
main.cc
@@ -69,6 +69,7 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "distributed_loader.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@@ -426,6 +427,25 @@ void print_starting_message(int ac, char** av, const bpo::parsed_options& opts)
|
||||
fmt::print("parsed command line options: {}\n", format_parsed_options(opts.options));
|
||||
}
|
||||
|
||||
// Glue logic between db::config and cql3::cql_config
|
||||
class cql_config_updater {
|
||||
cql3::cql_config& _cql_config;
|
||||
const db::config& _cfg;
|
||||
std::vector<std::any> _observers;
|
||||
private:
|
||||
template <typename T>
|
||||
void tie(T& dest, const db::config::named_value<T>& src) {
|
||||
dest = src();
|
||||
_observers.emplace_back(make_lw_shared<utils::observer<T>>(src.observe([&dest] (const T& value) { dest = value; })));
|
||||
}
|
||||
public:
|
||||
cql_config_updater(cql3::cql_config& cql_config, const db::config& cfg)
|
||||
: _cql_config(cql_config), _cfg(cfg) {
|
||||
tie(_cql_config.restrictions.partition_key_restrictions_max_cartesian_product_size, _cfg.max_partition_key_restrictions_per_query);
|
||||
tie(_cql_config.restrictions.clustering_key_restrictions_max_cartesian_product_size, _cfg.max_clustering_key_restrictions_per_query);
|
||||
}
|
||||
};
|
||||
|
||||
int main(int ac, char** av) {
|
||||
int return_value = 0;
|
||||
try {
|
||||
@@ -676,6 +696,11 @@ int main(int ac, char** av) {
|
||||
static sharded<auth::service> auth_service;
|
||||
static sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
static sharded<db::view::view_update_generator> view_update_generator;
|
||||
static sharded<cql3::cql_config> cql_config;
|
||||
static sharded<::cql_config_updater> cql_config_updater;
|
||||
cql_config.start().get();
|
||||
cql_config_updater.start(std::ref(cql_config), std::ref(*cfg));
|
||||
auto stop_cql_config_updater = defer([&] { cql_config_updater.stop().get(); });
|
||||
auto& gossiper = gms::get_gossiper();
|
||||
gossiper.start(std::ref(feature_service), std::ref(*cfg)).get();
|
||||
// #293 - do not stop anything
|
||||
@@ -683,7 +708,7 @@ int main(int ac, char** av) {
|
||||
supervisor::notify("initializing storage service");
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg);
|
||||
init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
|
||||
supervisor::notify("starting per-shard database core");
|
||||
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
|
||||
@@ -136,13 +136,14 @@ int get_generation_number() {
|
||||
return generation_number;
|
||||
}
|
||||
|
||||
storage_service::storage_service(abort_source& abort_source, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
storage_service::storage_service(abort_source& abort_source, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>& auth_service, sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, gms::feature_service& feature_service, storage_service_config config, bool for_testing, std::set<sstring> disabled_features)
|
||||
: _abort_source(abort_source)
|
||||
, _feature_service(feature_service)
|
||||
, _db(db)
|
||||
, _gossiper(gossiper)
|
||||
, _auth_service(auth_service)
|
||||
, _cql_config(cql_config)
|
||||
, _disabled_features(std::move(disabled_features))
|
||||
, _service_memory_total(config.available_memory / 10)
|
||||
, _service_memory_limiter(_service_memory_total)
|
||||
@@ -2217,7 +2218,7 @@ future<> storage_service::start_rpc_server() {
|
||||
tsc.timeout_config = make_timeout_config(cfg);
|
||||
tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20);
|
||||
return gms::inet_address::lookup(addr, family, preferred).then([&ss, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) {
|
||||
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), tsc).then([tserver, port, addr, ip, keepalive] {
|
||||
return tserver->start(std::ref(ss._db), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), tsc).then([tserver, port, addr, ip, keepalive] {
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([tserver] {
|
||||
// return tserver->stop();
|
||||
@@ -2274,7 +2275,7 @@ future<> storage_service::start_native_transport() {
|
||||
cql_server_config.get_service_memory_limiter_semaphore = [ss = std::ref(get_storage_service())] () -> semaphore& { return ss.get().local()._service_memory_limiter; };
|
||||
cql_server_config.allow_shard_aware_drivers = cfg.enable_shard_aware_drivers();
|
||||
return gms::inet_address::lookup(addr, family, preferred).then([&ss, cserver, addr, &cfg, keepalive, ceo = std::move(ceo), cql_server_config] (seastar::net::inet_address ip) {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
return cserver->start(std::ref(service::get_storage_proxy()), std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), cql_server_config).then([cserver, &cfg, addr, ip, ceo, keepalive]() {
|
||||
|
||||
auto f = make_ready_future();
|
||||
|
||||
@@ -3409,9 +3410,11 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const
|
||||
});
|
||||
}
|
||||
|
||||
future<> init_storage_service(sharded<abort_source>& abort_source, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
future<> init_storage_service(sharded<abort_source>& abort_source, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
|
||||
sharded<cql3::cql_config>& cql_config,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, storage_service_config config) {
|
||||
return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config);
|
||||
return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config);
|
||||
}
|
||||
|
||||
future<> deinit_storage_service() {
|
||||
|
||||
@@ -150,6 +150,7 @@ private:
|
||||
distributed<database>& _db;
|
||||
gms::gossiper& _gossiper;
|
||||
sharded<auth::service>& _auth_service;
|
||||
sharded<cql3::cql_config>& _cql_config;
|
||||
int _update_jobs{0};
|
||||
// Note that this is obviously only valid for the current shard. Users of
|
||||
// this facility should elect a shard to be the coordinator based on any
|
||||
@@ -170,7 +171,7 @@ private:
|
||||
size_t _service_memory_total;
|
||||
semaphore _service_memory_limiter;
|
||||
public:
|
||||
storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set<sstring> disabled_features = {});
|
||||
storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>&, sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set<sstring> disabled_features = {});
|
||||
void isolate_on_error();
|
||||
void isolate_on_commit_error();
|
||||
|
||||
@@ -2377,7 +2378,8 @@ private:
|
||||
void notify_cql_change(inet_address endpoint, bool ready);
|
||||
};
|
||||
|
||||
future<> init_storage_service(sharded<abort_source>& abort_sources, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
future<> init_storage_service(sharded<abort_source>& abort_sources, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
|
||||
sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, storage_service_config config);
|
||||
future<> deinit_storage_service();
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
#include "types/list.hh"
|
||||
#include "types/set.hh"
|
||||
#include "db/config.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
#include "sstables/compaction_manager.hh"
|
||||
#include "exception_utils.hh"
|
||||
|
||||
@@ -364,6 +365,56 @@ SEASTAR_TEST_CASE(test_in_clause_validation) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_in_clause_cartesian_product_limits) {
|
||||
do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE tab1 (pk1 int, pk2 int, PRIMARY KEY ((pk1, pk2)))").get();
|
||||
|
||||
// 100 partitions, should pass
|
||||
e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)"
|
||||
" AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get();
|
||||
// 110 partitions, should fail
|
||||
BOOST_REQUIRE_THROW(
|
||||
e.execute_cql("SELECT * FROM tab1 WHERE pk1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)"
|
||||
" AND pk2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(),
|
||||
std::runtime_error);
|
||||
|
||||
e.execute_cql("CREATE TABLE tab2 (pk1 int, ck1 int, ck2 int, PRIMARY KEY (pk1, ck1, ck2))").get();
|
||||
|
||||
// 100 clustering rows, should pass
|
||||
e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1"
|
||||
" AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)"
|
||||
" AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get();
|
||||
// 110 clustering rows, should fail
|
||||
BOOST_REQUIRE_THROW(
|
||||
e.execute_cql("SELECT * FROM tab2 WHERE pk1 = 1"
|
||||
" AND ck1 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)"
|
||||
" AND ck2 IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9)").get(),
|
||||
std::runtime_error);
|
||||
auto make_tuple = [] (unsigned count) -> sstring {
|
||||
std::ostringstream os;
|
||||
os << "(0";
|
||||
for (unsigned i = 1; i < count; ++i) {
|
||||
os << "," << i;
|
||||
}
|
||||
os << ")";
|
||||
return os.str();
|
||||
};
|
||||
e.execute_cql("CREATE TABLE tab3 (pk1 int, ck1 int, PRIMARY KEY (pk1, ck1))").get();
|
||||
// tuple with 100 keys, should pass
|
||||
e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(100))).get();
|
||||
e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 1 AND ck1 IN {}", make_tuple(100))).get();
|
||||
// tuple with 101 keys, should fail
|
||||
BOOST_REQUIRE_THROW(
|
||||
e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 IN {}", make_tuple(101))).get(),
|
||||
std::runtime_error
|
||||
);
|
||||
BOOST_REQUIRE_THROW(
|
||||
e.execute_cql(fmt::format("SELECT * FROM tab3 WHERE pk1 = 3 AND ck1 IN {}", make_tuple(101))).get(),
|
||||
std::runtime_error
|
||||
);
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_tuple_elements_validation) {
|
||||
return do_with_cql_env_thread([](cql_test_env& e) {
|
||||
auto test_inline = [&] (sstring value, bool should_throw) {
|
||||
@@ -3030,7 +3081,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) {
|
||||
BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception);
|
||||
|
||||
auto make_query_options = [] (cql_protocol_version_type version) {
|
||||
return std::make_unique<cql3::query_options>(db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
return std::make_unique<cql3::query_options>(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt,
|
||||
std::vector<cql3::raw_value_view>(), false,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version});
|
||||
};
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
@@ -385,13 +386,16 @@ public:
|
||||
distributed<service::storage_proxy>& proxy = service::get_storage_proxy();
|
||||
distributed<service::migration_manager>& mm = service::get_migration_manager();
|
||||
distributed<db::batchlog_manager>& bm = db::get_batchlog_manager();
|
||||
sharded<cql3::cql_config> cql_config;
|
||||
cql_config.start().get();
|
||||
auto stop_cql_config = defer([&] { cql_config.stop().get(); });
|
||||
|
||||
auto view_update_generator = ::make_shared<seastar::sharded<db::view::view_update_generator>>();
|
||||
|
||||
auto& ss = service::get_storage_service();
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
|
||||
ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
database_config dbcfg;
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <chrono>
|
||||
#include "db/config.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
@@ -60,11 +61,12 @@ namespace bpo = boost::program_options;
|
||||
int main(int ac, char ** av) {
|
||||
distributed<database> db;
|
||||
sharded<auth::service> auth_service;
|
||||
sharded<cql3::cql_config> cql_config;
|
||||
app_template app;
|
||||
app.add_options()
|
||||
("seed", bpo::value<std::vector<std::string>>(), "IP address of seed node")
|
||||
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen");
|
||||
return app.run_deprecated(ac, av, [&auth_service, &db, &app] {
|
||||
return app.run_deprecated(ac, av, [&auth_service, &db, &app, &cql_config] {
|
||||
auto config = app.configuration();
|
||||
logging::logger_registry().set_logger_level("gossip", logging::log_level::trace);
|
||||
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
|
||||
@@ -83,7 +85,8 @@ int main(int ac, char ** av) {
|
||||
auto stop_abort_source = defer([&] { abort_sources.stop().get(); });
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, sys_dist_ks, view_update_generator, feature_service, sscfg).get();
|
||||
cql_config.start().get();
|
||||
service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg).get();
|
||||
netw::get_messaging_service().start(listen).get();
|
||||
auto& server = netw::get_local_messaging_service();
|
||||
auto port = server.port();
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#include "database.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "db/config.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
namespace db::view {
|
||||
class view_update_generator;
|
||||
@@ -71,7 +72,11 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get();
|
||||
sharded<cql3::cql_config> cql_config;
|
||||
cql_config.start();
|
||||
auto stop_cql_config = defer([&] { cql_config.stop().get(); });
|
||||
|
||||
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start(std::ref(cfg), dbcfg).get();
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
SEASTAR_TEST_CASE(test_execute_internal_insert) {
|
||||
return do_with_cql_env([] (auto& e) {
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include "gms/gossiper.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
|
||||
class storage_service_for_tests::impl {
|
||||
@@ -39,6 +40,7 @@ class storage_service_for_tests::impl {
|
||||
distributed<database> _db;
|
||||
db::config _cfg;
|
||||
sharded<auth::service> _auth_service;
|
||||
sharded<cql3::cql_config> _cql_config;
|
||||
sharded<db::system_distributed_keyspace> _sys_dist_ks;
|
||||
sharded<db::view::view_update_generator> _view_update_generator;
|
||||
public:
|
||||
@@ -54,13 +56,15 @@ public:
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
|
||||
service::storage_service_config sscfg;
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get();
|
||||
_cql_config.start().get();
|
||||
service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_cql_config), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get();
|
||||
service::get_storage_service().invoke_on_all([] (auto& ss) {
|
||||
ss.enable_all_features();
|
||||
}).get();
|
||||
}
|
||||
~impl() {
|
||||
service::get_storage_service().stop().get();
|
||||
_cql_config.stop().get();
|
||||
netw::get_messaging_service().stop().get();
|
||||
_db.stop().get();
|
||||
_gossiper.stop().get();
|
||||
|
||||
@@ -203,6 +203,7 @@ enum class query_order { no, yes };
|
||||
class thrift_handler : public CassandraCobSvIf {
|
||||
distributed<database>& _db;
|
||||
distributed<cql3::query_processor>& _query_processor;
|
||||
const cql3::cql_config& _cql_config;
|
||||
service::query_state _query_state;
|
||||
::timeout_config _timeout_config;
|
||||
private:
|
||||
@@ -218,9 +219,10 @@ private:
|
||||
});
|
||||
}
|
||||
public:
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
|
||||
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, const cql3::cql_config& cql_config, ::timeout_config timeout_config)
|
||||
: _db(db)
|
||||
, _query_processor(qp)
|
||||
, _cql_config(cql_config)
|
||||
, _query_state(service::client_state::for_external_thrift_calls(auth_service), /*FIXME: pass real permit*/empty_service_permit())
|
||||
, _timeout_config(timeout_config)
|
||||
{ }
|
||||
@@ -964,7 +966,7 @@ public:
|
||||
if (compression != Compression::type::NONE) {
|
||||
throw make_exception<InvalidRequestException>("Compressed query strings are not supported");
|
||||
}
|
||||
auto opts = std::make_unique<cql3::query_options>(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
auto opts = std::make_unique<cql3::query_options>(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector<cql3::raw_value_view>(),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = _query_processor.local().process(query, _query_state, *opts);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
@@ -1043,7 +1045,7 @@ public:
|
||||
std::transform(values.begin(), values.end(), std::back_inserter(bytes_values), [](auto&& s) {
|
||||
return cql3::raw_value::make_value(to_bytes(s));
|
||||
});
|
||||
auto opts = std::make_unique<cql3::query_options>(cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values),
|
||||
auto opts = std::make_unique<cql3::query_options>(_cql_config, cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values),
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
auto f = _query_processor.local().process_statement_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization);
|
||||
return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) {
|
||||
@@ -1942,15 +1944,18 @@ class handler_factory : public CassandraCobSvIfFactory {
|
||||
distributed<database>& _db;
|
||||
distributed<cql3::query_processor>& _query_processor;
|
||||
auth::service& _auth_service;
|
||||
const cql3::cql_config& _cql_config;
|
||||
timeout_config _timeout_config;
|
||||
public:
|
||||
explicit handler_factory(distributed<database>& db,
|
||||
distributed<cql3::query_processor>& qp,
|
||||
auth::service& auth_service, ::timeout_config timeout_config)
|
||||
: _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {}
|
||||
auth::service& auth_service,
|
||||
const cql3::cql_config& cql_config,
|
||||
::timeout_config timeout_config)
|
||||
: _db(db), _query_processor(qp), _auth_service(auth_service), _cql_config(cql_config), _timeout_config(timeout_config) {}
|
||||
typedef CassandraCobSvIf Handler;
|
||||
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
|
||||
return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config);
|
||||
return new thrift_handler(_db, _query_processor, _auth_service, _cql_config, _timeout_config);
|
||||
}
|
||||
virtual void releaseHandler(CassandraCobSvIf* handler) {
|
||||
delete handler;
|
||||
@@ -1958,6 +1963,7 @@ public:
|
||||
};
|
||||
|
||||
std::unique_ptr<CassandraCobSvIfFactory>
|
||||
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config) {
|
||||
return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config);
|
||||
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service,
|
||||
const cql3::cql_config& cql_config, ::timeout_config timeout_config) {
|
||||
return std::make_unique<handler_factory>(db, qp, auth_service, cql_config, timeout_config);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,13 @@
|
||||
|
||||
struct timeout_config;
|
||||
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config);
|
||||
namespace cql3 {
|
||||
|
||||
class cql_config;
|
||||
|
||||
}
|
||||
|
||||
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&,
|
||||
const cql3::cql_config& cql_config, timeout_config);
|
||||
|
||||
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
|
||||
|
||||
@@ -65,9 +65,10 @@ public:
|
||||
thrift_server::thrift_server(distributed<database>& db,
|
||||
distributed<cql3::query_processor>& qp,
|
||||
auth::service& auth_service,
|
||||
const cql3::cql_config& cql_config,
|
||||
thrift_server_config config)
|
||||
: _stats(new thrift_stats(*this))
|
||||
, _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release())
|
||||
, _handler_factory(create_handler_factory(db, qp, auth_service, cql_config, config.timeout_config).release())
|
||||
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
|
||||
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
|
||||
, _config(config) {
|
||||
|
||||
@@ -41,6 +41,12 @@ namespace thrift_std = boost;
|
||||
namespace thrift_std = std;
|
||||
#endif
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class cql_config;
|
||||
|
||||
}
|
||||
|
||||
namespace cassandra {
|
||||
|
||||
static const sstring thrift_version = "20.1.0";
|
||||
@@ -117,7 +123,7 @@ private:
|
||||
boost::intrusive::list<connection> _connections_list;
|
||||
seastar::gate _stop_gate;
|
||||
public:
|
||||
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, thrift_server_config config);
|
||||
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, const cql3::cql_config& cql_config, thrift_server_config config);
|
||||
~thrift_server();
|
||||
future<> listen(socket_address addr, bool keepalive);
|
||||
future<> stop();
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "tracing/tracing_backend_registry.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "cql3/cql_config.hh"
|
||||
|
||||
namespace tracing {
|
||||
|
||||
@@ -258,7 +259,8 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_
|
||||
cql3::raw_value::make_value(int32_type->decompose((int32_t)(session_records.ttl.count())))
|
||||
};
|
||||
|
||||
return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) {
|
||||
@@ -275,7 +277,8 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c
|
||||
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.ttl.count())))
|
||||
};
|
||||
|
||||
return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -315,7 +318,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o
|
||||
cql3::raw_value::make_value(int32_type->decompose((int32_t)(record.slow_query_record_ttl.count())))
|
||||
});
|
||||
|
||||
return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) {
|
||||
@@ -335,7 +339,8 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat
|
||||
cql3::raw_value::make_value(int32_type->decompose(int32_t(session_records.session_rec.slow_query_record_ttl.count())))
|
||||
});
|
||||
|
||||
return cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
return cql3::query_options(cql3::default_cql_config,
|
||||
db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest());
|
||||
}
|
||||
|
||||
std::vector<cql3::raw_value> trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) {
|
||||
@@ -372,7 +377,7 @@ future<> trace_keyspace_helper::apply_events_mutation(lw_shared_ptr<one_session_
|
||||
std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); });
|
||||
|
||||
return do_with(
|
||||
cql3::query_options::make_batch_options(cql3::query_options(db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector<cql3::raw_value>{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)),
|
||||
cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()),
|
||||
[this] (auto& batch_options, auto& batch) {
|
||||
return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr<cql_transport::messages::result_message> res) { return now(); });
|
||||
|
||||
@@ -215,10 +215,10 @@ private:
|
||||
options_flag::NAMES_FOR_VALUES
|
||||
>;
|
||||
public:
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts) {
|
||||
std::unique_ptr<cql3::query_options> read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) {
|
||||
auto consistency = read_consistency();
|
||||
if (version == 1) {
|
||||
return std::make_unique<cql3::query_options>(consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
return std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::vector<cql3::raw_value_view>{},
|
||||
false, cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
@@ -266,11 +266,11 @@ public:
|
||||
if (!names.empty()) {
|
||||
onames = std::move(names);
|
||||
}
|
||||
options = std::make_unique<cql3::query_options>(consistency, timeouts, std::move(onames), std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts},
|
||||
cql_ser_format);
|
||||
} else {
|
||||
options = std::make_unique<cql3::query_options>(consistency, timeouts, std::nullopt, std::move(values), skip_metadata,
|
||||
options = std::make_unique<cql3::query_options>(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata,
|
||||
cql3::query_options::specific_options::DEFAULT, cql_ser_format);
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ event::event_type parse_event_type(const sstring& value)
|
||||
}
|
||||
|
||||
cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service& auth_service,
|
||||
cql_server_config config)
|
||||
const cql3::cql_config& cql_config, cql_server_config config)
|
||||
: _proxy(proxy)
|
||||
, _query_processor(qp)
|
||||
, _config(config)
|
||||
@@ -153,6 +153,7 @@ cql_server::cql_server(distributed<service::storage_proxy>& proxy, distributed<c
|
||||
, _memory_available(config.get_service_memory_limiter_semaphore())
|
||||
, _notifier(std::make_unique<event_notifier>())
|
||||
, _auth_service(auth_service)
|
||||
, _cql_config(cql_config)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
@@ -748,7 +749,7 @@ future<response_type> cql_server::connection::process_query(uint16_t stream, req
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config());
|
||||
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config);
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
|
||||
@@ -826,10 +827,10 @@ future<response_type> cql_server::connection::process_execute(uint16_t stream, r
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
in.read_value_view_list(_version, values);
|
||||
auto consistency = in.read_consistency();
|
||||
q_state->options = std::make_unique<cql3::query_options>(consistency, timeout_config(), std::nullopt, values, false,
|
||||
q_state->options = std::make_unique<cql3::query_options>(_server._cql_config, consistency, timeout_config(), std::nullopt, values, false,
|
||||
cql3::query_options::specific_options::DEFAULT, _cql_serialization_format);
|
||||
} else {
|
||||
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config());
|
||||
q_state->options = in.read_options(_version, _cql_serialization_format, this->timeout_config(), _server._cql_config);
|
||||
}
|
||||
auto& options = *q_state->options;
|
||||
auto skip_metadata = options.skip_metadata();
|
||||
@@ -946,7 +947,7 @@ cql_server::connection::process_batch(uint16_t stream, request_reader in, servic
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, std::move(permit));
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config())), std::move(values)));
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(_version < 3 ? 1 : _version, _cql_serialization_format, this->timeout_config(), _server._cql_config)), std::move(values)));
|
||||
auto& options = *q_state->options;
|
||||
|
||||
tracing::set_consistency_level(client_state.get_trace_state(), options.get_consistency());
|
||||
|
||||
@@ -127,9 +127,10 @@ private:
|
||||
uint64_t _requests_serving = 0;
|
||||
uint64_t _requests_blocked_memory = 0;
|
||||
auth::service& _auth_service;
|
||||
const cql3::cql_config& _cql_config;
|
||||
public:
|
||||
cql_server(distributed<service::storage_proxy>& proxy, distributed<cql3::query_processor>& qp, auth::service&,
|
||||
cql_server_config config);
|
||||
const cql3::cql_config& cql_config, cql_server_config config);
|
||||
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
|
||||
future<> stop();
|
||||
|
||||
Reference in New Issue
Block a user