Compare commits
3 Commits
copilot/ad
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d01358cecd | ||
|
|
35830b34df | ||
|
|
5bc015549f |
@@ -1085,7 +1085,6 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'db/view/view_building_task_mutation_builder.cc',
|
||||
'db/virtual_table.cc',
|
||||
'db/virtual_tables.cc',
|
||||
'db/system0_virtual_tables.cc',
|
||||
'db/tablet_options.cc',
|
||||
'db/object_storage_endpoint_param.cc',
|
||||
'index/secondary_index_manager.cc',
|
||||
|
||||
@@ -716,7 +716,7 @@ batchStatement returns [std::unique_ptr<cql3::statements::raw::batch_statement>
|
||||
auto attrs = std::make_unique<cql3::attributes::raw>();
|
||||
}
|
||||
: K_BEGIN
|
||||
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } )?
|
||||
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } | K_GROUP0 { type = btype::GROUP0; } )?
|
||||
K_BATCH ( usingClause[attrs] )?
|
||||
( s=batchStatementObjective ';'?
|
||||
{
|
||||
@@ -2374,6 +2374,7 @@ K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D;
|
||||
|
||||
|
||||
K_GROUP: G R O U P;
|
||||
K_GROUP0: G R O U P '0';
|
||||
|
||||
K_LIKE: L I K E;
|
||||
|
||||
|
||||
@@ -31,9 +31,13 @@ logging::logger batch_statement::_logger("BatchStatement");
|
||||
|
||||
timeout_config_selector
|
||||
timeout_for_type(batch_statement::type t) {
|
||||
return t == batch_statement::type::COUNTER
|
||||
? &timeout_config::counter_write_timeout
|
||||
: &timeout_config::write_timeout;
|
||||
if (t == batch_statement::type::COUNTER) {
|
||||
return &timeout_config::counter_write_timeout;
|
||||
} else if (t == batch_statement::type::GROUP0) {
|
||||
return &timeout_config::other_timeout;
|
||||
} else {
|
||||
return &timeout_config::write_timeout;
|
||||
}
|
||||
}
|
||||
|
||||
db::timeout_clock::duration batch_statement::get_timeout(const service::client_state& state, const query_options& options) const {
|
||||
@@ -90,6 +94,11 @@ future<> batch_statement::check_access(query_processor& qp, const service::clien
|
||||
});
|
||||
}
|
||||
|
||||
bool batch_statement::needs_guard(query_processor& qp, service::query_state& state) const
|
||||
{
|
||||
return _type == type::GROUP0;
|
||||
}
|
||||
|
||||
void batch_statement::validate()
|
||||
{
|
||||
if (_attrs->is_time_to_live_set()) {
|
||||
@@ -104,6 +113,22 @@ void batch_statement::validate()
|
||||
if (_type == type::COUNTER) {
|
||||
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for counter BATCH");
|
||||
}
|
||||
if (_type == type::GROUP0) {
|
||||
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for GROUP0 BATCH");
|
||||
}
|
||||
}
|
||||
|
||||
if (_type == type::GROUP0) {
|
||||
if (_has_conditions) {
|
||||
throw exceptions::invalid_request_exception("Cannot use conditions in GROUP0 BATCH");
|
||||
}
|
||||
// Validate that all statements target system keyspace tables managed by group0
|
||||
for (auto& s : _statements) {
|
||||
if (s.statement->keyspace() != "system") {
|
||||
throw exceptions::invalid_request_exception("GROUP0 BATCH can only modify system keyspace tables");
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
bool has_counters = std::ranges::any_of(_statements, [] (auto&& s) { return s.statement->is_counter(); });
|
||||
@@ -235,6 +260,9 @@ static thread_local inheriting_concrete_execution_stage<
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
|
||||
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
|
||||
if (_type == type::GROUP0) {
|
||||
return execute_group0_batch(qp, state, options, std::move(guard));
|
||||
}
|
||||
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
@@ -285,6 +313,39 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
|
||||
});
|
||||
}
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_group0_batch(
|
||||
query_processor& qp,
|
||||
service::query_state& query_state, const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const
|
||||
{
|
||||
if (!guard) {
|
||||
throw exceptions::invalid_request_exception("GROUP0 BATCH requires a guard");
|
||||
}
|
||||
|
||||
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
|
||||
|
||||
// Create group0_batch and get the timestamp from it
|
||||
service::group0_batch mc{std::move(guard)};
|
||||
auto now = mc.write_timestamp();
|
||||
|
||||
// Get mutations from all statements
|
||||
auto mutations = co_await get_mutations(qp, options, timeout, false, now, query_state);
|
||||
|
||||
// Add mutations to the group0_batch
|
||||
mc.add_mutations(std::move(mutations), format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement));
|
||||
|
||||
// Announce the batch via group0
|
||||
auto description = format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement);
|
||||
auto [remote_, holder] = qp.remote();
|
||||
auto [m, g] = co_await std::move(mc).extract();
|
||||
|
||||
if (!m.empty()) {
|
||||
co_await remote_.get().mm.announce(std::move(m), std::move(g), description);
|
||||
}
|
||||
|
||||
co_return make_shared<cql_transport::messages::result_message::void_message>();
|
||||
}
|
||||
|
||||
future<coordinator_result<>> batch_statement::execute_without_conditions(
|
||||
query_processor& qp,
|
||||
utils::chunked_vector<mutation> mutations,
|
||||
|
||||
@@ -95,6 +95,8 @@ public:
|
||||
|
||||
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
|
||||
|
||||
virtual bool needs_guard(query_processor& qp, service::query_state& state) const override;
|
||||
|
||||
// Validates a prepared batch statement without validating its nested statements.
|
||||
void validate();
|
||||
|
||||
@@ -130,6 +132,11 @@ private:
|
||||
service::query_state& query_state, const query_options& options,
|
||||
bool local, api::timestamp_type now) const;
|
||||
|
||||
future<shared_ptr<cql_transport::messages::result_message>> execute_group0_batch(
|
||||
query_processor& qp,
|
||||
service::query_state& query_state, const query_options& options,
|
||||
std::optional<service::group0_guard> guard) const;
|
||||
|
||||
future<exceptions::coordinator_result<>> execute_without_conditions(
|
||||
query_processor& qp,
|
||||
utils::chunked_vector<mutation> mutations,
|
||||
|
||||
@@ -23,7 +23,7 @@ class modification_statement;
|
||||
class batch_statement : public raw::cf_statement {
|
||||
public:
|
||||
enum class type {
|
||||
LOGGED, UNLOGGED, COUNTER
|
||||
LOGGED, UNLOGGED, COUNTER, GROUP0
|
||||
};
|
||||
private:
|
||||
type _type;
|
||||
|
||||
@@ -1,201 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "db/system0_virtual_tables.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/virtual_table.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "mutation/frozen_mutation.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
namespace db {
|
||||
|
||||
namespace {
|
||||
|
||||
static constexpr auto SYSTEM0_KEYSPACE_NAME = "system0";
|
||||
|
||||
logging::logger sys0log("system0_virtual_tables");
|
||||
|
||||
// Virtual table that mirrors system.topology but allows writes via group0
|
||||
class system0_topology_table : public memtable_filling_virtual_table {
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
public:
|
||||
explicit system0_topology_table(cql3::query_processor& qp)
|
||||
: memtable_filling_virtual_table(build_schema())
|
||||
, _qp(qp)
|
||||
{}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
// Use the same schema as system.topology but in system0 keyspace
|
||||
auto id = generate_legacy_id(SYSTEM0_KEYSPACE_NAME, system_keyspace::TOPOLOGY);
|
||||
return schema_builder(SYSTEM0_KEYSPACE_NAME, system_keyspace::TOPOLOGY, std::optional(id))
|
||||
.with_column("key", utf8_type, column_kind::partition_key)
|
||||
.with_column("host_id", uuid_type, column_kind::clustering_key)
|
||||
.with_column("datacenter", utf8_type)
|
||||
.with_column("rack", utf8_type)
|
||||
.with_column("tokens", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("node_state", utf8_type)
|
||||
.with_column("release_version", utf8_type)
|
||||
.with_column("topology_request", utf8_type)
|
||||
.with_column("replaced_id", uuid_type)
|
||||
.with_column("rebuild_option", utf8_type)
|
||||
.with_column("num_tokens", int32_type)
|
||||
.with_column("tokens_string", utf8_type)
|
||||
.with_column("shard_count", int32_type)
|
||||
.with_column("ignore_msb", int32_type)
|
||||
.with_column("cleanup_status", utf8_type)
|
||||
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("request_id", timeuuid_type)
|
||||
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
||||
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
|
||||
.with_column("version", long_type, column_kind::static_column)
|
||||
.with_column("fence_version", long_type, column_kind::static_column)
|
||||
.with_column("transition_state", utf8_type, column_kind::static_column)
|
||||
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
|
||||
.with_column("global_topology_request", utf8_type, column_kind::static_column)
|
||||
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
|
||||
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
|
||||
.with_column("session", uuid_type, column_kind::static_column)
|
||||
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
|
||||
.with_column("upgrade_state", utf8_type, column_kind::static_column)
|
||||
.with_column("global_requests", set_type_impl::get_instance(timeuuid_type, true), column_kind::static_column)
|
||||
.set_comment("Virtual table for updating system.topology via group0")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
future<> execute(std::function<void(mutation)> mutation_sink) override {
|
||||
// For reads, we mirror the actual system.topology table
|
||||
// This is a simplified placeholder implementation
|
||||
sys0log.debug("system0.topology: read operation");
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> apply(const frozen_mutation& fm) override {
|
||||
sys0log.info("system0.topology: received write operation");
|
||||
|
||||
// Convert mutation from system0.topology schema to system.topology schema
|
||||
const mutation m = fm.unfreeze(_s);
|
||||
|
||||
// Re-freeze the mutation with the system.topology schema
|
||||
auto system_topology_schema = system_keyspace::topology();
|
||||
mutation target_m(system_topology_schema, m.key());
|
||||
target_m.partition() = m.partition();
|
||||
|
||||
// TODO: Submit mutation to group0 via raft_group0_client
|
||||
// For now, just log a warning
|
||||
sys0log.warn("system0.topology: write operations require group0 integration (not yet implemented)");
|
||||
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
// Virtual table that mirrors system.tablets but allows writes via group0
|
||||
class system0_tablets_table : public memtable_filling_virtual_table {
|
||||
private:
|
||||
cql3::query_processor& _qp;
|
||||
|
||||
public:
|
||||
explicit system0_tablets_table(cql3::query_processor& qp)
|
||||
: memtable_filling_virtual_table(build_schema())
|
||||
, _qp(qp)
|
||||
{}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
// Create a simple schema for tablets in system0 keyspace
|
||||
// This mirrors system.tablets structure
|
||||
auto id = generate_legacy_id(SYSTEM0_KEYSPACE_NAME, system_keyspace::TABLETS);
|
||||
auto replica_set_type = replica::get_replica_set_type();
|
||||
|
||||
return schema_builder(SYSTEM0_KEYSPACE_NAME, system_keyspace::TABLETS, id)
|
||||
.with_column("table_id", uuid_type, column_kind::partition_key)
|
||||
.with_column("tablet_count", int32_type, column_kind::static_column)
|
||||
.with_column("keyspace_name", utf8_type, column_kind::static_column)
|
||||
.with_column("table_name", utf8_type, column_kind::static_column)
|
||||
.with_column("last_token", long_type, column_kind::clustering_key)
|
||||
.with_column("replicas", replica_set_type)
|
||||
.with_column("new_replicas", replica_set_type)
|
||||
.with_column("stage", utf8_type)
|
||||
.with_column("transition", utf8_type)
|
||||
.with_column("session", uuid_type)
|
||||
.set_comment("Virtual table for updating system.tablets via group0")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
}
|
||||
|
||||
future<> execute(std::function<void(mutation)> mutation_sink) override {
|
||||
// For reads, we mirror the actual system.tablets table
|
||||
// This is a simplified placeholder implementation
|
||||
sys0log.debug("system0.tablets: read operation");
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> apply(const frozen_mutation& fm) override {
|
||||
sys0log.info("system0.tablets: received write operation");
|
||||
|
||||
// Convert mutation from system0.tablets schema to system.tablets schema
|
||||
const mutation m = fm.unfreeze(_s);
|
||||
|
||||
// Re-freeze the mutation with the system.tablets schema
|
||||
auto system_tablets_schema = system_keyspace::tablets();
|
||||
mutation target_m(system_tablets_schema, m.key());
|
||||
target_m.partition() = m.partition();
|
||||
|
||||
// TODO: Submit mutation to group0 via raft_group0_client
|
||||
// For now, just log a warning
|
||||
sys0log.warn("system0.tablets: write operations require group0 integration (not yet implemented)");
|
||||
|
||||
co_return;
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
future<> initialize_system0_virtual_tables(
|
||||
sharded<service::raft_group_registry>& dist_raft_gr,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cql3::query_processor>& qp) {
|
||||
|
||||
auto& virtual_tables_registry = sys_ks.local().get_virtual_tables_registry();
|
||||
auto& virtual_tables = *virtual_tables_registry;
|
||||
auto& db = sys_ks.local().local_db();
|
||||
|
||||
auto add_table = [&] (std::unique_ptr<virtual_table>&& tbl) -> future<> {
|
||||
auto schema = tbl->schema();
|
||||
virtual_tables[schema->id()] = std::move(tbl);
|
||||
|
||||
// Add the table as a local system table (similar to regular virtual tables)
|
||||
// Note: This creates tables in the system0 keyspace which is treated as internal
|
||||
co_await db.add_column_family_and_make_directory(schema, replica::database::is_new_cf::yes);
|
||||
|
||||
auto& cf = db.find_column_family(schema);
|
||||
cf.mark_ready_for_writes(nullptr);
|
||||
auto& vt = virtual_tables[schema->id()];
|
||||
cf.set_virtual_reader(vt->as_mutation_source());
|
||||
cf.set_virtual_writer([&vt = *vt] (const frozen_mutation& m) { return vt.apply(m); });
|
||||
};
|
||||
|
||||
// Add system0 virtual tables
|
||||
co_await add_table(std::make_unique<system0_topology_table>(qp.local()));
|
||||
co_await add_table(std::make_unique<system0_tablets_table>(qp.local()));
|
||||
|
||||
sys0log.info("system0 virtual tables initialized");
|
||||
}
|
||||
|
||||
} // namespace db
|
||||
@@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
class raft_group_registry;
|
||||
}
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
|
||||
class system_keyspace;
|
||||
|
||||
// Initialize virtual tables in the system0 keyspace which mirror group0 tables
|
||||
// from the system keyspace but allow writes via group0.
|
||||
future<> initialize_system0_virtual_tables(
|
||||
sharded<service::raft_group_registry>& dist_raft_gr,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
sharded<cql3::query_processor>& qp);
|
||||
|
||||
} // namespace db
|
||||
@@ -10,7 +10,7 @@ Multiple ``INSERT``, ``UPDATE`` and ``DELETE`` can be executed in a single state
|
||||
|
||||
.. code-block::
|
||||
|
||||
batch_statement: BEGIN [ UNLOGGED | COUNTER ] BATCH
|
||||
batch_statement: BEGIN [ UNLOGGED | COUNTER | GROUP0 ] BATCH
|
||||
: [ USING `update_parameter` ( AND `update_parameter` )* ]
|
||||
: `modification_statement` ( ';' `modification_statement` )*
|
||||
: APPLY BATCH
|
||||
@@ -67,6 +67,29 @@ used, a failed batch might leave the batch only partly applied.
|
||||
Use the ``COUNTER`` option for batched counter updates. Unlike other
|
||||
updates in ScyllaDB, counter updates are not idempotent.
|
||||
|
||||
``GROUP0`` batches
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Use the ``GROUP0`` option for batched modifications to system tables that are managed by group0
|
||||
(e.g., ``system.topology``). GROUP0 batches execute mutations as a group0 command, ensuring they
|
||||
are replicated through the Raft consensus protocol.
|
||||
|
||||
GROUP0 batches have the following restrictions:
|
||||
|
||||
- Can only modify tables in the ``system`` keyspace
|
||||
- Cannot use custom timestamps (``USING TIMESTAMP`` is not allowed)
|
||||
- Cannot use conditional statements (``IF EXISTS``, ``IF NOT EXISTS``, etc.)
|
||||
- Requires a group0 guard to be taken before execution
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
BEGIN GROUP0 BATCH
|
||||
INSERT INTO system.topology (key, value) VALUES ('node1', 'data1');
|
||||
UPDATE system.topology SET value = 'data2' WHERE key = 'node2';
|
||||
APPLY BATCH;
|
||||
|
||||
|
||||
:doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`
|
||||
|
||||
|
||||
6
main.cc
6
main.cc
@@ -108,7 +108,6 @@
|
||||
#include "lang/manager.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "db/virtual_tables.hh"
|
||||
#include "db/system0_virtual_tables.hh"
|
||||
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
@@ -1837,11 +1836,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return db::initialize_virtual_tables(db, ss, gossiper, raft_gr, sys_ks, tablet_allocator, messaging, *cfg);
|
||||
}).get();
|
||||
|
||||
checkpoint(stop_signal, "initializing system0 virtual tables");
|
||||
smp::invoke_on_all([&] {
|
||||
return db::initialize_system0_virtual_tables(raft_gr, sys_ks, qp);
|
||||
}).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
sstables::init_metrics().get();
|
||||
|
||||
@@ -50,8 +50,7 @@ static const std::unordered_set<std::string_view> internal_keyspaces = {
|
||||
db::system_keyspace::NAME,
|
||||
db::schema_tables::NAME,
|
||||
auth::meta::legacy::AUTH_KS,
|
||||
tracing::trace_keyspace_helper::KEYSPACE_NAME,
|
||||
"system0" // Virtual keyspace for group0 table updates
|
||||
tracing::trace_keyspace_helper::KEYSPACE_NAME
|
||||
};
|
||||
|
||||
bool is_internal_keyspace(std::string_view name) {
|
||||
|
||||
63
test/cqlpy/test_group0_batch.py
Normal file
63
test/cqlpy/test_group0_batch.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#############################################################################
|
||||
# Tests for GROUP0 BATCH operations
|
||||
#############################################################################
|
||||
from cassandra import InvalidRequest
|
||||
import pytest
|
||||
|
||||
|
||||
def test_group0_batch_syntax_error_for_non_system_table(cql, test_keyspace):
|
||||
"""Verifies that GROUP0 BATCH can only be used with system keyspace tables"""
|
||||
with pytest.raises(InvalidRequest, match="GROUP0 BATCH can only modify system keyspace tables"):
|
||||
# Create a test table in a non-system keyspace
|
||||
table_name = f"{test_keyspace}.test_table"
|
||||
cql.execute(f"CREATE TABLE {table_name} (k int PRIMARY KEY, v int)")
|
||||
try:
|
||||
# Try to use GROUP0 BATCH with a non-system table
|
||||
cql.execute(f"""
|
||||
BEGIN GROUP0 BATCH
|
||||
INSERT INTO {table_name} (k, v) VALUES (1, 1)
|
||||
APPLY BATCH
|
||||
""")
|
||||
finally:
|
||||
cql.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||||
|
||||
|
||||
def test_group0_batch_with_timestamp_error(cql):
|
||||
"""Verifies that GROUP0 BATCH cannot have custom timestamp"""
|
||||
with pytest.raises(InvalidRequest, match="Cannot provide custom timestamp for GROUP0 BATCH"):
|
||||
cql.execute("""
|
||||
BEGIN GROUP0 BATCH USING TIMESTAMP 12345
|
||||
INSERT INTO system.topology (key) VALUES ('test')
|
||||
APPLY BATCH
|
||||
""")
|
||||
|
||||
|
||||
def test_group0_batch_with_conditions_error(cql):
|
||||
"""Verifies that GROUP0 BATCH cannot have conditions"""
|
||||
with pytest.raises(InvalidRequest, match="Cannot use conditions in GROUP0 BATCH"):
|
||||
cql.execute("""
|
||||
BEGIN GROUP0 BATCH
|
||||
INSERT INTO system.topology (key) VALUES ('test') IF NOT EXISTS
|
||||
APPLY BATCH
|
||||
""")
|
||||
|
||||
|
||||
def test_group0_batch_basic_syntax(cql):
|
||||
"""Verifies that GROUP0 BATCH has correct basic syntax"""
|
||||
# This test just checks that the syntax is recognized
|
||||
# The actual execution will fail if not properly set up with group0
|
||||
# but the syntax should be accepted
|
||||
try:
|
||||
cql.execute("""
|
||||
BEGIN GROUP0 BATCH
|
||||
APPLY BATCH
|
||||
""")
|
||||
except Exception as e:
|
||||
# Accept either success or group0-related errors, but not syntax errors
|
||||
error_msg = str(e).lower()
|
||||
assert "syntax" not in error_msg and "unexpected" not in error_msg, \
|
||||
f"GROUP0 BATCH should be valid syntax, but got: {e}"
|
||||
Reference in New Issue
Block a user