mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
Compare commits
3 Commits
copilot/fi
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d01358cecd | ||
|
|
35830b34df | ||
|
|
5bc015549f |
@@ -716,7 +716,7 @@ batchStatement returns [std::unique_ptr<cql3::statements::raw::batch_statement>
|
|||||||
auto attrs = std::make_unique<cql3::attributes::raw>();
|
auto attrs = std::make_unique<cql3::attributes::raw>();
|
||||||
}
|
}
|
||||||
: K_BEGIN
|
: K_BEGIN
|
||||||
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } )?
|
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } | K_GROUP0 { type = btype::GROUP0; } )?
|
||||||
K_BATCH ( usingClause[attrs] )?
|
K_BATCH ( usingClause[attrs] )?
|
||||||
( s=batchStatementObjective ';'?
|
( s=batchStatementObjective ';'?
|
||||||
{
|
{
|
||||||
@@ -2374,6 +2374,7 @@ K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D;
|
|||||||
|
|
||||||
|
|
||||||
K_GROUP: G R O U P;
|
K_GROUP: G R O U P;
|
||||||
|
K_GROUP0: G R O U P '0';
|
||||||
|
|
||||||
K_LIKE: L I K E;
|
K_LIKE: L I K E;
|
||||||
|
|
||||||
|
|||||||
@@ -31,9 +31,13 @@ logging::logger batch_statement::_logger("BatchStatement");
|
|||||||
|
|
||||||
timeout_config_selector
|
timeout_config_selector
|
||||||
timeout_for_type(batch_statement::type t) {
|
timeout_for_type(batch_statement::type t) {
|
||||||
return t == batch_statement::type::COUNTER
|
if (t == batch_statement::type::COUNTER) {
|
||||||
? &timeout_config::counter_write_timeout
|
return &timeout_config::counter_write_timeout;
|
||||||
: &timeout_config::write_timeout;
|
} else if (t == batch_statement::type::GROUP0) {
|
||||||
|
return &timeout_config::other_timeout;
|
||||||
|
} else {
|
||||||
|
return &timeout_config::write_timeout;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
db::timeout_clock::duration batch_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
db::timeout_clock::duration batch_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
||||||
@@ -90,6 +94,11 @@ future<> batch_statement::check_access(query_processor& qp, const service::clien
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool batch_statement::needs_guard(query_processor& qp, service::query_state& state) const
|
||||||
|
{
|
||||||
|
return _type == type::GROUP0;
|
||||||
|
}
|
||||||
|
|
||||||
void batch_statement::validate()
|
void batch_statement::validate()
|
||||||
{
|
{
|
||||||
if (_attrs->is_time_to_live_set()) {
|
if (_attrs->is_time_to_live_set()) {
|
||||||
@@ -104,6 +113,22 @@ void batch_statement::validate()
|
|||||||
if (_type == type::COUNTER) {
|
if (_type == type::COUNTER) {
|
||||||
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for counter BATCH");
|
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for counter BATCH");
|
||||||
}
|
}
|
||||||
|
if (_type == type::GROUP0) {
|
||||||
|
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for GROUP0 BATCH");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_type == type::GROUP0) {
|
||||||
|
if (_has_conditions) {
|
||||||
|
throw exceptions::invalid_request_exception("Cannot use conditions in GROUP0 BATCH");
|
||||||
|
}
|
||||||
|
// Validate that all statements target system keyspace tables managed by group0
|
||||||
|
for (auto& s : _statements) {
|
||||||
|
if (s.statement->keyspace() != "system") {
|
||||||
|
throw exceptions::invalid_request_exception("GROUP0 BATCH can only modify system keyspace tables");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool has_counters = std::ranges::any_of(_statements, [] (auto&& s) { return s.statement->is_counter(); });
|
bool has_counters = std::ranges::any_of(_statements, [] (auto&& s) { return s.statement->is_counter(); });
|
||||||
@@ -235,6 +260,9 @@ static thread_local inheriting_concrete_execution_stage<
|
|||||||
|
|
||||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
|
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
|
||||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||||
|
if (_type == type::GROUP0) {
|
||||||
|
return execute_group0_batch(qp, state, options, std::move(guard));
|
||||||
|
}
|
||||||
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
|
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
|
||||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||||
}
|
}
|
||||||
@@ -285,6 +313,39 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_group0_batch(
|
||||||
|
query_processor& qp,
|
||||||
|
service::query_state& query_state, const query_options& options,
|
||||||
|
std::optional<service::group0_guard> guard) const
|
||||||
|
{
|
||||||
|
if (!guard) {
|
||||||
|
throw exceptions::invalid_request_exception("GROUP0 BATCH requires a guard");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
|
||||||
|
|
||||||
|
// Create group0_batch and get the timestamp from it
|
||||||
|
service::group0_batch mc{std::move(guard)};
|
||||||
|
auto now = mc.write_timestamp();
|
||||||
|
|
||||||
|
// Get mutations from all statements
|
||||||
|
auto mutations = co_await get_mutations(qp, options, timeout, false, now, query_state);
|
||||||
|
|
||||||
|
// Add mutations to the group0_batch
|
||||||
|
mc.add_mutations(std::move(mutations), format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement));
|
||||||
|
|
||||||
|
// Announce the batch via group0
|
||||||
|
auto description = format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement);
|
||||||
|
auto [remote_, holder] = qp.remote();
|
||||||
|
auto [m, g] = co_await std::move(mc).extract();
|
||||||
|
|
||||||
|
if (!m.empty()) {
|
||||||
|
co_await remote_.get().mm.announce(std::move(m), std::move(g), description);
|
||||||
|
}
|
||||||
|
|
||||||
|
co_return make_shared<cql_transport::messages::result_message::void_message>();
|
||||||
|
}
|
||||||
|
|
||||||
future<coordinator_result<>> batch_statement::execute_without_conditions(
|
future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||||
query_processor& qp,
|
query_processor& qp,
|
||||||
utils::chunked_vector<mutation> mutations,
|
utils::chunked_vector<mutation> mutations,
|
||||||
|
|||||||
@@ -95,6 +95,8 @@ public:
|
|||||||
|
|
||||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||||
|
|
||||||
|
virtual bool needs_guard(query_processor& qp, service::query_state& state) const override;
|
||||||
|
|
||||||
// Validates a prepared batch statement without validating its nested statements.
|
// Validates a prepared batch statement without validating its nested statements.
|
||||||
void validate();
|
void validate();
|
||||||
|
|
||||||
@@ -130,6 +132,11 @@ private:
|
|||||||
service::query_state& query_state, const query_options& options,
|
service::query_state& query_state, const query_options& options,
|
||||||
bool local, api::timestamp_type now) const;
|
bool local, api::timestamp_type now) const;
|
||||||
|
|
||||||
|
future<shared_ptr<cql_transport::messages::result_message>> execute_group0_batch(
|
||||||
|
query_processor& qp,
|
||||||
|
service::query_state& query_state, const query_options& options,
|
||||||
|
std::optional<service::group0_guard> guard) const;
|
||||||
|
|
||||||
future<exceptions::coordinator_result<>> execute_without_conditions(
|
future<exceptions::coordinator_result<>> execute_without_conditions(
|
||||||
query_processor& qp,
|
query_processor& qp,
|
||||||
utils::chunked_vector<mutation> mutations,
|
utils::chunked_vector<mutation> mutations,
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ class modification_statement;
|
|||||||
class batch_statement : public raw::cf_statement {
|
class batch_statement : public raw::cf_statement {
|
||||||
public:
|
public:
|
||||||
enum class type {
|
enum class type {
|
||||||
LOGGED, UNLOGGED, COUNTER
|
LOGGED, UNLOGGED, COUNTER, GROUP0
|
||||||
};
|
};
|
||||||
private:
|
private:
|
||||||
type _type;
|
type _type;
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ Multiple ``INSERT``, ``UPDATE`` and ``DELETE`` can be executed in a single state
|
|||||||
|
|
||||||
.. code-block::
|
.. code-block::
|
||||||
|
|
||||||
batch_statement: BEGIN [ UNLOGGED | COUNTER ] BATCH
|
batch_statement: BEGIN [ UNLOGGED | COUNTER | GROUP0 ] BATCH
|
||||||
: [ USING `update_parameter` ( AND `update_parameter` )* ]
|
: [ USING `update_parameter` ( AND `update_parameter` )* ]
|
||||||
: `modification_statement` ( ';' `modification_statement` )*
|
: `modification_statement` ( ';' `modification_statement` )*
|
||||||
: APPLY BATCH
|
: APPLY BATCH
|
||||||
@@ -67,6 +67,29 @@ used, a failed batch might leave the batch only partly applied.
|
|||||||
Use the ``COUNTER`` option for batched counter updates. Unlike other
|
Use the ``COUNTER`` option for batched counter updates. Unlike other
|
||||||
updates in ScyllaDB, counter updates are not idempotent.
|
updates in ScyllaDB, counter updates are not idempotent.
|
||||||
|
|
||||||
|
``GROUP0`` batches
|
||||||
|
~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
Use the ``GROUP0`` option for batched modifications to system tables that are managed by group0
|
||||||
|
(e.g., ``system.topology``). GROUP0 batches execute mutations as a group0 command, ensuring they
|
||||||
|
are replicated through the Raft consensus protocol.
|
||||||
|
|
||||||
|
GROUP0 batches have the following restrictions:
|
||||||
|
|
||||||
|
- Can only modify tables in the ``system`` keyspace
|
||||||
|
- Cannot use custom timestamps (``USING TIMESTAMP`` is not allowed)
|
||||||
|
- Cannot use conditional statements (``IF EXISTS``, ``IF NOT EXISTS``, etc.)
|
||||||
|
- Requires a group0 guard to be taken before execution
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
.. code-block:: cql
|
||||||
|
|
||||||
|
BEGIN GROUP0 BATCH
|
||||||
|
INSERT INTO system.topology (key, value) VALUES ('node1', 'data1');
|
||||||
|
UPDATE system.topology SET value = 'data2' WHERE key = 'node2';
|
||||||
|
APPLY BATCH;
|
||||||
|
|
||||||
|
|
||||||
:doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`
|
:doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`
|
||||||
|
|
||||||
|
|||||||
63
test/cqlpy/test_group0_batch.py
Normal file
63
test/cqlpy/test_group0_batch.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2024-present ScyllaDB
|
||||||
|
#
|
||||||
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||||
|
#############################################################################
|
||||||
|
# Tests for GROUP0 BATCH operations
|
||||||
|
#############################################################################
|
||||||
|
from cassandra import InvalidRequest
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def test_group0_batch_syntax_error_for_non_system_table(cql, test_keyspace):
|
||||||
|
"""Verifies that GROUP0 BATCH can only be used with system keyspace tables"""
|
||||||
|
with pytest.raises(InvalidRequest, match="GROUP0 BATCH can only modify system keyspace tables"):
|
||||||
|
# Create a test table in a non-system keyspace
|
||||||
|
table_name = f"{test_keyspace}.test_table"
|
||||||
|
cql.execute(f"CREATE TABLE {table_name} (k int PRIMARY KEY, v int)")
|
||||||
|
try:
|
||||||
|
# Try to use GROUP0 BATCH with a non-system table
|
||||||
|
cql.execute(f"""
|
||||||
|
BEGIN GROUP0 BATCH
|
||||||
|
INSERT INTO {table_name} (k, v) VALUES (1, 1)
|
||||||
|
APPLY BATCH
|
||||||
|
""")
|
||||||
|
finally:
|
||||||
|
cql.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_group0_batch_with_timestamp_error(cql):
|
||||||
|
"""Verifies that GROUP0 BATCH cannot have custom timestamp"""
|
||||||
|
with pytest.raises(InvalidRequest, match="Cannot provide custom timestamp for GROUP0 BATCH"):
|
||||||
|
cql.execute("""
|
||||||
|
BEGIN GROUP0 BATCH USING TIMESTAMP 12345
|
||||||
|
INSERT INTO system.topology (key) VALUES ('test')
|
||||||
|
APPLY BATCH
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
def test_group0_batch_with_conditions_error(cql):
|
||||||
|
"""Verifies that GROUP0 BATCH cannot have conditions"""
|
||||||
|
with pytest.raises(InvalidRequest, match="Cannot use conditions in GROUP0 BATCH"):
|
||||||
|
cql.execute("""
|
||||||
|
BEGIN GROUP0 BATCH
|
||||||
|
INSERT INTO system.topology (key) VALUES ('test') IF NOT EXISTS
|
||||||
|
APPLY BATCH
|
||||||
|
""")
|
||||||
|
|
||||||
|
|
||||||
|
def test_group0_batch_basic_syntax(cql):
|
||||||
|
"""Verifies that GROUP0 BATCH has correct basic syntax"""
|
||||||
|
# This test just checks that the syntax is recognized
|
||||||
|
# The actual execution will fail if not properly set up with group0
|
||||||
|
# but the syntax should be accepted
|
||||||
|
try:
|
||||||
|
cql.execute("""
|
||||||
|
BEGIN GROUP0 BATCH
|
||||||
|
APPLY BATCH
|
||||||
|
""")
|
||||||
|
except Exception as e:
|
||||||
|
# Accept either success or group0-related errors, but not syntax errors
|
||||||
|
error_msg = str(e).lower()
|
||||||
|
assert "syntax" not in error_msg and "unexpected" not in error_msg, \
|
||||||
|
f"GROUP0 BATCH should be valid syntax, but got: {e}"
|
||||||
Reference in New Issue
Block a user