diff --git a/cql3/expr/collection_cell_metadata.hh b/cql3/expr/collection_cell_metadata.hh new file mode 100644 index 0000000000..e551068bcc --- /dev/null +++ b/cql3/expr/collection_cell_metadata.hh @@ -0,0 +1,21 @@ +// Copyright (C) 2026-present ScyllaDB +// SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + +#pragma once + +#include + +#include "bytes.hh" +#include "mutation/timestamp.hh" + +namespace cql3::expr { + +// Per-element timestamps and TTLs for a cell of a map, set or UDT (populated +// when a WRITETIME() or TTL() of col[key] or col.field are in the query. +// Keys are the raw serialized keys or serialized field index. +struct collection_cell_metadata { + std::map timestamps; + std::map ttls; // remaining TTL in seconds (-1 if no TTL) +}; + +} // namespace cql3::expr diff --git a/cql3/expr/evaluate.hh b/cql3/expr/evaluate.hh index f39b42d7af..166bd6d045 100644 --- a/cql3/expr/evaluate.hh +++ b/cql3/expr/evaluate.hh @@ -3,6 +3,7 @@ #pragma once +#include "collection_cell_metadata.hh" #include "expression.hh" #include "bytes.hh" @@ -27,6 +28,7 @@ struct evaluation_inputs { std::span static_and_regular_timestamps; // indexes match `selection` member std::span static_and_regular_ttls; // indexes match `selection` member std::span temporaries; // indexes match temporary::index + std::span collection_element_metadata; // indexes match `selection` member }; // Takes a prepared expression and calculates its value. diff --git a/cql3/selection/selection.cc b/cql3/selection/selection.cc index 133539ce80..c35979f24a 100644 --- a/cql3/selection/selection.cc +++ b/cql3/selection/selection.cc @@ -17,6 +17,7 @@ #include "cql3/expr/expr-utils.hh" #include "cql3/functions/first_function.hh" #include "cql3/functions/aggregate_fcts.hh" +#include "types/types.hh" #include @@ -31,12 +32,14 @@ selection::selection(schema_ptr schema, std::vector> metadata_, bool collect_timestamps, bool collect_TTLs, + bool collect_collection_timestamps, trivial is_trivial) : _schema(std::move(schema)) , _columns(std::move(columns)) , _metadata(::make_shared(std::move(metadata_))) , _collect_timestamps(collect_timestamps) , _collect_TTLs(collect_TTLs) + , _collect_collection_timestamps(collect_collection_timestamps) , _contains_static_columns(std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_static))) , _is_trivial(is_trivial) { } @@ -46,6 +49,7 @@ query::partition_slice::option_set selection::get_query_options() { opts.set_if(_collect_timestamps); opts.set_if(_collect_TTLs); + opts.set_if(_collect_collection_timestamps); opts.set_if( std::any_of(_columns.begin(), _columns.end(), @@ -114,7 +118,7 @@ public: */ simple_selection(schema_ptr schema, std::vector columns, std::vector> metadata, bool is_wildcard) - : selection(schema, std::move(columns), std::move(metadata), false, false, trivial::yes) + : selection(schema, std::move(columns), std::move(metadata), false, false, false, trivial::yes) , _is_wildcard(is_wildcard) { } @@ -178,6 +182,12 @@ contains_column_mutation_attribute(expr::column_mutation_attribute::attribute_ki }); } +static bool contains_collection_mutation_attribute(const expr::expression& e) { + return expr::find_in_expression(e, [](const expr::column_mutation_attribute& cma) { + return expr::is(cma.column) || expr::is(cma.column); + }); +} + static bool contains_writetime(const expr::expression& e) { @@ -202,7 +212,8 @@ public: std::vector selectors) : selection(schema, std::move(columns), std::move(metadata), contains_writetime(expr::tuple_constructor{selectors}), - contains_ttl(expr::tuple_constructor{selectors})) + contains_ttl(expr::tuple_constructor{selectors}), + contains_collection_mutation_attribute(expr::tuple_constructor{selectors})) , _selectors(std::move(selectors)) { auto agg_split = expr::split_aggregation(_selectors); @@ -391,6 +402,7 @@ protected: .static_and_regular_timestamps = rs._timestamps, .static_and_regular_ttls = rs._ttls, .temporaries = {}, + .collection_element_metadata = rs._collection_element_metadata, }; for (auto&& e : _sel._selectors) { auto out = expr::evaluate(e, inputs); @@ -429,6 +441,7 @@ protected: .static_and_regular_timestamps = rs._timestamps, .static_and_regular_ttls = rs._ttls, .temporaries = _temporaries, + .collection_element_metadata = rs._collection_element_metadata, }; for (size_t i = 0; i != _sel._inner_loop.size(); ++i) { _temporaries[i] = expr::evaluate(_sel._inner_loop[i], inputs); @@ -553,6 +566,9 @@ result_set_builder::result_set_builder(const selection& s, gc_clock::time_point if (s._collect_TTLs) { _ttls.resize(s._columns.size(), 0); } + if (s._collect_collection_timestamps) { + _collection_element_metadata.resize(s._columns.size()); + } } void result_set_builder::add_empty() { @@ -563,6 +579,9 @@ void result_set_builder::add_empty() { if (!_ttls.empty()) { _ttls[current.size() - 1] = -1; } + if (!_collection_element_metadata.empty()) { + _collection_element_metadata[current.size() - 1] = {}; + } } void result_set_builder::add(bytes_opt value) { @@ -585,8 +604,45 @@ void result_set_builder::add(const column_definition& def, const query::result_a } void result_set_builder::add_collection(const column_definition& def, bytes_view c) { + size_t col_idx = current.size(); + if (!_collection_element_metadata.empty()) { + // Extended format produced by serialize_for_cql_with_timestamps() + // [uint32 cql_len][cql bytes][int32 entry_count] + // followed by entry_count entries, each: + // [int32 key_len][key bytes][int64 timestamp][int64 expiry_raw] + // where expiry_raw is -1 if the element does not expire, otherwise + // it is the serialized gc_clock time used to derive the remaining + // TTL. The flag _collect_collection_timestamps = true determines + // whether this extended format is used (instead of a plain CQL + // collection blob), and it is only enabled when a feature flag + // guarantees both reader and writer support it. + uint32_t cql_len = read_simple(c); + bytes_view cql_bytes = read_simple_bytes(c, cql_len); + current.emplace_back(to_bytes(cql_bytes)); + + auto& meta = _collection_element_metadata[col_idx]; + meta = {}; // clear stale data from previous row + int32_t entry_count = read_simple(c); + for (int32_t i = 0; i < entry_count; ++i) { + int32_t key_len = read_simple(c); + bytes key = to_bytes(read_simple_bytes(c, key_len)); + int64_t ts = read_simple(c); + int64_t expiry_raw = read_simple(c); + meta.timestamps[key] = ts; + if (expiry_raw != -1) { + auto expiry = gc_clock::time_point(gc_clock::duration(expiry_raw)); + auto ttl_left = expiry - _now; + int32_t ttl = int32_t(ttl_left.count()); + if (ttl > 0) { + meta.ttls[key] = ttl; + } + // otherwise, expired or no TTL; We can omit this key from + // map - missing key is treated as null by the evaluator. + } + } + return; + } current.emplace_back(to_bytes(c)); - // timestamps, ttls meaningless for collections } void result_set_builder::update_last_group() { diff --git a/cql3/selection/selection.hh b/cql3/selection/selection.hh index 2d68e44c27..28980be6a6 100644 --- a/cql3/selection/selection.hh +++ b/cql3/selection/selection.hh @@ -12,6 +12,7 @@ #include "utils/assert.hh" #include "bytes.hh" +#include "cql3/expr/collection_cell_metadata.hh" #include "schema/schema_fwd.hh" #include "query/query-result-reader.hh" #include "selector.hh" @@ -69,6 +70,7 @@ private: ::shared_ptr _metadata; const bool _collect_timestamps; const bool _collect_TTLs; + const bool _collect_collection_timestamps; const bool _contains_static_columns; bool _is_trivial; protected: @@ -78,7 +80,9 @@ protected: std::vector columns, std::vector> metadata_, bool collect_timestamps, - bool collect_TTLs, trivial is_trivial = trivial::no); + bool collect_TTLs, + bool collect_collection_timestamps, + trivial is_trivial = trivial::no); virtual ~selection() {} public: @@ -197,6 +201,7 @@ public: std::vector current_clustering_key; std::vector _timestamps; std::vector _ttls; + std::vector _collection_element_metadata; const query_options* _options; private: const gc_clock::time_point _now;