transport: use result_try in process_request_one

Adapts the exception handling logic in process_request_one so that it
uses utils::result_try to handle both C++ exceptions and failed results
in a unified way.
This commit is contained in:
Piotr Dulikowski
2022-02-07 01:41:45 +01:00
parent 98bde8d6d2
commit 049564bd2d

View File

@@ -34,7 +34,7 @@
#include <seastar/net/byteorder.hh>
#include <seastar/util/lazy.hh>
#include <seastar/core/execution_stage.hh>
#include "utils/overloaded_functor.hh"
#include "utils/result_try.hh"
#include "enum_set.hh"
#include "service/query_state.hh"
@@ -425,23 +425,16 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
case cql_binary_opcode::REGISTER: return wrap_in_foreign(process_register(stream, std::move(in), client_state, trace_state));
default: throw exceptions::protocol_exception(format("Unknown opcode {:d}", int(cqlop)));
}
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<result_with_foreign_response_ptr> f) -> foreign_ptr<std::unique_ptr<cql_server::response>> {
}).then_wrapped([this, cqlop, stream, &client_state, linearization_buffer = std::move(linearization_buffer), trace_state] (future<result_with_foreign_response_ptr> f) {
auto stop_trace = defer([&] {
tracing::stop_foreground(trace_state);
});
--_server._stats.requests_serving;
auto exception_handler = overloaded_functor {
[&] (const exceptions::mutation_write_timeout_exception& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
}
};
try {
return utils::result_into_future<result_with_foreign_response_ptr>(utils::result_try([&] () -> result_with_foreign_response_ptr {
result_with_foreign_response_ptr res = f.get0();
if (!res) {
return res.assume_error().accept(exception_handler);
return res;
}
auto response = std::move(res).assume_value();
@@ -472,30 +465,31 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
tracing::set_response_size(trace_state, response->size());
return response;
} catch (const exceptions::unavailable_exception& ex) {
}, utils::result_catch<exceptions::unavailable_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_unavailable_error(stream, ex.code(), ex.what(), ex.consistency, ex.required, ex.alive, trace_state);
} catch (const exceptions::read_timeout_exception& ex) {
}), utils::result_catch<exceptions::read_timeout_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_read_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::read_failure_exception& ex) {
}), utils::result_catch<exceptions::read_failure_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_read_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.data_present, trace_state);
} catch (const exceptions::mutation_write_timeout_exception& ex) {
return exception_handler(ex);
} catch (const exceptions::mutation_write_failure_exception& ex) {
}), utils::result_catch<exceptions::mutation_write_timeout_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_mutation_write_timeout_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.block_for, ex.type, trace_state);
}), utils::result_catch<exceptions::mutation_write_failure_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_mutation_write_failure_error(stream, ex.code(), ex.what(), ex.consistency, ex.received, ex.failures, ex.block_for, ex.type, trace_state);
} catch (const exceptions::already_exists_exception& ex) {
}), utils::result_catch<exceptions::already_exists_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_already_exists_error(stream, ex.code(), ex.what(), ex.ks_name, ex.cf_name, trace_state);
} catch (const exceptions::prepared_query_not_found_exception& ex) {
}), utils::result_catch<exceptions::prepared_query_not_found_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_unprepared_error(stream, ex.code(), ex.what(), ex.id, trace_state);
} catch (const exceptions::function_execution_exception& ex) {
}), utils::result_catch<exceptions::function_execution_exception>([&] (const auto& ex) {
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_function_failure_error(stream, ex.code(), ex.what(), ex.ks_name, ex.func_name, ex.args, trace_state);
} catch (const exceptions::cassandra_exception& ex) {
}), utils::result_catch<exceptions::cassandra_exception>([&] (const auto& ex) {
// Note: the CQL protocol specifies that many types of errors have
// mandatory parameters. These cassandra_exception subclasses MUST
// be handled above. This default "cassandra_exception" case is
@@ -505,7 +499,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
// catch-all type cassandra_exception.
try { ++_server._stats.errors[ex.code()]; } catch(...) {}
return make_error(stream, ex.code(), ex.what(), trace_state);
} catch (std::exception& ex) {
}), utils::result_catch<std::exception>([&] (const auto& ex) {
try { ++_server._stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
sstring msg = ex.what();
try {
@@ -516,10 +510,10 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
msg = ss.str();
}
return make_error(stream, exceptions::exception_code::SERVER_ERROR, msg, trace_state);
} catch (...) {
}), utils::result_catch_dots([&] () {
try { ++_server._stats.errors[exceptions::exception_code::SERVER_ERROR]; } catch(...) {}
return make_error(stream, exceptions::exception_code::SERVER_ERROR, "unknown error", trace_state);
}
})));
});
}