From ef1de114f09f84d58038614858608ec3ffca07aa Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Thu, 18 Mar 2021 09:48:23 +0100 Subject: [PATCH] thrift: partially add admission control This commit adds admission control in the form of passing service permits to the Thrift server. The support is partial, because Thrift also supports running CQL queries, and for that purpose a query_state object is kept in the Thrift handler. However, the handler is generally created once per connection, not once per query, and the query_state object is supposed to keep the state of a single query only. In order to keep this series simpler, the CQL-on-top-of-Thrift layer is not touched and is left as TODO. Moreover, the Thrift layer does not make it easy to pass custom per-query context (like service_permit), so the implementation uses a trick: the service permit is created on the server and then passed as reference to its connections and their respective Thrift handlers. Then, each time a query is read from the socket, this service permit is overwritten and then read back from the Thrift handler. This mechanism heavily relies on the fact that there are zero preemption points between overwriting the service permit and reading it back by the handler. Otherwise, races may occur. This assumption was verified by code inspection + empirical tests, but if somebody is aware that it may not always hold, please speak up. --- main.cc | 2 +- thrift/controller.cc | 8 +-- thrift/controller.hh | 4 +- thrift/handler.cc | 115 ++++++++++++++++++++++++++++++------------- thrift/handler.hh | 2 +- thrift/server.cc | 32 +++++++++++- thrift/server.hh | 11 ++++- 7 files changed, 132 insertions(+), 42 deletions(-) diff --git a/main.cc b/main.cc index 29cec748fc..3e9ce2cdb4 100644 --- a/main.cc +++ b/main.cc @@ -1304,7 +1304,7 @@ int main(int ac, char** av) { api::unset_transport_controller(ctx).get(); }); - ::thrift_controller thrift_ctl(db, auth_service, qp); + ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter); ss.register_client_shutdown_hook("rpc server", [&thrift_ctl] { thrift_ctl.stop().get(); diff --git a/thrift/controller.cc b/thrift/controller.cc index 510543efae..b1f5975247 100644 --- a/thrift/controller.cc +++ b/thrift/controller.cc @@ -24,14 +24,16 @@ #include "database.hh" #include "db/config.hh" #include "log.hh" +#include "service/storage_service.hh" static logging::logger clogger("thrift_controller"); -thrift_controller::thrift_controller(distributed& db, sharded& auth, sharded& qp) +thrift_controller::thrift_controller(distributed& db, sharded& auth, sharded& qp, sharded& ml) : _ops_sem(1) , _db(db) , _auth_service(auth) - , _qp(qp) { + , _qp(qp) + , _mem_limiter(ml) { } future<> thrift_controller::start_server() { @@ -62,7 +64,7 @@ future<> thrift_controller::do_start_server() { tsc.timeout_config = make_timeout_config(cfg); tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20); return gms::inet_address::lookup(addr, family, preferred).then([this, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) { - return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), tsc).then([tserver, port, addr, ip, keepalive] { + return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] { // #293 - do not stop anything //engine().at_exit([tserver] { // return tserver->stop(); diff --git a/thrift/controller.hh b/thrift/controller.hh index 0a404d21a9..69c2bc6dea 100644 --- a/thrift/controller.hh +++ b/thrift/controller.hh @@ -24,6 +24,7 @@ #include #include #include +#include "service/memory_limiter.hh" using namespace seastar; @@ -40,12 +41,13 @@ class thrift_controller { distributed& _db; sharded& _auth_service; sharded& _qp; + sharded& _mem_limiter; future<> do_start_server(); future<> do_stop_server(); public: - thrift_controller(distributed&, sharded&, sharded&); + thrift_controller(distributed&, sharded&, sharded&, sharded&); future<> start_server(); future<> stop_server(); future<> stop(); diff --git a/thrift/handler.cc b/thrift/handler.cc index 1a0f31f3e7..8d1199162d 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -204,6 +204,7 @@ class thrift_handler : public CassandraCobSvIf { ::timeout_config _timeout_config; service::client_state _client_state; service::query_state _query_state; + service_permit& _current_permit; private: template void @@ -217,12 +218,16 @@ private: }); } public: - explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) + explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit) : _db(db) , _query_processor(qp) , _timeout_config(timeout_config) , _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true) + // FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store + // service permits in here. The query state should be reinstantiated per query - AFAIK it's only used + // for CQL queries which piggy-back on Thrift protocol. , _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit()) + , _current_permit(current_permit) { } const sstring& current_keyspace() const { @@ -234,6 +239,7 @@ public: }; void login(thrift_fn::function cob, thrift_fn::function exn_cob, const AuthenticationRequest& auth_request) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auth::authenticator::credentials_map creds(auth_request.credentials.begin(), auth_request.credentials.end()); auto& auth_service = *_query_state.get_client_state().get_auth_service(); @@ -244,12 +250,14 @@ public: } void set_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { _query_state.get_client_state().set_keyspace(_db.local(), keyspace); }); } void get(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return get_slice([cob = std::move(cob), &column_path](auto&& results) { if (results.empty()) { throw NotFoundException(); @@ -259,6 +267,7 @@ public: } void get_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return multiget_slice([cob = std::move(cob)](auto&& results) { if (!results.empty()) { return cob(std::move(results.begin()->second)); @@ -268,6 +277,7 @@ public: } void get_count(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return multiget_count([cob = std::move(cob)](auto&& results) { if (!results.empty()) { return cob(results.begin()->second); @@ -277,6 +287,7 @@ public: } void multiget_slice(thrift_fn::function > const& _return)> cob, thrift_fn::function exn_cob, const std::vector & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -286,9 +297,9 @@ public: auto cell_limit = predicate.__isset.slice_range ? static_cast(predicate.slice_range.count) : std::numeric_limits::max(); auto pranges = make_partition_ranges(*schema, keys); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable { + return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { if (schema->is_counter()) { @@ -306,6 +317,7 @@ public: } void multiget_count(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::vector & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -315,9 +327,9 @@ public: auto cell_limit = predicate.__isset.slice_range ? static_cast(predicate.slice_range.count) : std::numeric_limits::max(); auto pranges = make_partition_ranges(*schema, keys); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable { + return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { column_counter counter(*schema, cmd->slice, cell_limit, std::move(keys)); @@ -336,6 +348,7 @@ public: * now our behavior differs from Origin. */ void get_range_slices(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const ColumnParent& column_parent, const SlicePredicate& predicate, const KeyRange& range, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -353,9 +366,9 @@ public: cmd->set_row_limit(static_cast(range.count)); } auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level] () mutable { + return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.range_read_timeout; - return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, std::numeric_limits::max()); @@ -409,7 +422,8 @@ public: db::consistency_level consistency_level, const ::timeout_config& timeout_config, std::vector& output, - service::query_state& qs) { + service::query_state& qs, + service_permit permit) { auto& proxy = service::get_local_storage_proxy(); auto cmd = make_paged_read_cmd(proxy, *schema, column_limit, start_column, range); std::optional start_key; @@ -422,12 +436,12 @@ public: } auto range1 = range; // query() below accepts an rvalue, so need a copy to reuse later auto timeout = db::timeout_clock::now() + timeout_config.range_read_timeout; - return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, empty_service_permit(), qs.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, std::move(permit), qs.get_client_state()}).then( [schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, column_limit); }); - }).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs](auto&& slices) mutable { + }).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs, permit = std::move(permit)](auto&& slices) mutable { auto columns = std::accumulate(slices.begin(), slices.end(), 0u, [](auto&& acc, auto&& ks) { return acc + ks.columns.size(); }); @@ -436,7 +450,7 @@ public: if (!output.empty() || !start_key) { if (range.size() > 1 && columns < column_limit) { range.erase(range.begin()); - return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs); + return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit)); } return make_ready_future(); } @@ -446,11 +460,12 @@ public: } auto start = dht::decorate_key(*schema, std::move(*start_key)); range[0] = dht::partition_range(dht::partition_range::bound(std::move(start), false), std::move(end)); - return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs); + return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit)); }); } void get_paged_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& column_family, const KeyRange& range, const std::string& start_column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_family, [&](schema_ptr schema) { return do_with(std::vector(), [&](auto& output) { if (range.__isset.row_filter) { @@ -468,9 +483,9 @@ public: } } auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output] () mutable { + return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output, permit = std::move(permit)] () mutable { return do_get_paged_slice(std::move(schema), count, std::move(prange), &start_column, - cl_from_thrift(consistency_level), _timeout_config, output, _query_state).then([&output] { + cl_from_thrift(consistency_level), _timeout_config, output, _query_state, std::move(permit)).then([&output] { return std::move(output); }); }); @@ -479,6 +494,7 @@ public: } void get_indexed_slices(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const ColumnParent& column_parent, const IndexClause& index_clause, const SlicePredicate& column_predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); std::vector _return; warn(unimplemented::cause::INDEXES); // FIXME: implement @@ -486,6 +502,7 @@ public: } void insert(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const Column& column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (column_parent.__isset.super_column) { fail(unimplemented::cause::SUPER); @@ -497,14 +514,15 @@ public: mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); add_to_mutation(*schema, column, m_to_apply); - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void add(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const CounterColumn& column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (column_parent.__isset.super_column) { fail(unimplemented::cause::SUPER); @@ -512,14 +530,15 @@ public: mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); add_to_mutation(*schema, column, m_to_apply); - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void cas(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const std::string& column_family, const std::vector & expected, const std::vector & updates, const ConsistencyLevel::type serial_consistency_level, const ConsistencyLevel::type commit_consistency_level) { + service_permit permit = obtain_permit(); CASResult _return; warn(unimplemented::cause::LWT); // FIXME: implement @@ -527,6 +546,7 @@ public: } void remove(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const int64_t timestamp, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) { if (schema->is_view()) { throw make_exception("Cannot modify Materialized Views directly"); @@ -547,14 +567,15 @@ public: m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now())); } - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void remove_counter(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) { mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); @@ -572,39 +593,42 @@ public: m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now())); } - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { // This mutation contains only counter tombstones so it can be applied like non-counter mutations. auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void batch_mutate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map); return parallel_for_each(std::move(p.second), [this](auto&& schema) { return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY); - }).then([this, muts = std::move(p.first), consistency_level] { + }).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void atomic_batch_mutate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map); return parallel_for_each(std::move(p.second), [this](auto&& schema) { return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY); - }).then([this, muts = std::move(p.first), consistency_level] { + }).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void truncate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& cfname) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { if (current_keyspace().empty()) { throw make_exception("keyspace not set"); @@ -620,6 +644,7 @@ public: } void get_multi_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const MultiSliceRequest& request) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), request.column_parent.column_family, [&](schema_ptr schema) { if (!request.__isset.key) { throw make_exception("Key may not be empty"); @@ -673,9 +698,9 @@ public: auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice), query::row_limit(row_limit)); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level] { + return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, /* FIXME: pass real permit */empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) { column_aggregator aggregator(*schema, cmd->slice, column_limit, { }); @@ -689,6 +714,7 @@ public: } void describe_schema_versions(thrift_fn::function > const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [] { return service::get_local_storage_service().describe_schema_versions().then([](auto&& m) { std::map> ret; @@ -701,6 +727,7 @@ public: } void describe_keyspaces(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { validate_login(); std::vector ret; @@ -712,14 +739,17 @@ public: } void describe_cluster_name(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(_db.local().get_config().cluster_name()); } void describe_version(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(::cassandra::thrift_version); } void do_describe_ring(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& keyspace, bool local) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto& ks = _db.local().find_keyspace(keyspace); if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) { @@ -759,6 +789,7 @@ public: } void describe_token_map(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [] { auto m = service::get_local_storage_service().get_token_to_endpoint_map(); std::map ret; @@ -770,14 +801,17 @@ public: } void describe_partitioner(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(_db.local().get_config().partitioner()); } void describe_snitch(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(format("org.apache.cassandra.locator.{}", _db.local().get_snitch_name())); } void describe_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { validate_login(); auto& ks = _db.local().find_keyspace(keyspace); @@ -786,6 +820,7 @@ public: } void describe_splits(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) { + service_permit permit = obtain_permit(); return describe_splits_ex([cob = std::move(cob)](auto&& results) { std::vector res; res.reserve(results.size() + 1); @@ -798,12 +833,14 @@ public: } void trace_next_query(thrift_fn::function cob) { + service_permit permit = obtain_permit(); std::string _return; // FIXME: implement return cob("dummy trace"); } void describe_splits_ex(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&]{ dht::token_range_vector ranges; auto tstart = start_token.empty() ? dht::minimum_token() : dht::token::from_sstring(sstring(start_token)); @@ -827,6 +864,7 @@ public: } void system_add_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { if (!_db.local().has_keyspace(cf_def.keyspace)) { throw NotFoundException(); @@ -844,6 +882,7 @@ public: }); } void system_drop_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& column_family) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), column_family, auth::permission::DROP).then([this, column_family] { auto& cf = _db.local().find_column_family(current_keyspace(), column_family); @@ -861,6 +900,7 @@ public: } void system_add_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto ksm = keyspace_from_thrift(ks_def); return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] { @@ -872,6 +912,7 @@ public: } void system_drop_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { thrift_validation::validate_keyspace_not_system(keyspace); if (!_db.local().has_keyspace(keyspace)) { @@ -887,6 +928,7 @@ public: } void system_update_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { thrift_validation::validate_keyspace_not_system(ks_def.name); @@ -907,6 +949,7 @@ public: } void system_update_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto& cf = _db.local().find_column_family(cf_def.keyspace, cf_def.name); auto schema = cf.schema(); @@ -1964,6 +2007,10 @@ private: } return {std::move(muts), std::move(schemas)}; } +protected: + service_permit obtain_permit() { + return std::move(_current_permit); + } }; class handler_factory : public CassandraCobSvIfFactory { @@ -1971,15 +2018,17 @@ class handler_factory : public CassandraCobSvIfFactory { distributed& _query_processor; auth::service& _auth_service; timeout_config _timeout_config; + service_permit& _current_permit; public: explicit handler_factory(distributed& db, distributed& qp, auth::service& auth_service, - ::timeout_config timeout_config) - : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {} + ::timeout_config timeout_config, + service_permit& current_permit) + : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { - return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config); + return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit); } virtual void releaseHandler(CassandraCobSvIf* handler) { delete handler; @@ -1988,6 +2037,6 @@ public: std::unique_ptr create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, - ::timeout_config timeout_config) { - return std::make_unique(db, qp, auth_service, timeout_config); + ::timeout_config timeout_config, service_permit& current_permit) { + return std::make_unique(db, qp, auth_service, timeout_config, current_permit); } diff --git a/thrift/handler.hh b/thrift/handler.hh index 2765fa9417..c56f0e4923 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -31,6 +31,6 @@ struct timeout_config; -std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config); +std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config, service_permit& current_permit); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index 320c4fba68..f3ad1ae61c 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "log.hh" #include #include @@ -66,11 +67,13 @@ public: thrift_server::thrift_server(distributed& db, distributed& qp, auth::service& auth_service, + service::memory_limiter& ml, thrift_server_config config) : _stats(new thrift_stats(*this)) - , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release()) + , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) + , _memory_available(ml.get_semaphore()) , _config(config) { } @@ -158,8 +161,23 @@ thrift_server::connection::process_one_request() { write().forward_to(std::move(_processor_promise)); _processor_promise = promise<>(); }; + // Heuristics copied from transport/server.cc + size_t mem_estimate = 8000 + 2 * _input->available_read(); + auto units = co_await get_units(_server._memory_available, mem_estimate); + // NOTICE: this permit is put in the server under the assumption that no other + // connection will overwrite this permit *until* it's extracted by the code + // which handles the Thrift request (via calling obtain_permit()). + // This assumption is true because there are no preemption points between this + // insertion and the call to obtain_permit(), which was verified both by + // code inspection and confirmed empirically by running manual tests. + if (_server._current_permit.count() > 0) { + tlogger.debug("Current service permit is overwritten while its units are still held ({}). " + "This situation likely means that there's a bug in passing service permits to message handlers.", + _server._current_permit.count()); + } + _server._current_permit = make_service_permit(std::move(units)); _processor->process(complete, _in_proto, _out_proto); - co_return co_await ret; + co_return co_await std::move(ret); } future<> @@ -289,6 +307,16 @@ thrift_server::requests_served() const { return _requests_served; } +size_t +thrift_server::max_request_size() const { + return _config.max_request_size; +} + +const semaphore& +thrift_server::memory_available() const { + return _memory_available; +} + thrift_stats::thrift_stats(thrift_server& server) { namespace sm = seastar::metrics; diff --git a/thrift/server.hh b/thrift/server.hh index 80b4c539a9..25228c0185 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -26,6 +26,7 @@ #include #include "cql3/query_processor.hh" #include "timeout_config.hh" +#include "service/memory_limiter.hh" #include #include #include @@ -76,6 +77,7 @@ class service; struct thrift_server_config { ::timeout_config timeout_config; uint64_t max_request_size; + std::function get_service_memory_limiter_semaphore; }; class thrift_server { @@ -107,17 +109,19 @@ class thrift_server { private: std::vector _listeners; std::unique_ptr _stats; + service_permit _current_permit = empty_service_permit(); thrift_std::shared_ptr<::cassandra::CassandraCobSvIfFactory> _handler_factory; std::unique_ptr _protocol_factory; thrift_std::shared_ptr _processor_factory; uint64_t _total_connections = 0; uint64_t _current_connections = 0; uint64_t _requests_served = 0; + semaphore& _memory_available; thrift_server_config _config; boost::intrusive::list _connections_list; seastar::gate _stop_gate; public: - thrift_server(distributed& db, distributed& qp, auth::service&, thrift_server_config config); + thrift_server(distributed& db, distributed& qp, auth::service&, service::memory_limiter& ml, thrift_server_config config); ~thrift_server(); future<> listen(socket_address addr, bool keepalive); future<> stop(); @@ -125,6 +129,11 @@ public: uint64_t total_connections() const; uint64_t current_connections() const; uint64_t requests_served() const; + size_t max_request_size() const; + const semaphore& memory_available() const; + +private: + void maybe_retry_accept(int which, bool keepalive, std::exception_ptr ex); }; #endif /* APPS_SEASTAR_THRIFT_SERVER_HH_ */