diff --git a/cql3/statements/strong_consistency/select_statement.cc b/cql3/statements/strong_consistency/select_statement.cc index 612eb3978a..b0b352e17f 100644 --- a/cql3/statements/strong_consistency/select_statement.cc +++ b/cql3/statements/strong_consistency/select_statement.cc @@ -18,9 +18,15 @@ namespace cql3::statements::strong_consistency { using result_message = cql_transport::messages::result_message; -static void validate_consistency_level(const db::consistency_level& cl) { - if (cl != db::consistency_level::QUORUM && cl != db::consistency_level::LOCAL_QUORUM && - cl != db::consistency_level::ONE && cl != db::consistency_level::LOCAL_ONE) { +static service::strong_consistency::read_type parse_consistency_level(const db::consistency_level& cl) { + switch (cl) { + case db::consistency_level::QUORUM: + case db::consistency_level::LOCAL_QUORUM: + return service::strong_consistency::read_type::linearizable; + case db::consistency_level::ONE: + case db::consistency_level::LOCAL_ONE: + return service::strong_consistency::read_type::non_linearizable; + default: throw exceptions::invalid_request_exception("Strongly consistent reads must use QUORUM/LOCAL_QUORUM or ONE/LOCAL_ONE consistency level"); } } @@ -29,7 +35,7 @@ future<::shared_ptr> select_statement::do_execute(query_processo service::query_state& state, const query_options& options) const { - validate_consistency_level(options.get_consistency()); + auto read_type = parse_consistency_level(options.get_consistency()); const auto key_ranges = _restrictions->get_partition_key_ranges(options); if (key_ranges.size() != 1 || !query::is_single_partition(key_ranges[0])) { @@ -52,7 +58,7 @@ future<::shared_ptr> select_statement::do_execute(query_processo const auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options); auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator(); auto query_result = co_await coordinator.get().query(_query_schema, *read_command, - key_ranges, state.get_trace_state(), timeout, state.get_client_state().get_abort_source()); + key_ranges, read_type, state.get_trace_state(), timeout, state.get_client_state().get_abort_source()); using namespace service::strong_consistency; if (auto* redirect = get_if(&query_result)) { diff --git a/service/strong_consistency/coordinator.cc b/service/strong_consistency/coordinator.cc index 4e71f19ece..378dcbe935 100644 --- a/service/strong_consistency/coordinator.cc +++ b/service/strong_consistency/coordinator.cc @@ -390,6 +390,7 @@ future> coordinator::mutate(schema_ptr schema, auto coordinator::query(schema_ptr schema, const query::read_command& cmd, const dht::partition_range_vector& ranges, + read_type rtype, tracing::trace_state_ptr trace_state, timeout_clock::time_point timeout, abort_source& as diff --git a/service/strong_consistency/coordinator.hh b/service/strong_consistency/coordinator.hh index dd1500b345..3d4a614207 100644 --- a/service/strong_consistency/coordinator.hh +++ b/service/strong_consistency/coordinator.hh @@ -24,6 +24,20 @@ namespace service::strong_consistency { class groups_manager; +// Classifies a strongly consistent read based on the CQL consistency level. +// +// linearizable: the read is forwarded to the raft leader, where read_barrier() +// is performed locally. This we we read the most up-to-date aplied data. +// Currently mapped from CL=QUORUM (the default). +// +// non_linearizable: the read is performed on the local replica without +// a read_barrier. Because of this, the read may return slightly stale data, +// so reads from different nodes may not be linearizable. Currently mapped from CL=ONE. +enum class read_type { + linearizable, + non_linearizable, +}; + struct need_redirect { locator::tablet_replica target; noncopyable_function on_node_resolved; @@ -81,6 +95,7 @@ public: future query(schema_ptr schema, const query::read_command& cmd, const dht::partition_range_vector& ranges, + read_type rtype, tracing::trace_state_ptr trace_state, timeout_clock::time_point timeout, abort_source& as);