Compare commits

..

6 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
1eca538536 Use "this auto" instead of coroutine::lambda wrapper
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-06 18:21:38 +00:00
copilot-swe-agent[bot]
1ad7a5e0f1 Revert false positives - only keep fix for hint_endpoint_manager.cc
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-06 18:08:39 +00:00
copilot-swe-agent[bot]
6ed8f1a24b Fix additional lambda-coroutine fiasco in streaming/consumer.cc
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-06 17:05:59 +00:00
copilot-swe-agent[bot]
a87e72004d Fix lambda-coroutine fiasco issues in 4 files
Co-authored-by: tgrabiec <283695+tgrabiec@users.noreply.github.com>
2025-12-06 17:01:45 +00:00
copilot-swe-agent[bot]
e7317e9f39 Initial plan 2025-12-06 16:50:52 +00:00
Nadav Har'El
350cbd1d66 alternator: fix typo of BatchWriteItem in comments
The DynamoDB API's "BatchWriteItem" operation is spelled like this, in
singular. Some comments incorrectly referred to as BatchWriteItems - in
plural. This patch fixes those mistakes.

There are no functional changes here or changes to user-facing documents -
these mistakes were only in code comments.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes scylladb/scylladb#27446
2025-12-05 15:08:58 +02:00
9 changed files with 13 additions and 168 deletions

View File

@@ -2223,12 +2223,12 @@ void validate_value(const rjson::value& v, const char* caller) {
// The put_or_delete_item class builds the mutations needed by the PutItem and
// DeleteItem operations - either as stand-alone commands or part of a list
// of commands in BatchWriteItems.
// of commands in BatchWriteItem.
// put_or_delete_item splits each operation into two stages: Constructing the
// object parses and validates the user input (throwing exceptions if there
// are input errors). Later, build() generates the actual mutation, with a
// specified timestamp. This split is needed because of the peculiar needs of
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
// BatchWriteItem and LWT. BatchWriteItem needs all parsing to happen before
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
@@ -3026,7 +3026,7 @@ struct primary_key_equal {
};
// This is a cas_request subclass for applying given put_or_delete_items to
// one partition using LWT as part as BatchWriteItems. This is a write-only
// one partition using LWT as part as BatchWriteItem. This is a write-only
// operation, not needing the previous value of the item (the mutation to be
// done is known prior to starting the operation). Nevertheless, we want to
// do this mutation via LWT to ensure that it is serialized with other LWT
@@ -3065,7 +3065,7 @@ static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, serv
{timeout, std::move(permit), client_state, trace_state},
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
timeout, timeout, true, std::move(cdc_opts)).discard_result();
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
// We discarded cas()'s future value ("is_applied") because BatchWriteItem
// does not need to support conditional updates.
}

View File

