cql3: parse per-element timestamps/TTLs in the selection layer

Wire up the selection and result-set infrastructure to consume the
extended collection wire format introduced in the previous patch and
expose per-element timestamps and TTLs to the expression evaluator.

* Add collection_cell_metadata: maps from raw element-key bytes to
  timestamp and remaining TTL, one entry per collection or UDT cell.
  Add a corresponding collection_element_metadata span to
  evaluation_inputs so that evaluators can access it.

* Add a flag _collect_collection_timestamps to selection (selection.hh/cc).
  When any selected expression contains a WRITETIME(col[key])/TTL(col[key])
  or WRITETIME(col.field)/TTL(col.field) attribute, the flag is set and
  the send_collection_timestamps partition-slice option is enabled,
  causing storage nodes to use the extended wire format from the
  previous patch.

* Implement result_set_builder::add_collection() (selection.cc): when
  _collect_collection_timestamps is set, parse the extended format,
  decode per-element timestamps and remaining TTLs (computed from the
  stored expiry time and the query time), and store them in
  _collection_element_metadata indexed by column position.  When the
  flag is not set, the existing plain-bytes path is unchanged.

After this patch, the new selection feature is still not available to
the end-user because the prepare step still forbids it. The next patch
will finally complete the expression preparation and evaluation.
It will read the new collection_element_metadata and return the correct
timestamp or TTL value.
This commit is contained in:
Nadav Har'El
2026-04-12 11:06:30 +03:00
parent bb63db34e5
commit 4ac63de063
4 changed files with 88 additions and 4 deletions

View File

@@ -0,0 +1,21 @@
// Copyright (C) 2026-present ScyllaDB
// SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#pragma once
#include <map>
#include "bytes.hh"
#include "mutation/timestamp.hh"
namespace cql3::expr {
// Per-element timestamps and TTLs for a cell of a map, set or UDT (populated
// when a WRITETIME() or TTL() of col[key] or col.field are in the query.
// Keys are the raw serialized keys or serialized field index.
struct collection_cell_metadata {
std::map<bytes, api::timestamp_type> timestamps;
std::map<bytes, int32_t> ttls; // remaining TTL in seconds (-1 if no TTL)
};
} // namespace cql3::expr

View File

@@ -3,6 +3,7 @@
#pragma once
#include "collection_cell_metadata.hh"
#include "expression.hh"
#include "bytes.hh"
@@ -27,6 +28,7 @@ struct evaluation_inputs {
std::span<const api::timestamp_type> static_and_regular_timestamps; // indexes match `selection` member
std::span<const int32_t> static_and_regular_ttls; // indexes match `selection` member
std::span<const cql3::raw_value> temporaries; // indexes match temporary::index
std::span<const collection_cell_metadata> collection_element_metadata; // indexes match `selection` member
};
// Takes a prepared expression and calculates its value.

View File

