strong_consistency: wire up metrics to operations

Track write and read latency using latency_counter in
coordinator::mutate() and coordinator::query().

Count commit_status_unknown errors in coordinator::mutate().

Count node and shard bounces in redirect_statement(), passing the
coordinator's stats from both modification_statement and
select_statement.
This commit is contained in:
Michał Jadwiszczak
2026-04-02 16:43:27 +02:00
parent 55293c34f8
commit f77c258c8e
5 changed files with 30 additions and 4 deletions

View File

@@ -71,7 +71,7 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
bool is_write = true;
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write);
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats());
}
utils::get_local_injector().inject("sc_modification_statement_timeout", [&] {
throw exceptions::mutation_write_timeout_exception{"", "", options.get_consistency(), 0, 0, db::write_type::SIMPLE};

View File

@@ -47,7 +47,7 @@ future<::shared_ptr<result_message>> select_statement::do_execute(query_processo
using namespace service::strong_consistency;
if (const auto* redirect = get_if<need_redirect>(&query_result)) {
bool is_write = false;
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write);
co_return co_await redirect_statement(qp, options, redirect->target, timeout, is_write, coordinator.get().get_stats());
}
co_return co_await process_results(get<lw_shared_ptr<query::result>>(std::move(query_result)),

View File

@@ -12,19 +12,23 @@
#include "cql3/query_processor.hh"
#include "replica/database.hh"
#include "locator/tablet_replication_strategy.hh"
#include "service/strong_consistency/coordinator.hh"
namespace cql3::statements::strong_consistency {
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(query_processor& qp,
const query_options& options,
const locator::tablet_replica& target,
db::timeout_clock::time_point timeout,
bool is_write)
bool is_write,
service::strong_consistency::stats& stats)
{
auto&& func_values_cache = const_cast<cql3::query_options&>(options).take_cached_pk_function_calls();
const auto my_host_id = qp.db().real_database().get_token_metadata().get_topology().my_host_id();
if (target.host != my_host_id) {
++(is_write ? stats.write_node_bounces : stats.read_node_bounces);
co_return qp.bounce_to_node(target, std::move(func_values_cache), timeout, is_write);
}
++(is_write ? stats.write_shard_bounces : stats.read_shard_bounces);
co_return qp.bounce_to_shard(target.shard, std::move(func_values_cache));
}

View File

@@ -11,6 +11,8 @@
#include "cql3/cql_statement.hh"
#include "locator/tablets.hh"
namespace service::strong_consistency { struct stats; }
namespace cql3::statements::strong_consistency {
future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement(
@@ -18,7 +20,8 @@ future<::shared_ptr<cql_transport::messages::result_message>> redirect_statement
const query_options& options,
const locator::tablet_replica& target,
db::timeout_clock::time_point timeout,
bool is_write);
bool is_write,
service::strong_consistency::stats& stats);
bool is_strongly_consistent(data_dictionary::database db, std::string_view ks_name);

View File

@@ -244,6 +244,11 @@ future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
auto aoe = abort_on_expiry<timeout_clock>(timeout);
[[maybe_unused]] const auto subs = chain_abort_sources(aoe.abort_source(), as);
utils::latency_counter lc;
lc.start();
auto mark_write_latency = defer([this, &lc] { _stats.write.mark(lc.stop().latency()); });
bool commit_status_unknown_ex = false;
try {
auto op_result = co_await create_operation_ctx(*schema, token, aoe.abort_source());
if (const auto* redirect = get_if<need_redirect>(&op_result)) {
@@ -308,7 +313,11 @@ future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
logger.debug("mutate(): add_entry, got commit_status_unknown {}, table {}.{}, tablet {}, term {}",
ex, schema->ks_name(), schema->cf_name(), op.tablet_id, term);
++_stats.write_errors_status_unknown;
// FIXME: use a dedicated ERROR_CODE instead of SERVER_ERROR
// FIXME: when a dedicated ERROR_CODE will be used,
// we can get rid of the boolean flag
commit_status_unknown_ex = true;
throw exceptions::server_exception(
"The outcome of this statement is unknown. It may or may not have been applied. "
"Retrying the statement may be necessary.");
@@ -334,8 +343,12 @@ future<value_or_redirect<>> coordinator::mutate(schema_ptr schema,
|| try_catch<seastar::timed_out_error>(ex) || try_catch<seastar::condition_variable_timed_out>(ex)) {
logger.trace("mutate(): request timed out with error {}, table {}.{}, token {}",
ex, schema->ks_name(), schema->cf_name(), token);
++_stats.write_errors_timeout;
co_return coroutine::return_exception(write_timeout(schema->ks_name(), schema->cf_name()));
} else {
if (!commit_status_unknown_ex) {
++_stats.write_errors_other;
}
logger.trace("mutate(): unknown exception {}, table {}.{}, token {}",
ex, schema->ks_name(), schema->cf_name(), token);
// We know nothing about other errors. Let the CQL server convert them to SERVER_ERROR.
@@ -355,6 +368,10 @@ auto coordinator::query(schema_ptr schema,
auto aoe = abort_on_expiry<timeout_clock>(timeout);
[[maybe_unused]] const auto subs = chain_abort_sources(aoe.abort_source(), as);
utils::latency_counter lc;
lc.start();
auto mark_read_latency = defer([this, &lc] { _stats.read.mark(lc.stop().latency()); });
try {
auto op_result = co_await create_operation_ctx(*schema, ranges[0].start()->value().token(), aoe.abort_source());
if (const auto* redirect = get_if<need_redirect>(&op_result)) {
@@ -386,10 +403,12 @@ auto coordinator::query(schema_ptr schema,
|| try_catch<timed_out_error>(ex)) {
logger.trace("query(): request timed out with error {}, table {}.{}, read cmd {}",
ex, schema->ks_name(), schema->cf_name(), cmd);
++_stats.read_errors_timeout;
co_return coroutine::return_exception(read_timeout(schema->ks_name(), schema->cf_name()));
} else {
logger.trace("mutate(): unknown exception {}, table {}.{}, read cmd {}",
ex, schema->ks_name(), schema->cf_name(), cmd);
++_stats.read_errors_other;
// We know nothing about other errors. Let the CQL server convert them to SERVER_ERROR.
throw;
}