@@ -716,7 +716,7 @@ batchStatement returns [std::unique_ptr<cql3::statements::raw::batch_statement>
auto attrs = std::make_unique<cql3::attributes::raw>();
}
: K_BEGIN
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } | K_GROUP0 { type = btype::GROUP0; } )?
( K_UNLOGGED { type = btype::UNLOGGED; } | K_COUNTER { type = btype::COUNTER; } )?
K_BATCH ( usingClause[attrs] )?
( s=batchStatementObjective ';'?
{
@@ -2374,7 +2374,6 @@ K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D;
K_GROUP: G R O U P;
K_GROUP0: G R O U P '0';
K_LIKE: L I K E;

View File

@@ -31,13 +31,9 @@ logging::logger batch_statement::_logger("BatchStatement");
timeout_config_selector
timeout_for_type(batch_statement::type t) {
if (t == batch_statement::type::COUNTER) {
return &timeout_config::counter_write_timeout;
} else if (t == batch_statement::type::GROUP0) {
return &timeout_config::other_timeout;
} else {
return &timeout_config::write_timeout;
}
return t == batch_statement::type::COUNTER
? &timeout_config::counter_write_timeout
: &timeout_config::write_timeout;
}
db::timeout_clock::duration batch_statement::get_timeout(const service::client_state& state, const query_options& options) const {
@@ -94,11 +90,6 @@ future<> batch_statement::check_access(query_processor& qp, const service::clien
});
}
bool batch_statement::needs_guard(query_processor& qp, service::query_state& state) const
{
return _type == type::GROUP0;
}
void batch_statement::validate()
{
if (_attrs->is_time_to_live_set()) {
@@ -113,22 +104,6 @@ void batch_statement::validate()
if (_type == type::COUNTER) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for counter BATCH");
}
if (_type == type::GROUP0) {
throw exceptions::invalid_request_exception("Cannot provide custom timestamp for GROUP0 BATCH");
}
}
if (_type == type::GROUP0) {
if (_has_conditions) {
throw exceptions::invalid_request_exception("Cannot use conditions in GROUP0 BATCH");
}
// Validate that all statements target system keyspace tables managed by group0
for (auto& s : _statements) {
if (s.statement->keyspace() != "system") {
throw exceptions::invalid_request_exception("GROUP0 BATCH can only modify system keyspace tables");
}
}
return;
}
bool has_counters = std::ranges::any_of(_statements, [] (auto&& s) { return s.statement->is_counter(); });
@@ -260,9 +235,6 @@ static thread_local inheriting_concrete_execution_stage<
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute(
query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
if (_type == type::GROUP0) {
return execute_group0_batch(qp, state, options, std::move(guard));
}
return execute_without_checking_exception_message(qp, state, options, std::move(guard))
.then(cql_transport::messages::propagate_exception_as_future<shared_ptr<cql_transport::messages::result_message>>);
}
@@ -313,39 +285,6 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::do_
});
}
future<shared_ptr<cql_transport::messages::result_message>> batch_statement::execute_group0_batch(
query_processor& qp,
service::query_state& query_state, const query_options& options,
std::optional<service::group0_guard> guard) const
{
if (!guard) {
throw exceptions::invalid_request_exception("GROUP0 BATCH requires a guard");
}
auto timeout = db::timeout_clock::now() + get_timeout(query_state.get_client_state(), options);
// Create group0_batch and get the timestamp from it
service::group0_batch mc{std::move(guard)};
auto now = mc.write_timestamp();
// Get mutations from all statements
auto mutations = co_await get_mutations(qp, options, timeout, false, now, query_state);
// Add mutations to the group0_batch
mc.add_mutations(std::move(mutations), format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement));
// Announce the batch via group0
auto description = format("CQL GROUP0 BATCH: \"{}\"", raw_cql_statement);
auto [remote_, holder] = qp.remote();
auto [m, g] = co_await std::move(mc).extract();
if (!m.empty()) {
co_await remote_.get().mm.announce(std::move(m), std::move(g), description);
}
co_return make_shared<cql_transport::messages::result_message::void_message>();
}
future<coordinator_result<>> batch_statement::execute_without_conditions(
query_processor& qp,
utils::chunked_vector<mutation> mutations,

View File

@@ -95,8 +95,6 @@ public:
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
virtual bool needs_guard(query_processor& qp, service::query_state& state) const override;
// Validates a prepared batch statement without validating its nested statements.
void validate();
@@ -132,11 +130,6 @@ private:
service::query_state& query_state, const query_options& options,
bool local, api::timestamp_type now) const;
future<shared_ptr<cql_transport::messages::result_message>> execute_group0_batch(
query_processor& qp,
service::query_state& query_state, const query_options& options,
std::optional<service::group0_guard> guard) const;
future<exceptions::coordinator_result<>> execute_without_conditions(
query_processor& qp,
utils::chunked_vector<mutation> mutations,

View File

@@ -23,7 +23,7 @@ class modification_statement;
class batch_statement : public raw::cf_statement {
public:
enum class type {
LOGGED, UNLOGGED, COUNTER, GROUP0
LOGGED, UNLOGGED, COUNTER
};
private:
type _type;

View File

@@ -248,7 +248,7 @@ future<db::commitlog> hint_endpoint_manager::add_store() noexcept {
// which is larger than the segment ID of the RP of the last written hint.
cfg.base_segment_id = _last_written_rp.base_id();
return commitlog::create_commitlog(std::move(cfg)).then([this] (commitlog l) -> future<commitlog> {
return commitlog::create_commitlog(std::move(cfg)).then([this] (this auto, commitlog l) -> future<commitlog> {
// add_store() is triggered every time hint files are forcefully flushed to I/O (every hints_flush_period).
// When this happens we want to refill _sender's segments only if it has finished with the segments he had before.
if (_sender.have_segments()) {

View File

@@ -10,7 +10,7 @@ Multiple ``INSERT``, ``UPDATE`` and ``DELETE`` can be executed in a single state
.. code-block::
batch_statement: BEGIN [ UNLOGGED | COUNTER | GROUP0 ] BATCH
batch_statement: BEGIN [ UNLOGGED | COUNTER ] BATCH
: [ USING `update_parameter` ( AND `update_parameter` )* ]
: `modification_statement` ( ';' `modification_statement` )*
: APPLY BATCH
@@ -67,29 +67,6 @@ used, a failed batch might leave the batch only partly applied.
Use the ``COUNTER`` option for batched counter updates. Unlike other
updates in ScyllaDB, counter updates are not idempotent.
``GROUP0`` batches
~~~~~~~~~~~~~~~~~~
Use the ``GROUP0`` option for batched modifications to system tables that are managed by group0
(e.g., ``system.topology``). GROUP0 batches execute mutations as a group0 command, ensuring they
are replicated through the Raft consensus protocol.
GROUP0 batches have the following restrictions:
- Can only modify tables in the ``system`` keyspace
- Cannot use custom timestamps (``USING TIMESTAMP`` is not allowed)
- Cannot use conditional statements (``IF EXISTS``, ``IF NOT EXISTS``, etc.)
- Requires a group0 guard to be taken before execution
Example:
.. code-block:: cql
BEGIN GROUP0 BATCH
INSERT INTO system.topology (key, value) VALUES ('node1', 'data1');
UPDATE system.topology SET value = 'data2' WHERE key = 'node2';
APPLY BATCH;
:doc:`Apache Cassandra Query Language (CQL) Reference </cql/index>`

View File

@@ -205,7 +205,7 @@ def test_batch_write_invalid_operation(test_table_s):
# In test_item.py we have a bunch of test_empty_* tests on different ways to
# create an empty item (which in Scylla requires the special CQL row marker
# to be supported correctly). BatchWriteItems provides yet another way of
# to be supported correctly). BatchWriteItem provides yet another way of
# creating items, so check the empty case here too:
def test_empty_batch_write(test_table):
p = random_string()
@@ -214,7 +214,7 @@ def test_empty_batch_write(test_table):
batch.put_item({'p': p, 'c': c})
assert test_table.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item'] == {'p': p, 'c': c}
# Test that BatchWriteItems allows writing to multiple tables in one operation
# Test that BatchWriteItem allows writing to multiple tables in one operation
def test_batch_write_multiple_tables(test_table_s, test_table):
p1 = random_string()
c1 = random_string()

View File

@@ -1,63 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#############################################################################
# Tests for GROUP0 BATCH operations
#############################################################################
from cassandra import InvalidRequest
import pytest
def test_group0_batch_syntax_error_for_non_system_table(cql, test_keyspace):
"""Verifies that GROUP0 BATCH can only be used with system keyspace tables"""
with pytest.raises(InvalidRequest, match="GROUP0 BATCH can only modify system keyspace tables"):
# Create a test table in a non-system keyspace
table_name = f"{test_keyspace}.test_table"
cql.execute(f"CREATE TABLE {table_name} (k int PRIMARY KEY, v int)")
try:
# Try to use GROUP0 BATCH with a non-system table
cql.execute(f"""
BEGIN GROUP0 BATCH
INSERT INTO {table_name} (k, v) VALUES (1, 1)
APPLY BATCH
""")
finally:
cql.execute(f"DROP TABLE IF EXISTS {table_name}")
def test_group0_batch_with_timestamp_error(cql):
"""Verifies that GROUP0 BATCH cannot have custom timestamp"""
with pytest.raises(InvalidRequest, match="Cannot provide custom timestamp for GROUP0 BATCH"):
cql.execute("""
BEGIN GROUP0 BATCH USING TIMESTAMP 12345
INSERT INTO system.topology (key) VALUES ('test')
APPLY BATCH
""")
def test_group0_batch_with_conditions_error(cql):
"""Verifies that GROUP0 BATCH cannot have conditions"""
with pytest.raises(InvalidRequest, match="Cannot use conditions in GROUP0 BATCH"):
cql.execute("""
BEGIN GROUP0 BATCH
INSERT INTO system.topology (key) VALUES ('test') IF NOT EXISTS
APPLY BATCH
""")
def test_group0_batch_basic_syntax(cql):
"""Verifies that GROUP0 BATCH has correct basic syntax"""
# This test just checks that the syntax is recognized
# The actual execution will fail if not properly set up with group0
# but the syntax should be accepted
try:
cql.execute("""
BEGIN GROUP0 BATCH
APPLY BATCH
""")
except Exception as e:
# Accept either success or group0-related errors, but not syntax errors
error_msg = str(e).lower()
assert "syntax" not in error_msg and "unexpected" not in error_msg, \
f"GROUP0 BATCH should be valid syntax, but got: {e}"