Compare commits

..

2 Commits

Author SHA1 Message Date
Alex
0313d3e291 test/cluster: add cluster test for prepared metadata_id promotion
Exercises the full server-side prepared metadata_id promotion path using
the Scylla Python driver rather than raw sockets, via two lightweight
patches:

- _UseMetadataId(ApplicationInfoBase) injects SCYLLA_USE_METADATA_ID
  into the CQL STARTUP options so the server enables the v4 metadata_id
  wire exchange for this connection.
- mock.patch on ProtocolVersion.uses_prepared_metadata (forced True for
  all versions) makes the driver write result_metadata_id in EXECUTE
  frames and read it back from PREPARE/ROWS responses on protocol v4.
- A wrapper around ResultMessage.recv_results_metadata captures the
  result_metadata_id set by the driver on the ROWS ResultMessage (the
  driver parses it there but does not propagate it to PreparedStatement
  in the normal rows path).

The happy-path test prepares LIST ROLES OF (a statement with empty
result metadata at prepare time), executes it, and verifies:
- The ROWS response carried METADATA_CHANGED (promotion occurred).
- The response includes a non-None result_metadata_id.
- The promoted id differs from the stale empty-metadata id returned
  by PREPARE.

The suppression test injects skip_prepared_result_metadata_promotion to
confirm the happy-path test is not a false positive: the injection skips
the entire promotion block so no real_id is computed, the cache is not
updated, and the response carries no METADATA_CHANGED flag.
2026-04-01 13:24:16 +03:00
Alex
4a06935bb0 transport/server: Promote prepared metadata_id after first rows response
When a prepared statement is cached without a result metadata_id (e.g.
LIST ROLES OF, whose result schema is not known at prepare time), the
server now detects this on the first EXECUTE that returns rows.

Promotion logic lives entirely in transport's .then() lambda:
1. Snapshot result_metadata_was_empty before calling execute_prepared_*.
2. After a rows response, calculate the real metadata_id from the result
   metadata.
3. Update the cache via qp.local().update_prepared_result_metadata_id()
   so subsequent EXECUTEs on the same shard skip promotion entirely.
4. Return METADATA_CHANGED to the client so it can update its stale
   cached metadata_id.

No promotion logic remains in do_execute_prepared; query_processor.hh
exposes update_prepared_result_metadata_id() which delegates to the
prepared_statements_cache.  The prepared (checked_weak_ptr) is no longer
captured in the lambda, avoiding a use-after-free if a concurrent ALTER
TABLE evicts the entry between scheduling and execution.

Adds a Boost unit test (schema_change_test) that verifies the
METADATA_CHANGED flag is set in the rows response when a stale
metadata_id is supplied.  The skip_prepared_result_metadata_promotion
error injection point in the transport lambda allows suppressing the
entire promotion block (no real_id computation, no cache update) for
negative testing.
2026-04-01 12:32:23 +03:00
58 changed files with 868 additions and 1455 deletions

View File

