diff --git a/main.cc b/main.cc index 29cec748fc..3e9ce2cdb4 100644 --- a/main.cc +++ b/main.cc @@ -1304,7 +1304,7 @@ int main(int ac, char** av) { api::unset_transport_controller(ctx).get(); }); - ::thrift_controller thrift_ctl(db, auth_service, qp); + ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter); ss.register_client_shutdown_hook("rpc server", [&thrift_ctl] { thrift_ctl.stop().get(); diff --git a/thrift/controller.cc b/thrift/controller.cc index 510543efae..b1f5975247 100644 --- a/thrift/controller.cc +++ b/thrift/controller.cc @@ -24,14 +24,16 @@ #include "database.hh" #include "db/config.hh" #include "log.hh" +#include "service/storage_service.hh" static logging::logger clogger("thrift_controller"); -thrift_controller::thrift_controller(distributed& db, sharded& auth, sharded& qp) +thrift_controller::thrift_controller(distributed& db, sharded& auth, sharded& qp, sharded& ml) : _ops_sem(1) , _db(db) , _auth_service(auth) - , _qp(qp) { + , _qp(qp) + , _mem_limiter(ml) { } future<> thrift_controller::start_server() { @@ -62,7 +64,7 @@ future<> thrift_controller::do_start_server() { tsc.timeout_config = make_timeout_config(cfg); tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20); return gms::inet_address::lookup(addr, family, preferred).then([this, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) { - return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), tsc).then([tserver, port, addr, ip, keepalive] { + return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] { // #293 - do not stop anything //engine().at_exit([tserver] { // return tserver->stop(); diff --git a/thrift/controller.hh b/thrift/controller.hh index 0a404d21a9..69c2bc6dea 100644 --- a/thrift/controller.hh +++ b/thrift/controller.hh @@ -24,6 +24,7 @@ #include #include #include +#include "service/memory_limiter.hh" using namespace seastar; @@ -40,12 +41,13 @@ class thrift_controller { distributed& _db; sharded& _auth_service; sharded& _qp; + sharded& _mem_limiter; future<> do_start_server(); future<> do_stop_server(); public: - thrift_controller(distributed&, sharded&, sharded&); + thrift_controller(distributed&, sharded&, sharded&, sharded&); future<> start_server(); future<> stop_server(); future<> stop(); diff --git a/thrift/handler.cc b/thrift/handler.cc index 1a0f31f3e7..8d1199162d 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -204,6 +204,7 @@ class thrift_handler : public CassandraCobSvIf { ::timeout_config _timeout_config; service::client_state _client_state; service::query_state _query_state; + service_permit& _current_permit; private: template void @@ -217,12 +218,16 @@ private: }); } public: - explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config) + explicit thrift_handler(distributed& db, distributed& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit) : _db(db) , _query_processor(qp) , _timeout_config(timeout_config) , _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true) + // FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store + // service permits in here. The query state should be reinstantiated per query - AFAIK it's only used + // for CQL queries which piggy-back on Thrift protocol. , _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit()) + , _current_permit(current_permit) { } const sstring& current_keyspace() const { @@ -234,6 +239,7 @@ public: }; void login(thrift_fn::function cob, thrift_fn::function exn_cob, const AuthenticationRequest& auth_request) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auth::authenticator::credentials_map creds(auth_request.credentials.begin(), auth_request.credentials.end()); auto& auth_service = *_query_state.get_client_state().get_auth_service(); @@ -244,12 +250,14 @@ public: } void set_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { _query_state.get_client_state().set_keyspace(_db.local(), keyspace); }); } void get(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return get_slice([cob = std::move(cob), &column_path](auto&& results) { if (results.empty()) { throw NotFoundException(); @@ -259,6 +267,7 @@ public: } void get_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return multiget_slice([cob = std::move(cob)](auto&& results) { if (!results.empty()) { return cob(std::move(results.begin()->second)); @@ -268,6 +277,7 @@ public: } void get_count(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); return multiget_count([cob = std::move(cob)](auto&& results) { if (!results.empty()) { return cob(results.begin()->second); @@ -277,6 +287,7 @@ public: } void multiget_slice(thrift_fn::function > const& _return)> cob, thrift_fn::function exn_cob, const std::vector & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -286,9 +297,9 @@ public: auto cell_limit = predicate.__isset.slice_range ? static_cast(predicate.slice_range.count) : std::numeric_limits::max(); auto pranges = make_partition_ranges(*schema, keys); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable { + return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { if (schema->is_counter()) { @@ -306,6 +317,7 @@ public: } void multiget_count(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::vector & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -315,9 +327,9 @@ public: auto cell_limit = predicate.__isset.slice_range ? static_cast(predicate.slice_range.count) : std::numeric_limits::max(); auto pranges = make_partition_ranges(*schema, keys); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable { + return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable { column_counter counter(*schema, cmd->slice, cell_limit, std::move(keys)); @@ -336,6 +348,7 @@ public: * now our behavior differs from Origin. */ void get_range_slices(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const ColumnParent& column_parent, const SlicePredicate& predicate, const KeyRange& range, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (!column_parent.super_column.empty()) { fail(unimplemented::cause::SUPER); @@ -353,9 +366,9 @@ public: cmd->set_row_limit(static_cast(range.count)); } auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level] () mutable { + return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.range_read_timeout; - return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, std::numeric_limits::max()); @@ -409,7 +422,8 @@ public: db::consistency_level consistency_level, const ::timeout_config& timeout_config, std::vector& output, - service::query_state& qs) { + service::query_state& qs, + service_permit permit) { auto& proxy = service::get_local_storage_proxy(); auto cmd = make_paged_read_cmd(proxy, *schema, column_limit, start_column, range); std::optional start_key; @@ -422,12 +436,12 @@ public: } auto range1 = range; // query() below accepts an rvalue, so need a copy to reuse later auto timeout = db::timeout_clock::now() + timeout_config.range_read_timeout; - return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, empty_service_permit(), qs.get_client_state()}).then( + return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, std::move(permit), qs.get_client_state()}).then( [schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) { return to_key_slices(*schema, cmd->slice, v, column_limit); }); - }).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs](auto&& slices) mutable { + }).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs, permit = std::move(permit)](auto&& slices) mutable { auto columns = std::accumulate(slices.begin(), slices.end(), 0u, [](auto&& acc, auto&& ks) { return acc + ks.columns.size(); }); @@ -436,7 +450,7 @@ public: if (!output.empty() || !start_key) { if (range.size() > 1 && columns < column_limit) { range.erase(range.begin()); - return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs); + return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit)); } return make_ready_future(); } @@ -446,11 +460,12 @@ public: } auto start = dht::decorate_key(*schema, std::move(*start_key)); range[0] = dht::partition_range(dht::partition_range::bound(std::move(start), false), std::move(end)); - return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs); + return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit)); }); } void get_paged_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& column_family, const KeyRange& range, const std::string& start_column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_family, [&](schema_ptr schema) { return do_with(std::vector(), [&](auto& output) { if (range.__isset.row_filter) { @@ -468,9 +483,9 @@ public: } } auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output] () mutable { + return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output, permit = std::move(permit)] () mutable { return do_get_paged_slice(std::move(schema), count, std::move(prange), &start_column, - cl_from_thrift(consistency_level), _timeout_config, output, _query_state).then([&output] { + cl_from_thrift(consistency_level), _timeout_config, output, _query_state, std::move(permit)).then([&output] { return std::move(output); }); }); @@ -479,6 +494,7 @@ public: } void get_indexed_slices(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const ColumnParent& column_parent, const IndexClause& index_clause, const SlicePredicate& column_predicate, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); std::vector _return; warn(unimplemented::cause::INDEXES); // FIXME: implement @@ -486,6 +502,7 @@ public: } void insert(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const Column& column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (column_parent.__isset.super_column) { fail(unimplemented::cause::SUPER); @@ -497,14 +514,15 @@ public: mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); add_to_mutation(*schema, column, m_to_apply); - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void add(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnParent& column_parent, const CounterColumn& column, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) { if (column_parent.__isset.super_column) { fail(unimplemented::cause::SUPER); @@ -512,14 +530,15 @@ public: mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); add_to_mutation(*schema, column, m_to_apply); - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void cas(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const std::string& column_family, const std::vector & expected, const std::vector & updates, const ConsistencyLevel::type serial_consistency_level, const ConsistencyLevel::type commit_consistency_level) { + service_permit permit = obtain_permit(); CASResult _return; warn(unimplemented::cause::LWT); // FIXME: implement @@ -527,6 +546,7 @@ public: } void remove(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const int64_t timestamp, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) { if (schema->is_view()) { throw make_exception("Cannot modify Materialized Views directly"); @@ -547,14 +567,15 @@ public: m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now())); } - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void remove_counter(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) { mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key))); @@ -572,39 +593,42 @@ public: m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now())); } - return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] { + return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable { // This mutation contains only counter tombstones so it can be applied like non-counter mutations. auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout; - return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void batch_mutate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map); return parallel_for_each(std::move(p.second), [this](auto&& schema) { return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY); - }).then([this, muts = std::move(p.first), consistency_level] { + }).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void atomic_batch_mutate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::map > > & mutation_map, const ConsistencyLevel::type consistency_level) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map); return parallel_for_each(std::move(p.second), [this](auto&& schema) { return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY); - }).then([this, muts = std::move(p.first), consistency_level] { + }).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout; - return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit()); + return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit)); }); }); } void truncate(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& cfname) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { if (current_keyspace().empty()) { throw make_exception("keyspace not set"); @@ -620,6 +644,7 @@ public: } void get_multi_slice(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const MultiSliceRequest& request) { + service_permit permit = obtain_permit(); with_schema(std::move(cob), std::move(exn_cob), request.column_parent.column_family, [&](schema_ptr schema) { if (!request.__isset.key) { throw make_exception("Key may not be empty"); @@ -673,9 +698,9 @@ public: auto cmd = make_lw_shared(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice), query::row_limit(row_limit)); auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT); - return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level] { + return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable { auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout; - return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, /* FIXME: pass real permit */empty_service_permit(), _query_state.get_client_state()}).then( + return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, std::move(permit), _query_state.get_client_state()}).then( [schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) { return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) { column_aggregator aggregator(*schema, cmd->slice, column_limit, { }); @@ -689,6 +714,7 @@ public: } void describe_schema_versions(thrift_fn::function > const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [] { return service::get_local_storage_service().describe_schema_versions().then([](auto&& m) { std::map> ret; @@ -701,6 +727,7 @@ public: } void describe_keyspaces(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { validate_login(); std::vector ret; @@ -712,14 +739,17 @@ public: } void describe_cluster_name(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(_db.local().get_config().cluster_name()); } void describe_version(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(::cassandra::thrift_version); } void do_describe_ring(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& keyspace, bool local) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto& ks = _db.local().find_keyspace(keyspace); if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) { @@ -759,6 +789,7 @@ public: } void describe_token_map(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [] { auto m = service::get_local_storage_service().get_token_to_endpoint_map(); std::map ret; @@ -770,14 +801,17 @@ public: } void describe_partitioner(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(_db.local().get_config().partitioner()); } void describe_snitch(thrift_fn::function cob) { + service_permit permit = obtain_permit(); cob(format("org.apache.cassandra.locator.{}", _db.local().get_snitch_name())); } void describe_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { validate_login(); auto& ks = _db.local().find_keyspace(keyspace); @@ -786,6 +820,7 @@ public: } void describe_splits(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) { + service_permit permit = obtain_permit(); return describe_splits_ex([cob = std::move(cob)](auto&& results) { std::vector res; res.reserve(results.size() + 1); @@ -798,12 +833,14 @@ public: } void trace_next_query(thrift_fn::function cob) { + service_permit permit = obtain_permit(); std::string _return; // FIXME: implement return cob("dummy trace"); } void describe_splits_ex(thrift_fn::function const& _return)> cob, thrift_fn::function exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&]{ dht::token_range_vector ranges; auto tstart = start_token.empty() ? dht::minimum_token() : dht::token::from_sstring(sstring(start_token)); @@ -827,6 +864,7 @@ public: } void system_add_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { if (!_db.local().has_keyspace(cf_def.keyspace)) { throw NotFoundException(); @@ -844,6 +882,7 @@ public: }); } void system_drop_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& column_family) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), column_family, auth::permission::DROP).then([this, column_family] { auto& cf = _db.local().find_column_family(current_keyspace(), column_family); @@ -861,6 +900,7 @@ public: } void system_add_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto ksm = keyspace_from_thrift(ks_def); return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] { @@ -872,6 +912,7 @@ public: } void system_drop_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { thrift_validation::validate_keyspace_not_system(keyspace); if (!_db.local().has_keyspace(keyspace)) { @@ -887,6 +928,7 @@ public: } void system_update_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { thrift_validation::validate_keyspace_not_system(ks_def.name); @@ -907,6 +949,7 @@ public: } void system_update_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { + service_permit permit = obtain_permit(); with_cob(std::move(cob), std::move(exn_cob), [&] { auto& cf = _db.local().find_column_family(cf_def.keyspace, cf_def.name); auto schema = cf.schema(); @@ -1964,6 +2007,10 @@ private: } return {std::move(muts), std::move(schemas)}; } +protected: + service_permit obtain_permit() { + return std::move(_current_permit); + } }; class handler_factory : public CassandraCobSvIfFactory { @@ -1971,15 +2018,17 @@ class handler_factory : public CassandraCobSvIfFactory { distributed& _query_processor; auth::service& _auth_service; timeout_config _timeout_config; + service_permit& _current_permit; public: explicit handler_factory(distributed& db, distributed& qp, auth::service& auth_service, - ::timeout_config timeout_config) - : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {} + ::timeout_config timeout_config, + service_permit& current_permit) + : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {} typedef CassandraCobSvIf Handler; virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) { - return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config); + return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit); } virtual void releaseHandler(CassandraCobSvIf* handler) { delete handler; @@ -1988,6 +2037,6 @@ public: std::unique_ptr create_handler_factory(distributed& db, distributed& qp, auth::service& auth_service, - ::timeout_config timeout_config) { - return std::make_unique(db, qp, auth_service, timeout_config); + ::timeout_config timeout_config, service_permit& current_permit) { + return std::make_unique(db, qp, auth_service, timeout_config, current_permit); } diff --git a/thrift/handler.hh b/thrift/handler.hh index 2765fa9417..c56f0e4923 100644 --- a/thrift/handler.hh +++ b/thrift/handler.hh @@ -31,6 +31,6 @@ struct timeout_config; -std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config); +std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed& db, distributed& qp, auth::service&, timeout_config, service_permit& current_permit); #endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */ diff --git a/thrift/server.cc b/thrift/server.cc index 320c4fba68..f3ad1ae61c 100644 --- a/thrift/server.cc +++ b/thrift/server.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include "log.hh" #include #include @@ -66,11 +67,13 @@ public: thrift_server::thrift_server(distributed& db, distributed& qp, auth::service& auth_service, + service::memory_limiter& ml, thrift_server_config config) : _stats(new thrift_stats(*this)) - , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release()) + , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release()) , _protocol_factory(new TBinaryProtocolFactoryT()) , _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory)) + , _memory_available(ml.get_semaphore()) , _config(config) { } @@ -158,8 +161,23 @@ thrift_server::connection::process_one_request() { write().forward_to(std::move(_processor_promise)); _processor_promise = promise<>(); }; + // Heuristics copied from transport/server.cc + size_t mem_estimate = 8000 + 2 * _input->available_read(); + auto units = co_await get_units(_server._memory_available, mem_estimate); + // NOTICE: this permit is put in the server under the assumption that no other + // connection will overwrite this permit *until* it's extracted by the code + // which handles the Thrift request (via calling obtain_permit()). + // This assumption is true because there are no preemption points between this + // insertion and the call to obtain_permit(), which was verified both by + // code inspection and confirmed empirically by running manual tests. + if (_server._current_permit.count() > 0) { + tlogger.debug("Current service permit is overwritten while its units are still held ({}). " + "This situation likely means that there's a bug in passing service permits to message handlers.", + _server._current_permit.count()); + } + _server._current_permit = make_service_permit(std::move(units)); _processor->process(complete, _in_proto, _out_proto); - co_return co_await ret; + co_return co_await std::move(ret); } future<> @@ -289,6 +307,16 @@ thrift_server::requests_served() const { return _requests_served; } +size_t +thrift_server::max_request_size() const { + return _config.max_request_size; +} + +const semaphore& +thrift_server::memory_available() const { + return _memory_available; +} + thrift_stats::thrift_stats(thrift_server& server) { namespace sm = seastar::metrics; diff --git a/thrift/server.hh b/thrift/server.hh index 80b4c539a9..25228c0185 100644 --- a/thrift/server.hh +++ b/thrift/server.hh @@ -26,6 +26,7 @@ #include #include "cql3/query_processor.hh" #include "timeout_config.hh" +#include "service/memory_limiter.hh" #include #include #include @@ -76,6 +77,7 @@ class service; struct thrift_server_config { ::timeout_config timeout_config; uint64_t max_request_size; + std::function get_service_memory_limiter_semaphore; }; class thrift_server { @@ -107,17 +109,19 @@ class thrift_server { private: std::vector _listeners; std::unique_ptr _stats; + service_permit _current_permit = empty_service_permit(); thrift_std::shared_ptr<::cassandra::CassandraCobSvIfFactory> _handler_factory; std::unique_ptr _protocol_factory; thrift_std::shared_ptr _processor_factory; uint64_t _total_connections = 0; uint64_t _current_connections = 0; uint64_t _requests_served = 0; + semaphore& _memory_available; thrift_server_config _config; boost::intrusive::list _connections_list; seastar::gate _stop_gate; public: - thrift_server(distributed& db, distributed& qp, auth::service&, thrift_server_config config); + thrift_server(distributed& db, distributed& qp, auth::service&, service::memory_limiter& ml, thrift_server_config config); ~thrift_server(); future<> listen(socket_address addr, bool keepalive); future<> stop(); @@ -125,6 +129,11 @@ public: uint64_t total_connections() const; uint64_t current_connections() const; uint64_t requests_served() const; + size_t max_request_size() const; + const semaphore& memory_available() const; + +private: + void maybe_retry_accept(int which, bool keepalive, std::exception_ptr ex); }; #endif /* APPS_SEASTAR_THRIFT_SERVER_HH_ */