Compare commits
9 Commits
master
...
copilot/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5515d5fb7d | ||
|
|
328c263aed | ||
|
|
a6d36a480d | ||
|
|
85354ae26a | ||
|
|
786fa68faa | ||
|
|
a57f781852 | ||
|
|
7f79b90e91 | ||
|
|
175b8a8a5e | ||
|
|
c2ef8075ee |
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include <fmt/ranges.h>
|
||||
#include <cstdlib>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/consumed_capacity.hh"
|
||||
@@ -108,6 +109,16 @@ const sstring TABLE_CREATION_TIME_TAG_KEY("system:table_creation_time");
|
||||
// configured by UpdateTimeToLive to be the expiration-time attribute for
|
||||
// this table.
|
||||
extern const sstring TTL_TAG_KEY("system:ttl_attribute");
|
||||
// If this tag is present, it stores the name of an attribute whose numeric
|
||||
// value (in microseconds since the Unix epoch) is used as the write timestamp
|
||||
// for PutItem and UpdateItem operations. When the named attribute is present
|
||||
// in a PutItem or UpdateItem request, its value is used as the timestamp of
|
||||
// the write, and the attribute itself is NOT stored in the item. This allows
|
||||
// users to control write ordering for last-write-wins semantics. Because LWT
|
||||
// does not allow setting a custom write timestamp, operations using this
|
||||
// feature are incompatible with conditions (which require LWT), and with
|
||||
// the LWT_ALWAYS write isolation mode; such operations are rejected.
|
||||
static const sstring TIMESTAMP_TAG_KEY("system:timestamp_attribute");
|
||||
// This will be set to 1 in a case, where user DID NOT specify a range key.
|
||||
// The way GSI / LSI is implemented by Alternator assumes user specified keys will come first
|
||||
// in materialized view's key list. Then, if needed missing keys are added (current implementation
|
||||
@@ -1337,13 +1348,14 @@ void rmw_operation::set_default_write_isolation(std::string_view value) {
|
||||
// Alternator uses tags whose keys start with the "system:" prefix for
|
||||
// internal purposes. Those should not be readable by ListTagsOfResource,
|
||||
// nor writable with TagResource or UntagResource (see #24098).
|
||||
// Only a few specific system tags, currently only "system:write_isolation"
|
||||
// and "system:initial_tablets", are deliberately intended to be set and read
|
||||
// by the user, so are not considered "internal".
|
||||
// Only a few specific system tags, currently only "system:write_isolation",
|
||||
// "system:initial_tablets", and "system:timestamp_attribute", are deliberately
|
||||
// intended to be set and read by the user, so are not considered "internal".
|
||||
static bool tag_key_is_internal(std::string_view tag_key) {
|
||||
return tag_key.starts_with("system:")
|
||||
&& tag_key != rmw_operation::WRITE_ISOLATION_TAG_KEY
|
||||
&& tag_key != INITIAL_TABLETS_TAG_KEY;
|
||||
&& tag_key != INITIAL_TABLETS_TAG_KEY
|
||||
&& tag_key != TIMESTAMP_TAG_KEY;
|
||||
}
|
||||
|
||||
enum class update_tags_action { add_tags, delete_tags };
|
||||
@@ -2298,8 +2310,11 @@ public:
|
||||
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
|
||||
// components of a key, and if that succeeded, call check_key() to further
|
||||
// check that the key doesn't have any spurious components.
|
||||
static void check_key(const rjson::value& key, const schema_ptr& schema) {
|
||||
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
|
||||
// allow_extra_attribute: set to true when the key may contain one extra
|
||||
// non-key attribute (e.g., the timestamp pseudo-attribute for DeleteItem).
|
||||
static void check_key(const rjson::value& key, const schema_ptr& schema, bool allow_extra_attribute = false) {
|
||||
const unsigned expected = (schema->clustering_key_size() == 0 ? 1 : 2) + (allow_extra_attribute ? 1 : 0);
|
||||
if (key.MemberCount() != expected) {
|
||||
throw api_error::validation("Given key attribute not in schema");
|
||||
}
|
||||
}
|
||||
@@ -2346,6 +2361,57 @@ void validate_value(const rjson::value& v, const char* caller) {
|
||||
// any writing happens (if one of the commands has an error, none of the
|
||||
// writes should be done). LWT makes it impossible for the parse step to
|
||||
// generate "mutation" objects, because the timestamp still isn't known.
|
||||
|
||||
// Convert a DynamoDB number (big_decimal) to an api::timestamp_type
|
||||
// (microseconds since the Unix epoch). Fractional microseconds are truncated.
|
||||
// Returns nullopt if the value is negative or zero.
|
||||
static std::optional<api::timestamp_type> bigdecimal_to_timestamp(const big_decimal& bd) {
|
||||
if (bd.unscaled_value() <= 0) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (bd.scale() == 0) {
|
||||
// Fast path: integer value, no decimal adjustment needed
|
||||
return static_cast<api::timestamp_type>(bd.unscaled_value());
|
||||
}
|
||||
// General case: adjust for decimal scale.
|
||||
// big_decimal stores value as unscaled_value * 10^(-scale).
|
||||
// scale > 0 means divide by 10^scale (truncate fractional part).
|
||||
// scale < 0 means multiply by 10^|scale| (add trailing zeros).
|
||||
auto str = bd.unscaled_value().str();
|
||||
if (bd.scale() > 0) {
|
||||
int len = str.length();
|
||||
if (len <= bd.scale()) {
|
||||
return std::nullopt; // Number < 1
|
||||
}
|
||||
str = str.substr(0, len - bd.scale());
|
||||
} else {
|
||||
if (bd.scale() < -18) {
|
||||
// Too large to represent as int64_t
|
||||
return std::nullopt;
|
||||
}
|
||||
for (int i = 0; i < -bd.scale(); i++) {
|
||||
str.push_back('0');
|
||||
}
|
||||
}
|
||||
long long result = strtoll(str.c_str(), nullptr, 10);
|
||||
if (result <= 0) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return static_cast<api::timestamp_type>(result);
|
||||
}
|
||||
|
||||
// Try to extract a write timestamp from a DynamoDB-typed value.
|
||||
// The value should be a number ({"N": "..."}), representing microseconds
|
||||
// since the Unix epoch. Returns nullopt if the value is not a valid number
|
||||
// or doesn't represent a valid timestamp.
|
||||
static std::optional<api::timestamp_type> try_get_timestamp(const rjson::value& attr_value) {
|
||||
std::optional<big_decimal> n = try_unwrap_number(attr_value);
|
||||
if (!n) {
|
||||
return std::nullopt;
|
||||
}
|
||||
return bigdecimal_to_timestamp(*n);
|
||||
}
|
||||
|
||||
class put_or_delete_item {
|
||||
private:
|
||||
partition_key _pk;
|
||||
@@ -2361,11 +2427,17 @@ private:
|
||||
// that length can have different meaning depends on the operation but the
|
||||
// the calculation of length in bytes to WCU is the same.
|
||||
uint64_t _length_in_bytes = 0;
|
||||
// If the table has a system:timestamp_attribute tag, and the named
|
||||
// attribute was found in the item with a valid numeric value, this holds
|
||||
// the extracted timestamp. The attribute is not added to _cells.
|
||||
std::optional<api::timestamp_type> _custom_timestamp;
|
||||
public:
|
||||
struct delete_item {};
|
||||
struct put_item {};
|
||||
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes);
|
||||
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item,
|
||||
const std::optional<bytes>& timestamp_attribute = std::nullopt);
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
|
||||
const std::optional<bytes>& timestamp_attribute = std::nullopt);
|
||||
// put_or_delete_item doesn't keep a reference to schema (so it can be
|
||||
// moved between shards for LWT) so it needs to be given again to build():
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts) const;
|
||||
@@ -2380,11 +2452,32 @@ public:
|
||||
bool is_put_item() noexcept {
|
||||
return _cells.has_value();
|
||||
}
|
||||
// Returns the custom write timestamp extracted from the timestamp attribute,
|
||||
// if any. If not set, the caller should use api::new_timestamp() instead.
|
||||
std::optional<api::timestamp_type> custom_timestamp() const noexcept {
|
||||
return _custom_timestamp;
|
||||
}
|
||||
};
|
||||
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item, const std::optional<bytes>& timestamp_attribute)
|
||||
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
|
||||
check_key(key, schema);
|
||||
if (timestamp_attribute) {
|
||||
// The timestamp attribute may be provided as a "pseudo-key": it is
|
||||
// not a real key column, but can be included in the "Key" object to
|
||||
// carry the custom write timestamp. If found, extract the timestamp
|
||||
// and don't store it in the item.
|
||||
const rjson::value* ts_val = rjson::find(key, to_string_view(*timestamp_attribute));
|
||||
if (ts_val) {
|
||||
if (auto t = try_get_timestamp(*ts_val)) {
|
||||
_custom_timestamp = t;
|
||||
} else {
|
||||
throw api_error::validation(fmt::format(
|
||||
"The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)",
|
||||
to_string_view(*timestamp_attribute)));
|
||||
}
|
||||
}
|
||||
}
|
||||
check_key(key, schema, _custom_timestamp.has_value());
|
||||
}
|
||||
|
||||
// find_attribute() checks whether the named attribute is stored in the
|
||||
@@ -2471,7 +2564,8 @@ static inline void validate_value_if_index_key(
|
||||
}
|
||||
}
|
||||
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes)
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item, std::unordered_map<bytes, std::string> key_attributes,
|
||||
const std::optional<bytes>& timestamp_attribute)
|
||||
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
@@ -2480,6 +2574,17 @@ put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr sche
|
||||
validate_value(it->value, "PutItem");
|
||||
const column_definition* cdef = find_attribute(*schema, column_name);
|
||||
validate_attr_name_length("", column_name.size(), cdef && cdef->is_primary_key());
|
||||
// If this is the timestamp attribute, it must be a valid numeric value
|
||||
// (microseconds since epoch). Use it as the write timestamp and do not
|
||||
// store it in the item data. Reject the write if the value is non-numeric.
|
||||
if (timestamp_attribute && column_name == *timestamp_attribute) {
|
||||
if (auto t = try_get_timestamp(it->value)) {
|
||||
_custom_timestamp = t;
|
||||
// The attribute is consumed as timestamp, not stored in _cells.
|
||||
continue;
|
||||
}
|
||||
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*timestamp_attribute)));
|
||||
}
|
||||
_length_in_bytes += column_name.size();
|
||||
if (!cdef) {
|
||||
// This attribute may be a key column of one of the GSI or LSI,
|
||||
@@ -2671,6 +2776,13 @@ rmw_operation::rmw_operation(service::storage_proxy& proxy, rjson::value&& reque
|
||||
// _pk and _ck will be assigned later, by the subclass's constructor
|
||||
// (each operation puts the key in a slightly different location in
|
||||
// the request).
|
||||
const auto tags_ptr = db::get_tags_of_table(_schema);
|
||||
if (tags_ptr) {
|
||||
auto it = tags_ptr->find(TIMESTAMP_TAG_KEY);
|
||||
if (it != tags_ptr->end() && !it->second.empty()) {
|
||||
_timestamp_attribute = to_bytes(it->second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<mutation> rmw_operation::apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) {
|
||||
@@ -2815,6 +2927,21 @@ future<executor::request_return_type> rmw_operation::execute(service::storage_pr
|
||||
.alternator = true,
|
||||
.alternator_streams_increased_compatibility = schema()->cdc_options().enabled() && proxy.data_dictionary().get_config().alternator_streams_increased_compatibility(),
|
||||
};
|
||||
// If the operation uses a custom write timestamp (from the
|
||||
// system:timestamp_attribute tag), LWT is incompatible because LWT
|
||||
// requires the timestamp to be set by the Paxos protocol. Reject the
|
||||
// operation if it would need to use LWT.
|
||||
if (has_custom_timestamp()) {
|
||||
bool would_use_lwt = _write_isolation == write_isolation::LWT_ALWAYS ||
|
||||
(needs_read_before_write &&
|
||||
_write_isolation != write_isolation::FORBID_RMW &&
|
||||
_write_isolation != write_isolation::UNSAFE_RMW);
|
||||
if (would_use_lwt) {
|
||||
throw api_error::validation(
|
||||
"Using the system:timestamp_attribute is not compatible with "
|
||||
"conditional writes or the 'always' write isolation policy.");
|
||||
}
|
||||
}
|
||||
if (needs_read_before_write) {
|
||||
if (_write_isolation == write_isolation::FORBID_RMW) {
|
||||
throw api_error::validation("Read-modify-write operations are disabled by 'forbid_rmw' write isolation policy. Refer to https://github.com/scylladb/scylla/blob/master/docs/alternator/alternator.md#write-isolation-policies for more information.");
|
||||
@@ -2913,7 +3040,8 @@ public:
|
||||
put_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
|
||||
: rmw_operation(proxy, std::move(request))
|
||||
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{},
|
||||
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name()))) {
|
||||
si_key_attributes(proxy.data_dictionary().find_table(schema()->ks_name(), schema()->cf_name())),
|
||||
_timestamp_attribute) {
|
||||
_pk = _mutation_builder.pk();
|
||||
_ck = _mutation_builder.ck();
|
||||
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
|
||||
@@ -2945,6 +3073,9 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
bool has_custom_timestamp() const noexcept {
|
||||
return _mutation_builder.custom_timestamp().has_value();
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
|
||||
if (!verify_expected(_request, previous_item.get()) ||
|
||||
!verify_condition_expression(_condition_expression, previous_item.get())) {
|
||||
@@ -2962,7 +3093,10 @@ public:
|
||||
} else {
|
||||
_return_attributes = {};
|
||||
}
|
||||
return _mutation_builder.build(_schema, ts);
|
||||
// Use the custom timestamp from the timestamp attribute if available,
|
||||
// otherwise use the provided timestamp.
|
||||
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
|
||||
return _mutation_builder.build(_schema, effective_ts);
|
||||
}
|
||||
virtual ~put_item_operation() = default;
|
||||
};
|
||||
@@ -3014,7 +3148,7 @@ public:
|
||||
parsed::condition_expression _condition_expression;
|
||||
delete_item_operation(parsed::expression_cache& parsed_expression_cache, service::storage_proxy& proxy, rjson::value&& request)
|
||||
: rmw_operation(proxy, std::move(request))
|
||||
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
|
||||
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}, _timestamp_attribute) {
|
||||
_pk = _mutation_builder.pk();
|
||||
_ck = _mutation_builder.ck();
|
||||
if (_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::ALL_OLD) {
|
||||
@@ -3045,6 +3179,9 @@ public:
|
||||
check_needs_read_before_write(_condition_expression) ||
|
||||
_returnvalues == returnvalues::ALL_OLD;
|
||||
}
|
||||
bool has_custom_timestamp() const noexcept override {
|
||||
return _mutation_builder.custom_timestamp().has_value();
|
||||
}
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override {
|
||||
if (!verify_expected(_request, previous_item.get()) ||
|
||||
!verify_condition_expression(_condition_expression, previous_item.get())) {
|
||||
@@ -3065,7 +3202,10 @@ public:
|
||||
if (_consumed_capacity._total_bytes == 0) {
|
||||
_consumed_capacity._total_bytes = 1;
|
||||
}
|
||||
return _mutation_builder.build(_schema, ts);
|
||||
// Use the custom timestamp from the timestamp attribute if available,
|
||||
// otherwise use the provided timestamp.
|
||||
api::timestamp_type effective_ts = _mutation_builder.custom_timestamp().value_or(ts);
|
||||
return _mutation_builder.build(_schema, effective_ts);
|
||||
}
|
||||
virtual ~delete_item_operation() = default;
|
||||
};
|
||||
@@ -3252,10 +3392,13 @@ future<> executor::do_batch_write(
|
||||
// Do a normal write, without LWT:
|
||||
utils::chunked_vector<mutation> mutations;
|
||||
mutations.reserve(mutation_builders.size());
|
||||
api::timestamp_type now = api::new_timestamp();
|
||||
api::timestamp_type default_ts = api::new_timestamp();
|
||||
bool any_cdc_enabled = false;
|
||||
for (auto& b : mutation_builders) {
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
// Use custom timestamp from the timestamp attribute if available,
|
||||
// otherwise use the default timestamp for all items in this batch.
|
||||
api::timestamp_type ts = b.second.custom_timestamp().value_or(default_ts);
|
||||
mutations.push_back(b.second.build(b.first, ts));
|
||||
any_cdc_enabled |= b.first->cdc_options().enabled();
|
||||
}
|
||||
return _proxy.mutate(std::move(mutations),
|
||||
@@ -3355,6 +3498,16 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
1, primary_key_hash{schema}, primary_key_equal{schema});
|
||||
// Look up the timestamp attribute tag once per table (shared by all
|
||||
// PutRequests and DeleteRequests for this table).
|
||||
std::optional<bytes> ts_attr;
|
||||
const auto tags_ptr = db::get_tags_of_table(schema);
|
||||
if (tags_ptr) {
|
||||
auto tag_it = tags_ptr->find(TIMESTAMP_TAG_KEY);
|
||||
if (tag_it != tags_ptr->end() && !tag_it->second.empty()) {
|
||||
ts_attr = to_bytes(tag_it->second);
|
||||
}
|
||||
}
|
||||
for (auto& request : it->value.GetArray()) {
|
||||
auto& r = get_single_member(request, "RequestItems element");
|
||||
const auto r_name = rjson::to_string_view(r.name);
|
||||
@@ -3363,7 +3516,8 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
validate_is_object(item, "Item in PutRequest");
|
||||
auto&& put_item = put_or_delete_item(
|
||||
item, schema, put_or_delete_item::put_item{},
|
||||
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())));
|
||||
si_key_attributes(_proxy.data_dictionary().find_table(schema->ks_name(), schema->cf_name())),
|
||||
ts_attr);
|
||||
mutation_builders.emplace_back(schema, std::move(put_item));
|
||||
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
|
||||
if (used_keys.contains(mut_key)) {
|
||||
@@ -3374,7 +3528,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
const rjson::value& key = get_member(r.value, "Key", "DeleteRequest");
|
||||
validate_is_object(key, "Key in DeleteRequest");
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
key, schema, put_or_delete_item::delete_item{}));
|
||||
key, schema, put_or_delete_item::delete_item{}, ts_attr));
|
||||
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
|
||||
mutation_builders.back().second.ck());
|
||||
if (used_keys.contains(mut_key)) {
|
||||
@@ -3983,6 +4137,10 @@ public:
|
||||
virtual ~update_item_operation() = default;
|
||||
virtual std::optional<mutation> apply(std::unique_ptr<rjson::value> previous_item, api::timestamp_type ts, cdc::per_request_options& cdc_opts) const override;
|
||||
bool needs_read_before_write() const;
|
||||
// Returns true if the timestamp attribute is being set in this update
|
||||
// (via AttributeUpdates PUT or UpdateExpression SET). Used to detect
|
||||
// whether a custom write timestamp will be used.
|
||||
bool has_custom_timestamp() const noexcept;
|
||||
|
||||
private:
|
||||
void delete_attribute(bytes&& column_name, const std::unique_ptr<rjson::value>& previous_item, const api::timestamp_type ts, deletable_row& row,
|
||||
@@ -4117,6 +4275,44 @@ update_item_operation::needs_read_before_write() const {
|
||||
(_returnvalues != returnvalues::NONE && _returnvalues != returnvalues::UPDATED_NEW);
|
||||
}
|
||||
|
||||
bool
|
||||
update_item_operation::has_custom_timestamp() const noexcept {
|
||||
if (!_timestamp_attribute) {
|
||||
return false;
|
||||
}
|
||||
// Check if the timestamp attribute is being set via AttributeUpdates PUT
|
||||
// with a valid numeric value.
|
||||
if (_attribute_updates) {
|
||||
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
if (rjson::to_string_view(it->name) == ts_attr) {
|
||||
const rjson::value* action = rjson::find(it->value, "Action");
|
||||
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
|
||||
// Only consider it a custom timestamp if the value is numeric
|
||||
if (try_get_timestamp((it->value)["Value"])) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check if the timestamp attribute is being set via UpdateExpression SET.
|
||||
// We can't check the actual value type without resolving the expression
|
||||
// (which requires previous_item), so we conservatively return true if the
|
||||
// attribute appears in a SET action, and handle the non-numeric case in apply().
|
||||
// A non-numeric value will cause apply() to throw a ValidationException.
|
||||
if (!_update_expression.empty()) {
|
||||
std::string ts_attr(to_string_view(*_timestamp_attribute));
|
||||
auto it = _update_expression.find(ts_attr);
|
||||
if (it != _update_expression.end() && it->second.has_value()) {
|
||||
const auto& action = it->second.get_value();
|
||||
return std::holds_alternative<parsed::update_expression::action::set>(action._action);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// action_result() returns the result of applying an UpdateItem action -
|
||||
// this result is either a JSON object or an unset optional which indicates
|
||||
// the action was a deletion. The caller (update_item_operation::apply()
|
||||
@@ -4392,6 +4588,17 @@ inline void update_item_operation::apply_attribute_updates(const std::unique_ptr
|
||||
throw api_error::validation(format("UpdateItem cannot update key column {}", rjson::to_string_view(it->name)));
|
||||
}
|
||||
std::string action = rjson::to_string((it->value)["Action"]);
|
||||
// If this is the timestamp attribute being PUT, it must be a valid
|
||||
// numeric value (microseconds since epoch). Use it as the write
|
||||
// timestamp and skip storing it. Reject if the value is non-numeric.
|
||||
if (_timestamp_attribute && column_name == *_timestamp_attribute && action == "PUT") {
|
||||
if (it->value.HasMember("Value")) {
|
||||
if (try_get_timestamp((it->value)["Value"])) {
|
||||
continue;
|
||||
}
|
||||
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
|
||||
}
|
||||
}
|
||||
if (action == "DELETE") {
|
||||
// The DELETE operation can do two unrelated tasks. Without a
|
||||
// "Value" option, it is used to delete an attribute. With a
|
||||
@@ -4495,6 +4702,20 @@ inline void update_item_operation::apply_update_expression(const std::unique_ptr
|
||||
if (cdef && cdef->is_primary_key()) {
|
||||
throw api_error::validation(fmt::format("UpdateItem cannot update key column {}", column_name));
|
||||
}
|
||||
// If this is the timestamp attribute being set via UpdateExpression SET,
|
||||
// it must be a valid numeric value (microseconds since epoch). Use it as
|
||||
// the write timestamp and skip storing it. Reject if non-numeric.
|
||||
if (_timestamp_attribute && to_bytes(column_name) == *_timestamp_attribute &&
|
||||
actions.second.has_value() &&
|
||||
std::holds_alternative<parsed::update_expression::action::set>(actions.second.get_value()._action)) {
|
||||
std::optional<rjson::value> result = action_result(actions.second.get_value(), previous_item.get());
|
||||
if (result) {
|
||||
if (try_get_timestamp(*result)) {
|
||||
continue; // Skip - already used as timestamp
|
||||
}
|
||||
throw api_error::validation(fmt::format("The '{}' attribute used as a write timestamp must be a positive number (microseconds since epoch)", to_string_view(*_timestamp_attribute)));
|
||||
}
|
||||
}
|
||||
if (actions.second.has_value()) {
|
||||
// An action on a top-level attribute column_name. The single
|
||||
// action is actions.second.get_value(). We can simply invoke
|
||||
@@ -4543,6 +4764,44 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
|
||||
return {};
|
||||
}
|
||||
|
||||
// If the table has a timestamp attribute, look for it in the update
|
||||
// (AttributeUpdates PUT or UpdateExpression SET). If found with a valid
|
||||
// numeric value, use it as the write timestamp instead of the provided ts.
|
||||
api::timestamp_type effective_ts = ts;
|
||||
if (_timestamp_attribute) {
|
||||
bool found_ts = false;
|
||||
if (_attribute_updates) {
|
||||
std::string_view ts_attr = to_string_view(*_timestamp_attribute);
|
||||
for (auto it = _attribute_updates->MemberBegin(); it != _attribute_updates->MemberEnd(); ++it) {
|
||||
if (rjson::to_string_view(it->name) == ts_attr) {
|
||||
const rjson::value* action = rjson::find(it->value, "Action");
|
||||
if (action && rjson::to_string_view(*action) == "PUT" && it->value.HasMember("Value")) {
|
||||
if (auto t = try_get_timestamp((it->value)["Value"])) {
|
||||
effective_ts = *t;
|
||||
found_ts = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found_ts && !_update_expression.empty()) {
|
||||
std::string ts_attr(to_string_view(*_timestamp_attribute));
|
||||
auto it = _update_expression.find(ts_attr);
|
||||
if (it != _update_expression.end() && it->second.has_value()) {
|
||||
const auto& action = it->second.get_value();
|
||||
if (std::holds_alternative<parsed::update_expression::action::set>(action._action)) {
|
||||
std::optional<rjson::value> result = action_result(action, previous_item.get());
|
||||
if (result) {
|
||||
if (auto t = try_get_timestamp(*result)) {
|
||||
effective_ts = *t;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In the ReturnValues=ALL_NEW case, we make a copy of previous_item into
|
||||
// _return_attributes and parts of it will be overwritten by the new
|
||||
// updates (in do_update() and do_delete()). We need to make a copy and
|
||||
@@ -4571,10 +4830,10 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
|
||||
auto& row = m.partition().clustered_row(*_schema, _ck);
|
||||
auto modified_attrs = attribute_collector();
|
||||
if (!_update_expression.empty()) {
|
||||
apply_update_expression(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
|
||||
apply_update_expression(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
|
||||
}
|
||||
if (_attribute_updates) {
|
||||
apply_attribute_updates(previous_item, ts, row, modified_attrs, any_updates, any_deletes);
|
||||
apply_attribute_updates(previous_item, effective_ts, row, modified_attrs, any_updates, any_deletes);
|
||||
}
|
||||
if (!modified_attrs.empty()) {
|
||||
auto serialized_map = modified_attrs.to_mut().serialize(*attrs_type());
|
||||
@@ -4585,7 +4844,7 @@ std::optional<mutation> update_item_operation::apply(std::unique_ptr<rjson::valu
|
||||
// marker. An update with only DELETE operations must not add a row marker
|
||||
// (this was issue #5862) but any other update, even an empty one, should.
|
||||
if (any_updates || !any_deletes) {
|
||||
row.apply(row_marker(ts));
|
||||
row.apply(row_marker(effective_ts));
|
||||
} else if (_returnvalues == returnvalues::ALL_NEW && !previous_item) {
|
||||
// There was no pre-existing item, and we're not creating one, so
|
||||
// don't report the new item in the returned Attributes.
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "executor.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "bytes.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
@@ -72,6 +73,11 @@ protected:
|
||||
clustering_key _ck = clustering_key::make_empty();
|
||||
write_isolation _write_isolation;
|
||||
mutable wcu_consumed_capacity_counter _consumed_capacity;
|
||||
// If the table has a "system:timestamp_attribute" tag, this holds the
|
||||
// name of the attribute (converted to bytes) whose numeric value should
|
||||
// be used as the write timestamp instead of the current time. The
|
||||
// attribute itself is NOT stored in the item data.
|
||||
std::optional<bytes> _timestamp_attribute;
|
||||
// All RMW operations can have a ReturnValues parameter from the following
|
||||
// choices. But note that only UpdateItem actually supports all of them:
|
||||
enum class returnvalues {
|
||||
@@ -113,6 +119,9 @@ public:
|
||||
// Convert the above apply() into the signature needed by cas_request:
|
||||
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr, const query::partition_slice& slice, api::timestamp_type ts, cdc::per_request_options& cdc_opts) override;
|
||||
virtual ~rmw_operation() = default;
|
||||
// Returns true if the operation will use a custom write timestamp (from the
|
||||
// system:timestamp_attribute tag). Subclasses override this as needed.
|
||||
virtual bool has_custom_timestamp() const noexcept { return false; }
|
||||
const wcu_consumed_capacity_counter& consumed_capacity() const noexcept { return _consumed_capacity; }
|
||||
schema_ptr schema() const { return _schema; }
|
||||
const rjson::value& request() const { return _request; }
|
||||
|
||||
@@ -213,3 +213,71 @@ Alternator table, the following features will not work for this table:
|
||||
* Enabling Streams with CreateTable or UpdateTable doesn't work
|
||||
(results in an error).
|
||||
See <https://github.com/scylladb/scylla/issues/23838>.
|
||||
|
||||
## Custom write timestamps
|
||||
|
||||
DynamoDB doesn't allow clients to set the write timestamp of updates. All
|
||||
updates use the current server time as their timestamp, and ScyllaDB uses
|
||||
these timestamps for last-write-wins conflict resolution when concurrent
|
||||
writes reach different replicas.
|
||||
|
||||
ScyllaDB Alternator extends this with the `system:timestamp_attribute` tag,
|
||||
which allows specifying a custom write timestamp for each PutItem,
|
||||
UpdateItem, DeleteItem, or BatchWriteItem request. To use this feature:
|
||||
|
||||
1. Tag the table (at CreateTable time or using TagResource) with
|
||||
`system:timestamp_attribute` set to the name of an attribute that will
|
||||
hold the custom write timestamp.
|
||||
|
||||
2. When performing a PutItem or UpdateItem, include the named attribute
|
||||
in the request with a numeric value. The value represents the write
|
||||
timestamp in **microseconds since the Unix epoch** (this is the same
|
||||
unit used internally by ScyllaDB for timestamps).
|
||||
For a DeleteItem or a BatchWriteItem DeleteRequest, include the named
|
||||
attribute in the `Key` parameter (it will be stripped from the key
|
||||
before use).
|
||||
|
||||
3. The named attribute is **not stored** in the item data - it only
|
||||
controls the write timestamp. If you also want to record the timestamp
|
||||
as data, use a separate attribute for that purpose.
|
||||
|
||||
4. If the named attribute is absent, the write proceeds normally using the
|
||||
current server time as the timestamp. If the named attribute is present
|
||||
but has a non-numeric value, the write is rejected with a ValidationException.
|
||||
|
||||
### Limitations
|
||||
|
||||
- **Incompatible with conditions**: If the write includes a ConditionExpression
|
||||
(or uses the `Expected` legacy condition), LWT is needed and the operation
|
||||
is rejected with a ValidationException, because LWT requires the write
|
||||
timestamp to be set by the Paxos protocol, not by the client.
|
||||
|
||||
- **Incompatible with `always` write isolation**: Tables using the `always`
|
||||
(or `always_use_lwt`) write isolation policy cannot use the timestamp
|
||||
attribute feature at all, because every write uses LWT in that mode.
|
||||
When using `system:timestamp_attribute`, consider tagging the table with
|
||||
`system:write_isolation=only_rmw_uses_lwt` (or `forbid_rmw`) so that
|
||||
unconditional writes do not use LWT.
|
||||
|
||||
### Example use case
|
||||
|
||||
This feature is useful for ingesting data from multiple sources where each
|
||||
record has a known logical timestamp. By setting the `system:timestamp_attribute`
|
||||
tag, you can ensure that the record with the highest logical timestamp always
|
||||
wins, regardless of ingestion order:
|
||||
|
||||
```python
|
||||
# Create table with timestamp attribute
|
||||
dynamodb.create_table(
|
||||
TableName='my_table',
|
||||
...
|
||||
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'write_ts'}]
|
||||
)
|
||||
|
||||
# Write a record with a specific timestamp (in microseconds since epoch)
|
||||
table.put_item(Item={
|
||||
'pk': 'my_key',
|
||||
'data': 'new_value',
|
||||
'write_ts': Decimal('1700000000000000'), # Nov 14, 2023 in microseconds
|
||||
})
|
||||
```
|
||||
|
||||
356
test/alternator/test_timestamp_attribute.py
Normal file
356
test/alternator/test_timestamp_attribute.py
Normal file
@@ -0,0 +1,356 @@
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
# Tests for the system:timestamp_attribute Scylla-specific feature.
|
||||
# This feature allows users to control the write timestamp of PutItem and
|
||||
# UpdateItem operations by specifying an attribute name in the table's
|
||||
# system:timestamp_attribute tag. When that attribute is present in the
|
||||
# write request with a numeric value (microseconds since Unix epoch), it
|
||||
# is used as the write timestamp. The attribute itself is not stored in
|
||||
# the item data.
|
||||
#
|
||||
# This is a Scylla-specific feature and is not tested on DynamoDB.
|
||||
|
||||
import time
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
from decimal import Decimal
|
||||
|
||||
from .util import create_test_table, random_string
|
||||
|
||||
# A large timestamp in microseconds (far future, year ~2033)
|
||||
LARGE_TS = Decimal('2000000000000000')
|
||||
# A medium timestamp in microseconds (year ~2001)
|
||||
MEDIUM_TS = Decimal('1000000000000000')
|
||||
# A small timestamp in microseconds (year ~1970+)
|
||||
SMALL_TS = Decimal('100000000000000')
|
||||
|
||||
# Fixtures for tables with the system:timestamp_attribute tag. The tables
|
||||
# are created once per module and shared between all tests that use them,
|
||||
# to avoid the overhead of creating and deleting tables for each test.
|
||||
# Because system:timestamp_attribute is a Scylla-only feature, all tests
|
||||
# using these fixtures are implicitly Scylla-only (via scylla_only parameter).
|
||||
|
||||
# A table with only a hash key and system:timestamp_attribute='ts' tag.
|
||||
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
|
||||
# correct even if the server default changes to always_use_lwt in the future.
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_ts(scylla_only, dynamodb):
|
||||
table = create_test_table(dynamodb,
|
||||
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
|
||||
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
|
||||
yield table
|
||||
table.delete()
|
||||
|
||||
# A table with hash (string) and range (string) keys and system:timestamp_attribute='ts' tag.
|
||||
# We explicitly set write isolation to only_rmw_uses_lwt so the tests remain
|
||||
# correct even if the server default changes to always_use_lwt in the future.
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_ts_ss(scylla_only, dynamodb):
|
||||
table = create_test_table(dynamodb,
|
||||
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
|
||||
{'Key': 'system:write_isolation', 'Value': 'only_rmw_uses_lwt'}],
|
||||
KeySchema=[
|
||||
{'AttributeName': 'p', 'KeyType': 'HASH'},
|
||||
{'AttributeName': 'c', 'KeyType': 'RANGE'},
|
||||
],
|
||||
AttributeDefinitions=[
|
||||
{'AttributeName': 'p', 'AttributeType': 'S'},
|
||||
{'AttributeName': 'c', 'AttributeType': 'S'},
|
||||
])
|
||||
yield table
|
||||
table.delete()
|
||||
|
||||
# A table with hash key, system:timestamp_attribute='ts' tag, and
|
||||
# system:write_isolation='always' to test rejection in LWT_ALWAYS mode.
|
||||
# In always_use_lwt mode, every write uses LWT, so the timestamp attribute
|
||||
# feature cannot be used at all.
|
||||
@pytest.fixture(scope="module")
|
||||
def test_table_ts_lwt(scylla_only, dynamodb):
|
||||
table = create_test_table(dynamodb,
|
||||
Tags=[{'Key': 'system:timestamp_attribute', 'Value': 'ts'},
|
||||
{'Key': 'system:write_isolation', 'Value': 'always'}],
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
|
||||
yield table
|
||||
table.delete()
|
||||
|
||||
# Test that PutItem with the timestamp attribute uses the given numeric
|
||||
# value as the write timestamp, and the timestamp attribute is NOT stored
|
||||
# in the item.
|
||||
def test_timestamp_attribute_put_item_basic(test_table_ts):
|
||||
p = random_string()
|
||||
# Put an item with the timestamp attribute
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
|
||||
# Read the item back
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# 'val' should be stored normally
|
||||
assert item['val'] == 'hello'
|
||||
# 'ts' (the timestamp attribute) should NOT be stored in the item
|
||||
assert 'ts' not in item
|
||||
|
||||
# Test that PutItem respects the write timestamp ordering: a write with a
|
||||
# larger timestamp should win over a write with a smaller timestamp,
|
||||
# regardless of wall-clock order.
|
||||
def test_timestamp_attribute_put_item_ordering(test_table_ts):
|
||||
p = random_string()
|
||||
# First, write item with a LARGE timestamp
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
|
||||
# Then write item with a SMALL timestamp (should lose since SMALL < LARGE)
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
|
||||
# The item with the larger timestamp (val='large_ts') should win
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'large_ts'
|
||||
|
||||
# Now try to overwrite with a LARGER timestamp (should win)
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'latest', 'ts': LARGE_TS + 1})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'latest'
|
||||
|
||||
# Test that UpdateItem with the timestamp attribute in AttributeUpdates
|
||||
# uses the given numeric value as the write timestamp, and the timestamp
|
||||
# attribute is NOT stored in the item.
|
||||
def test_timestamp_attribute_update_item_attribute_updates(test_table_ts):
|
||||
p = random_string()
|
||||
# Use UpdateItem with AttributeUpdates, setting 'val' and 'ts'
|
||||
test_table_ts.update_item(
|
||||
Key={'p': p},
|
||||
AttributeUpdates={
|
||||
'val': {'Value': 'hello', 'Action': 'PUT'},
|
||||
'ts': {'Value': LARGE_TS, 'Action': 'PUT'},
|
||||
}
|
||||
)
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'hello'
|
||||
# 'ts' should NOT be stored in the item
|
||||
assert 'ts' not in item
|
||||
|
||||
# Update with a smaller timestamp - should NOT overwrite
|
||||
test_table_ts.update_item(
|
||||
Key={'p': p},
|
||||
AttributeUpdates={
|
||||
'val': {'Value': 'overwritten', 'Action': 'PUT'},
|
||||
'ts': {'Value': SMALL_TS, 'Action': 'PUT'},
|
||||
}
|
||||
)
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# The item with the larger timestamp should still win
|
||||
assert item['val'] == 'hello'
|
||||
|
||||
# Test that UpdateItem with the timestamp attribute in UpdateExpression
|
||||
# uses the given numeric value as the write timestamp, and the timestamp
|
||||
# attribute is NOT stored in the item.
|
||||
def test_timestamp_attribute_update_item_update_expression(test_table_ts):
|
||||
p = random_string()
|
||||
# Use UpdateItem with UpdateExpression to set 'val' and 'ts'
|
||||
test_table_ts.update_item(
|
||||
Key={'p': p},
|
||||
UpdateExpression='SET val = :v, ts = :t',
|
||||
ExpressionAttributeValues={':v': 'hello', ':t': LARGE_TS}
|
||||
)
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'hello'
|
||||
# 'ts' should NOT be stored in the item
|
||||
assert 'ts' not in item
|
||||
|
||||
# Update with a smaller timestamp - should NOT overwrite
|
||||
test_table_ts.update_item(
|
||||
Key={'p': p},
|
||||
UpdateExpression='SET val = :v, ts = :t',
|
||||
ExpressionAttributeValues={':v': 'overwritten', ':t': SMALL_TS}
|
||||
)
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
# The item with the larger timestamp should still win
|
||||
assert item['val'] == 'hello'
|
||||
|
||||
# Test that when the timestamp attribute is not present in the write request,
|
||||
# the operation behaves normally (no custom timestamp is applied).
|
||||
def test_timestamp_attribute_absent(test_table_ts):
|
||||
p = random_string()
|
||||
# Put item without the timestamp attribute
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'hello'
|
||||
# No 'ts' attribute expected either
|
||||
assert 'ts' not in item
|
||||
|
||||
# Test that using a condition expression (which requires LWT) together with
|
||||
# the timestamp attribute is rejected.
|
||||
def test_timestamp_attribute_with_condition_rejected(test_table_ts):
|
||||
p = random_string()
|
||||
# Put an initial item (no timestamp attribute, so LWT is ok)
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'initial'})
|
||||
# Try to put with a ConditionExpression and a timestamp - should be rejected
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts.put_item(
|
||||
Item={'p': p, 'val': 'updated', 'ts': LARGE_TS},
|
||||
ConditionExpression='attribute_exists(p)'
|
||||
)
|
||||
|
||||
# Test that using the timestamp attribute with the 'always' write isolation
|
||||
# policy is rejected, because in always_use_lwt mode every write uses LWT
|
||||
# (including unconditional ones), which is incompatible with custom timestamps.
|
||||
def test_timestamp_attribute_lwt_always_rejected(test_table_ts_lwt):
|
||||
p = random_string()
|
||||
# Even a plain PutItem with a timestamp is rejected in LWT_ALWAYS mode
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts_lwt.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
|
||||
|
||||
# Test that when the timestamp attribute has a non-numeric value, the write
|
||||
# is rejected with a ValidationException.
|
||||
def test_timestamp_attribute_non_numeric(test_table_ts):
|
||||
p = random_string()
|
||||
# Put item with the timestamp attribute as a string (non-numeric) - should fail
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': 'not_a_number'})
|
||||
|
||||
# Test that the timestamp attribute tag can be set on a table with a sort key.
|
||||
def test_timestamp_attribute_with_range_key(test_table_ts_ss):
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
# Write with a large timestamp
|
||||
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'large', 'ts': LARGE_TS})
|
||||
# Write with a small timestamp (should lose)
|
||||
test_table_ts_ss.put_item(Item={'p': p, 'c': c, 'val': 'small', 'ts': SMALL_TS})
|
||||
item = test_table_ts_ss.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'large'
|
||||
assert 'ts' not in item
|
||||
|
||||
# Test that the timestamp attribute value is interpreted in microseconds since
|
||||
# the Unix epoch, and that writes with and without explicit timestamps interact
|
||||
# correctly.
|
||||
def test_timestamp_attribute_microseconds(test_table_ts):
|
||||
# Get current time in microseconds from the Python client side.
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
one_hour_us = 3600 * 1_000_000
|
||||
|
||||
# Part 1: write with the current time as the explicit timestamp, then
|
||||
# overwrite without an explicit timestamp. The second write uses the
|
||||
# server's current time (which is >= now_us), so it should win.
|
||||
p = random_string()
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'old', 'ts': Decimal(str(now_us))})
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'new'})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'new'
|
||||
|
||||
# Part 2: write with a timestamp one hour in the future, then overwrite
|
||||
# without an explicit timestamp. The server's current time (≈ now_us) is
|
||||
# much less than now_us + one_hour_us, so the first write should win.
|
||||
p = random_string()
|
||||
future_us = now_us + one_hour_us
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'future', 'ts': Decimal(str(future_us))})
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'now'})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'future'
|
||||
|
||||
# Test that BatchWriteItem also respects the timestamp attribute.
|
||||
def test_timestamp_attribute_batch_write(test_table_ts):
|
||||
p = random_string()
|
||||
# Write item via BatchWriteItem with a large timestamp
|
||||
with test_table_ts.batch_writer() as batch:
|
||||
batch.put_item(Item={'p': p, 'val': 'large_ts', 'ts': LARGE_TS})
|
||||
# Write item via BatchWriteItem with a small timestamp (should lose)
|
||||
with test_table_ts.batch_writer() as batch:
|
||||
batch.put_item(Item={'p': p, 'val': 'small_ts', 'ts': SMALL_TS})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'large_ts'
|
||||
assert 'ts' not in item
|
||||
|
||||
# Test that DeleteItem respects the timestamp attribute: a delete with a
|
||||
# smaller timestamp than the item's write timestamp should not take effect.
|
||||
def test_timestamp_attribute_delete_item(test_table_ts):
|
||||
p = random_string()
|
||||
# Write an item with a large timestamp
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)['Item']
|
||||
assert item['val'] == 'hello'
|
||||
# Delete with a small timestamp - the delete should lose (item still exists)
|
||||
test_table_ts.delete_item(Key={'p': p, 'ts': SMALL_TS})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
|
||||
assert item is not None and item['val'] == 'hello'
|
||||
# Delete with a large timestamp - the delete should win (item is removed)
|
||||
test_table_ts.delete_item(Key={'p': p, 'ts': LARGE_TS + 1})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
|
||||
assert item is None
|
||||
|
||||
# Test that DeleteItem without the timestamp attribute in the key behaves
|
||||
# normally (no custom timestamp is applied).
|
||||
def test_timestamp_attribute_delete_item_no_ts(test_table_ts):
|
||||
p = random_string()
|
||||
# Use SMALL_TS so the delete (which uses the current server time) wins.
|
||||
# If we used LARGE_TS (far future), the delete without an explicit timestamp
|
||||
# would use current time which is smaller than LARGE_TS and the delete would lose.
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': SMALL_TS})
|
||||
# Delete without a timestamp attribute - should succeed normally
|
||||
test_table_ts.delete_item(Key={'p': p})
|
||||
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
# Verify that an item written with a far-future timestamp is NOT deleted by
|
||||
# a delete without an explicit timestamp (server time < LARGE_TS).
|
||||
p = random_string()
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
|
||||
test_table_ts.delete_item(Key={'p': p})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
|
||||
assert item is not None and item['val'] == 'hello'
|
||||
|
||||
# Test that DeleteItem with a non-numeric timestamp attribute is rejected.
|
||||
def test_timestamp_attribute_delete_item_non_numeric(test_table_ts):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts.delete_item(Key={'p': p, 'ts': 'not_a_number'})
|
||||
|
||||
# Test that BatchWriteItem DeleteRequest also respects the timestamp attribute.
|
||||
def test_timestamp_attribute_batch_delete(test_table_ts):
|
||||
p = random_string()
|
||||
# Write an item with a large timestamp
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello', 'ts': LARGE_TS})
|
||||
# Delete via BatchWriteItem with a small timestamp - delete should lose
|
||||
test_table_ts.meta.client.batch_write_item(RequestItems={
|
||||
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
|
||||
})
|
||||
item = test_table_ts.get_item(Key={'p': p}, ConsistentRead=True).get('Item')
|
||||
assert item is not None and item['val'] == 'hello'
|
||||
# Delete via BatchWriteItem with a large timestamp - delete should win
|
||||
test_table_ts.meta.client.batch_write_item(RequestItems={
|
||||
test_table_ts.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': LARGE_TS + 1}}}]
|
||||
})
|
||||
assert 'Item' not in test_table_ts.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
|
||||
# Test that DeleteItem with a ConditionExpression and a custom timestamp is
|
||||
# rejected, because conditional writes require LWT which is incompatible with
|
||||
# custom timestamps.
|
||||
def test_timestamp_attribute_delete_item_condition_rejected(test_table_ts):
|
||||
p = random_string()
|
||||
test_table_ts.put_item(Item={'p': p, 'val': 'hello'})
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts.delete_item(
|
||||
Key={'p': p, 'ts': SMALL_TS},
|
||||
ConditionExpression='attribute_exists(p)'
|
||||
)
|
||||
|
||||
# Test that DeleteItem with a custom timestamp is rejected when the table uses
|
||||
# always_use_lwt isolation, because every write uses LWT in that mode.
|
||||
def test_timestamp_attribute_delete_item_lwt_always_rejected(test_table_ts_lwt):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts_lwt.delete_item(Key={'p': p, 'ts': SMALL_TS})
|
||||
|
||||
# Test that BatchWriteItem PutRequest with a custom timestamp is rejected when
|
||||
# the table uses always_use_lwt isolation.
|
||||
def test_timestamp_attribute_batch_put_lwt_always_rejected(test_table_ts_lwt):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
|
||||
test_table_ts_lwt.name: [{'PutRequest': {'Item': {'p': p, 'val': 'v', 'ts': SMALL_TS}}}]
|
||||
})
|
||||
|
||||
# Test that BatchWriteItem DeleteRequest with a custom timestamp is rejected
|
||||
# when the table uses always_use_lwt isolation.
|
||||
def test_timestamp_attribute_batch_delete_lwt_always_rejected(test_table_ts_lwt):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException'):
|
||||
test_table_ts_lwt.meta.client.batch_write_item(RequestItems={
|
||||
test_table_ts_lwt.name: [{'DeleteRequest': {'Key': {'p': p, 'ts': SMALL_TS}}}]
|
||||
})
|
||||
Reference in New Issue
Block a user