@@ -7,7 +7,6 @@
*/
#include <fmt/ranges.h>
#include <cstdlib>
#include <seastar/core/on_internal_error.hh>
#include "alternator/executor.hh"
#include "alternator/consumed_capacity.hh"
@@ -109,16 +108,6 @@ const sstring TABLE_CREATION_TIME_TAG_KEY("system:table_creation_time");
// configured by UpdateTimeToLive to be the expiration-time attribute for
// this table.
extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// If this tag is present, it stores the name of an attribute whose numeric
// value (in microseconds since the Unix epoch) is used as the write timestamp
// for PutItem and UpdateItem operations. When the named attribute is present
// in a PutItem or UpdateItem request, its value is used as the timestamp of
// the write, and the attribute itself is NOT stored in the item. This allows
// users to control write ordering for last-write-wins semantics. Because LWT
// does not allow setting a custom write timestamp, operations using this
// feature are incompatible with conditions (which require LWT), and with
// the LWT_ALWAYS write isolation mode; such operations are rejected.
static const sstring TIMESTAMP_TAG_KEY("system:timestamp_attribute");
// This will be set to 1 in a case, where user DID NOT specify a range key.
// The way GSI / LSI is implemented by Alternator assumes user specified keys will come first
// in materialized view's key list. Then, if needed missing keys are added (current implementation
@@ -1348,14 +1337,13 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation",
// "system:initial_tablets", and "system:timestamp_attribute", are deliberately
// intended to be set and read by the user, so are not considered "internal".
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY
&& tag_key != TIMESTAMP_TAG_KEY;
&& tag_key != INITIAL_TABLETS_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -2310,11 +2298,8 @@ public:
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
// components of a key, and if that succeeded, call check_key() to further
// check that the key doesn't have any spurious components.
// allow_extra_attribute: set to true when the key may contain one extra
// non-key attribute (e.g., the timestamp pseudo-attribute for DeleteItem).
static void check_key(const rjson::value& key, const schema_ptr& schema, bool allow_extra_attribute = false) {
const unsigned expected = (schema->clustering_key_size() == 0 ? 1 : 2) + (allow_extra_attribute ? 1 : 0);
if (key.MemberCount() != expected) {
static void check_key(const rjson::value& key, const schema_ptr& schema) {
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
throw api_error::validation("Given key attribute not in schema");
}
}
@@ -2361,57 +2346,6 @@ void validate_value(const rjson::value& v, const char* caller) {
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
// Convert a DynamoDB number (big_decimal) to an api::timestamp_type
// (microseconds since the Unix epoch). Fractional microseconds are truncated.
// Returns nullopt if the value is negative or zero.
static std::optional<api::timestamp_type> bigdecimal_to_timestamp(const big_decimal& bd) {
if (bd.unscaled_value() <= 0) {
return std::nullopt;
}
if (bd.scale() == 0) {
// Fast path: integer value, no decimal adjustment needed
return static_cast<api::timestamp_type>(bd.unscaled_value());
}
// General case: adjust for decimal scale.
// big_decimal stores value as unscaled_value * 10^(-scale).
// scale > 0 means divide by 10^scale (truncate fractional part).
// scale < 0 means multiply by 10^|scale| (add trailing zeros).
auto str = bd.unscaled_value().str();
if (bd.scale() > 0) {
int len = str.length();
if (len <= bd.scale()) {
return std::nullopt; // Number < 1
}
str = str.substr(0, len - bd.scale());
} else {
if (bd.scale() < -18) {
// Too large to represent as int64_t
return std::nullopt;
}
for (int i = 0; i < -bd.scale(); i++) {
str.push_back('0');
}
}
long long result = strtoll(str.c_str(), nullptr, 10);
if (result <= 0) {
return std::nullopt;
}
return static_cast<api::timestamp_type>(result);
}
// Try to extract a write timestamp from a DynamoDB-typed value.
// The value should be a number ({"N": "..."}), representing microseconds
// since the Unix epoch. Returns nullopt if the value is not a valid number
// or doesn't represent a valid timestamp.
static std::optional<api::timestamp_type> try_get_timestamp(const rjson::value& attr_value) {
std::optional<big_decimal> n = try_unwrap_number(attr_value);
if (!n) {
return std::nullopt;
}
return bigdecimal_to_timestamp(*n);
}
class put_or_delete_item {
private:
partition_key _pk;
@@ -2427,17 +2361,11 @@ private:
// that length can have different meaning depends on the operation but the
// the calculation of length in bytes to WCU is the same.
uint64_t _length_in_bytes = 0;
// If the table has a system:timestamp_attribute tag, and the named
// attribute was found in the item with a valid numeric value, this holds
// the extracted timestamp. The attribute is not added to _cells.
std::optional<api::timestamp_type> _custom_timestamp;
public:
struct delete_item {};
struct put_item {};
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
// put_or_delete_item doesn't keep a reference to schema (so it can be
// moved between shards for LWT) so it needs to be given again to build():
mutation build(schema_ptr schema, api::timestamp_type ts) const;
@@ -2452,32 +2380,11 @@ public:
bool is_put_item() noexcept {
return _cells.has_value();
}
// Returns the custom write timestamp extracted from the timestamp attribute,
// if any. If not set, the caller should use api::new_timestamp() instead.
std::optional<api::timestamp_type> custom_timestamp() const noexcept {
return _custom_timestamp;
}
};
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item, const std::optional<bytes>& timestamp_attribute)
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
if (timestamp_attribute) {
// The timestamp attribute may be provided as a "pseudo-key": it is
// not a real key column, but can be included in the "Key" object to
// carry the custom write timestamp. If found, extract the timestamp
// and don't store it in the item.
const rjson::value* ts_val = rjson::find(key, to_string_view(*timestamp_attribute));
if (ts_val) {
if (auto t = try_get_timestamp(*ts_val)) {
_custom_timestamp = t;
} else {
throw api_error::validation(fmt::format(
"The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)",
to_string_view(*timestamp_attribute)));
}
}
}
check_key(key, schema, _custom_timestamp.has_value());
check_key(key, schema);
}
// find_attribute() checks whether the named attribute is stored in the
@@ -2564,8 +2471,7 @@ static inline void validate_value_if_index_key(
}
}
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute)
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
@@ -2574,17 +2480,6 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
// If this is the timestamp attribute, it must be a valid numeric value
// (microseconds since epoch). Use it as the write timestamp and do not
// store it in the item data. Reject the write if the value is non-numeric.
if (timestamp_attribute && column_name == *timestamp_attribute) {
if (auto t = try_get_timestamp(it->value)) {
_custom_timestamp = t;
// The attribute is consumed as timestamp, not stored in _cells.
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*timestamp_attribute)));
}
_length_in_bytes += column_name.size();
if (!cdef) {
// This attribute may be a key column of one of the GSI or LSI,
@@ -2776,13 +2671,6 @@ rmw_operation::rmw_operation(service::storage_proxy& proxy, rjson::value&& reque
// _pk and _ck will be assigned later, by the subclass's constructor
// (each operation puts the key in a slightly different location in
// the request).
const auto tags_ptr = db::get_tags_of_table(_schema);
if (tags_ptr) {
auto it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (it != tags_ptr->end() && !it->second.empty()) {
_timestamp_attribute = to_bytes(it->second);
}
}
}
std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) {
@@ -2927,21 +2815,6 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
// If the operation uses a custom write timestamp (from the
// system:timestamp_attribute tag), LWT is incompatible because LWT
// requires the timestamp to be set by the Paxos protocol. Reject the
// operation if it would need to use LWT.
if (has_custom_timestamp()) {
bool would_use_lwt = _write_isolation == write_isolation::LWT_ALWAYS ||
(needs_read_before_write &&
_write_isolation != write_isolation::FORBID_RMW &&
_write_isolation != write_isolation::UNSAFE_RMW);
if (would_use_lwt) {
throw api_error::validation(
"Using the system:timestamp_attribute is not compatible with "
"conditional writes or the 'always' write isolation policy.");
}
}
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -3040,8 +2913,7 @@ public:
put_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name())),
_timestamp_attribute) {
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -3073,9 +2945,6 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -3093,10 +2962,7 @@ public:
} else {
_return_attributes = {};
}
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
return _mutation_builder.build(_schema, ts);
}
virtual ~put_item_operation() = default;
};
@@ -3148,7 +3014,7 @@ public:
parsed::condition_expression _condition_expression;
delete_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}, _timestamp_attribute) {
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -3179,9 +3045,6 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept override {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -3202,10 +3065,7 @@ public:
if (_consumed_capacity._total_bytes == 0) {
_consumed_capacity._total_bytes = 1;
}
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
return _mutation_builder.build(_schema, ts);
}
virtual ~delete_item_operation() = default;
};
@@ -3392,13 +3252,10 @@ future<> executor::do_batch_write(
// Do a normal write, without LWT:
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type default_ts = api::new_timestamp();
api::timestamp_type now = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
// Use custom timestamp from the timestamp attribute if available,
// otherwise use the default timestamp for all items in this batch.
api::timestamp_type ts = b.second.custom_timestamp().value_or(default_ts);
mutations.push_back(b.second.build(b.first, ts));
mutations.push_back(b.second.build(b.first, now));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
@@ -3498,16 +3355,6 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
// Look up the timestamp attribute tag once per table (shared by all
// PutRequests and DeleteRequests for this table).
std::optional<bytes> ts_attr;
const auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
auto tag_it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (tag_it != tags_ptr->end() && !tag_it->second.empty()) {
ts_attr = to_bytes(tag_it->second);
}
}
for (auto& request : it->value.GetArray()) {
auto& r = get_single_member(request, "RequestItems element");
const auto r_name = rjson::to_string_view(r.name);
@@ -3516,8 +3363,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
validate_is_object(item, "Item in PutRequest");
auto&& put_item = put_or_delete_item(
item, schema, put_or_delete_item::put_item{},
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())),
ts_attr);
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())));
mutation_builders.emplace_back(schema, std::move(put_item));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3528,7 +3374,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
const rjson::value& key = get_member(r.value, "Key", "DeleteRequest");
validate_is_object(key, "Key in DeleteRequest");
mutation_builders.emplace_back(schema, put_or_delete_item(
key, schema, put_or_delete_item::delete_item{}, ts_attr));
key, schema, put_or_delete_item::delete_item{}));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -4137,10 +3983,6 @@ public:
virtual ~update_item_operation() = default;
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override;
bool needs_read_before_write() const;
// Returns true if the timestamp attribute is being set in this update
// (via AttributeUpdates PUT or UpdateExpression SET). Used to detect
// whether a custom write timestamp will be used.
bool has_custom_timestamp() const noexcept;
private:
void delete_attribute(bytes&& column_name, const std::unique_ptr<rjson::value>& previous_item, const api::timestamp_type ts, deletable_row& row,
@@ -4275,44 +4117,6 @@ update_item_operation::needs_read_before_write() const {
(_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::UPDATED_NEW);
}
bool
update_item_operation::has_custom_timestamp() const noexcept {
if (!_timestamp_attribute) {
return false;
}
// Check if the timestamp attribute is being set via AttributeUpdates PUT
// with a valid numeric value.
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
// Only consider it a custom timestamp if the value is numeric
if (try_get_timestamp((it->value)["Value"])) {
return true;
}
}
break;
}
}
}
// Check if the timestamp attribute is being set via UpdateExpression SET.
// We can't check the actual value type without resolving the expression
// (which requires previous_item), so we conservatively return true if the
// attribute appears in a SET action, and handle the non-numeric case in apply().
// A non-numeric value will cause apply() to throw a ValidationException.
if (!_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
return std::holds_alternative<parsed::update_expression::action::set>(action._action);
}
}
return false;
}
// action_result() returns the result of applying an UpdateItem action -
// this result is either a JSON object or an unset optional which indicates
// the action was a deletion. The caller (update_item_operation::apply()
@@ -4588,17 +4392,6 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = rjson::to_string((it->value)["Action"]);
// If this is the timestamp attribute being PUT, it must be a valid
// numeric value (microseconds since epoch). Use it as the write
// timestamp and skip storing it. Reject if the value is non-numeric.
if (_timestamp_attribute && column_name == *_timestamp_attribute && action == "PUT") {
if (it->value.HasMember("Value")) {
if (try_get_timestamp((it->value)["Value"])) {
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -4702,20 +4495,6 @@ inline void update_item_operation::apply_update_expression(const std::unique_ptr
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(fmt::format("UpdateItem cannot update key column {}", column_name));
}
// If this is the timestamp attribute being set via UpdateExpression SET,
// it must be a valid numeric value (microseconds since epoch). Use it as
// the write timestamp and skip storing it. Reject if non-numeric.
if (_timestamp_attribute && to_bytes(column_name) == *_timestamp_attribute &&
actions.second.has_value() &&
std::holds_alternative<parsed::update_expression::action::set>(actions.second.get_value()._action)) {
std::optional<rjson::value> result = action_result(actions.second.get_value(), previous_item.get());
if (result) {
if (try_get_timestamp(*result)) {
continue; // Skip - already used as timestamp
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (actions.second.has_value()) {
// An action on a top-level attribute column_name. The single
// action is actions.second.get_value(). We can simply invoke
@@ -4764,44 +4543,6 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
return {};
}
// If the table has a timestamp attribute, look for it in the update
// (AttributeUpdates PUT or UpdateExpression SET). If found with a valid
// numeric value, use it as the write timestamp instead of the provided ts.
api::timestamp_type effective_ts = ts;
if (_timestamp_attribute) {
bool found_ts = false;
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
if (auto t = try_get_timestamp((it->value)["Value"])) {
effective_ts = *t;
found_ts = true;
}
}
break;
}
}
}
if (!found_ts && !_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
if (std::holds_alternative<parsed::update_expression::action::set>(action._action)) {
std::optional<rjson::value> result = action_result(action, previous_item.get());
if (result) {
if (auto t = try_get_timestamp(*result)) {
effective_ts = *t;
}
}
}
}
}
}
// In the ReturnValues=ALL_NEW case, we make a copy of previous_item into
// _return_attributes and parts of it will be overwritten by the new
// updates (in do_update() and do_delete()). We need to make a copy and
@@ -4830,10 +4571,10 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
auto& row = m.partition().clustered_row(*_schema, _ck);
auto modified_attrs = attribute_collector();
if (!_update_expression.empty()) {
apply_update_expression(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
apply_update_expression(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
}
if (_attribute_updates) {
apply_attribute_updates(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
apply_attribute_updates(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
}
if (!modified_attrs.empty()) {
auto serialized_map = modified_attrs.to_mut().serialize(*attrs_type());
@@ -4844,7 +4585,7 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
// marker. An update with only DELETE operations must not add a row marker
// (this was issue #5862) but any other update, even an empty one, should.
if (any_updates || !any_deletes) {
row.apply(row_marker(effective_ts));
row.apply(row_marker(ts));
} else if (_returnvalues == returnvalues::ALL_NEW && !previous_item) {
// There was no pre-existing item, and we're not creating one, so
// don't report the new item in the returned Attributes.

View File

@@ -18,7 +18,6 @@
#include "executor.hh"
#include "tracing/trace_state.hh"
#include "keys/keys.hh"
#include "bytes.hh"
namespace alternator {
@@ -73,11 +72,6 @@ protected:
clustering_key _ck = clustering_key::make_empty();
write_isolation _write_isolation;
mutable wcu_consumed_capacity_counter _consumed_capacity;
// If the table has a "system:timestamp_attribute" tag, this holds the
// name of the attribute (converted to bytes) whose numeric value should
// be used as the write timestamp instead of the current time. The
// attribute itself is NOT stored in the item data.
std::optional<bytes> _timestamp_attribute;
// All RMW operations can have a ReturnValues parameter from the following
// choices. But note that only UpdateItem actually supports all of them:
enum class returnvalues {
@@ -119,9 +113,6 @@ public:
// Convert the above apply() into the signature needed by cas_request:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
virtual ~rmw_operation() = default;
// Returns true if the operation will use a custom write timestamp (from the
// system:timestamp_attribute tag). Subclasses override this as needed.
virtual bool has_custom_timestamp() const noexcept { return false; }
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
schema_ptr schema() const { return _schema; }
const rjson::value& request() const { return _request; }

View File

@@ -767,7 +767,7 @@ static future<bool> scan_table(
// by tasking another node to take over scanning of the dead node's primary
// ranges. What we do here is that this node will also check expiration
// on its *secondary* ranges - but only those whose primary owner is down.
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet, erm->get_topology()); // throws if no secondary replica
auto tablet_secondary_replica = tablet_map.get_secondary_replica(*tablet); // throws if no secondary replica
if (tablet_secondary_replica.host == my_host_id && tablet_secondary_replica.shard == this_shard_id()) {
if (!gossiper.is_alive(tablet_primary_replica.host)) {
co_await scan_tablet(*tablet, proxy, abort_source, page_sem, expiration_stats, scan_ctx, tablet_map);

View File

@@ -110,23 +110,15 @@ future<> cache::prune(const resource& r) {
future<> cache::reload_all_permissions() noexcept {
SCYLLA_ASSERT(_permission_loader);
auto units = co_await get_units(_loading_sem, 1, _as);
auto copy_keys = [] (const std::unordered_map<resource, permission_set>& m) {
std::vector<resource> keys;
keys.reserve(m.size());
for (const auto& [res, _] : m) {
keys.push_back(res);
}
return keys;
};
const role_or_anonymous anon;
for (const auto& res : copy_keys(_anonymous_permissions)) {
_anonymous_permissions[res] = co_await _permission_loader(anon, res);
for (auto& [res, perms] : _anonymous_permissions) {
perms = co_await _permission_loader(anon, res);
}
for (auto& [role, entry] : _roles) {
auto& perms_cache = entry->cached_permissions;
auto r = role_or_anonymous(role);
for (const auto& res : copy_keys(perms_cache)) {
perms_cache[res] = co_await _permission_loader(r, res);
for (auto& [res, perms] : perms_cache) {
perms = co_await _permission_loader(r, res);
}
}
logger.debug("Reloaded auth cache with {} entries", _roles.size());
@@ -236,7 +228,6 @@ future<> cache::load_all() {
co_await distribute_role(name, role);
}
co_await container().invoke_on_others([this](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
c._current_version = _current_version;
co_await c.prune_all();
});
@@ -296,11 +287,10 @@ future<> cache::load_roles(std::unordered_set<role_name_t> roles) {
future<> cache::distribute_role(const role_name_t& name, lw_shared_ptr<role_record> role) {
auto role_ptr = role.get();
co_await container().invoke_on_others([&name, role_ptr](cache& c) -> future<> {
auto units = co_await get_units(c._loading_sem, 1, c._as);
co_await container().invoke_on_others([&name, role_ptr](cache& c) {
if (!role_ptr) {
c.remove_role(name);
co_return;
return;
}
auto role_copy = make_lw_shared<role_record>(*role_ptr);
c.add_role(name, std::move(role_copy));

View File

@@ -1192,7 +1192,6 @@ scylla_core = (['message/messaging_service.cc',
'utils/azure/identity/default_credentials.cc',
'utils/gcp/gcp_credentials.cc',
'utils/gcp/object_storage.cc',
'utils/gcp/object_storage_retry_strategy.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',

View File

@@ -69,7 +69,7 @@ float compute_cosine_similarity(std::span<const float> v1, std::span<const float
}
if (squared_norm_a == 0 || squared_norm_b == 0) {
return std::numeric_limits<float>::quiet_NaN();
throw exceptions::invalid_request_exception("Function system.similarity_cosine doesn't support all-zero vectors");
}
// The cosine similarity is in the range [-1, 1].

View File

@@ -137,6 +137,15 @@ public:
return value_type();
}
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
cache_value_ptr vp = _cache.find(key.key());
if (!vp) {
return false;
}
(*vp)->update_result_metadata_id(std::move(metadata_id));
return true;
}
template <typename Pred>
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
void remove_if(Pred&& pred) {

View File

@@ -481,6 +481,12 @@ public:
void update_authorized_prepared_cache_config();
/// Update the result metadata_id of a cached prepared statement.
/// Returns true if the entry was found and updated, false if it was evicted.
bool update_prepared_result_metadata_id(const cql3::prepared_cache_key_type& cache_key, cql3::cql_metadata_id_type metadata_id) {
return _prepared_cache.update_result_metadata_id(cache_key, std::move(metadata_id));
}
void reset_cache();
bool topology_global_queue_empty();

View File

@@ -52,6 +52,7 @@ public:
std::vector<sstring> warnings;
private:
cql_metadata_id_type _metadata_id;
bool _result_metadata_is_empty;
public:
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
@@ -71,6 +72,15 @@ public:
void calculate_metadata_id();
cql_metadata_id_type get_metadata_id() const;
bool result_metadata_is_empty() const {
return _result_metadata_is_empty;
}
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
_metadata_id = std::move(metadata_id);
_result_metadata_is_empty = false;
}
};
}

View File

@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
, partition_key_bind_indices(std::move(partition_key_bind_indices))
, warnings(std::move(warnings))
, _metadata_id(bytes{})
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
{
statement->set_audit_info(std::move(audit_info));
}

View File

@@ -2308,7 +2308,6 @@ future<> view_builder::drain() {
vlogger.info("Draining view builder");
_as.request_abort();
co_await _mnotifier.unregister_listener(this);
co_await _ops_gate.close();
co_await _vug.drain();
co_await _sem.wait();
_sem.broken();
@@ -2743,48 +2742,30 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_create_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
static_cast<void>(dispatch_create_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view creation {}.{}: {}", ks_name, view_name, ep);
}));
}
future<> view_builder::dispatch_update_view(sstring ks_name, sstring view_name) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
co_return;
}
[[maybe_unused]] auto sem_units = co_await get_or_adopt_view_builder_lock(std::nullopt);
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
if (step_it == _base_to_build_step.end()) {
co_return; // In case all the views for this CF have finished building already.
}
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
return bs.view->id() == view->id();
});
if (status_it != step_it->second.build_status.end()) {
status_it->view = std::move(view);
}
}
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
if (should_ignore_tablet_keyspace(_db, ks_name)) {
return;
}
// Do it in the background, serialized.
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_update_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const seastar::gate_closed_exception&) {
vlogger.warn("Ignoring gate_closed_exception during view update {}.{}", ks_name, view_name);
} catch (const seastar::broken_named_semaphore&) {
vlogger.warn("Ignoring broken_named_semaphore during view update {}.{}", ks_name, view_name);
} catch (const replica::no_such_column_family&) {
vlogger.warn("Ignoring no_such_column_family during view update {}.{}", ks_name, view_name);
(void)with_semaphore(_sem, view_builder_semaphore_units, [ks_name, view_name, this] {
auto view = view_ptr(_db.find_schema(ks_name, view_name));
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
if (step_it == _base_to_build_step.end()) {
return;// In case all the views for this CF have finished building already.
}
}));
auto status_it = std::ranges::find_if(step_it->second.build_status, [view] (const view_build_status& bs) {
return bs.view->id() == view->id();
});
if (status_it != step_it->second.build_status.end()) {
status_it->view = std::move(view);
}
}).handle_exception_type([] (replica::no_such_column_family&) { });
}
future<> view_builder::dispatch_drop_view(sstring ks_name, sstring view_name) {
@@ -2846,9 +2827,7 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
}
// Do it in the background, serialized and broadcast from shard 0.
static_cast<void>(with_gate(_ops_gate, [this, ks_name = ks_name, view_name = view_name] () mutable {
return dispatch_drop_view(std::move(ks_name), std::move(view_name));
}).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
static_cast<void>(dispatch_drop_view(ks_name, view_name).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
vlogger.warn("Failed to dispatch view drop {}.{}: {}", ks_name, view_name, ep);
}));
}

View File

@@ -16,7 +16,6 @@
#include <seastar/core/abort_source.hh>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sharded.hh>
@@ -191,7 +190,6 @@ class view_builder final : public service::migration_listener::only_view_notific
// Guard the whole startup routine with a semaphore so that it's not intercepted by
// `on_drop_view`, `on_create_view`, or `on_update_view` events.
seastar::named_semaphore _sem{view_builder_semaphore_units, named_semaphore_exception_factory{"view builder"}};
seastar::gate _ops_gate;
seastar::abort_source _as;
future<> _step_fiber = make_ready_future<>();
// Used to coordinate between shards the conclusion of the build process for a particular view.
@@ -286,7 +284,6 @@ private:
future<> mark_as_built(view_ptr);
void setup_metrics();
future<> dispatch_create_view(sstring ks_name, sstring view_name);
future<> dispatch_update_view(sstring ks_name, sstring view_name);
future<> dispatch_drop_view(sstring ks_name, sstring view_name);
future<> handle_seed_view_build_progress(const sstring& ks_name, const sstring& view_name);
future<> handle_create_view_local(const sstring& ks_name, const sstring& view_name, view_builder_units_opt units);

View File

@@ -142,6 +142,10 @@ want modify a non-top-level attribute directly (e.g., a.b[3].c) need RMW:
Alternator implements such requests by reading the entire top-level
attribute a, modifying only a.b[3].c, and then writing back a.
Currently, Alternator doesn't use Tablets. That's because Alternator relies
on LWT (lightweight transactions), and LWT is not supported in keyspaces
with Tablets enabled.
```{eval-rst}
.. toctree::
:maxdepth: 2

View File

@@ -213,71 +213,3 @@ Alternator table, the following features will not work for this table:
* Enabling Streams with CreateTable or UpdateTable doesn't work
(results in an error).
See <https://github.com/scylladb/scylla/issues/23838>.
## Custom write timestamps
DynamoDB doesn't allow clients to set the write timestamp of updates. All
updates use the current server time as their timestamp, and ScyllaDB uses
these timestamps for last-write-wins conflict resolution when concurrent
writes reach different replicas.
ScyllaDB Alternator extends this with the `system:timestamp_attribute` tag,
which allows specifying a custom write timestamp for each PutItem,
UpdateItem, DeleteItem, or BatchWriteItem request. To use this feature:
1. Tag the table (at CreateTable time or using TagResource) with
`system:timestamp_attribute` set to the name of an attribute that will
hold the custom write timestamp.
2. When performing a PutItem or UpdateItem, include the named attribute
in the request with a numeric value. The value represents the write
timestamp in **microseconds since the Unix epoch** (this is the same
unit used internally by ScyllaDB for timestamps).
For a DeleteItem or a BatchWriteItem DeleteRequest, include the named
attribute in the `Key` parameter (it will be stripped from the key
before use).
3. The named attribute is **not stored** in the item data - it only
controls the write timestamp. If you also want to record the timestamp
as data, use a separate attribute for that purpose.
4. If the named attribute is absent, the write proceeds normally using the
current server time as the timestamp. If the named attribute is present
but has a non-numeric value, the write is rejected with a ValidationException.
### Limitations
- **Incompatible with conditions**: If the write includes a ConditionExpression
(or uses the `Expected` legacy condition), LWT is needed and the operation
is rejected with a ValidationException, because LWT requires the write
timestamp to be set by the Paxos protocol, not by the client.
- **Incompatible with `always` write isolation**: Tables using the `always`
(or `always_use_lwt`) write isolation policy cannot use the timestamp
attribute feature at all, because every write uses LWT in that mode.
When using `system:timestamp_attribute`, consider tagging the table with
`system:write_isolation=only_rmw_uses_lwt` (or `forbid_rmw`) so that
unconditional writes do not use LWT.
### Example use case
This feature is useful for ingesting data from multiple sources where each
record has a known logical timestamp. By setting the `system:timestamp_attribute`
tag, you can ensure that the record with the highest logical timestamp always
wins, regardless of ingestion order:
```python
# Create table with timestamp attribute
dynamodb.create_table(
TableName='my_table',
...
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'write_ts'}]
)
# Write a record with a specific timestamp (in microseconds since epoch)
table.put_item(Item={
'pk': 'my_key',
'data': 'new_value',
'write_ts': Decimal('1700000000000000'), # Nov 14, 2023 in microseconds
})
```

View File

@@ -187,23 +187,6 @@ You can create a keyspace with tablets enabled with the ``tablets = {'enabled':
the keyspace schema with ``tablets = { 'enabled': false }`` or
``tablets = { 'enabled': true }``.
.. _keyspace-rf-rack-valid-to-enforce-rack-list:
Enforcing Rack-List Replication for Tablet Keyspaces
------------------------------------------------------------------
The ``rf_rack_valid_keyspaces`` is a legacy option that ensures that all keyspaces with tablets enabled are
:term:`RF-rack-valid <RF-rack-valid keyspace>`.
Requiring every tablet keyspace to use the rack list replication factor exclusively is enough to guarantee the keyspace is
:term:`RF-rack-valid <RF-rack-valid keyspace>`. It reduces restrictions and provides stronger guarantees compared
to ``rf_rack_valid_keyspaces`` option.
To enforce rack list in tablet keyspaces, use ``enforce_rack_list`` option. It can be set only if all tablet keyspaces use
rack list. To ensure that, follow a procedure of :ref:`conversion to rack list replication factor <conversion-to-rack-list-rf>`.
After that restart all nodes in the cluster, with ``enforce_rack_list`` enabled and ``rf_rack_valid_keyspaces`` disabled. Make
sure to avoid setting or updating replication factor (with CREATE KEYSPACE or ALTER KEYSPACE) while nodes are being restarted.
.. _tablets-limitations:
Limitations and Unsupported Features

View File

@@ -200,6 +200,8 @@ for two cases. One is setting replication factor to 0, in which case the number
The other is when the numeric replication factor is equal to the current number of replicas
for a given datacanter, in which case the current rack list is preserved.
Altering from a numeric replication factor to a rack list is not supported yet.
Note that when ``ALTER`` ing keyspaces and supplying ``replication_factor``,
auto-expansion will only *add* new datacenters for safety, it will not alter
existing datacenters or remove any even if they are no longer in the cluster.
@@ -422,21 +424,6 @@ Altering from a rack list to a numeric replication factor is not supported.
Keyspaces which use rack lists are :term:`RF-rack-valid <RF-rack-valid keyspace>` if each rack in the rack list contains at least one node (excluding :doc:`zero-token nodes </architecture/zero-token-nodes>`).
.. _conversion-to-rack-list-rf:
Conversion to rack-list replication factor
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To migrate a keyspace from a numeric replication factor to a rack-list replication factor, provide the rack-list replication factor explicitly in ALTER KEYSPACE statement. The number of racks in the list must be equal to the numeric replication factor. The replication factor can be converted in any number of DCs at once. In a statement that converts replication factor, no replication factor updates (increase or decrease) are allowed in any DC.
.. code-block:: cql
CREATE KEYSPACE Excelsior
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 1} AND tablets = { 'enabled': true };
ALTER KEYSPACE Excelsior
WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : ['RAC1', 'RAC2', 'RAC3'], 'dc2' : ['RAC4']} AND tablets = { 'enabled': true };
.. _drop-keyspace-statement:
DROP KEYSPACE

View File

@@ -10,6 +10,7 @@ Install ScyllaDB |CURRENT_VERSION|
/getting-started/install-scylla/launch-on-azure
/getting-started/installation-common/scylla-web-installer
/getting-started/install-scylla/install-on-linux
/getting-started/installation-common/install-jmx
/getting-started/install-scylla/run-in-docker
/getting-started/installation-common/unified-installer
/getting-started/installation-common/air-gapped-install
@@ -23,9 +24,9 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:id: "getting-started"
:class: my-panel
* :doc:`Launch ScyllaDB on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB on Azure </getting-started/install-scylla/launch-on-azure>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on AWS </getting-started/install-scylla/launch-on-aws>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on GCP </getting-started/install-scylla/launch-on-gcp>`
* :doc:`Launch ScyllaDB |CURRENT_VERSION| on Azure </getting-started/install-scylla/launch-on-azure>`
.. panel-box::
@@ -34,7 +35,8 @@ Keep your versions up-to-date. The two latest versions are supported. Also, alwa
:class: my-panel
* :doc:`Install ScyllaDB with Web Installer (recommended) </getting-started/installation-common/scylla-web-installer>`
* :doc:`Install ScyllaDB Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install ScyllaDB |CURRENT_VERSION| Linux Packages </getting-started/install-scylla/install-on-linux>`
* :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
* :doc:`Install ScyllaDB Without root Privileges </getting-started/installation-common/unified-installer>`
* :doc:`Air-gapped Server Installation </getting-started/installation-common/air-gapped-install>`
* :doc:`ScyllaDB Developer Mode </getting-started/installation-common/dev-mod>`

View File

@@ -94,6 +94,16 @@ Install ScyllaDB
apt-get install scylla{,-server,-kernel-conf,-node-exporter,-conf,-python3,-cqlsh}=2025.3.1-0.20250907.2bbf3cf669bb-1
#. (Ubuntu only) Set Java 11.
.. code-block:: console
sudo apt-get update
sudo apt-get install -y openjdk-11-jre-headless
sudo update-java-alternatives --jre-headless -s java-1.11.0-openjdk-amd64
.. group-tab:: Centos/RHEL
#. Install the EPEL repository.
@@ -147,6 +157,14 @@ Install ScyllaDB
sudo yum install scylla-5.2.3
(Optional) Install scylla-jmx
-------------------------------
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
.. include:: /getting-started/_common/setup-after-install.rst
Next Steps

View File

@@ -0,0 +1,78 @@
======================================
Install scylla-jmx Package
======================================
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, you can still install it from scylla-jmx GitHub page.
.. tabs::
.. group-tab:: Debian/Ubuntu
#. Download .deb package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".deb".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code-block:: console
sudo apt install -y ./scylla-jmx_<version>_all.deb
.. group-tab:: Centos/RHEL
#. Download .rpm package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".rpm".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code-block:: console
sudo yum install -y ./scylla-jmx-<version>.noarch.rpm
.. group-tab:: Install without root privileges
#. Download .tar.gz package from scylla-jmx page.
Access to https://github.com/scylladb/scylla-jmx, select latest
release from "releases", download a file end with ".tar.gz".
#. (Optional) Transfer the downloaded package to the install node.
If the pc from which you downloaded the package is different from
the node where you install scylladb, you will need to transfer
the files to the node.
#. Install scylla-jmx package.
.. code:: console
tar xpf scylla-jmx-<version>.noarch.tar.gz
cd scylla-jmx
./install.sh --nonroot
Next Steps
-----------
* :doc:`Configure ScyllaDB </getting-started/system-configuration>`
* Manage your clusters with `ScyllaDB Manager <https://manager.docs.scylladb.com/>`_
* Monitor your cluster and data with `ScyllaDB Monitoring <https://monitoring.docs.scylladb.com/>`_
* Get familiar with ScyllaDBs :doc:`command line reference guide </operating-scylla/nodetool>`.
* Learn about ScyllaDB at `ScyllaDB University <https://university.scylladb.com/>`_

View File

@@ -49,6 +49,11 @@ Download and Install
./install.sh --nonroot --python3 ~/scylladb/python3/bin/python3
#. (Optional) Install scylla-jmx
scylla-jmx is an optional package and is not installed by default.
If you need JMX server, see :doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`.
Configure and Run ScyllaDB
----------------------------

View File

@@ -25,8 +25,4 @@ For Example:
nodetool rebuild <source-dc-name>
``nodetool rebuild`` command works only for vnode keyspaces. For tablet keyspaces, use ``nodetool cluster repair`` instead.
See :doc:`Data Distribution with Tablets </architecture/tablets/>`.
.. include:: nodetool-index.rst

View File

@@ -155,6 +155,7 @@ Add New DC
UN 54.235.9.159 109.75 KB 256 ? 39798227-9f6f-4868-8193-08570856c09a RACK1
UN 54.146.228.25 128.33 KB 256 ? 7a4957a1-9590-4434-9746-9c8a6f796a0c RACK1
.. TODO possibly provide additional information WRT how ALTER works with tablets
#. When all nodes are up and running ``ALTER`` the following Keyspaces in the new nodes:
@@ -170,68 +171,26 @@ Add New DC
DESCRIBE KEYSPACE mykeyspace;
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3};
CREATE KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3};
ALTER Command
.. code-block:: cql
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
ALTER KEYSPACE mykeyspace WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
ALTER KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
ALTER KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
After
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace;
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3};
CREATE KEYSPACE mykeyspace WITH REPLICATION = {'class: 'NetworkTopologyStrategy', <exiting_dc>:3, <new_dc>: 3};
CREATE KEYSPACE system_distributed WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
CREATE KEYSPACE system_traces WITH replication = { 'class' : 'NetworkTopologyStrategy', '<exiting_dc>' : 3, <new_dc> : 3};
For tablet keyspaces, update the replication factor one by one:
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace2;
CREATE KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3} AND tablets = { 'enabled': true };
.. code-block:: cql
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 1} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 2} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace2 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : 3, '<new_dc>' : 3} AND tablets = { 'enabled': true };
.. note::
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that a new DC (rack) can be added. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to add a datacenter:
Before
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace3;
CREATE KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>']} AND tablets = { 'enabled': true };
Add all the nodes to the new datacenter and then alter the keyspace one by one:
.. code-block:: cql
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>']} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>']} AND tablets = { 'enabled': true };
ALTER KEYSPACE mykeyspace3 WITH replication = { 'class' : 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
After
.. code-block:: cql
DESCRIBE KEYSPACE mykeyspace3;
CREATE KEYSPACE mykeyspace3 WITH REPLICATION = {'class': 'NetworkTopologyStrategy', '<existing_dc>' : ['<existing_rack1>', '<existing_rack2>', '<existing_rack3>'], '<new_dc>' : ['<new_rack1>', '<new_rack2>', '<new_rack3>']} AND tablets = { 'enabled': true };
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
#. If any vnode keyspace was altered, run ``nodetool rebuild`` on each node in the new datacenter, specifying the existing datacenter name in the rebuild command.
#. Run ``nodetool rebuild`` on each node in the new datacenter, specify the existing datacenter name in the rebuild command.
For example:
@@ -239,7 +198,7 @@ Add New DC
The rebuild ensures that the new nodes that were just added to the cluster will recognize the existing datacenters in the cluster.
#. If any vnode keyspace was altered, run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
#. Run a full cluster repair, using :doc:`nodetool repair -pr </operating-scylla/nodetool-commands/repair>` on each node, or using `ScyllaDB Manager ad-hoc repair <https://manager.docs.scylladb.com/stable/repair>`_
#. If you are using ScyllaDB Monitoring, update the `monitoring stack <https://monitoring.docs.scylladb.com/stable/install/monitoring_stack.html#configure-scylla-nodes-from-files>`_ to monitor it. If you are using ScyllaDB Manager, make sure you install the `Manager Agent <https://manager.docs.scylladb.com/stable/install-scylla-manager-agent.html>`_ and Manager can access the new DC.

View File

@@ -40,14 +40,12 @@ Prerequisites
Procedure
---------
#. If there are vnode keyspaces in this DC, run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
#. Run the ``nodetool repair -pr`` command on each node in the data-center that is going to be decommissioned. This will verify that all the data is in sync between the decommissioned data-center and the other data-centers in the cluster.
For example:
If the ASIA-DC cluster is to be removed, then, run the ``nodetool repair -pr`` command on all the nodes in the ASIA-DC
#. If there are tablet keyspaces in this DC, run the ``nodetool cluster repair`` on an arbitrary node. The reason for running repair is to ensure that any updates stored only on the about-to-be-decommissioned replicas are propagated to the other replicas, before the replicas on the decommissioned datacenter are dropped.
#. ALTER every cluster KEYSPACE, so that the keyspaces will no longer replicate data to the decommissioned data-center.
For example:
@@ -75,33 +73,6 @@ Procedure
cqlsh> ALTER KEYSPACE nba WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3};
For tablet keyspaces, update the replication factor one by one:
.. code-block:: shell
cqlsh> DESCRIBE nba2
cqlsh> CREATE KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 2, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
.. code-block:: shell
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 1, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
cqlsh> ALTER KEYSPACE nba2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : 3, 'ASIA-DC' : 0, 'EUROPE-DC' : 3} AND tablets = { 'enabled': true };
.. note::
If ``rf_rack_valid_keyspaces`` option is set, a tablet keyspace needs to use rack list replication factor, so that the DC can be removed. See :ref:`the conversion procedure <conversion-to-rack-list-rf>`. In this case, to remove a datacenter:
.. code-block:: shell
cqlsh> DESCRIBE nba3
cqlsh> CREATE KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4', 'RAC5'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
.. code-block:: shell
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : ['RAC4'], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
cqlsh> ALTER KEYSPACE nba3 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'US-DC' : ['RAC1', 'RAC2', 'RAC3'], 'ASIA-DC' : [], 'EUROPE-DC' : ['RAC6', 'RAC7', 'RAC8']} AND tablets = { 'enabled': true };
Consider :ref:`upgrading rf_rack_valid_keyspaces option to enforce_rack_list option <keyspace-rf-rack-valid-to-enforce-rack-list>` to ensure all tablet keyspaces use rack lists.
.. note::
If table audit is enabled, the ``audit`` keyspace is automatically created with ``NetworkTopologyStrategy``.

View File

@@ -199,6 +199,9 @@ You should take note of the current version in case you want to |ROLLBACK|_ the
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
If you need JMX server, see
:doc:`Install scylla-jmx Package </getting-started/installation-common/install-jmx>`
and get new version.
Start the node
--------------

View File

@@ -283,6 +283,17 @@ concept CanHandleLuaTypes = requires(Func f) {
{ f(*static_cast<const lua_table*>(nullptr)) } -> std::same_as<lua_visit_ret_type<Func>>;
};
// This is used to test if a double fits in a long long, so
// we expect overflows. Prevent the sanitizer from complaining.
#ifdef __clang__
[[clang::no_sanitize("undefined")]]
#endif
static
long long
cast_to_long_long_allow_overflow(double v) {
return (long long)v;
}
template <typename Func>
requires CanHandleLuaTypes<Func>
static auto visit_lua_value(lua_State* l, int index, Func&& f) {
@@ -293,10 +304,9 @@ static auto visit_lua_value(lua_State* l, int index, Func&& f) {
auto operator()(const long long& v) { return f(utils::multiprecision_int(v)); }
auto operator()(const utils::multiprecision_int& v) { return f(v); }
auto operator()(const double& v) {
auto min = double(std::numeric_limits<long long>::min());
auto max = double(std::numeric_limits<long long>::max());
if (min <= v && v <= max && std::trunc(v) == v) {
return (*this)((long long)v);
long long v2 = cast_to_long_long_allow_overflow(v);
if (v2 == v) {
return (*this)(v2);
}
// FIXME: We could use frexp to produce a decimal instead of a double
return f(v);

View File

@@ -616,16 +616,12 @@ tablet_replica tablet_map::get_primary_replica(tablet_id id, const locator::topo
return maybe_get_primary_replica(id, replicas, topo, [&] (const auto& _) { return true; }).value();
}
tablet_replica tablet_map::get_secondary_replica(tablet_id id, const locator::topology& topo) const {
const auto& orig_replicas = get_tablet_info(id).replicas;
if (orig_replicas.size() < 2) {
tablet_replica tablet_map::get_secondary_replica(tablet_id id) const {
if (get_tablet_info(id).replicas.size() < 2) {
throw std::runtime_error(format("No secondary replica for tablet id {}", id));
}
tablet_replica_set replicas = orig_replicas;
std::ranges::sort(replicas, tablet_replica_comparator(topo));
// This formula must match the one in get_primary_replica(),
// just with + 1.
return replicas.at((size_t(id) + size_t(id) / replicas.size() + 1) % replicas.size());
const auto& replicas = get_tablet_info(id).replicas;
return replicas.at((size_t(id)+1) % replicas.size());
}
std::optional<tablet_replica> tablet_map::maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const {

View File

@@ -648,10 +648,9 @@ public:
/// Returns the primary replica for the tablet
tablet_replica get_primary_replica(tablet_id id, const locator::topology& topo) const;
/// Returns the secondary replica for the tablet: the replica that immediately follows the primary
/// replica in the topology-sorted replica list.
/// Returns the secondary replica for the tablet, which is assumed to be directly following the primary replica in the replicas vector
/// \throws std::runtime_error if the tablet has less than 2 replicas.
tablet_replica get_secondary_replica(tablet_id id, const locator::topology& topo) const;
tablet_replica get_secondary_replica(tablet_id id) const;
// Returns the replica that matches hosts and dcs filters for tablet_task_info.
std::optional<tablet_replica> maybe_get_selected_replica(tablet_id id, const topology& topo, const tablet_task_info& tablet_task_info) const;

View File

@@ -7329,11 +7329,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
const locator::host_id this_host = _db.local().get_token_metadata().get_my_id();
// Align to 64 bytes to avoid cache line ping-pong when updating size in map_reduce0() below
struct alignas(64) aligned_tablet_size {
uint64_t size = 0;
};
std::vector<aligned_tablet_size> tablet_sizes_per_shard(smp::count);
std::vector<uint64_t> tablet_sizes_per_shard(smp::count);
// Each node combines a per-table load map from all of its shards and returns it to the coordinator.
// So if there are 1k nodes, there will be 1k RPCs in total.
@@ -7375,7 +7371,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
locator::combined_load_stats combined_ls { table->table_load_stats(tablet_filter) };
load_stats.tables.emplace(id, std::move(combined_ls.table_ls));
tablet_sizes_per_shard[this_shard_id()].size += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
tablet_sizes_per_shard[this_shard_id()] += load_stats.tablet_stats[this_host].add_tablet_sizes(combined_ls.tablet_ls);
co_await coroutine::maybe_yield();
}
@@ -7394,10 +7390,7 @@ future<locator::load_stats> storage_service::load_stats_for_tablet_based_tables(
if (config_capacity != 0) {
tls.effective_capacity = config_capacity;
} else {
uint64_t sum_tablet_sizes = 0;
for (const auto& ts : tablet_sizes_per_shard) {
sum_tablet_sizes += ts.size;
}
const uint64_t sum_tablet_sizes = std::reduce(tablet_sizes_per_shard.begin(), tablet_sizes_per_shard.end());
tls.effective_capacity = si.available + sum_tablet_sizes;
}

View File

@@ -1,356 +0,0 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for the system:timestamp_attribute Scylla-specific feature.
# This feature allows users to control the write timestamp of PutItem and
# UpdateItem operations by specifying an attribute name in the table's
# system:timestamp_attribute tag. When that attribute is present in the
# write request with a numeric value (microseconds since Unix epoch), it
# is used as the write timestamp. The attribute itself is not stored in
# the item data.
#
# This is a Scylla-specific feature and is not tested on DynamoDB.
import time
import pytest
from botocore.exceptions import ClientError
from decimal import Decimal
from .util import create_test_table, random_string
# A large timestamp in microseconds (far future, year ~2033)
LARGE_TS = Decimal('2000000000000000')
# A medium timestamp in microseconds (year ~2001)
MEDIUM_TS = Decimal('1000000000000000')
# A small timestamp in microseconds (year ~1970+)
SMALL_TS = Decimal('100000000000000')
# Fixtures for tables with the system:timestamp_attribute tag. The tables
# are created once per module and shared between all tests that use them,
# to avoid the overhead of creating and deleting tables for each test.
# Because system:timestamp_attribute is a Scylla-only feature, all tests
# using these fixtures are implicitly Scylla-only (via scylla_only parameter).
# A table with only a hash key and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# A table with hash (string) and range (string) keys and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts_ss(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'c', 'KeyType': 'RANGE'},
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'S'},
{'AttributeName': 'c', 'AttributeType': 'S'},
])
yield table
table.delete()
# A table with hash key, system:timestamp_attribute='ts' tag, and
# system:write_isolation='always' to test rejection in LWT_ALWAYS mode.
# In always_use_lwt mode, every write uses LWT, so the timestamp attribute
# feature cannot be used at all.
@pytest.fixture(scope="module")
def test_table_ts_lwt(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'always'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# Test that PutItem with the timestamp attribute uses the given numeric
# value as the write timestamp, and the timestamp attribute is NOT stored
# in the item.
def test_timestamp_attribute_put_item_basic(test_table_ts):
p = random_string()
# Put an item with the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Read the item back
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# 'val' should be stored normally
assert item['val'] == 'hello'
# 'ts' (the timestamp attribute) should NOT be stored in the item
assert 'ts' not in item
# Test that PutItem respects the write timestamp ordering: a write with a
# larger timestamp should win over a write with a smaller timestamp,
# regardless of wall-clock order.
def test_timestamp_attribute_put_item_ordering(test_table_ts):
p = random_string()
# First, write item with a LARGE timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Then write item with a SMALL timestamp (should lose since SMALL < LARGE)
test_table_ts.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
# The item with the larger timestamp (val='large_ts') should win
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
# Now try to overwrite with a LARGER timestamp (should win)
test_table_ts.put_item(Item={'p': p, 'val': 'latest', 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'latest'
# Test that UpdateItem with the timestamp attribute in AttributeUpdates
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_attribute_updates(test_table_ts):
p = random_string()
# Use UpdateItem with AttributeUpdates, setting 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'hello', 'Action': 'PUT'},
'ts': {'Value': LARGE_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'overwritten', 'Action': 'PUT'},
'ts': {'Value': SMALL_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that UpdateItem with the timestamp attribute in UpdateExpression
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_update_expression(test_table_ts):
p = random_string()
# Use UpdateItem with UpdateExpression to set 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'hello', ':t': LARGE_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'overwritten', ':t': SMALL_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that when the timestamp attribute is not present in the write request,
# the operation behaves normally (no custom timestamp is applied).
def test_timestamp_attribute_absent(test_table_ts):
p = random_string()
# Put item without the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# No 'ts' attribute expected either
assert 'ts' not in item
# Test that using a condition expression (which requires LWT) together with
# the timestamp attribute is rejected.
def test_timestamp_attribute_with_condition_rejected(test_table_ts):
p = random_string()
# Put an initial item (no timestamp attribute, so LWT is ok)
test_table_ts.put_item(Item={'p': p, 'val': 'initial'})
# Try to put with a ConditionExpression and a timestamp - should be rejected
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(
Item={'p': p, 'val': 'updated', 'ts': LARGE_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that using the timestamp attribute with the 'always' write isolation
# policy is rejected, because in always_use_lwt mode every write uses LWT
# (including unconditional ones), which is incompatible with custom timestamps.
def test_timestamp_attribute_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
# Even a plain PutItem with a timestamp is rejected in LWT_ALWAYS mode
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Test that when the timestamp attribute has a non-numeric value, the write
# is rejected with a ValidationException.
def test_timestamp_attribute_non_numeric(test_table_ts):
p = random_string()
# Put item with the timestamp attribute as a string (non-numeric) - should fail
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': 'not_a_number'})
# Test that the timestamp attribute tag can be set on a table with a sort key.
def test_timestamp_attribute_with_range_key(test_table_ts_ss):
p = random_string()
c = random_string()
# Write with a large timestamp
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'large', 'ts': LARGE_TS})
# Write with a small timestamp (should lose)
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'small', 'ts': SMALL_TS})
item = test_table_ts_ss.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
assert item['val'] == 'large'
assert 'ts' not in item
# Test that the timestamp attribute value is interpreted in microseconds since
# the Unix epoch, and that writes with and without explicit timestamps interact
# correctly.
def test_timestamp_attribute_microseconds(test_table_ts):
# Get current time in microseconds from the Python client side.
now_us = int(time.time() * 1_000_000)
one_hour_us = 3600 * 1_000_000
# Part 1: write with the current time as the explicit timestamp, then
# overwrite without an explicit timestamp. The second write uses the
# server's current time (which is >= now_us), so it should win.
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'old', 'ts': Decimal(str(now_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'new'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'new'
# Part 2: write with a timestamp one hour in the future, then overwrite
# without an explicit timestamp. The server's current time (≈ now_us) is
# much less than now_us + one_hour_us, so the first write should win.
p = random_string()
future_us = now_us + one_hour_us
test_table_ts.put_item(Item={'p': p, 'val': 'future', 'ts': Decimal(str(future_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'now'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'future'
# Test that BatchWriteItem also respects the timestamp attribute.
def test_timestamp_attribute_batch_write(test_table_ts):
p = random_string()
# Write item via BatchWriteItem with a large timestamp
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Write item via BatchWriteItem with a small timestamp (should lose)
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
assert 'ts' not in item
# Test that DeleteItem respects the timestamp attribute: a delete with a
# smaller timestamp than the item's write timestamp should not take effect.
def test_timestamp_attribute_delete_item(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# Delete with a small timestamp - the delete should lose (item still exists)
test_table_ts.delete_item(Key={'p': p, 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete with a large timestamp - the delete should win (item is removed)
test_table_ts.delete_item(Key={'p': p, 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is None
# Test that DeleteItem without the timestamp attribute in the key behaves
# normally (no custom timestamp is applied).
def test_timestamp_attribute_delete_item_no_ts(test_table_ts):
p = random_string()
# Use SMALL_TS so the delete (which uses the current server time) wins.
# If we used LARGE_TS (far future), the delete without an explicit timestamp
# would use current time which is smaller than LARGE_TS and the delete would lose.
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': SMALL_TS})
# Delete without a timestamp attribute - should succeed normally
test_table_ts.delete_item(Key={'p': p})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Verify that an item written with a far-future timestamp is NOT deleted by
# a delete without an explicit timestamp (server time < LARGE_TS).
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
test_table_ts.delete_item(Key={'p': p})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Test that DeleteItem with a non-numeric timestamp attribute is rejected.
def test_timestamp_attribute_delete_item_non_numeric(test_table_ts):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(Key={'p': p, 'ts': 'not_a_number'})
# Test that BatchWriteItem DeleteRequest also respects the timestamp attribute.
def test_timestamp_attribute_batch_delete(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Delete via BatchWriteItem with a small timestamp - delete should lose
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete via BatchWriteItem with a large timestamp - delete should win
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': LARGE_TS + 1}}}]
})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Test that DeleteItem with a ConditionExpression and a custom timestamp is
# rejected, because conditional writes require LWT which is incompatible with
# custom timestamps.
def test_timestamp_attribute_delete_item_condition_rejected(test_table_ts):
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(
Key={'p': p, 'ts': SMALL_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that DeleteItem with a custom timestamp is rejected when the table uses
# always_use_lwt isolation, because every write uses LWT in that mode.
def test_timestamp_attribute_delete_item_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.delete_item(Key={'p': p, 'ts': SMALL_TS})
# Test that BatchWriteItem PutRequest with a custom timestamp is rejected when
# the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_put_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'PutRequest': {'Item': {'p': p, 'val': 'v', 'ts': SMALL_TS}}}]
})
# Test that BatchWriteItem DeleteRequest with a custom timestamp is rejected
# when the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_delete_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})

View File

@@ -34,6 +34,9 @@
#include "test/lib/log.hh"
#include "cdc/cdc_extension.hh"
#include "test/lib/test_utils.hh"
#include "transport/request.hh"
#include "transport/response.hh"
#include "utils/memory_data_sink.hh"
BOOST_AUTO_TEST_SUITE(schema_change_test)
@@ -1181,6 +1184,16 @@ cql3::cql_metadata_id_type compute_metadata_id(std::vector<std::pair<sstring, sh
return cql3::metadata{columns_specification}.calculate_metadata_id();
}
std::vector<lw_shared_ptr<cql3::column_specification>> make_columns_specification(
const std::vector<std::pair<sstring, shared_ptr<const abstract_type>>>& columns, sstring ks = "ks", sstring cf = "cf") {
std::vector<lw_shared_ptr<cql3::column_specification>> columns_specification;
columns_specification.reserve(columns.size());
for (const auto& column : columns) {
columns_specification.push_back(make_lw_shared(cql3::column_specification(ks, cf, make_shared<cql3::column_identifier>(column.first, false), column.second)));
}
return columns_specification;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_different_keyspace_and_table) {
const auto c = std::make_pair("id", uuid_type);
auto h1 = compute_metadata_id({c}, "ks1", "cf1");
@@ -1231,6 +1244,55 @@ BOOST_AUTO_TEST_CASE(metadata_id_with_different_column_order) {
verify_metadata_id_is_stable(h2, "b52512f2b76d3e0695dcaf7b0a71efac");
}
SEASTAR_TEST_CASE(metadata_id_changed_rows_response_overrides_no_metadata) {
auto empty_metadata_id = cql3::metadata{std::vector<lw_shared_ptr<cql3::column_specification>>{}}.calculate_metadata_id();
auto columns_specification = make_columns_specification({{"role", utf8_type}});
cql3::metadata rows_metadata(columns_specification);
auto rows_metadata_id = rows_metadata.calculate_metadata_id();
cql_transport::response resp{0, cql_transport::cql_binary_opcode::RESULT, tracing::trace_state_ptr{}};
resp.write(rows_metadata, cql_transport::cql_metadata_id_wrapper(
std::move(empty_metadata_id),
cql3::cql_metadata_id_type(bytes(rows_metadata_id._metadata_id))), true);
memory_data_sink_buffers buffers;
{
output_stream<char> out(data_sink(std::make_unique<memory_data_sink>(buffers)));
co_await resp.write_message(out, 4, cql_transport::cql_compression::none, deleter());
co_await out.close();
}
auto total_length = buffers.size();
auto fbufs = fragmented_temporary_buffer(buffers.buffers() | std::views::as_rvalue | std::ranges::to<std::vector>(), total_length);
bytes_ostream linearization_buffer;
auto req = cql_transport::request_reader(fbufs.get_istream(), linearization_buffer);
BOOST_REQUIRE_EQUAL(unsigned(uint8_t(req.read_byte().value())), 4 | 0x80);
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), 0);
BOOST_REQUIRE_EQUAL(req.read_short().value(), 0);
BOOST_REQUIRE_EQUAL(unsigned(req.read_byte().value()), unsigned(uint8_t(cql_transport::cql_binary_opcode::RESULT)));
BOOST_REQUIRE_EQUAL(req.read_int().value(), total_length - 9);
auto body = req.read_raw_bytes_view(req.bytes_left()).value();
const auto* ptr = reinterpret_cast<const char*>(body.begin());
const auto flags_mask = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
const auto flags = cql3::metadata::flag_enum_set::from_mask(flags_mask);
BOOST_REQUIRE(flags.contains<cql3::metadata::flag::METADATA_CHANGED>());
BOOST_REQUIRE(!flags.contains<cql3::metadata::flag::NO_METADATA>());
const auto column_count = read_be<int32_t>(ptr);
ptr += sizeof(int32_t);
BOOST_REQUIRE_EQUAL(column_count, 1);
const auto metadata_id_length = read_be<uint16_t>(ptr);
ptr += sizeof(uint16_t);
BOOST_REQUIRE_EQUAL(metadata_id_length, rows_metadata_id._metadata_id.size());
BOOST_REQUIRE(std::equal(rows_metadata_id._metadata_id.begin(), rows_metadata_id._metadata_id.end(),
reinterpret_cast<const bytes::value_type*>(ptr)));
co_return;
}
BOOST_AUTO_TEST_CASE(metadata_id_with_udt) {
auto compute_metadata_id_for_type = [&](

View File

@@ -6096,83 +6096,4 @@ SEASTAR_THREAD_TEST_CASE(test_tablet_manual_repair_rf1_auto_repair_on) {
do_with_cql_env_thread(run_tablet_manual_repair_rf1, std::move(cfg_in)).get();
}
// Test for tablet_map::get_secondary_replica() and specifically how it
// relates to get_primary_replica().
// We never officially documented given a list of replicas, which replica
// is to be considered the "primary" - it's not simply the first replica in
// the list but the first in some reshuffling of the list, reshuffling whose
// details changed in commits like 817fdad and d88036d. So this patch doesn't
// enshrine what get_primary_replica() or get_secondary_replica() should
// return. It just verifies that get_secondary_replica() returns a *different*
// replica than get_primary_replica() if there are 2 or more replicas, or
// throws an error when there's just one replica.
// Reproduces SCYLLADB-777.
SEASTAR_THREAD_TEST_CASE(test_get_secondary_replica) {
auto h1 = host_id(utils::UUID_gen::get_time_UUID());
auto h2 = host_id(utils::UUID_gen::get_time_UUID());
auto h3 = host_id(utils::UUID_gen::get_time_UUID());
locator::topology::config cfg = {
.this_endpoint = inet_address("127.0.0.1"),
.this_host_id = h1,
.local_dc_rack = endpoint_dc_rack::default_location,
};
auto topo = locator::topology(cfg);
topo.add_or_update_endpoint(h1, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h2, endpoint_dc_rack::default_location, node::state::normal);
topo.add_or_update_endpoint(h3, endpoint_dc_rack::default_location, node::state::normal);
// With 1 replica, get_secondary_replica should throw.
{
tablet_map tmap(1);
auto tid = tmap.first_tablet();
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
}
});
BOOST_REQUIRE_THROW(tmap.get_secondary_replica(tid, topo), std::runtime_error);
}
// With 2 replicas, get_secondary_replica should return a different replica
// than get_primary_replica for every tablet.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
// With 3 replicas, same check.
{
tablet_map tmap(4);
for (auto tid : tmap.tablet_ids()) {
tmap.set_tablet(tid, tablet_info {
tablet_replica_set {
tablet_replica {h1, 0},
tablet_replica {h2, 0},
tablet_replica {h3, 0},
}
});
}
for (auto tid : tmap.tablet_ids()) {
auto primary = tmap.get_primary_replica(tid, topo);
auto secondary = tmap.get_secondary_replica(tid, topo);
BOOST_REQUIRE(primary != secondary);
}
}
topo.clear_gently().get();
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -0,0 +1,267 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import dataclasses
from typing import Optional
from unittest import mock
import pytest
from cassandra import ProtocolVersion
from cassandra.application_info import ApplicationInfoBase
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.protocol import ResultMessage
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import unique_name
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
# ---------------------------------------------------------------------------
# Driver helpers for SCYLLA_USE_METADATA_ID / result_metadata_id exchange.
#
# The standard Python driver gates result_metadata_id exchange on protocol v5+
# (ProtocolVersion.uses_prepared_metadata). ScyllaDB does not implement v5
# but exposes the same semantics on v4 via the SCYLLA_USE_METADATA_ID startup
# extension. Two lightweight patches make the driver exercise this path:
#
# 1. _UseMetadataId — ApplicationInfoBase subclass that injects
# SCYLLA_USE_METADATA_ID into the STARTUP options dict. Passed to
# Cluster(application_info=...). The driver merges these options into
# the STARTUP frame without any filtering.
#
# 2. mock.patch.object(ProtocolVersion, "uses_prepared_metadata", ...) —
# makes the driver write result_metadata_id in EXECUTE frames and read
# it back from PREPARE/ROWS responses, which is exactly the v4 extension
# wire format.
#
# Note: the driver does not send the SKIP_METADATA flag in EXECUTE even with
# these patches (it never ORs it into the options flags byte for prepared
# statements). The server does not require SKIP_METADATA to trigger
# promotion; without it, it returns full column metadata alongside
# METADATA_CHANGED.
# ---------------------------------------------------------------------------
class _UseMetadataId(ApplicationInfoBase):
"""Inject SCYLLA_USE_METADATA_ID into the CQL STARTUP options."""
def add_startup_options(self, options: dict) -> None:
options["SCYLLA_USE_METADATA_ID"] = ""
@dataclasses.dataclass
class _ExecuteResult:
"""Parsed outcome of interest from a prepared-statement EXECUTE."""
initial_metadata_id: Optional[bytes]
"""result_metadata_id returned by the PREPARE response."""
result_metadata_id: Optional[bytes]
"""result_metadata_id embedded in the ROWS EXECUTE response, if any."""
metadata_changed: bool
"""True when the ROWS response carried the METADATA_CHANGED result-metadata flag (i.e., promotion occurred)."""
row_count: int
"""Number of rows returned by the EXECUTE response."""
def _prepare_and_execute(host: str, query: str) -> _ExecuteResult:
"""
Connect via the Scylla Python driver with SCYLLA_USE_METADATA_ID negotiated,
prepare *query*, execute it once, and return relevant metadata_id fields.
Intended to be called via ``asyncio.to_thread`` to avoid blocking the event loop.
The function uses two patches scoped to the connection lifetime:
* ``ProtocolVersion.uses_prepared_metadata`` is forced to return ``True``
for all protocol versions so that the driver reads/writes result_metadata_id
in PREPARE and EXECUTE frames on protocol v4.
* ``ResultMessage.recv_results_metadata`` is wrapped to capture
result_metadata_id from the ROWS response (the driver parses it there but
does not propagate it back to the PreparedStatement in the normal rows path).
"""
captured: dict = {"metadata_id": None, "metadata_changed": False}
original_recv = ResultMessage.recv_results_metadata
def _capturing_recv(self: ResultMessage, f, user_type_map) -> None:
original_recv(self, f, user_type_map)
rmi = getattr(self, "result_metadata_id", None)
if rmi is not None:
captured["metadata_id"] = rmi
captured["metadata_changed"] = True
with mock.patch.object(
ProtocolVersion,
"uses_prepared_metadata",
staticmethod(lambda v: True),
):
cluster = Cluster(
contact_points=[host],
port=9042,
protocol_version=4,
auth_provider=PlainTextAuthProvider("cassandra", "cassandra"),
application_info=_UseMetadataId(),
load_balancing_policy=WhiteListRoundRobinPolicy([host]),
)
session = cluster.connect()
try:
ps = session.prepare(query)
initial_metadata_id = ps.result_metadata_id
with mock.patch.object(
ResultMessage, "recv_results_metadata", _capturing_recv
):
rows = list(session.execute(ps))
return _ExecuteResult(
initial_metadata_id=initial_metadata_id,
result_metadata_id=captured["metadata_id"],
metadata_changed=captured["metadata_changed"],
row_count=len(rows),
)
finally:
session.shutdown()
cluster.shutdown()
@pytest.mark.asyncio
async def test_list_roles_of_prepared_metadata_promotion(
manager: ManagerClient,
build_mode: str,
) -> None:
"""Verify that EXECUTE promotes a stale prepared metadata_id, and that
disabling that promotion suppresses the resulting ``METADATA_CHANGED``.
``LIST ROLES OF <role>`` is such a statement: at PREPARE time the server
does not know the result set schema because the statement implementation
builds the metadata dynamically at execute time. The server therefore
returns the metadata_id of empty metadata in the PREPARE response.
When the client later sends EXECUTE with the stale empty metadata_id, the
server should detect the mismatch (the actual rows have real metadata) and
respond with a ``METADATA_CHANGED`` result that carries the real
metadata_id so the client can update its cache. This is the behaviour
mandated by CQL v5; on CQL v4 it is exercised via the
SCYLLA_USE_METADATA_ID Scylla protocol extension which enables the same
wire-level exchange. The test repeats PREPARE/EXECUTE on the same query
to show that the promoted metadata_id is cached, and in non-release modes
it contrasts that with an injected execution where the cache update is
suppressed.
"""
server = await manager.server_add(config=auth_config)
cql, _ = await manager.get_ready_cql([server])
role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {role}")
promoted = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
)
assert promoted.row_count > 0, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
)
assert promoted.initial_metadata_id is not None, (
f"expected PREPARE for 'LIST ROLES OF {role}' to return a result_metadata_id"
)
assert promoted.metadata_changed, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return METADATA_CHANGED "
f"after PREPARE returned an empty result_metadata_id"
)
assert promoted.result_metadata_id is not None, (
f"expected EXECUTE for 'LIST ROLES OF {role}' to return a result_metadata_id "
f"alongside METADATA_CHANGED"
)
assert promoted.initial_metadata_id != promoted.result_metadata_id, (
f"expected promoted result_metadata_id to differ from the stale empty one "
f"returned by PREPARE"
)
cached = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {role}"
)
assert cached.row_count > 0, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' to return at least one row"
)
assert cached.initial_metadata_id == promoted.result_metadata_id, (
f"expected second PREPARE for 'LIST ROLES OF {role}' to reuse the promoted "
f"result_metadata_id from the first EXECUTE"
)
assert not cached.metadata_changed, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return "
f"METADATA_CHANGED after the cache had been promoted"
)
assert cached.result_metadata_id is None, (
f"expected second EXECUTE for 'LIST ROLES OF {role}' not to return a new "
f"result_metadata_id once the cache had been promoted"
)
if build_mode == "release":
return
# Use a fresh prepared statement key so the promotion above does not seed
# the cache for the injected contrast case.
injected_role = "r" + unique_name()
await cql.run_async(f"CREATE ROLE {injected_role}")
async with inject_error(
manager.api, server.ip_addr, "skip_prepared_result_metadata_promotion"
):
suppressed = await asyncio.to_thread(
_prepare_and_execute,
server.ip_addr,
f"LIST ROLES OF {injected_role}",
)
assert suppressed.row_count > 0, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
)
assert suppressed.initial_metadata_id is not None, (
f"expected injected PREPARE for 'LIST ROLES OF {injected_role}' to return a result_metadata_id"
)
assert not suppressed.metadata_changed, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to suppress "
f"METADATA_CHANGED, but the flag was set"
)
assert suppressed.result_metadata_id is None, (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' to omit "
f"result_metadata_id when promotion is suppressed"
)
promoted_after_suppression = await asyncio.to_thread(
_prepare_and_execute, server.ip_addr, f"LIST ROLES OF {injected_role}"
)
assert promoted_after_suppression.row_count > 0, (
f"expected post-injection EXECUTE for 'LIST ROLES OF {injected_role}' to return at least one row"
)
assert (
promoted_after_suppression.initial_metadata_id == suppressed.initial_metadata_id
), (
f"expected injected EXECUTE for 'LIST ROLES OF {injected_role}' not to update the cached "
f"result_metadata_id"
)
assert promoted_after_suppression.metadata_changed, (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"return METADATA_CHANGED because the injected run left the cache stale"
)
assert promoted_after_suppression.result_metadata_id is not None, (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"return a promoted result_metadata_id"
)
assert (
promoted_after_suppression.initial_metadata_id
!= promoted_after_suppression.result_metadata_id
), (
f"expected first non-injected EXECUTE for 'LIST ROLES OF {injected_role}' to "
f"promote the stale result_metadata_id left by the injected run"
)

View File

@@ -0,0 +1,102 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
from cassandra.cluster import Session
from cassandra.protocol import ConfigurationException, InvalidRequest
from dtest_class import Tester
logger = logging.getLogger(__name__)
def create_ks_and_assert_warning(session, query, ks_name, key_warn_msg_words):
ret = session.execute_async(query)
_ = ret.result()
found = False
if len(key_warn_msg_words) > 0:
assert len(ret.warnings) >= 1, "Expected RF guardrail warning"
for warning in ret.warnings:
found = found or all(word in warning.lower() for word in key_warn_msg_words)
assert found, "Didn't match all required keywords"
session.execute(f"USE {ks_name}")
def assert_creating_ks_fails(session, query, ks_name):
with pytest.raises(ConfigurationException):
session.execute(query)
with pytest.raises(InvalidRequest):
session.execute(f"USE {ks_name}")
@pytest.mark.next_gating
class TestGuardrails(Tester):
def test_default_rf(self):
"""
As of now, the only RF guardrail enabled is a soft limit checking that RF >= 3. Not complying to this soft limit
results in a CQL being executed, but with a warning. Also, whatever the guardrails' values, RF = 0 is always OK.
"""
cluster = self.cluster
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
cluster.populate([1, 1, 1]).start(wait_other_notice=True)
session_dc1: Session = self.patient_cql_connection(cluster.nodelist()[0])
ks_name = "ks"
rf = {"dc1": 2, "dc2": 3, "dc3": 0}
query = "CREATE KEYSPACE %s WITH REPLICATION={%s}"
options = ", ".join(["'%s':%d" % (dc_value, rf_value) for dc_value, rf_value in rf.items()])
query = query % (ks_name, "'class':'NetworkTopologyStrategy', %s" % options)
create_ks_and_assert_warning(session_dc1, query, ks_name, ["warn", "min", "replication", "factor", "3", "dc1", "2"])
def test_all_rf_limits(self):
"""
There're 4 limits for RF: soft/hard min and soft/hard max limits. Breaking soft limits issues a warning,
breaking the hard limits prevents the query from being executed.
"""
cluster = self.cluster
MIN_FAIL_THRESHOLD = 2
MIN_WARN_THRESHOLD = 3
MAX_WARN_THRESHOLD = 4
MAX_FAIL_THRESHOLD = 5
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cluster.set_configuration_options(values={"rf_rack_valid_keyspaces": False})
cluster.set_configuration_options(
values={
"minimum_replication_factor_fail_threshold": MIN_FAIL_THRESHOLD, "minimum_replication_factor_warn_threshold": MIN_WARN_THRESHOLD, "maximum_replication_factor_warn_threshold": MAX_WARN_THRESHOLD,
"maximum_replication_factor_fail_threshold": MAX_FAIL_THRESHOLD
}
)
query = "CREATE KEYSPACE %s WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1': %s}"
cluster.populate([1]).start()
node = cluster.nodelist()[0]
session = self.patient_cql_connection(node)
def test_rf(rf):
ks_name = f"ks_{rf}"
if rf < MIN_FAIL_THRESHOLD or rf > MAX_FAIL_THRESHOLD:
assert_creating_ks_fails(session, query % (ks_name, rf), ks_name)
elif rf < MIN_WARN_THRESHOLD:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "min", "replication", "factor", str(MIN_WARN_THRESHOLD), "dc1", "2"])
elif rf > MAX_WARN_THRESHOLD:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, ["warn", "max", "replication", "factor", str(MAX_WARN_THRESHOLD), "dc1", "5"])
else:
create_ks_and_assert_warning(session, query % (ks_name, rf), ks_name, [])
for rf in range(MIN_FAIL_THRESHOLD - 1, MAX_FAIL_THRESHOLD + 1):
test_rf(rf)

View File

@@ -61,8 +61,8 @@ async def test_mv_build_during_shutdown(manager: ManagerClient):
# Start building two views. The first is delayed by the injection, and the second
# view build is queued, waiting on the view builder semaphore.
await manager.api.enable_injection(server.ip_addr, "delay_before_get_view_natural_endpoint", one_shot=True)
await cql.run_async(f"CREATE materialized view {ks}.t_view1 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
await cql.run_async(f"CREATE materialized view {ks}.t_view2 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
create_task1 = cql.run_async(f"CREATE materialized view {ks}.t_view1 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
create_task2 = cql.run_async(f"CREATE materialized view {ks}.t_view2 AS select pk, v from {ks}.t where v is not null primary key (v, pk)")
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
@@ -80,4 +80,4 @@ async def test_mv_build_during_shutdown(manager: ManagerClient):
# For dropping the keyspace
await manager.server_start(server.server_id)
await reconnect_driver(manager)
asyncio.gather(create_task1, create_task2)

View File

@@ -95,8 +95,7 @@ async def test_garbage_collect(manager: ManagerClient, object_storage):
cfg = {'enable_user_defined_functions': False,
'object_storage_endpoints': objconf,
'experimental_features': ['keyspace-storage-options']}
cmd = ['--logger-log-level', 's3=trace:http=debug:gcp_storage=trace']
server = await manager.server_add(config=cfg, cmdline=cmd)
server = await manager.server_add(config=cfg)
cql = manager.get_cql()

View File

@@ -47,5 +47,6 @@ run_in_dev:
- dtest/commitlog_test
- dtest/cfid_test
- dtest/rebuild_test
- dtest/guardrails_test
run_in_debug:
- random_failures/test_random_failures

View File

@@ -183,72 +183,6 @@ async def test_alternator_ttl_scheduling_group(manager: ManagerClient):
table.delete()
@pytest.mark.parametrize("with_down_node", [False, True], ids=["all_nodes_up", "one_node_down"])
async def test_alternator_ttl_multinode_expiration(manager: ManagerClient, with_down_node):
"""When the cluster has multiple nodes, different nodes are responsible
for checking expiration in different token ranges - each node is
responsible for its "primary ranges". Let's check that this expiration
really does happen - for the entire token range - by writing many
partitions that will span the entire token range, and seeing that they
all expire. We don't check that nodes don't do more work than they
should - an inefficient implementation where every node scans the
entire data set will also pass this test.
When the test is run a second time with with_down_node=True, we verify
that TTL expiration works correctly even when one of the nodes is
brought down. This node's TTL scanner is responsible for scanning part
of the token range, so when this node is down, part of the data might
not get expired. At that point - other node(s) should take over
expiring data in that range - and this test verifies that this indeed
happens. Reproduces issue #9787 and SCYLLADB-777.
"""
servers = await manager.servers_add(3, config=alternator_config, auto_rack_dc='dc1')
alternator = get_alternator(servers[0].ip_addr)
if with_down_node:
# Bring down one of nodes. Everything we do below, like creating a
# table, reading and writing, should continue to work with one node
# down.
await manager.server_stop_gracefully(servers[2].server_id)
table = alternator.create_table(TableName=unique_table_name(),
BillingMode='PAY_PER_REQUEST',
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH' },
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'N' },
])
# Set the "expiration" column to mark item's expiration time
table.meta.client.update_time_to_live(TableName=table.name, TimeToLiveSpecification={'AttributeName': 'expiration', 'Enabled': True})
# Insert 50 rows, in different partitions, so the murmur3 hash maps them
# all over the token space so different nodes would be responsible for
# expiring them. All items are marked to expire 10 seconds in the past,
# so should all expire as soon as possible, during this test.
expiration = int(time.time()) - 10
with table.batch_writer() as batch:
for p in range(50):
batch.put_item({'p': p, 'expiration': expiration})
# Expect that after a short delay, all items in the table will have
# expired - so a scan should return no responses. This should happen
# even though one of the nodes is down and not doing its usual
# expiration-scanning work.
timeout = time.time() + 60
items = -1
while items != 0 and time.time() < timeout:
response = table.scan(ConsistentRead=True)
items = len(response['Items'])
# In theory (though probably not in practice in this test), a scan()
# can return zero items but have more pages - so we need to be more
# diligent and scan all pages to check it's completely empty.
while items == 0 and 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'], ConsistentRead=True)
items += len(response['Items'])
if items == 0:
break
time.sleep(0.1)
assert items == 0
@pytest.mark.asyncio
async def test_localnodes_broadcast_rpc_address(manager: ManagerClient):
"""Test that if the "broadcast_rpc_address" of a node is set, the

View File

@@ -1,99 +0,0 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import logging
import pytest
from cassandra.protocol import ConfigurationException, InvalidRequest
from test.pylib.async_cql import _wrap_future
from test.pylib.manager_client import ManagerClient
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
async def create_ks_and_assert_warning(cql, query, ks_name, key_warn_msg_words):
# We have to use `Session::execute_async` here to be able to obtain `warnings`.
ret = cql.execute_async(query)
await _wrap_future(ret)
found = False
if len(key_warn_msg_words) > 0:
assert len(ret.warnings) >= 1, "Expected RF guardrail warning"
for warning in ret.warnings:
found = found or all(word in warning.lower() for word in key_warn_msg_words)
assert found, "Didn't match all required keywords"
await cql.run_async(f"USE {ks_name}")
async def assert_creating_ks_fails(cql, query, ks_name):
with pytest.raises(ConfigurationException):
await cql.run_async(query)
with pytest.raises(InvalidRequest):
await cql.run_async(f"USE {ks_name}")
@pytest.mark.asyncio
async def test_default_rf(manager: ManagerClient):
"""
As of now, the only RF guardrail enabled is a soft limit checking that RF >= 3. Not complying to this soft limit
results in a CQL query being executed, but with a warning. Also, whatever the guardrails' values, RF = 0 is always OK.
"""
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cfg = {"rf_rack_valid_keyspaces": False}
await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": "r1"})
await manager.server_add(config=cfg, property_file={"dc": "dc2", "rack": "r1"})
await manager.server_add(config=cfg, property_file={"dc": "dc3", "rack": "r1"})
cql = manager.get_cql()
ks_name = unique_name()
rf = {"dc1": 2, "dc2": 3, "dc3": 0}
options = ", ".join([f"'{dc}':{rf_val}" for dc, rf_val in rf.items()])
query = f"CREATE KEYSPACE {ks_name} WITH REPLICATION={{'class':'NetworkTopologyStrategy', {options}}}"
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "min", "replication", "factor", "3", "dc1", "2"])
@pytest.mark.asyncio
async def test_all_rf_limits(manager: ManagerClient):
"""
There are 4 limits for RF: soft/hard min and soft/hard max limits. Breaking soft limits issues a warning,
breaking the hard limits prevents the query from being executed.
"""
MIN_FAIL_THRESHOLD = 2
MIN_WARN_THRESHOLD = 3
MAX_WARN_THRESHOLD = 4
MAX_FAIL_THRESHOLD = 5
# FIXME: This test verifies that guardrails work. However, if we set `rf_rack_valid_keyspaces` to true,
# we'll get a different error, so let's disable it for now. For more context, see issues:
# scylladb/scylladb#23071 and scylladb/scylla-dtest#5633.
cfg = {
"rf_rack_valid_keyspaces": False,
"minimum_replication_factor_fail_threshold": MIN_FAIL_THRESHOLD,
"minimum_replication_factor_warn_threshold": MIN_WARN_THRESHOLD,
"maximum_replication_factor_warn_threshold": MAX_WARN_THRESHOLD,
"maximum_replication_factor_fail_threshold": MAX_FAIL_THRESHOLD,
}
dc = "dc1"
await manager.server_add(config=cfg, property_file={"dc": dc, "rack": "r1"})
cql = manager.get_cql()
for rf in range(MIN_FAIL_THRESHOLD - 1, MAX_FAIL_THRESHOLD + 1):
ks_name = unique_name()
query = f"CREATE KEYSPACE {ks_name} WITH REPLICATION = {{'class': 'NetworkTopologyStrategy', '{dc}': {rf}}}"
if rf < MIN_FAIL_THRESHOLD or rf > MAX_FAIL_THRESHOLD:
await assert_creating_ks_fails(cql, query, ks_name)
elif rf < MIN_WARN_THRESHOLD:
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "min", "replication", "factor", str(MIN_WARN_THRESHOLD), dc, str(rf)])
elif rf > MAX_WARN_THRESHOLD:
await create_ks_and_assert_warning(cql, query, ks_name, ["warn", "max", "replication", "factor", str(MAX_WARN_THRESHOLD), dc, str(rf)])
else:
await create_ks_and_assert_warning(cql, query, ks_name, [])

View File

@@ -54,7 +54,7 @@ async def test_autoretrain_dict(manager: ManagerClient):
uncompressed_size = blob_size * n_blobs * rf
# Start with compressor without a dictionary
cfg = { "sstable_compression_user_table_options": { 'sstable_compression': 'ZstdCompressor' } }
cfg = { "sstable_compression_user_table_options": "ZstdCompressor" }
logger.info("Bootstrapping cluster")
servers = await manager.servers_add(2, cmdline=[

View File

@@ -34,7 +34,7 @@ def scylla_path(build_mode):
@pytest.mark.parametrize("mode", ["read"])
async def test_perf_simple_query(scylla_path, mode, tmp_path):
args = [scylla_path, "perf-simple-query", "--duration", "1", "--partitions", "1000", "--stop-on-error", "false"]
args = [scylla_path, "perf-simple-query", "--duration", "1", "--partitions", "1000"]
await run(args)
@@ -54,8 +54,7 @@ async def test_perf_cql_raw(scylla_path, tmp_path, workload):
"--smp", "2",
"--workdir", str(tmp_path),
"--developer-mode", "1",
"--partitions", "1000",
"--continue-after-error", "true"
"--partitions", "1000"
]
try:
await run(cmd)
@@ -82,8 +81,7 @@ async def test_perf_alternator(scylla_path, tmp_path, workload):
"--smp", "2",
"--workdir", str(tmp_path),
"--developer-mode", "1",
"--partitions", "1000",
"--continue-after-error", "true"
"--partitions", "1000"
]
try:
await run(cmd)
@@ -103,8 +101,7 @@ async def test_perf_cql_raw_remote(scylla_path, tmp_path, workload, manager):
"--duration", "1",
"--remote-host", host,
"--smp", "1",
"--partitions", "1000",
"--continue-after-error", "true"
"--partitions", "1000"
]
await run(client_cmd)
@@ -124,7 +121,6 @@ async def test_perf_alternator_remote(scylla_path, tmp_path, workload, manager):
"--duration", "1",
"--remote-host", host,
"--smp", "1",
"--partitions", "1000",
"--continue-after-error", "true"
"--partitions", "1000"
]
await run(client_cmd)

View File

@@ -5,7 +5,7 @@
import pytest
from .util import new_test_table, is_scylla
from cassandra.protocol import InvalidRequest
from math import sqrt, isclose, nan, isnan
from math import sqrt, isclose
###############################################################################
@@ -46,7 +46,7 @@ def compute_similarity(similarity_function, v1, v2):
norm_v = sqrt(sum(x**2 for x in v1))
norm_q = sqrt(sum(x**2 for x in v2))
if norm_v == 0 or norm_q == 0:
return nan
raise ValueError("Cosine similarity is not defined for zero vectors")
cosine = dot / (norm_v * norm_q)
return (1 + cosine) / 2
elif similarity_function == "euclidean":
@@ -247,22 +247,13 @@ def test_vector_similarity_with_zero_vectors(cql, table1, similarity_function):
def test_vector_similarity_cosine_with_zero_vectors(cql, table1):
zero = [0.0, 0.0, 0.0]
queries = [
f"SELECT pk, v1, similarity_cosine(v1, {zero}) FROM {table1}",
f"SELECT pk, v1, similarity_cosine({zero}, v1) FROM {table1}",
f"SELECT pk, v1, similarity_cosine({zero}, {zero}) FROM {table1}",
]
expected_error = "Function system.similarity_cosine doesn't support all-zero vectors"
for query in queries:
# Scylla returns NaN for cosine similarity with zero vectors, while Cassandra throws an error.
# We allow for this difference as we want the rescoring
if is_scylla(cql):
result = cql.execute(query)
for row in result:
assert isnan(row[2])
else:
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(query)
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine(v1, {zero}) FROM {table1}")
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine({zero}, v1) FROM {table1}")
with pytest.raises(InvalidRequest, match=expected_error):
cql.execute(f"SELECT pk, v1, similarity_cosine({zero}, {zero}) FROM {table1}")
@pytest.mark.parametrize("similarity_function", similarity_functions)

View File

@@ -93,27 +93,26 @@ class ResourceGather(ABC):
env.update(os.environ)
else:
env = os.environ.copy()
with output_file.open(mode="w", encoding="utf-8") as output_handle:
p = subprocess.Popen(
args=args,
bufsize=1,
stdout=output_handle,
stderr=subprocess.STDOUT,
preexec_fn=self.put_process_to_cgroup,
close_fds=True,
cwd=cwd,
env=env,
text=True,
)
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise
p = subprocess.Popen(
args=args,
bufsize=1,
stdout=output_file.open(mode="w", encoding="utf-8"),
stderr=subprocess.STDOUT,
preexec_fn=self.put_process_to_cgroup,
close_fds=True,
cwd=cwd,
env=env,
text=True,
)
try:
p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
logger.critical(f"Process {args} timed out")
p.kill()
p.communicate()
except KeyboardInterrupt:
p.kill()
raise
return p
def make_cgroup(self) -> None:

View File

@@ -51,3 +51,30 @@ def get_coroutine():
name = resolve(vtable_addr)
if name and name.strip() == target:
print(f"coroutine_config={obj_addr.cast(gdb.lookup_type('uintptr_t'))}")
def coroutine_debug_config(tmpdir):
"""
Check if scylla_find agrees with find_vptrs, for debugging.
Execute GDB commands for coroutine debugging with detailed output.
This test fails sometimes, but rarely and unreliably.
We want to get a coredump from it the next time it fails.
Sending a SIGSEGV should induce that.
https://github.com/scylladb/scylladb/issues/22501
"""
target = 'service::topology_coordinator::run() [clone .resume]'
target_addr = int(gdb.parse_and_eval(f"&'{target}'"))
find_command = f"scylla find -a 0x{target_addr:x}"
gdb.write(f"Didn't find {target} (0x{target_addr:x}). Running '{find_command}'\n")
mem_range = get_seastar_memory_start_and_size()
gdb.execute(find_command)
gdb.write(f"Memory range: 0x{mem_range[0]:x} 0x{mem_range[1]:x}\n")
gdb.write("Found coroutines:\n")
for obj_addr, vtable_addr in find_vptrs():
name = resolve(vtable_addr)
if name and '.resume' in name.strip():
gdb.write(f"{name}\n")
core_filename = f"{tmpdir}/../scylla_gdb_coro_task-{uuid.uuid4()}.core"
gdb.execute(f"gcore {core_filename}")
raise gdb.error(f"No coroutine frames found with expected name. Dumped Scylla core to {core_filename}")

View File

@@ -47,13 +47,24 @@ def coroutine_task(gdb_cmd, scylla_server):
Finds a coroutine task, similar to the `task` fixture.
This fixture executes the `coroutine_config` script in GDB to locate a
specific coroutine task.
specific coroutine task. If the task is not found, the `coroutine_debug_config`
debugging script is called which checks if scylla_find agrees with find_vptrs.
This debugging script then forces a coredump to capture additional
diagnostic information before the test is marked as failed.
Coredump is saved to `testlog/release/{scylla}`.
"""
result = execute_gdb_command(gdb_cmd, full_command="python get_coroutine()").stdout
match = re.search(r"coroutine_config=\s*(.*)", result)
if not match:
# See https://github.com/scylladb/scylladb/issues/22501
pytest.skip("Failed to find coroutine task. Skipping test.")
result = execute_gdb_command(
gdb_cmd,
full_command=f"python coroutine_debug_config('{scylla_server.workdir}')",
)
pytest.fail(
f"Failed to find coroutine task. Debugging logs have been collected\n"
f"Debugging code result: {result}\n"
)
return match.group(1).strip()

View File

@@ -15,8 +15,8 @@ from typing import Callable
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass
from test.cluster.conftest import PHASE_REPORT_KEY
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import *
from test.pylib.util import gather_safely

10
test/storage/suite.yaml Normal file
View File

@@ -0,0 +1,10 @@
type: Topology
pool_size: 4
cluster:
initial_size: 0
extra_scylla_config_options:
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
enable_user_defined_functions: False
rf_rack_valid_keyspaces: True
tablets_mode_for_new_keyspaces: enabled

View File

@@ -19,7 +19,7 @@ from test.cluster.util import get_topology_coordinator, find_server_by_host_id,
from test.pylib.manager_client import ManagerClient
from test.pylib.tablets import get_tablet_count
from test.pylib.util import Host
from test.cluster.storage.conftest import space_limited_servers
from test.storage.conftest import space_limited_servers
logger = logging.getLogger(__name__)

View File

@@ -460,38 +460,3 @@ SEASTAR_TEST_CASE(no_nulls_in_rescored_results, *boost::unit_test::expected_fail
}));
}
}
// Reproducer for SCYLLADB-456
SEASTAR_TEST_CASE(rescoring_with_zerovector_query) {
for (const auto& params : test_data) {
auto server = co_await make_vs_mock_server();
co_await do_with_cql_env(
[&](cql_test_env& env) -> future<> {
configure(env.local_qp().vector_store_client()).with_dns({{"server.node", std::vector<std::string>{server->host()}}});
env.local_qp().vector_store_client().start_background_tasks();
co_await create_index_and_insert_data(env, params);
server->next_ann_response({http::reply::status_type::ok, R"({
"primary_keys": { "id": [4, 3, 2, 1] },
"distances": [0, 0, 0, 0]
})"});
// For cosine similarity the ANN vector query would fail as `similarity_cosine` function did not support zero vectors.
try {
auto msg = co_await env.execute_cql("SELECT id FROM ks.cf ORDER BY embedding ANN OF [0, 0] LIMIT 3;");
auto rms = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(msg);
BOOST_REQUIRE(rms);
const auto& rows = rms->rs().result_set().rows();
BOOST_REQUIRE_EQUAL(rows.size(), 3);
} catch (const std::exception& e) {
BOOST_FAIL(e.what());
}
},
make_config(format("http://server.node:{}", server->port())))
.finally(seastar::coroutine::lambda([&] -> future<> {
co_await server->stop();
}));
}
}

View File

@@ -65,6 +65,7 @@
#include "transport/cql_protocol_extension.hh"
#include "utils/bit_cast.hh"
#include "utils/error_injection.hh"
#include "utils/labels.hh"
#include "utils/result.hh"
#include "utils/reusable_buffer.hh"
@@ -1304,14 +1305,27 @@ process_execute_internal(service::client_state& client_state, sharded<cql3::quer
}
tracing::trace(trace_state, "Processing a statement");
// Evaluated once here: drives both the conditional cache_key copy below and the promotion block in the lambda.
const bool should_promote_metadata_id = prepared->result_metadata_is_empty() && metadata_id.has_request_metadata_id() &&
!utils::get_local_injector().enter("skip_prepared_result_metadata_promotion");
auto maybe_copied_cache_key = should_promote_metadata_id ? std::optional(cache_key) : std::nullopt;
return qp.local().execute_prepared_without_checking_exception_message(query_state, std::move(stmt), options, std::move(prepared), std::move(cache_key), needs_authorization)
.then([trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version, metadata_id = std::move(metadata_id)] (auto msg) mutable {
.then([&qp, trace_state = query_state.get_trace_state(), skip_metadata, q_state = std::move(q_state), stream, version,
metadata_id = std::move(metadata_id), maybe_cache_key = std::move(maybe_copied_cache_key), should_promote_metadata_id] (auto msg) mutable {
if (msg->move_to_shard()) {
return cql_server::process_fn_return_type(make_foreign(dynamic_pointer_cast<messages::result_message::bounce_to_shard>(msg)));
} else if (msg->is_exception()) {
return cql_server::process_fn_return_type(convert_error_message_to_coordinator_result(msg.get()));
} else {
tracing::trace(q_state->query_state.get_trace_state(), "Done processing - preparing a result");
if (should_promote_metadata_id) {
if (auto rows = dynamic_pointer_cast<messages::result_message::rows>(msg)) {
auto real_id = rows->rs().get_metadata().calculate_metadata_id();
qp.local().update_prepared_result_metadata_id(*maybe_cache_key, real_id);
auto req = metadata_id.get_request_metadata_id();
metadata_id = cql_metadata_id_wrapper(std::move(req), std::move(real_id));
}
}
return cql_server::process_fn_return_type(make_foreign(make_result(stream, *msg, q_state->query_state.get_trace_state(), version, std::move(metadata_id), skip_metadata)));
}
});
@@ -2222,9 +2236,16 @@ void cql_server::response::write(const cql3::metadata& m, const cql_metadata_id_
cql3::cql_metadata_id_type calculated_metadata_id{bytes{}};
if (metadata_id.has_request_metadata_id() && metadata_id.has_response_metadata_id()) {
if (metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id()) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
const bool skip_rows_metadata_changed_response = utils::get_local_injector().enter("skip_rows_metadata_changed_response");
clogger.debug("rows metadata changed response: request_metadata_id_present={}, response_metadata_id_present={}, metadata_changed={}, no_metadata_before={}, injection_fired={}",
metadata_id.has_request_metadata_id(), metadata_id.has_response_metadata_id(),
metadata_id.get_request_metadata_id() != metadata_id.get_response_metadata_id(),
no_metadata, skip_rows_metadata_changed_response);
if (!skip_rows_metadata_changed_response) {
flags.remove<cql3::metadata::flag::NO_METADATA>();
flags.set<cql3::metadata::flag::METADATA_CHANGED>();
no_metadata = false;
}
}
}

View File

@@ -66,7 +66,6 @@ target_sources(utils
azure/identity/default_credentials.cc
gcp/gcp_credentials.cc
gcp/object_storage.cc
gcp/object_storage_retry_strategy.cc
)
target_include_directories(utils
PUBLIC

View File

@@ -9,7 +9,6 @@
#include "object_storage.hh"
#include "gcp_credentials.hh"
#include "object_storage_retry_strategy.hh"
#include <algorithm>
#include <numeric>
@@ -21,16 +20,12 @@
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/units.hh>
#include <seastar/http/client.hh>
#include <seastar/util/short_streams.hh>
#include "utils/rest/client.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/error_injection.hh"
#include "utils/exceptions.hh"
#include "utils/http.hh"
#include "utils/http_client_error_processing.hh"
#include "utils/overloaded_functor.hh"
static logger gcp_storage("gcp_storage");
@@ -231,7 +226,6 @@ class utils::gcp::storage::client::impl {
seastar::semaphore& _limits;
seastar::http::experimental::client _client;
shared_ptr<seastar::tls::certificate_credentials> _certs;
future<> authorize(request_wrapper& req, const std::string& scope);
public:
impl(const utils::http::url_info&, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
impl(std::string_view endpoint, std::optional<google_credentials>, seastar::semaphore*, shared_ptr<seastar::tls::certificate_credentials> creds);
@@ -249,13 +243,6 @@ public:
future<> close();
};
future<> storage::client::impl::authorize(request_wrapper& req, const std::string& scope) {
if (_credentials) {
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
}
}
utils::gcp::storage::client::impl::impl(const utils::http::url_info& url, std::optional<google_credentials> c, seastar::semaphore* memory, shared_ptr<seastar::tls::certificate_credentials> certs)
: _endpoint(url.host)
, _credentials(std::move(c))
@@ -306,87 +293,104 @@ using namespace std::chrono_literals;
/**
* Performs a REST post/put/get with credential refresh/retry.
*/
future<>
future<>
utils::gcp::storage::client::impl::send_with_retry(const std::string& path, const std::string& scope, body_variant body, std::string_view content_type, handler_func_ex handler, httpclient::method_type op, key_values headers, seastar::abort_source* as) {
rest::request_wrapper req(_endpoint);
req.target(path);
req.method(op);
static constexpr auto max_retries = 10;
for (auto& [k,v] : headers) {
req.add_header(k, v);
}
exponential_backoff_retry exr(10ms, 10000ms);
bool do_backoff = false;
std::visit(overloaded_functor {
[&](const std::string& s) { req.content(content_type, s); },
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
}, body);
// GCP storage requires this even if content is empty
req.add_header("Content-Length", std::to_string(req.request().content_length));
gcp_storage.trace("Sending: {}", redacted_request_type {
req.request(),
bearer_filter()
});
try {
try {
co_await authorize(req, scope);
} catch (...) {
// just disregard the failure, we will retry below in the wrapped handler
for (int retry = 0; ; ++retry) {
if (std::exchange(do_backoff, false)) {
co_await (as ? exr.retry(*as) : exr.retry());
}
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
auto _in = std::move(in);
auto status_class = reply::classify_status(rep._status);
/*
* Surprisingly Google Cloud Storage (GCS) commonly returns HTTP 308 during resumable uploads, including when you use PUT. This is expected behavior and
* not an error. The 308 tells the client to continue the upload at the same URL without changing the method or body, which is exactly how GCSs
* resumable upload protocol works.
*/
if (status_class != reply::status_class::informational && status_class != reply::status_class::success &&
rep._status != status_type::permanent_redirect) {
if (rep._status == status_type::unauthorized) {
gcp_storage.warn("Request to failed with status {}. Refreshing credentials.", rep._status);
co_await authorize(req, scope);
}
auto content = co_await util::read_entire_stream_contiguous(_in);
auto error_msg = get_gcp_error_message(std::string_view(content));
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
}
std::exception_ptr eptr;
rest::request_wrapper req(_endpoint);
req.target(path);
req.method(op);
if (_credentials) {
try {
// TODO: rename the fault injection point to something more generic
if (utils::get_local_injector().enter("s3_client_fail_authorization")) {
throw httpd::unexpected_status_error(status_type::unauthorized);
try {
co_await _credentials->refresh(scope, &storage_scope_implies, _certs);
req.add_header(utils::gcp::AUTHORIZATION, format_bearer(_credentials->token));
} catch (httpd::unexpected_status_error& e) {
switch (e.status()) {
default:
if (reply::classify_status(e.status()) != reply::status_class::server_error) {
break;
}
[[fallthrough]];
case status_type::request_timeout:
case status_type::too_many_requests:
if (retry < max_retries) {
gcp_storage.debug("Got {}: {}", e.status(), std::current_exception());
// service unavailable etc -> retry
do_backoff = true;
continue;
}
break;
}
throw;
}
co_await handler(rep, _in);
} catch (...) {
eptr = std::current_exception();
gcp_storage.error("Error refreshing credentials: {}", std::current_exception());
std::throw_with_nested(permission_error("Error refreshing credentials"));
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
};
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
co_return co_await rest::simple_send(_client, req, wrapped_handler, &retry_strategy, as);
} catch (...) {
}
for (auto& [k,v] : headers) {
req.add_header(k, v);
}
std::visit(overloaded_functor {
[&](const std::string& s) { req.content(content_type, s); },
[&](const writer_and_size& ws) { req.content(content_type, ws.first, ws.second); }
}, body);
// GCP storage requires this even if content is empty
req.add_header("Content-Length", std::to_string(req.request().content_length));
gcp_storage.trace("Sending: {}", redacted_request_type {
req.request(),
bearer_filter()
});
try {
std::rethrow_exception(std::current_exception());
} catch (const httpd::unexpected_status_error& e) {
auto status = e.status();
if (reply::classify_status(status) == reply::status_class::redirection || status == reply::status_type::not_found) {
throw storage_io_error{ENOENT, format("GCP object doesn't exist ({})", status)};
co_await rest::simple_send(_client, req, [&handler](const seastar::http::reply& res, seastar::input_stream<char>& in) -> future<> {
gcp_storage.trace("Result: {}", res);
if (res._status == status_type::unauthorized) {
throw permission_error(int(res._status), co_await get_gcp_error_message(in));
} else if (res._status == status_type::request_timeout || res._status == status_type::too_many_requests || reply::classify_status(res._status) == reply::status_class::server_error) {
throw storage_error(int(res._status), co_await get_gcp_error_message(in));
}
co_await handler(res, in);
}, as);
break;
} catch (storage_error& e) {
gcp_storage.debug("{}: Got unexpected response: {}", _endpoint, e.what());
auto s = status_type(e.status());
switch (s) {
default:
if (reply::classify_status(s) != reply::status_class::server_error) {
break;
}
[[fallthrough]];
case status_type::request_timeout:
case status_type::too_many_requests:
do_backoff = true;
[[fallthrough]];
case status_type::unauthorized:
if (retry < max_retries) {
continue; // retry loop.
}
break;
}
if (status == reply::status_type::forbidden || status == reply::status_type::unauthorized) {
throw storage_io_error{EACCES, format("GCP access denied ({})", status)};
}
throw storage_io_error{EIO, format("GCP request failed with ({})", status)};
throw;
} catch (...) {
throw storage_io_error{EIO, format("GCP error ({})", std::current_exception())};
// network, whatnot. maybe add retries here as well, but should really
// be on seastar level
throw;
}
}
}
@@ -999,22 +1003,21 @@ future<> utils::gcp::storage::client::delete_object(std::string_view bucket_in,
auto path = fmt::format("/storage/v1/b/{}/o/{}", bucket, seastar::http::internal::url_encode(object_name));
httpclient::result_type res;
try {
res = co_await _impl->send_with_retry(path, GCP_OBJECT_SCOPE_READ_WRITE, ""s, ""s, httpclient::method_type::DELETE);
} catch (const storage_io_error& ex) {
if (ex.code().value() == ENOENT) {
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
co_return; // ok...?
}
std::rethrow_exception(std::current_exception());
}
auto res = co_await _impl->send_with_retry(path
, GCP_OBJECT_SCOPE_READ_WRITE
, ""s
, ""s
, httpclient::method_type::DELETE
);
switch (res.result()) {
case status_type::ok:
case status_type::no_content:
gcp_storage.debug("Deleted {}:{}", bucket, object_name);
co_return; // done and happy
case status_type::not_found:
gcp_storage.debug("Could not delete {}:{} - no such object", bucket, object_name);
co_return; // ok...?
default:
throw failed_operation(fmt::format("Could not delete object {}:{}: {} ({})", bucket, object_name, res.result()
, get_gcp_error_message(res.body())

View File

@@ -1,52 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "object_storage_retry_strategy.hh"
#include "utils/exceptions.hh"
#include "utils/http_client_error_processing.hh"
#include <seastar/core/sleep.hh>
#include <seastar/http/exception.hh>
static logger rs_logger("gcp_retry_strategy");
namespace utils::gcp::storage {
object_storage_retry_strategy::object_storage_retry_strategy(unsigned max_retries,
std::chrono::milliseconds base_sleep_time,
std::chrono::milliseconds max_sleep_time,
abort_source* as)
: _max_retries(max_retries), _exr(base_sleep_time, max_sleep_time), _as(as) {
}
future<bool> object_storage_retry_strategy::should_retry(std::exception_ptr error, unsigned attempted_retries) const {
if (attempted_retries >= _max_retries) {
rs_logger.warn("Retries exhausted. Retry# {}", attempted_retries);
co_return false;
}
auto retryable = from_exception_ptr(error);
if (retryable) {
rs_logger.debug("GCP client request failed. Reason: {}. Retry# {}", error, attempted_retries);
co_await (_as ? _exr.retry(*_as) : _exr.retry());
} else {
rs_logger.warn("GCP client encountered non-retryable error. Reason: {}. Retry# {}", error, attempted_retries);
}
co_return retryable;
}
bool object_storage_retry_strategy::from_exception_ptr(std::exception_ptr exception) {
return dispatch_exception<bool>(
std::move(exception),
[](std::exception_ptr, std::string&&) { return false; },
[](const seastar::httpd::unexpected_status_error& ex) {
return http::from_http_code(ex.status()) == http::retryable::yes || ex.status() == seastar::http::reply::status_type::unauthorized;
},
[](const std::system_error& ex) { return http::from_system_error(ex) == http::retryable::yes; });
}
} // namespace utils::gcp::storage

View File

@@ -1,34 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "utils/exponential_backoff_retry.hh"
#include <seastar/http/retry_strategy.hh>
namespace utils::gcp::storage {
// GCP object storage retry strategy
// General guidelines https://docs.cloud.google.com/storage/docs/retry-strategy
class object_storage_retry_strategy : public seastar::http::experimental::retry_strategy {
protected:
unsigned _max_retries;
mutable exponential_backoff_retry _exr;
abort_source* _as;
public:
object_storage_retry_strategy(unsigned max_retries = 10,
std::chrono::milliseconds base_sleep_time = std::chrono::milliseconds(10),
std::chrono::milliseconds max_sleep_time = std::chrono::milliseconds(10000),
abort_source* as = nullptr);
seastar::future<bool> should_retry(std::exception_ptr error, unsigned attempted_retries) const override;
static bool from_exception_ptr(std::exception_ptr exception);
};
} // namespace utils::gcp::storage

View File

@@ -126,10 +126,6 @@ seastar::future<> rest::httpclient::send(const handler_func& f, seastar::abort_s
}
seastar::future<> rest::simple_send(seastar::http::experimental::client& client, seastar::http::request& req, const handler_func_ex& f, seastar::abort_source* as) {
co_await simple_send(client, req, f, nullptr, as);
}
seastar::future<> rest::simple_send(seastar::http::experimental::client& client, seastar::http::request& req, const handler_func_ex& f, const http::experimental::retry_strategy* strategy, seastar::abort_source* as) {
if (as) {
as->check();
}
@@ -143,14 +139,6 @@ seastar::future<> rest::simple_send(seastar::http::experimental::client& client,
req._headers[httpclient::CONTENT_TYPE_HEADER] = "application/x-www-form-urlencoded";
}
if (strategy) {
co_return co_await client.make_request(std::move(req), [&](const http::reply& rep, input_stream<char>&& in) -> future<> {
// ensure these are on our coroutine frame.
auto& resp_handler = f;
auto in_stream = std::move(in);
co_await resp_handler(rep, in_stream);
}, *strategy, std::nullopt, as);
}
co_await client.make_request(std::move(req), [&](const http::reply& rep, input_stream<char>&& in) -> future<> {
// ensure these are on our coroutine frame.
auto& resp_handler = f;

View File

@@ -114,7 +114,6 @@ private:
using handler_func_ex = std::function<future<>(const seastar::http::reply&, seastar::input_stream<char>&)>;
seastar::future<> simple_send(seastar::http::experimental::client&, seastar::http::request&, const handler_func_ex&, seastar::abort_source* = nullptr);
seastar::future<> simple_send(seastar::http::experimental::client&, seastar::http::request&, const handler_func_ex&, const http::experimental::retry_strategy* strategy, seastar::abort_source* = nullptr);
// Interface for redacting sensitive data from HTTP requests and responses before logging.
class http_log_filter {

View File

@@ -56,7 +56,7 @@ dns::dns(logging::logger& logger, std::vector<seastar::sstring> hosts, listener_
co_await coroutine::return_exception_ptr(std::move(err));
}
auto addr = co_await std::move(f);
co_return addr.addr_entries | std::views::transform(&net::hostent::address_entry::addr) | std::ranges::to<std::vector>();
co_return addr.addr_list;
})
, _hosts(std::move(hosts))
, _listener(std::move(listener))