@@ -17,6 +17,7 @@
#include "cql3/expr/expr-utils.hh"
#include "cql3/functions/first_function.hh"
#include "cql3/functions/aggregate_fcts.hh"
#include "types/types.hh"
#include <ranges>
@@ -31,12 +32,14 @@ selection::selection(schema_ptr schema,
std::vector<lw_shared_ptr<column_specification>> metadata_,
bool collect_timestamps,
bool collect_TTLs,
bool collect_collection_timestamps,
trivial is_trivial)
: _schema(std::move(schema))
, _columns(std::move(columns))
, _metadata(::make_shared<metadata>(std::move(metadata_)))
, _collect_timestamps(collect_timestamps)
, _collect_TTLs(collect_TTLs)
, _collect_collection_timestamps(collect_collection_timestamps)
, _contains_static_columns(std::any_of(_columns.begin(), _columns.end(), std::mem_fn(&column_definition::is_static)))
, _is_trivial(is_trivial)
{ }
@@ -46,6 +49,7 @@ query::partition_slice::option_set selection::get_query_options() {
opts.set_if<query::partition_slice::option::send_timestamp>(_collect_timestamps);
opts.set_if<query::partition_slice::option::send_expiry>(_collect_TTLs);
opts.set_if<query::partition_slice::option::send_collection_timestamps>(_collect_collection_timestamps);
opts.set_if<query::partition_slice::option::send_partition_key>(
std::any_of(_columns.begin(), _columns.end(),
@@ -114,7 +118,7 @@ public:
*/
simple_selection(schema_ptr schema, std::vector<const column_definition*> columns,
std::vector<lw_shared_ptr<column_specification>> metadata, bool is_wildcard)
: selection(schema, std::move(columns), std::move(metadata), false, false, trivial::yes)
: selection(schema, std::move(columns), std::move(metadata), false, false, false, trivial::yes)
, _is_wildcard(is_wildcard)
{ }
@@ -178,6 +182,12 @@ contains_column_mutation_attribute(expr::column_mutation_attribute::attribute_ki
});
}
static bool contains_collection_mutation_attribute(const expr::expression& e) {
return expr::find_in_expression<expr::column_mutation_attribute>(e, [](const expr::column_mutation_attribute& cma) {
return expr::is<expr::subscript>(cma.column) || expr::is<expr::field_selection>(cma.column);
});
}
static
bool
contains_writetime(const expr::expression& e) {
@@ -202,7 +212,8 @@ public:
std::vector<expr::expression> selectors)
: selection(schema, std::move(columns), std::move(metadata),
contains_writetime(expr::tuple_constructor{selectors}),
contains_ttl(expr::tuple_constructor{selectors}))
contains_ttl(expr::tuple_constructor{selectors}),
contains_collection_mutation_attribute(expr::tuple_constructor{selectors}))
, _selectors(std::move(selectors))
{
auto agg_split = expr::split_aggregation(_selectors);
@@ -391,6 +402,7 @@ protected:
.static_and_regular_timestamps = rs._timestamps,
.static_and_regular_ttls = rs._ttls,
.temporaries = {},
.collection_element_metadata = rs._collection_element_metadata,
};
for (auto&& e : _sel._selectors) {
auto out = expr::evaluate(e, inputs);
@@ -429,6 +441,7 @@ protected:
.static_and_regular_timestamps = rs._timestamps,
.static_and_regular_ttls = rs._ttls,
.temporaries = _temporaries,
.collection_element_metadata = rs._collection_element_metadata,
};
for (size_t i = 0; i != _sel._inner_loop.size(); ++i) {
_temporaries[i] = expr::evaluate(_sel._inner_loop[i], inputs);
@@ -553,6 +566,9 @@ result_set_builder::result_set_builder(const selection& s, gc_clock::time_point
if (s._collect_TTLs) {
_ttls.resize(s._columns.size(), 0);
}
if (s._collect_collection_timestamps) {
_collection_element_metadata.resize(s._columns.size());
}
}
void result_set_builder::add_empty() {
@@ -563,6 +579,9 @@ void result_set_builder::add_empty() {
if (!_ttls.empty()) {
_ttls[current.size() - 1] = -1;
}
if (!_collection_element_metadata.empty()) {
_collection_element_metadata[current.size() - 1] = {};
}
}
void result_set_builder::add(bytes_opt value) {
@@ -585,8 +604,45 @@ void result_set_builder::add(const column_definition& def, const query::result_a
}
void result_set_builder::add_collection(const column_definition& def, bytes_view c) {
size_t col_idx = current.size();
if (!_collection_element_metadata.empty()) {
// Extended format produced by serialize_for_cql_with_timestamps()
// [uint32 cql_len][cql bytes][int32 entry_count]
// followed by entry_count entries, each:
// [int32 key_len][key bytes][int64 timestamp][int64 expiry_raw]
// where expiry_raw is -1 if the element does not expire, otherwise
// it is the serialized gc_clock time used to derive the remaining
// TTL. The flag _collect_collection_timestamps = true determines
// whether this extended format is used (instead of a plain CQL
// collection blob), and it is only enabled when a feature flag
// guarantees both reader and writer support it.
uint32_t cql_len = read_simple<uint32_t>(c);
bytes_view cql_bytes = read_simple_bytes(c, cql_len);
current.emplace_back(to_bytes(cql_bytes));
auto& meta = _collection_element_metadata[col_idx];
meta = {}; // clear stale data from previous row
int32_t entry_count = read_simple<int32_t>(c);
for (int32_t i = 0; i < entry_count; ++i) {
int32_t key_len = read_simple<int32_t>(c);
bytes key = to_bytes(read_simple_bytes(c, key_len));
int64_t ts = read_simple<int64_t>(c);
int64_t expiry_raw = read_simple<int64_t>(c);
meta.timestamps[key] = ts;
if (expiry_raw != -1) {
auto expiry = gc_clock::time_point(gc_clock::duration(expiry_raw));
auto ttl_left = expiry - _now;
int32_t ttl = int32_t(ttl_left.count());
if (ttl > 0) {
meta.ttls[key] = ttl;
}
// otherwise, expired or no TTL; We can omit this key from
// map - missing key is treated as null by the evaluator.
}
}
return;
}
current.emplace_back(to_bytes(c));
// timestamps, ttls meaningless for collections
}
void result_set_builder::update_last_group() {

View File

@@ -12,6 +12,7 @@
#include "utils/assert.hh"
#include "bytes.hh"
#include "cql3/expr/collection_cell_metadata.hh"
#include "schema/schema_fwd.hh"
#include "query/query-result-reader.hh"
#include "selector.hh"
@@ -69,6 +70,7 @@ private:
::shared_ptr<metadata> _metadata;
const bool _collect_timestamps;
const bool _collect_TTLs;
const bool _collect_collection_timestamps;
const bool _contains_static_columns;
bool _is_trivial;
protected:
@@ -78,7 +80,9 @@ protected:
std::vector<const column_definition*> columns,
std::vector<lw_shared_ptr<column_specification>> metadata_,
bool collect_timestamps,
bool collect_TTLs, trivial is_trivial = trivial::no);
bool collect_TTLs,
bool collect_collection_timestamps,
trivial is_trivial = trivial::no);
virtual ~selection() {}
public:
@@ -197,6 +201,7 @@ public:
std::vector<bytes> current_clustering_key;
std::vector<api::timestamp_type> _timestamps;
std::vector<int32_t> _ttls;
std::vector<cql3::expr::collection_cell_metadata> _collection_element_metadata;
const query_options* _options;
private:
const gc_clock::time_point _now;