sp::cas_shard: rename to get_cas_shard
We intend to introduce a separate cas_shard class in the next commits. We rename the existing function here to avoid conflicts.
This commit is contained in:
@@ -2386,7 +2386,7 @@ rmw_operation::write_isolation rmw_operation::get_write_isolation_for_schema(sch
|
||||
// shard_for_execute() checks whether execute() must be called on a specific
|
||||
// other shard. Running execute() on a specific shard is necessary only if it
|
||||
// will use LWT (storage_proxy::cas()). This is because cas() can only be
|
||||
// called on the specific shard owning (as per cas_shard()) _pk's token.
|
||||
// called on the specific shard owning (as per get_cas_shard()) _pk's token.
|
||||
// Knowing if execute() will call cas() or not may depend on whether there is
|
||||
// a read-before-write, but not just on it - depending on configuration,
|
||||
// execute() may unconditionally use cas() for every write. Unfortunately,
|
||||
@@ -2400,7 +2400,7 @@ std::optional<shard_id> rmw_operation::shard_for_execute(bool needs_read_before_
|
||||
// If we're still here, cas() *will* be called by execute(), so let's
|
||||
// find the appropriate shard to run it on:
|
||||
auto token = dht::get_token(*_schema, _pk);
|
||||
auto desired_shard = service::storage_proxy::cas_shard(*_schema, token);
|
||||
auto desired_shard = service::storage_proxy::get_cas_shard(*_schema, token);
|
||||
if (desired_shard == this_shard_id()) {
|
||||
return {};
|
||||
}
|
||||
@@ -2872,7 +2872,7 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
}
|
||||
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats, trace_state, ssg, permit = std::move(permit)] (auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto desired_shard = service::storage_proxy::cas_shard(*e.first.schema, e.first.dk.token());
|
||||
auto desired_shard = service::storage_proxy::get_cas_shard(*e.first.schema, e.first.dk.token());
|
||||
if (desired_shard == this_shard_id()) {
|
||||
return cas_write(proxy, e.first.schema, e.first.dk, std::move(e.second), client_state, trace_state, permit);
|
||||
} else {
|
||||
|
||||
@@ -367,7 +367,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
|
||||
throw exceptions::invalid_request_exception(format("Unrestricted partition key in a conditional BATCH"));
|
||||
}
|
||||
|
||||
auto shard = service::storage_proxy::cas_shard(*_statements[0].statement->s, request->key()[0].start()->value().as_decorated_key().token());
|
||||
auto shard = service::storage_proxy::get_cas_shard(*_statements[0].statement->s, request->key()[0].start()->value().as_decorated_key().token());
|
||||
if (shard != this_shard_id()) {
|
||||
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(
|
||||
qp.bounce_to_shard(shard, std::move(cached_fn_calls))
|
||||
|
||||
@@ -403,7 +403,7 @@ modification_statement::execute_with_condition(query_processor& qp, service::que
|
||||
|
||||
auto token = request->key()[0].start()->value().as_decorated_key().token();
|
||||
|
||||
auto shard = service::storage_proxy::cas_shard(*s, token);
|
||||
auto shard = service::storage_proxy::get_cas_shard(*s, token);
|
||||
|
||||
if (utils::get_local_injector().is_enabled("forced_bounce_to_shard_counter")) {
|
||||
return process_forced_rebounce(shard, qp, options);
|
||||
|
||||
@@ -1144,7 +1144,7 @@ const dht::token& end_token(const dht::partition_range& r) {
|
||||
return r.end() ? r.end()->value().token() : max_token;
|
||||
}
|
||||
|
||||
unsigned storage_proxy::cas_shard(const schema& s, dht::token token) {
|
||||
unsigned storage_proxy::get_cas_shard(const schema& s, dht::token token) {
|
||||
return s.table().shard_for_reads(token);
|
||||
}
|
||||
|
||||
@@ -6365,7 +6365,7 @@ storage_proxy::do_query_with_paxos(schema_ptr s,
|
||||
exceptions::invalid_request_exception("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"));
|
||||
}
|
||||
|
||||
if (cas_shard(*s, partition_ranges[0].start()->value().as_decorated_key().token()) != this_shard_id()) {
|
||||
if (get_cas_shard(*s, partition_ranges[0].start()->value().as_decorated_key().token()) != this_shard_id()) {
|
||||
return make_exception_future<storage_proxy::coordinator_query_result>(std::logic_error("storage_proxy::do_query_with_paxos called on a wrong shard"));
|
||||
}
|
||||
// All cas networking operations run with query provided timeout
|
||||
@@ -6468,7 +6468,7 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
|
||||
db::validate_for_cas(cl_for_paxos);
|
||||
db::validate_for_cas_learn(cl_for_learn, schema->ks_name());
|
||||
|
||||
if (cas_shard(*schema, partition_ranges[0].start()->value().as_decorated_key().token()) != this_shard_id()) {
|
||||
if (get_cas_shard(*schema, partition_ranges[0].start()->value().as_decorated_key().token()) != this_shard_id()) {
|
||||
co_await coroutine::return_exception(std::logic_error("storage_proxy::cas called on a wrong shard"));
|
||||
}
|
||||
|
||||
|
||||
@@ -735,7 +735,7 @@ public:
|
||||
return _stats_key;
|
||||
}
|
||||
|
||||
static unsigned cas_shard(const schema& s, dht::token token);
|
||||
static unsigned get_cas_shard(const schema& s, dht::token token);
|
||||
|
||||
future<> await_pending_writes() noexcept {
|
||||
return _pending_writes_phaser.advance_and_await();
|
||||
|
||||
Reference in New Issue
Block a user