Compare commits

...

9 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
5515d5fb7d test/alternator: fix delete_item_no_ts test, add LWT rejection tests for delete ops, simplify assertions, update docs
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 17:31:41 +00:00
copilot-swe-agent[bot]
328c263aed alternator: add custom timestamp support to DeleteItem and BatchWriteItem DeleteRequest
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-03-05 16:16:27 +00:00
copilot-swe-agent[bot]
a6d36a480d test/alternator: add explicit only_rmw_uses_lwt isolation to test_table_ts and test_table_ts_ss
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:17:17 +00:00
copilot-swe-agent[bot]
85354ae26a test/alternator: restore test_table_ts_lwt with always isolation; update docs
- Restore test_table_ts_lwt fixture with system:write_isolation=always to
  explicitly test that the timestamp attribute is rejected in LWT_ALWAYS mode
- Add test_timestamp_attribute_lwt_always_rejected which verifies that even
  a plain PutItem with a timestamp is rejected when always_use_lwt is set
- Keep test_timestamp_attribute_with_condition_rejected using test_table_ts
  (with the test runner's default only_rmw_uses_lwt isolation) to test
  that a ConditionExpression triggers LWT rejection
- Update docs: fix item 4 (non-numeric now rejected), improve Limitations
  section to clearly state always_use_lwt is incompatible with the feature
  and recommend system:write_isolation=only_rmw_uses_lwt

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 22:12:07 +00:00
copilot-swe-agent[bot]
786fa68faa test/alternator: address review comments on test_timestamp_attribute
- Use scope="module" instead of testpy_test_fixture_scope in fixtures
- Rename test_table_ts_sc to test_table_ts_ss (ss = string+string keys)
- Remove test_table_ts_lwt; use test_table_ts for LWT-rejection test
  (the test server runs with only_rmw_uses_lwt, so conditions trigger LWT)
- Add comment that fixtures make tests implicitly Scylla-only
- Change non-numeric timestamp attribute behavior: reject with
  ValidationException instead of silently storing (test + C++ implementation)
- Add test_timestamp_attribute_microseconds: verifies the timestamp unit
  is microseconds and tests interaction with default server timestamps
- Add import time for the new microseconds test

Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:58:34 +00:00
copilot-swe-agent[bot]
a57f781852 test: refactor test_timestamp_attribute to use shared fixtures
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 21:38:52 +00:00
copilot-swe-agent[bot]
7f79b90e91 Fix timestamp_attribute: non-numeric handling, tag visibility, and clean up
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:35:48 +00:00
copilot-swe-agent[bot]
175b8a8a5e Add system:timestamp_attribute feature for custom write timestamps in Alternator
Co-authored-by: nyh <584227+nyh@users.noreply.github.com>
2026-02-25 20:27:26 +00:00
copilot-swe-agent[bot]
c2ef8075ee Initial plan 2026-02-25 20:04:37 +00:00
4 changed files with 714 additions and 22 deletions

View File

@@ -7,6 +7,7 @@
*/
#include <fmt/ranges.h>
#include <cstdlib>
#include <seastar/core/on_internal_error.hh>
#include "alternator/executor.hh"
#include "alternator/consumed_capacity.hh"
@@ -108,6 +109,16 @@ const sstring TABLE_CREATION_TIME_TAG_KEY("system:table_creation_time");
// configured by UpdateTimeToLive to be the expiration-time attribute for
// this table.
extern const sstring TTL_TAG_KEY("system:ttl_attribute");
// If this tag is present, it stores the name of an attribute whose numeric
// value (in microseconds since the Unix epoch) is used as the write timestamp
// for PutItem and UpdateItem operations. When the named attribute is present
// in a PutItem or UpdateItem request, its value is used as the timestamp of
// the write, and the attribute itself is NOT stored in the item. This allows
// users to control write ordering for last-write-wins semantics. Because LWT
// does not allow setting a custom write timestamp, operations using this
// feature are incompatible with conditions (which require LWT), and with
// the LWT_ALWAYS write isolation mode; such operations are rejected.
static const sstring TIMESTAMP_TAG_KEY("system:timestamp_attribute");
// This will be set to 1 in a case, where user DID NOT specify a range key.
// The way GSI / LSI is implemented by Alternator assumes user specified keys will come first
// in materialized view's key list. Then, if needed missing keys are added (current implementation
@@ -1337,13 +1348,14 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
// Alternator uses tags whose keys start with the "system:" prefix for
// internal purposes. Those should not be readable by ListTagsOfResource,
// nor writable with TagResource or UntagResource (see #24098).
// Only a few specific system tags, currently only "system:write_isolation"
// and "system:initial_tablets", are deliberately intended to be set and read
// by the user, so are not considered "internal".
// Only a few specific system tags, currently only "system:write_isolation",
// "system:initial_tablets", and "system:timestamp_attribute", are deliberately
// intended to be set and read by the user, so are not considered "internal".
static bool tag_key_is_internal(std::string_view tag_key) {
return tag_key.starts_with("system:")
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
&& tag_key != INITIAL_TABLETS_TAG_KEY;
&& tag_key != INITIAL_TABLETS_TAG_KEY
&& tag_key != TIMESTAMP_TAG_KEY;
}
enum class update_tags_action { add_tags, delete_tags };
@@ -2298,8 +2310,11 @@ public:
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
// components of a key, and if that succeeded, call check_key() to further
// check that the key doesn't have any spurious components.
static void check_key(const rjson::value& key, const schema_ptr& schema) {
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
// allow_extra_attribute: set to true when the key may contain one extra
// non-key attribute (e.g., the timestamp pseudo-attribute for DeleteItem).
static void check_key(const rjson::value& key, const schema_ptr& schema, bool allow_extra_attribute = false) {
const unsigned expected = (schema->clustering_key_size() == 0 ? 1 : 2) + (allow_extra_attribute ? 1 : 0);
if (key.MemberCount() != expected) {
throw api_error::validation("Given key attribute not in schema");
}
}
@@ -2346,6 +2361,57 @@ void validate_value(const rjson::value& v, const char* caller) {
// any writing happens (if one of the commands has an error, none of the
// writes should be done). LWT makes it impossible for the parse step to
// generate "mutation" objects, because the timestamp still isn't known.
// Convert a DynamoDB number (big_decimal) to an api::timestamp_type
// (microseconds since the Unix epoch). Fractional microseconds are truncated.
// Returns nullopt if the value is negative or zero.
static std::optional<api::timestamp_type> bigdecimal_to_timestamp(const big_decimal& bd) {
if (bd.unscaled_value() <= 0) {
return std::nullopt;
}
if (bd.scale() == 0) {
// Fast path: integer value, no decimal adjustment needed
return static_cast<api::timestamp_type>(bd.unscaled_value());
}
// General case: adjust for decimal scale.
// big_decimal stores value as unscaled_value * 10^(-scale).
// scale > 0 means divide by 10^scale (truncate fractional part).
// scale < 0 means multiply by 10^|scale| (add trailing zeros).
auto str = bd.unscaled_value().str();
if (bd.scale() > 0) {
int len = str.length();
if (len <= bd.scale()) {
return std::nullopt; // Number < 1
}
str = str.substr(0, len - bd.scale());
} else {
if (bd.scale() < -18) {
// Too large to represent as int64_t
return std::nullopt;
}
for (int i = 0; i < -bd.scale(); i++) {
str.push_back('0');
}
}
long long result = strtoll(str.c_str(), nullptr, 10);
if (result <= 0) {
return std::nullopt;
}
return static_cast<api::timestamp_type>(result);
}
// Try to extract a write timestamp from a DynamoDB-typed value.
// The value should be a number ({"N": "..."}), representing microseconds
// since the Unix epoch. Returns nullopt if the value is not a valid number
// or doesn't represent a valid timestamp.
static std::optional<api::timestamp_type> try_get_timestamp(const rjson::value& attr_value) {
std::optional<big_decimal> n = try_unwrap_number(attr_value);
if (!n) {
return std::nullopt;
}
return bigdecimal_to_timestamp(*n);
}
class put_or_delete_item {
private:
partition_key _pk;
@@ -2361,11 +2427,17 @@ private:
// that length can have different meaning depends on the operation but the
// the calculation of length in bytes to WCU is the same.
uint64_t _length_in_bytes = 0;
// If the table has a system:timestamp_attribute tag, and the named
// attribute was found in the item with a valid numeric value, this holds
// the extracted timestamp. The attribute is not added to _cells.
std::optional<api::timestamp_type> _custom_timestamp;
public:
struct delete_item {};
struct put_item {};
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute = std::nullopt);
// put_or_delete_item doesn't keep a reference to schema (so it can be
// moved between shards for LWT) so it needs to be given again to build():
mutation build(schema_ptr schema, api::timestamp_type ts) const;
@@ -2380,11 +2452,32 @@ public:
bool is_put_item() noexcept {
return _cells.has_value();
}
// Returns the custom write timestamp extracted from the timestamp attribute,
// if any. If not set, the caller should use api::new_timestamp() instead.
std::optional<api::timestamp_type> custom_timestamp() const noexcept {
return _custom_timestamp;
}
};
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item, const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
check_key(key, schema);
if (timestamp_attribute) {
// The timestamp attribute may be provided as a "pseudo-key": it is
// not a real key column, but can be included in the "Key" object to
// carry the custom write timestamp. If found, extract the timestamp
// and don't store it in the item.
const rjson::value* ts_val = rjson::find(key, to_string_view(*timestamp_attribute));
if (ts_val) {
if (auto t = try_get_timestamp(*ts_val)) {
_custom_timestamp = t;
} else {
throw api_error::validation(fmt::format(
"The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)",
to_string_view(*timestamp_attribute)));
}
}
}
check_key(key, schema, _custom_timestamp.has_value());
}
// find_attribute() checks whether the named attribute is stored in the
@@ -2471,7 +2564,8 @@ static inline void validate_value_if_index_key(
}
}
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
const std::optional<bytes>& timestamp_attribute)
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
_cells = std::vector<cell>();
_cells->reserve(item.MemberCount());
@@ -2480,6 +2574,17 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
validate_value(it->value, "PutItem");
const column_definition* cdef = find_attribute(*schema, column_name);
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
// If this is the timestamp attribute, it must be a valid numeric value
// (microseconds since epoch). Use it as the write timestamp and do not
// store it in the item data. Reject the write if the value is non-numeric.
if (timestamp_attribute && column_name == *timestamp_attribute) {
if (auto t = try_get_timestamp(it->value)) {
_custom_timestamp = t;
// The attribute is consumed as timestamp, not stored in _cells.
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*timestamp_attribute)));
}
_length_in_bytes += column_name.size();
if (!cdef) {
// This attribute may be a key column of one of the GSI or LSI,
@@ -2671,6 +2776,13 @@ rmw_operation::rmw_operation(service::storage_proxy& proxy, rjson::value&& reque
// _pk and _ck will be assigned later, by the subclass's constructor
// (each operation puts the key in a slightly different location in
// the request).
const auto tags_ptr = db::get_tags_of_table(_schema);
if (tags_ptr) {
auto it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (it != tags_ptr->end() && !it->second.empty()) {
_timestamp_attribute = to_bytes(it->second);
}
}
}
std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) {
@@ -2815,6 +2927,21 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
.alternator = true,
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
};
// If the operation uses a custom write timestamp (from the
// system:timestamp_attribute tag), LWT is incompatible because LWT
// requires the timestamp to be set by the Paxos protocol. Reject the
// operation if it would need to use LWT.
if (has_custom_timestamp()) {
bool would_use_lwt = _write_isolation == write_isolation::LWT_ALWAYS ||
(needs_read_before_write &&
_write_isolation != write_isolation::FORBID_RMW &&
_write_isolation != write_isolation::UNSAFE_RMW);
if (would_use_lwt) {
throw api_error::validation(
"Using the system:timestamp_attribute is not compatible with "
"conditional writes or the 'always' write isolation policy.");
}
}
if (needs_read_before_write) {
if (_write_isolation == write_isolation::FORBID_RMW) {
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
@@ -2913,7 +3040,8 @@ public:
put_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name())),
_timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -2945,6 +3073,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -2962,7 +3093,10 @@ public:
} else {
_return_attributes = {};
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~put_item_operation() = default;
};
@@ -3014,7 +3148,7 @@ public:
parsed::condition_expression _condition_expression;
delete_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
: rmw_operation(proxy, std::move(request))
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}, _timestamp_attribute) {
_pk = _mutation_builder.pk();
_ck = _mutation_builder.ck();
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
@@ -3045,6 +3179,9 @@ public:
check_needs_read_before_write(_condition_expression) ||
_returnvalues == returnvalues::ALL_OLD;
}
bool has_custom_timestamp() const noexcept override {
return _mutation_builder.custom_timestamp().has_value();
}
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
if (!verify_expected(_request, previous_item.get()) ||
!verify_condition_expression(_condition_expression, previous_item.get())) {
@@ -3065,7 +3202,10 @@ public:
if (_consumed_capacity._total_bytes == 0) {
_consumed_capacity._total_bytes = 1;
}
return _mutation_builder.build(_schema, ts);
// Use the custom timestamp from the timestamp attribute if available,
// otherwise use the provided timestamp.
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
return _mutation_builder.build(_schema, effective_ts);
}
virtual ~delete_item_operation() = default;
};
@@ -3252,10 +3392,13 @@ future<> executor::do_batch_write(
// Do a normal write, without LWT:
utils::chunked_vector<mutation> mutations;
mutations.reserve(mutation_builders.size());
api::timestamp_type now = api::new_timestamp();
api::timestamp_type default_ts = api::new_timestamp();
bool any_cdc_enabled = false;
for (auto& b : mutation_builders) {
mutations.push_back(b.second.build(b.first, now));
// Use custom timestamp from the timestamp attribute if available,
// otherwise use the default timestamp for all items in this batch.
api::timestamp_type ts = b.second.custom_timestamp().value_or(default_ts);
mutations.push_back(b.second.build(b.first, ts));
any_cdc_enabled |= b.first->cdc_options().enabled();
}
return _proxy.mutate(std::move(mutations),
@@ -3355,6 +3498,16 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
1, primary_key_hash{schema}, primary_key_equal{schema});
// Look up the timestamp attribute tag once per table (shared by all
// PutRequests and DeleteRequests for this table).
std::optional<bytes> ts_attr;
const auto tags_ptr = db::get_tags_of_table(schema);
if (tags_ptr) {
auto tag_it = tags_ptr->find(TIMESTAMP_TAG_KEY);
if (tag_it != tags_ptr->end() && !tag_it->second.empty()) {
ts_attr = to_bytes(tag_it->second);
}
}
for (auto& request : it->value.GetArray()) {
auto& r = get_single_member(request, "RequestItems element");
const auto r_name = rjson::to_string_view(r.name);
@@ -3363,7 +3516,8 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
validate_is_object(item, "Item in PutRequest");
auto&& put_item = put_or_delete_item(
item, schema, put_or_delete_item::put_item{},
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())));
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())),
ts_attr);
mutation_builders.emplace_back(schema, std::move(put_item));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3374,7 +3528,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
const rjson::value& key = get_member(r.value, "Key", "DeleteRequest");
validate_is_object(key, "Key in DeleteRequest");
mutation_builders.emplace_back(schema, put_or_delete_item(
key, schema, put_or_delete_item::delete_item{}));
key, schema, put_or_delete_item::delete_item{}, ts_attr));
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
mutation_builders.back().second.ck());
if (used_keys.contains(mut_key)) {
@@ -3983,6 +4137,10 @@ public:
virtual ~update_item_operation() = default;
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override;
bool needs_read_before_write() const;
// Returns true if the timestamp attribute is being set in this update
// (via AttributeUpdates PUT or UpdateExpression SET). Used to detect
// whether a custom write timestamp will be used.
bool has_custom_timestamp() const noexcept;
private:
void delete_attribute(bytes&& column_name, const std::unique_ptr<rjson::value>& previous_item, const api::timestamp_type ts, deletable_row& row,
@@ -4117,6 +4275,44 @@ update_item_operation::needs_read_before_write() const {
(_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::UPDATED_NEW);
}
bool
update_item_operation::has_custom_timestamp() const noexcept {
if (!_timestamp_attribute) {
return false;
}
// Check if the timestamp attribute is being set via AttributeUpdates PUT
// with a valid numeric value.
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
// Only consider it a custom timestamp if the value is numeric
if (try_get_timestamp((it->value)["Value"])) {
return true;
}
}
break;
}
}
}
// Check if the timestamp attribute is being set via UpdateExpression SET.
// We can't check the actual value type without resolving the expression
// (which requires previous_item), so we conservatively return true if the
// attribute appears in a SET action, and handle the non-numeric case in apply().
// A non-numeric value will cause apply() to throw a ValidationException.
if (!_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
return std::holds_alternative<parsed::update_expression::action::set>(action._action);
}
}
return false;
}
// action_result() returns the result of applying an UpdateItem action -
// this result is either a JSON object or an unset optional which indicates
// the action was a deletion. The caller (update_item_operation::apply()
@@ -4392,6 +4588,17 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
}
std::string action = rjson::to_string((it->value)["Action"]);
// If this is the timestamp attribute being PUT, it must be a valid
// numeric value (microseconds since epoch). Use it as the write
// timestamp and skip storing it. Reject if the value is non-numeric.
if (_timestamp_attribute && column_name == *_timestamp_attribute && action == "PUT") {
if (it->value.HasMember("Value")) {
if (try_get_timestamp((it->value)["Value"])) {
continue;
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (action == "DELETE") {
// The DELETE operation can do two unrelated tasks. Without a
// "Value" option, it is used to delete an attribute. With a
@@ -4495,6 +4702,20 @@ inline void update_item_operation::apply_update_expression(const std::unique_ptr
if (cdef && cdef->is_primary_key()) {
throw api_error::validation(fmt::format("UpdateItem cannot update key column {}", column_name));
}
// If this is the timestamp attribute being set via UpdateExpression SET,
// it must be a valid numeric value (microseconds since epoch). Use it as
// the write timestamp and skip storing it. Reject if non-numeric.
if (_timestamp_attribute && to_bytes(column_name) == *_timestamp_attribute &&
actions.second.has_value() &&
std::holds_alternative<parsed::update_expression::action::set>(actions.second.get_value()._action)) {
std::optional<rjson::value> result = action_result(actions.second.get_value(), previous_item.get());
if (result) {
if (try_get_timestamp(*result)) {
continue; // Skip - already used as timestamp
}
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
}
}
if (actions.second.has_value()) {
// An action on a top-level attribute column_name. The single
// action is actions.second.get_value(). We can simply invoke
@@ -4543,6 +4764,44 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
return {};
}
// If the table has a timestamp attribute, look for it in the update
// (AttributeUpdates PUT or UpdateExpression SET). If found with a valid
// numeric value, use it as the write timestamp instead of the provided ts.
api::timestamp_type effective_ts = ts;
if (_timestamp_attribute) {
bool found_ts = false;
if (_attribute_updates) {
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == ts_attr) {
const rjson::value* action = rjson::find(it->value, "Action");
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
if (auto t = try_get_timestamp((it->value)["Value"])) {
effective_ts = *t;
found_ts = true;
}
}
break;
}
}
}
if (!found_ts && !_update_expression.empty()) {
std::string ts_attr(to_string_view(*_timestamp_attribute));
auto it = _update_expression.find(ts_attr);
if (it != _update_expression.end() && it->second.has_value()) {
const auto& action = it->second.get_value();
if (std::holds_alternative<parsed::update_expression::action::set>(action._action)) {
std::optional<rjson::value> result = action_result(action, previous_item.get());
if (result) {
if (auto t = try_get_timestamp(*result)) {
effective_ts = *t;
}
}
}
}
}
}
// In the ReturnValues=ALL_NEW case, we make a copy of previous_item into
// _return_attributes and parts of it will be overwritten by the new
// updates (in do_update() and do_delete()). We need to make a copy and
@@ -4571,10 +4830,10 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
auto& row = m.partition().clustered_row(*_schema, _ck);
auto modified_attrs = attribute_collector();
if (!_update_expression.empty()) {
apply_update_expression(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_update_expression(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (_attribute_updates) {
apply_attribute_updates(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
apply_attribute_updates(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
}
if (!modified_attrs.empty()) {
auto serialized_map = modified_attrs.to_mut().serialize(*attrs_type());
@@ -4585,7 +4844,7 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
// marker. An update with only DELETE operations must not add a row marker
// (this was issue #5862) but any other update, even an empty one, should.
if (any_updates || !any_deletes) {
row.apply(row_marker(ts));
row.apply(row_marker(effective_ts));
} else if (_returnvalues == returnvalues::ALL_NEW && !previous_item) {
// There was no pre-existing item, and we're not creating one, so
// don't report the new item in the returned Attributes.

View File

@@ -18,6 +18,7 @@
#include "executor.hh"
#include "tracing/trace_state.hh"
#include "keys/keys.hh"
#include "bytes.hh"
namespace alternator {
@@ -72,6 +73,11 @@ protected:
clustering_key _ck = clustering_key::make_empty();
write_isolation _write_isolation;
mutable wcu_consumed_capacity_counter _consumed_capacity;
// If the table has a "system:timestamp_attribute" tag, this holds the
// name of the attribute (converted to bytes) whose numeric value should
// be used as the write timestamp instead of the current time. The
// attribute itself is NOT stored in the item data.
std::optional<bytes> _timestamp_attribute;
// All RMW operations can have a ReturnValues parameter from the following
// choices. But note that only UpdateItem actually supports all of them:
enum class returnvalues {
@@ -113,6 +119,9 @@ public:
// Convert the above apply() into the signature needed by cas_request:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
virtual ~rmw_operation() = default;
// Returns true if the operation will use a custom write timestamp (from the
// system:timestamp_attribute tag). Subclasses override this as needed.
virtual bool has_custom_timestamp() const noexcept { return false; }
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
schema_ptr schema() const { return _schema; }
const rjson::value& request() const { return _request; }

View File

@@ -213,3 +213,71 @@ Alternator table, the following features will not work for this table:
* Enabling Streams with CreateTable or UpdateTable doesn't work
(results in an error).
See <https://github.com/scylladb/scylla/issues/23838>.
## Custom write timestamps
DynamoDB doesn't allow clients to set the write timestamp of updates. All
updates use the current server time as their timestamp, and ScyllaDB uses
these timestamps for last-write-wins conflict resolution when concurrent
writes reach different replicas.
ScyllaDB Alternator extends this with the `system:timestamp_attribute` tag,
which allows specifying a custom write timestamp for each PutItem,
UpdateItem, DeleteItem, or BatchWriteItem request. To use this feature:
1. Tag the table (at CreateTable time or using TagResource) with
`system:timestamp_attribute` set to the name of an attribute that will
hold the custom write timestamp.
2. When performing a PutItem or UpdateItem, include the named attribute
in the request with a numeric value. The value represents the write
timestamp in **microseconds since the Unix epoch** (this is the same
unit used internally by ScyllaDB for timestamps).
For a DeleteItem or a BatchWriteItem DeleteRequest, include the named
attribute in the `Key` parameter (it will be stripped from the key
before use).
3. The named attribute is **not stored** in the item data - it only
controls the write timestamp. If you also want to record the timestamp
as data, use a separate attribute for that purpose.
4. If the named attribute is absent, the write proceeds normally using the
current server time as the timestamp. If the named attribute is present
but has a non-numeric value, the write is rejected with a ValidationException.
### Limitations
- **Incompatible with conditions**: If the write includes a ConditionExpression
(or uses the `Expected` legacy condition), LWT is needed and the operation
is rejected with a ValidationException, because LWT requires the write
timestamp to be set by the Paxos protocol, not by the client.
- **Incompatible with `always` write isolation**: Tables using the `always`
(or `always_use_lwt`) write isolation policy cannot use the timestamp
attribute feature at all, because every write uses LWT in that mode.
When using `system:timestamp_attribute`, consider tagging the table with
`system:write_isolation=only_rmw_uses_lwt` (or `forbid_rmw`) so that
unconditional writes do not use LWT.
### Example use case
This feature is useful for ingesting data from multiple sources where each
record has a known logical timestamp. By setting the `system:timestamp_attribute`
tag, you can ensure that the record with the highest logical timestamp always
wins, regardless of ingestion order:
```python
# Create table with timestamp attribute
dynamodb.create_table(
TableName='my_table',
...
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'write_ts'}]
)
# Write a record with a specific timestamp (in microseconds since epoch)
table.put_item(Item={
'pk': 'my_key',
'data': 'new_value',
'write_ts': Decimal('1700000000000000'), # Nov 14, 2023 in microseconds
})
```

View File

@@ -0,0 +1,356 @@
# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
# Tests for the system:timestamp_attribute Scylla-specific feature.
# This feature allows users to control the write timestamp of PutItem and
# UpdateItem operations by specifying an attribute name in the table's
# system:timestamp_attribute tag. When that attribute is present in the
# write request with a numeric value (microseconds since Unix epoch), it
# is used as the write timestamp. The attribute itself is not stored in
# the item data.
#
# This is a Scylla-specific feature and is not tested on DynamoDB.
import time
import pytest
from botocore.exceptions import ClientError
from decimal import Decimal
from .util import create_test_table, random_string
# A large timestamp in microseconds (far future, year ~2033)
LARGE_TS = Decimal('2000000000000000')
# A medium timestamp in microseconds (year ~2001)
MEDIUM_TS = Decimal('1000000000000000')
# A small timestamp in microseconds (year ~1970+)
SMALL_TS = Decimal('100000000000000')
# Fixtures for tables with the system:timestamp_attribute tag. The tables
# are created once per module and shared between all tests that use them,
# to avoid the overhead of creating and deleting tables for each test.
# Because system:timestamp_attribute is a Scylla-only feature, all tests
# using these fixtures are implicitly Scylla-only (via scylla_only parameter).
# A table with only a hash key and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# A table with hash (string) and range (string) keys and system:timestamp_attribute='ts' tag.
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
# correct even if the server default changes to always_use_lwt in the future.
@pytest.fixture(scope="module")
def test_table_ts_ss(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
KeySchema=[
{'AttributeName': 'p', 'KeyType': 'HASH'},
{'AttributeName': 'c', 'KeyType': 'RANGE'},
],
AttributeDefinitions=[
{'AttributeName': 'p', 'AttributeType': 'S'},
{'AttributeName': 'c', 'AttributeType': 'S'},
])
yield table
table.delete()
# A table with hash key, system:timestamp_attribute='ts' tag, and
# system:write_isolation='always' to test rejection in LWT_ALWAYS mode.
# In always_use_lwt mode, every write uses LWT, so the timestamp attribute
# feature cannot be used at all.
@pytest.fixture(scope="module")
def test_table_ts_lwt(scylla_only, dynamodb):
table = create_test_table(dynamodb,
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
{'Key': 'system:write_isolation', 'Value': 'always'}],
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
yield table
table.delete()
# Test that PutItem with the timestamp attribute uses the given numeric
# value as the write timestamp, and the timestamp attribute is NOT stored
# in the item.
def test_timestamp_attribute_put_item_basic(test_table_ts):
p = random_string()
# Put an item with the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Read the item back
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# 'val' should be stored normally
assert item['val'] == 'hello'
# 'ts' (the timestamp attribute) should NOT be stored in the item
assert 'ts' not in item
# Test that PutItem respects the write timestamp ordering: a write with a
# larger timestamp should win over a write with a smaller timestamp,
# regardless of wall-clock order.
def test_timestamp_attribute_put_item_ordering(test_table_ts):
p = random_string()
# First, write item with a LARGE timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Then write item with a SMALL timestamp (should lose since SMALL < LARGE)
test_table_ts.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
# The item with the larger timestamp (val='large_ts') should win
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
# Now try to overwrite with a LARGER timestamp (should win)
test_table_ts.put_item(Item={'p': p, 'val': 'latest', 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'latest'
# Test that UpdateItem with the timestamp attribute in AttributeUpdates
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_attribute_updates(test_table_ts):
p = random_string()
# Use UpdateItem with AttributeUpdates, setting 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'hello', 'Action': 'PUT'},
'ts': {'Value': LARGE_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
AttributeUpdates={
'val': {'Value': 'overwritten', 'Action': 'PUT'},
'ts': {'Value': SMALL_TS, 'Action': 'PUT'},
}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that UpdateItem with the timestamp attribute in UpdateExpression
# uses the given numeric value as the write timestamp, and the timestamp
# attribute is NOT stored in the item.
def test_timestamp_attribute_update_item_update_expression(test_table_ts):
p = random_string()
# Use UpdateItem with UpdateExpression to set 'val' and 'ts'
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'hello', ':t': LARGE_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# 'ts' should NOT be stored in the item
assert 'ts' not in item
# Update with a smaller timestamp - should NOT overwrite
test_table_ts.update_item(
Key={'p': p},
UpdateExpression='SET val = :v, ts = :t',
ExpressionAttributeValues={':v': 'overwritten', ':t': SMALL_TS}
)
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
# The item with the larger timestamp should still win
assert item['val'] == 'hello'
# Test that when the timestamp attribute is not present in the write request,
# the operation behaves normally (no custom timestamp is applied).
def test_timestamp_attribute_absent(test_table_ts):
p = random_string()
# Put item without the timestamp attribute
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# No 'ts' attribute expected either
assert 'ts' not in item
# Test that using a condition expression (which requires LWT) together with
# the timestamp attribute is rejected.
def test_timestamp_attribute_with_condition_rejected(test_table_ts):
p = random_string()
# Put an initial item (no timestamp attribute, so LWT is ok)
test_table_ts.put_item(Item={'p': p, 'val': 'initial'})
# Try to put with a ConditionExpression and a timestamp - should be rejected
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(
Item={'p': p, 'val': 'updated', 'ts': LARGE_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that using the timestamp attribute with the 'always' write isolation
# policy is rejected, because in always_use_lwt mode every write uses LWT
# (including unconditional ones), which is incompatible with custom timestamps.
def test_timestamp_attribute_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
# Even a plain PutItem with a timestamp is rejected in LWT_ALWAYS mode
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Test that when the timestamp attribute has a non-numeric value, the write
# is rejected with a ValidationException.
def test_timestamp_attribute_non_numeric(test_table_ts):
p = random_string()
# Put item with the timestamp attribute as a string (non-numeric) - should fail
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': 'not_a_number'})
# Test that the timestamp attribute tag can be set on a table with a sort key.
def test_timestamp_attribute_with_range_key(test_table_ts_ss):
p = random_string()
c = random_string()
# Write with a large timestamp
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'large', 'ts': LARGE_TS})
# Write with a small timestamp (should lose)
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'small', 'ts': SMALL_TS})
item = test_table_ts_ss.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
assert item['val'] == 'large'
assert 'ts' not in item
# Test that the timestamp attribute value is interpreted in microseconds since
# the Unix epoch, and that writes with and without explicit timestamps interact
# correctly.
def test_timestamp_attribute_microseconds(test_table_ts):
# Get current time in microseconds from the Python client side.
now_us = int(time.time() * 1_000_000)
one_hour_us = 3600 * 1_000_000
# Part 1: write with the current time as the explicit timestamp, then
# overwrite without an explicit timestamp. The second write uses the
# server's current time (which is >= now_us), so it should win.
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'old', 'ts': Decimal(str(now_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'new'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'new'
# Part 2: write with a timestamp one hour in the future, then overwrite
# without an explicit timestamp. The server's current time (≈ now_us) is
# much less than now_us + one_hour_us, so the first write should win.
p = random_string()
future_us = now_us + one_hour_us
test_table_ts.put_item(Item={'p': p, 'val': 'future', 'ts': Decimal(str(future_us))})
test_table_ts.put_item(Item={'p': p, 'val': 'now'})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'future'
# Test that BatchWriteItem also respects the timestamp attribute.
def test_timestamp_attribute_batch_write(test_table_ts):
p = random_string()
# Write item via BatchWriteItem with a large timestamp
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
# Write item via BatchWriteItem with a small timestamp (should lose)
with test_table_ts.batch_writer() as batch:
batch.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'large_ts'
assert 'ts' not in item
# Test that DeleteItem respects the timestamp attribute: a delete with a
# smaller timestamp than the item's write timestamp should not take effect.
def test_timestamp_attribute_delete_item(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
assert item['val'] == 'hello'
# Delete with a small timestamp - the delete should lose (item still exists)
test_table_ts.delete_item(Key={'p': p, 'ts': SMALL_TS})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete with a large timestamp - the delete should win (item is removed)
test_table_ts.delete_item(Key={'p': p, 'ts': LARGE_TS + 1})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is None
# Test that DeleteItem without the timestamp attribute in the key behaves
# normally (no custom timestamp is applied).
def test_timestamp_attribute_delete_item_no_ts(test_table_ts):
p = random_string()
# Use SMALL_TS so the delete (which uses the current server time) wins.
# If we used LARGE_TS (far future), the delete without an explicit timestamp
# would use current time which is smaller than LARGE_TS and the delete would lose.
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': SMALL_TS})
# Delete without a timestamp attribute - should succeed normally
test_table_ts.delete_item(Key={'p': p})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Verify that an item written with a far-future timestamp is NOT deleted by
# a delete without an explicit timestamp (server time < LARGE_TS).
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
test_table_ts.delete_item(Key={'p': p})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Test that DeleteItem with a non-numeric timestamp attribute is rejected.
def test_timestamp_attribute_delete_item_non_numeric(test_table_ts):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(Key={'p': p, 'ts': 'not_a_number'})
# Test that BatchWriteItem DeleteRequest also respects the timestamp attribute.
def test_timestamp_attribute_batch_delete(test_table_ts):
p = random_string()
# Write an item with a large timestamp
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
# Delete via BatchWriteItem with a small timestamp - delete should lose
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
assert item is not None and item['val'] == 'hello'
# Delete via BatchWriteItem with a large timestamp - delete should win
test_table_ts.meta.client.batch_write_item(RequestItems={
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': LARGE_TS + 1}}}]
})
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
# Test that DeleteItem with a ConditionExpression and a custom timestamp is
# rejected, because conditional writes require LWT which is incompatible with
# custom timestamps.
def test_timestamp_attribute_delete_item_condition_rejected(test_table_ts):
p = random_string()
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts.delete_item(
Key={'p': p, 'ts': SMALL_TS},
ConditionExpression='attribute_exists(p)'
)
# Test that DeleteItem with a custom timestamp is rejected when the table uses
# always_use_lwt isolation, because every write uses LWT in that mode.
def test_timestamp_attribute_delete_item_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.delete_item(Key={'p': p, 'ts': SMALL_TS})
# Test that BatchWriteItem PutRequest with a custom timestamp is rejected when
# the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_put_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'PutRequest': {'Item': {'p': p, 'val': 'v', 'ts': SMALL_TS}}}]
})
# Test that BatchWriteItem DeleteRequest with a custom timestamp is rejected
# when the table uses always_use_lwt isolation.
def test_timestamp_attribute_batch_delete_lwt_always_rejected(test_table_ts_lwt):
p = random_string()
with pytest.raises(ClientError, match='ValidationException'):
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
test_table_ts_lwt.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
})