thrift: partially add admission control

This commit adds admission control in the form of passing
service permits to the Thrift server.
The support is partial, because Thrift also supports running CQL
queries, and for that purpose a query_state object is kept
in the Thrift handler. However, the handler is generally created
once per connection, not once per query, and the query_state object
is supposed to keep the state of a single query only.
In order to keep this series simpler, the CQL-on-top-of-Thrift
layer is not touched and is left as TODO.
Moreover, the Thrift layer does not make it easy to pass custom
per-query context (like service_permit), so the implementation
uses a trick: the service permit is created on the server
and then passed as reference to its connections and their respective
Thrift handlers. Then, each time a query is read from the socket,
this service permit is overwritten and then read back from the Thrift
handler. This mechanism heavily relies on the fact that there are
zero preemption points between overwriting the service permit
and reading it back by the handler. Otherwise, races may occur.
This assumption was verified by code inspection + empirical tests,
but if somebody is aware that it may not always hold, please speak up.
This commit is contained in:
Piotr Sarna
2021-03-18 09:48:23 +01:00
parent 3388694e69
commit ef1de114f0
7 changed files with 132 additions and 42 deletions

View File

@@ -1304,7 +1304,7 @@ int main(int ac, char** av) {
api::unset_transport_controller(ctx).get();
});
::thrift_controller thrift_ctl(db, auth_service, qp);
::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter);
ss.register_client_shutdown_hook("rpc server", [&thrift_ctl] {
thrift_ctl.stop().get();

View File

@@ -24,14 +24,16 @@
#include "database.hh"
#include "db/config.hh"
#include "log.hh"
#include "service/storage_service.hh"
static logging::logger clogger("thrift_controller");
thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp)
thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml)
: _ops_sem(1)
, _db(db)
, _auth_service(auth)
, _qp(qp) {
, _qp(qp)
, _mem_limiter(ml) {
}
future<> thrift_controller::start_server() {
@@ -62,7 +64,7 @@ future<> thrift_controller::do_start_server() {
tsc.timeout_config = make_timeout_config(cfg);
tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20);
return gms::inet_address::lookup(addr, family, preferred).then([this, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) {
return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), tsc).then([tserver, port, addr, ip, keepalive] {
return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();

View File

@@ -24,6 +24,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/distributed.hh>
#include <seastar/core/future.hh>
#include "service/memory_limiter.hh"
using namespace seastar;
@@ -40,12 +41,13 @@ class thrift_controller {
distributed<database>& _db;
sharded<auth::service>& _auth_service;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _mem_limiter;
future<> do_start_server();
future<> do_stop_server();
public:
thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&);
thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
future<> start_server();
future<> stop_server();
future<> stop();

View File

@@ -204,6 +204,7 @@ class thrift_handler : public CassandraCobSvIf {
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
service_permit& _current_permit;
private:
template <typename Cob, typename Func>
void
@@ -217,12 +218,16 @@ private:
});
}
public:
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config)
explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, _timeout_config, socket_address(), true)
// FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store
// service permits in here. The query state should be reinstantiated per query - AFAIK it's only used
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
{ }
const sstring& current_keyspace() const {
@@ -234,6 +239,7 @@ public:
};
void login(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const AuthenticationRequest& auth_request) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auth::authenticator::credentials_map creds(auth_request.credentials.begin(), auth_request.credentials.end());
auto& auth_service = *_query_state.get_client_state().get_auth_service();
@@ -244,12 +250,14 @@ public:
}
void set_keyspace(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
_query_state.get_client_state().set_keyspace(_db.local(), keyspace);
});
}
void get(thrift_fn::function<void(ColumnOrSuperColumn const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
return get_slice([cob = std::move(cob), &column_path](auto&& results) {
if (results.empty()) {
throw NotFoundException();
@@ -259,6 +267,7 @@ public:
}
void get_slice(thrift_fn::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
return multiget_slice([cob = std::move(cob)](auto&& results) {
if (!results.empty()) {
return cob(std::move(results.begin()->second));
@@ -268,6 +277,7 @@ public:
}
void get_count(thrift_fn::function<void(int32_t const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
return multiget_count([cob = std::move(cob)](auto&& results) {
if (!results.empty()) {
return cob(results.begin()->second);
@@ -277,6 +287,7 @@ public:
}
void multiget_slice(thrift_fn::function<void(std::map<std::string, std::vector<ColumnOrSuperColumn> > const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::vector<std::string> & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) {
if (!column_parent.super_column.empty()) {
fail(unimplemented::cause::SUPER);
@@ -286,9 +297,9 @@ public:
auto cell_limit = predicate.__isset.slice_range ? static_cast<uint32_t>(predicate.slice_range.count) : std::numeric_limits<uint32_t>::max();
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable {
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then(
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
[schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
if (schema->is_counter()) {
@@ -306,6 +317,7 @@ public:
}
void multiget_count(thrift_fn::function<void(std::map<std::string, int32_t> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::vector<std::string> & keys, const ColumnParent& column_parent, const SlicePredicate& predicate, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) {
if (!column_parent.super_column.empty()) {
fail(unimplemented::cause::SUPER);
@@ -315,9 +327,9 @@ public:
auto cell_limit = predicate.__isset.slice_range ? static_cast<uint32_t>(predicate.slice_range.count) : std::numeric_limits<uint32_t>::max();
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable {
return f.then([this, &proxy, schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys, permit = std::move(permit)]() mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then(
return proxy.query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
[schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
column_counter counter(*schema, cmd->slice, cell_limit, std::move(keys));
@@ -336,6 +348,7 @@ public:
* now our behavior differs from Origin.
*/
void get_range_slices(thrift_fn::function<void(std::vector<KeySlice> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const ColumnParent& column_parent, const SlicePredicate& predicate, const KeyRange& range, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) {
if (!column_parent.super_column.empty()) {
fail(unimplemented::cause::SUPER);
@@ -353,9 +366,9 @@ public:
cmd->set_row_limit(static_cast<uint64_t>(range.count));
}
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level] () mutable {
return f.then([this, &proxy, schema, cmd, prange = std::move(prange), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.range_read_timeout;
return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, empty_service_permit(), _query_state.get_client_state()}).then(
return proxy.query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level), {timeout, std::move(permit), _query_state.get_client_state()}).then(
[schema, cmd](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd](query::result_view v) {
return to_key_slices(*schema, cmd->slice, v, std::numeric_limits<uint32_t>::max());
@@ -409,7 +422,8 @@ public:
db::consistency_level consistency_level,
const ::timeout_config& timeout_config,
std::vector<KeySlice>& output,
service::query_state& qs) {
service::query_state& qs,
service_permit permit) {
auto& proxy = service::get_local_storage_proxy();
auto cmd = make_paged_read_cmd(proxy, *schema, column_limit, start_column, range);
std::optional<partition_key> start_key;
@@ -422,12 +436,12 @@ public:
}
auto range1 = range; // query() below accepts an rvalue, so need a copy to reuse later
auto timeout = db::timeout_clock::now() + timeout_config.range_read_timeout;
return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, empty_service_permit(), qs.get_client_state()}).then(
return proxy.query(schema, cmd, std::move(range), consistency_level, {timeout, std::move(permit), qs.get_client_state()}).then(
[schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) {
return to_key_slices(*schema, cmd->slice, v, column_limit);
});
}).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs](auto&& slices) mutable {
}).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &timeout_config, &output, &qs, permit = std::move(permit)](auto&& slices) mutable {
auto columns = std::accumulate(slices.begin(), slices.end(), 0u, [](auto&& acc, auto&& ks) {
return acc + ks.columns.size();
});
@@ -436,7 +450,7 @@ public:
if (!output.empty() || !start_key) {
if (range.size() > 1 && columns < column_limit) {
range.erase(range.begin());
return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs);
return do_get_paged_slice(std::move(schema), column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit));
}
return make_ready_future();
}
@@ -446,11 +460,12 @@ public:
}
auto start = dht::decorate_key(*schema, std::move(*start_key));
range[0] = dht::partition_range(dht::partition_range::bound(std::move(start), false), std::move(end));
return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs);
return do_get_paged_slice(schema, column_limit - columns, std::move(range), nullptr, consistency_level, timeout_config, output, qs, std::move(permit));
});
}
void get_paged_slice(thrift_fn::function<void(std::vector<KeySlice> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& column_family, const KeyRange& range, const std::string& start_column, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_family, [&](schema_ptr schema) {
return do_with(std::vector<KeySlice>(), [&](auto& output) {
if (range.__isset.row_filter) {
@@ -468,9 +483,9 @@ public:
}
}
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output] () mutable {
return f.then([this, schema, count = range.count, start_column, prange = std::move(prange), consistency_level, &output, permit = std::move(permit)] () mutable {
return do_get_paged_slice(std::move(schema), count, std::move(prange), &start_column,
cl_from_thrift(consistency_level), _timeout_config, output, _query_state).then([&output] {
cl_from_thrift(consistency_level), _timeout_config, output, _query_state, std::move(permit)).then([&output] {
return std::move(output);
});
});
@@ -479,6 +494,7 @@ public:
}
void get_indexed_slices(thrift_fn::function<void(std::vector<KeySlice> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const ColumnParent& column_parent, const IndexClause& index_clause, const SlicePredicate& column_predicate, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
std::vector<KeySlice> _return;
warn(unimplemented::cause::INDEXES);
// FIXME: implement
@@ -486,6 +502,7 @@ public:
}
void insert(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const Column& column, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) {
if (column_parent.__isset.super_column) {
fail(unimplemented::cause::SUPER);
@@ -497,14 +514,15 @@ public:
mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key)));
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void add(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnParent& column_parent, const CounterColumn& column, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_parent.column_family, [&](schema_ptr schema) {
if (column_parent.__isset.super_column) {
fail(unimplemented::cause::SUPER);
@@ -512,14 +530,15 @@ public:
mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key)));
add_to_mutation(*schema, column, m_to_apply);
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void cas(thrift_fn::function<void(CASResult const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const std::string& column_family, const std::vector<Column> & expected, const std::vector<Column> & updates, const ConsistencyLevel::type serial_consistency_level, const ConsistencyLevel::type commit_consistency_level) {
service_permit permit = obtain_permit();
CASResult _return;
warn(unimplemented::cause::LWT);
// FIXME: implement
@@ -527,6 +546,7 @@ public:
}
void remove(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnPath& column_path, const int64_t timestamp, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) {
if (schema->is_view()) {
throw make_exception<InvalidRequestException>("Cannot modify Materialized Views directly");
@@ -547,14 +567,15 @@ public:
m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now()));
}
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void remove_counter(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& key, const ColumnPath& column_path, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), column_path.column_family, [&](schema_ptr schema) {
mutation m_to_apply(schema, key_from_thrift(*schema, to_bytes_view(key)));
@@ -572,39 +593,42 @@ public:
m_to_apply.partition().apply(tombstone(timestamp, gc_clock::now()));
}
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level] {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY).then([this, m_to_apply = std::move(m_to_apply), consistency_level, permit = std::move(permit)] () mutable {
// This mutation contains only counter tombstones so it can be applied like non-counter mutations.
auto timeout = db::timeout_clock::now() + _timeout_config.counter_write_timeout;
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate({std::move(m_to_apply)}, cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void batch_mutate(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map);
return parallel_for_each(std::move(p.second), [this](auto&& schema) {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level] {
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void atomic_batch_mutate(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::map<std::string, std::map<std::string, std::vector<Mutation> > > & mutation_map, const ConsistencyLevel::type consistency_level) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto p = prepare_mutations(_db.local(), current_keyspace(), mutation_map);
return parallel_for_each(std::move(p.second), [this](auto&& schema) {
return _query_state.get_client_state().has_schema_access(*schema, auth::permission::MODIFY);
}).then([this, muts = std::move(p.first), consistency_level] {
}).then([this, muts = std::move(p.first), consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.write_timeout;
return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, /*FIXME: pass real permit*/empty_service_permit());
return service::get_local_storage_proxy().mutate_atomically(std::move(muts), cl_from_thrift(consistency_level), timeout, nullptr, std::move(permit));
});
});
}
void truncate(thrift_fn::function<void()> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfname) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
if (current_keyspace().empty()) {
throw make_exception<InvalidRequestException>("keyspace not set");
@@ -620,6 +644,7 @@ public:
}
void get_multi_slice(thrift_fn::function<void(std::vector<ColumnOrSuperColumn> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const MultiSliceRequest& request) {
service_permit permit = obtain_permit();
with_schema(std::move(cob), std::move(exn_cob), request.column_parent.column_family, [&](schema_ptr schema) {
if (!request.__isset.key) {
throw make_exception<InvalidRequestException>("Key may not be empty");
@@ -673,9 +698,9 @@ public:
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), proxy.get_max_result_size(slice),
query::row_limit(row_limit));
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level] {
return f.then([this, &proxy, dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level, permit = std::move(permit)] () mutable {
auto timeout = db::timeout_clock::now() + _timeout_config.read_timeout;
return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, /* FIXME: pass real permit */empty_service_permit(), _query_state.get_client_state()}).then(
return proxy.query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl), {timeout, std::move(permit), _query_state.get_client_state()}).then(
[schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) {
column_aggregator<query_order::no> aggregator(*schema, cmd->slice, column_limit, { });
@@ -689,6 +714,7 @@ public:
}
void describe_schema_versions(thrift_fn::function<void(std::map<std::string, std::vector<std::string> > const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [] {
return service::get_local_storage_service().describe_schema_versions().then([](auto&& m) {
std::map<std::string, std::vector<std::string>> ret;
@@ -701,6 +727,7 @@ public:
}
void describe_keyspaces(thrift_fn::function<void(std::vector<KsDef> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
validate_login();
std::vector<KsDef> ret;
@@ -712,14 +739,17 @@ public:
}
void describe_cluster_name(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(_db.local().get_config().cluster_name());
}
void describe_version(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(::cassandra::thrift_version);
}
void do_describe_ring(thrift_fn::function<void(std::vector<TokenRange> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace, bool local) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto& ks = _db.local().find_keyspace(keyspace);
if (ks.get_replication_strategy().get_type() == locator::replication_strategy_type::local) {
@@ -759,6 +789,7 @@ public:
}
void describe_token_map(thrift_fn::function<void(std::map<std::string, std::string> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [] {
auto m = service::get_local_storage_service().get_token_to_endpoint_map();
std::map<std::string, std::string> ret;
@@ -770,14 +801,17 @@ public:
}
void describe_partitioner(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(_db.local().get_config().partitioner());
}
void describe_snitch(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
cob(format("org.apache.cassandra.locator.{}", _db.local().get_snitch_name()));
}
void describe_keyspace(thrift_fn::function<void(KsDef const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
validate_login();
auto& ks = _db.local().find_keyspace(keyspace);
@@ -786,6 +820,7 @@ public:
}
void describe_splits(thrift_fn::function<void(std::vector<std::string> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
service_permit permit = obtain_permit();
return describe_splits_ex([cob = std::move(cob)](auto&& results) {
std::vector<std::string> res;
res.reserve(results.size() + 1);
@@ -798,12 +833,14 @@ public:
}
void trace_next_query(thrift_fn::function<void(std::string const& _return)> cob) {
service_permit permit = obtain_permit();
std::string _return;
// FIXME: implement
return cob("dummy trace");
}
void describe_splits_ex(thrift_fn::function<void(std::vector<CfSplit> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& cfName, const std::string& start_token, const std::string& end_token, const int32_t keys_per_split) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&]{
dht::token_range_vector ranges;
auto tstart = start_token.empty() ? dht::minimum_token() : dht::token::from_sstring(sstring(start_token));
@@ -827,6 +864,7 @@ public:
}
void system_add_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
if (!_db.local().has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
@@ -844,6 +882,7 @@ public:
});
}
void system_drop_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& column_family) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), column_family, auth::permission::DROP).then([this, column_family] {
auto& cf = _db.local().find_column_family(current_keyspace(), column_family);
@@ -861,6 +900,7 @@ public:
}
void system_add_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto ksm = keyspace_from_thrift(ks_def);
return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] {
@@ -872,6 +912,7 @@ public:
}
void system_drop_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
thrift_validation::validate_keyspace_not_system(keyspace);
if (!_db.local().has_keyspace(keyspace)) {
@@ -887,6 +928,7 @@ public:
}
void system_update_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
thrift_validation::validate_keyspace_not_system(ks_def.name);
@@ -907,6 +949,7 @@ public:
}
void system_update_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto& cf = _db.local().find_column_family(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
@@ -1964,6 +2007,10 @@ private:
}
return {std::move(muts), std::move(schemas)};
}
protected:
service_permit obtain_permit() {
return std::move(_current_permit);
}
};
class handler_factory : public CassandraCobSvIfFactory {
@@ -1971,15 +2018,17 @@ class handler_factory : public CassandraCobSvIfFactory {
distributed<cql3::query_processor>& _query_processor;
auth::service& _auth_service;
timeout_config _timeout_config;
service_permit& _current_permit;
public:
explicit handler_factory(distributed<database>& db,
distributed<cql3::query_processor>& qp,
auth::service& auth_service,
::timeout_config timeout_config)
: _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config) {}
::timeout_config timeout_config,
service_permit& current_permit)
: _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config);
return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
@@ -1988,6 +2037,6 @@ public:
std::unique_ptr<CassandraCobSvIfFactory>
create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service,
::timeout_config timeout_config) {
return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config);
::timeout_config timeout_config, service_permit& current_permit) {
return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config, current_permit);
}

View File

@@ -31,6 +31,6 @@
struct timeout_config;
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config);
std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config, service_permit& current_permit);
#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */

View File

@@ -28,6 +28,7 @@
#include <seastar/core/scattered_message.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/core/semaphore.hh>
#include "log.hh"
#include <thrift/server/TServer.h>
#include <thrift/transport/TBufferTransports.h>
@@ -66,11 +67,13 @@ public:
thrift_server::thrift_server(distributed<database>& db,
distributed<cql3::query_processor>& qp,
auth::service& auth_service,
service::memory_limiter& ml,
thrift_server_config config)
: _stats(new thrift_stats(*this))
, _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config).release())
, _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
, _memory_available(ml.get_semaphore())
, _config(config) {
}
@@ -158,8 +161,23 @@ thrift_server::connection::process_one_request() {
write().forward_to(std::move(_processor_promise));
_processor_promise = promise<>();
};
// Heuristics copied from transport/server.cc
size_t mem_estimate = 8000 + 2 * _input->available_read();
auto units = co_await get_units(_server._memory_available, mem_estimate);
// NOTICE: this permit is put in the server under the assumption that no other
// connection will overwrite this permit *until* it's extracted by the code
// which handles the Thrift request (via calling obtain_permit()).
// This assumption is true because there are no preemption points between this
// insertion and the call to obtain_permit(), which was verified both by
// code inspection and confirmed empirically by running manual tests.
if (_server._current_permit.count() > 0) {
tlogger.debug("Current service permit is overwritten while its units are still held ({}). "
"This situation likely means that there's a bug in passing service permits to message handlers.",
_server._current_permit.count());
}
_server._current_permit = make_service_permit(std::move(units));
_processor->process(complete, _in_proto, _out_proto);
co_return co_await ret;
co_return co_await std::move(ret);
}
future<>
@@ -289,6 +307,16 @@ thrift_server::requests_served() const {
return _requests_served;
}
size_t
thrift_server::max_request_size() const {
return _config.max_request_size;
}
const semaphore&
thrift_server::memory_available() const {
return _memory_available;
}
thrift_stats::thrift_stats(thrift_server& server) {
namespace sm = seastar::metrics;

View File

@@ -26,6 +26,7 @@
#include <seastar/core/distributed.hh>
#include "cql3/query_processor.hh"
#include "timeout_config.hh"
#include "service/memory_limiter.hh"
#include <seastar/core/gate.hh>
#include <memory>
#include <cstdint>
@@ -76,6 +77,7 @@ class service;
struct thrift_server_config {
::timeout_config timeout_config;
uint64_t max_request_size;
std::function<semaphore& ()> get_service_memory_limiter_semaphore;
};
class thrift_server {
@@ -107,17 +109,19 @@ class thrift_server {
private:
std::vector<server_socket> _listeners;
std::unique_ptr<thrift_stats> _stats;
service_permit _current_permit = empty_service_permit();
thrift_std::shared_ptr<::cassandra::CassandraCobSvIfFactory> _handler_factory;
std::unique_ptr<apache::thrift::protocol::TProtocolFactory> _protocol_factory;
thrift_std::shared_ptr<apache::thrift::async::TAsyncProcessorFactory> _processor_factory;
uint64_t _total_connections = 0;
uint64_t _current_connections = 0;
uint64_t _requests_served = 0;
semaphore& _memory_available;
thrift_server_config _config;
boost::intrusive::list<connection> _connections_list;
seastar::gate _stop_gate;
public:
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, thrift_server_config config);
thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, service::memory_limiter& ml, thrift_server_config config);
~thrift_server();
future<> listen(socket_address addr, bool keepalive);
future<> stop();
@@ -125,6 +129,11 @@ public:
uint64_t total_connections() const;
uint64_t current_connections() const;
uint64_t requests_served() const;
size_t max_request_size() const;
const semaphore& memory_available() const;
private:
void maybe_retry_accept(int which, bool keepalive, std::exception_ptr ex);
};
#endif /* APPS_SEASTAR_THRIFT_SERVER_HH_ */