From a26516ef650d3fa3a34736624fdd90397ffa0830 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 27 Jun 2023 15:04:50 +0300 Subject: [PATCH] cql3: expression: add helper to split expressions with aggregate functions Aggregate functions cannot be evaluated directly, since they implicitly refer to state (the accumulator). To allow for evaluation, we split the expression into two: an inner expression that is evaluated over the input vector (once per element). The inner expression calls the aggregation function, with an extra input parameter (the accumulator). The outer expression is evaluated once per input vector; it calls the final function, and its input is just the accumulator. The outer expression also contains any expressions that operate on the result of the aggregate function. The acculator is stored in a temporary. Simple example: sum(x) is transformed into an inner expression: t1 = (t1 + x) // really sum.aggregation_function and an outer expression: result = t1 // really sum.state_to_result_function Complicated example: scalar_func(agg1(x, f1(y)), agg2(x, f2(y))) is transformed into two inner expressions: t1 = agg1.aggregation_function(t1, x, f1(y)) t2 = agg2.aggregation_function(t2, x, f2(y)) and an outer expression output = scalar_func(agg1.state_to_result_function(t1), agg2.state_to_result_function(t2)) There's a small wart: automatically parallelized queries can generate "reducible" aggregates that have no state_to_result function, since we want to pass the state back to the coordinator. Detect that and short circuit evaluation to pass the accumulator directly. --- cql3/expr/expr-utils.hh | 21 +++++++++++++ cql3/expr/expression.cc | 69 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/cql3/expr/expr-utils.hh b/cql3/expr/expr-utils.hh index baf9916cbd..7915e51396 100644 --- a/cql3/expr/expr-utils.hh +++ b/cql3/expr/expr-utils.hh @@ -366,4 +366,25 @@ unsigned aggregation_depth(const cql3::expr::expression& e); // cql3::expr::expression levellize_aggregation_depth(const cql3::expr::expression& e, unsigned depth); + +struct aggregation_split_result { + std::vector inner_loop; + std::vector outer_loop; + std::vector initial_values_for_temporaries; // same size as inner_loop +}; + +// Given a vector of aggergation expressions, split them into an inner loop that +// calls the aggregating function on each input row, and an outer loop that calls +// the final function on temporaries and generate the result. +// +// inner_loop should be evaluated with for each input row in a group, and its +// results stored in temporaries seeded from initial_values_for_temporaries +// +// outer_loop should be evaluated once for each group, just with temporaries +// as input. +// +// If the expressions don't contain aggregates, inner_loop and initial_values_for_temporaries +// are empty, and outer_loop should be evaluated for each loop. +aggregation_split_result split_aggregation(std::span aggregation); + } \ No newline at end of file diff --git a/cql3/expr/expression.cc b/cql3/expr/expression.cc index b683f74231..c75140c45f 100644 --- a/cql3/expr/expression.cc +++ b/cql3/expr/expression.cc @@ -2929,6 +2929,75 @@ levellize_aggregation_depth(const cql3::expr::expression& e, unsigned desired_de }, e); } +aggregation_split_result +split_aggregation(std::span aggregation) { + size_t nr_temporaries = 0; + auto allocate_temporary = [&] () -> size_t { + return nr_temporaries++; + }; + std::vector inner_vec; + std::vector outer_vec; + std::vector initial_values_vec; + for (auto& e : aggregation) { + auto outer = search_and_replace(e, [&] (const expression& e) -> std::optional { + auto fc = as_if(&e); + if (!fc) { + return std::nullopt; + } + return std::visit(overloaded_functor{ + [] (const functions::function_name& n) -> std::optional { + on_internal_error(expr_logger, "unprepared function_call in split_aggregation()"); + }, + [&] (const shared_ptr& fn) -> std::optional { + if (!fn->is_aggregate()) { + return std::nullopt; + } + // Split the aggregate. The aggregation function becomes the inner loop, the final + // function becomes the outer loop, and they're connected with a temporary. + auto temp = allocate_temporary(); + auto agg_fn = dynamic_pointer_cast(fn); + auto& agg = agg_fn->get_aggregate(); + auto inner_fn = agg.aggregation_function; + auto outer_fn = agg.state_to_result_function; + auto inner_args = std::vector(); + inner_args.push_back(temporary{.index = temp, .type = agg.state_type}); + inner_args.insert(inner_args.end(), fc->args.begin(), fc->args.end()); + auto inner = function_call{ + .func = std::move(inner_fn), + .args = std::move(inner_args), + }; + // The result of evaluating inner should be stored in the same temporary. + auto outer_args = std::vector(); + outer_args.push_back(temporary{.index = temp, .type = agg.state_type}); + auto outer = std::invoke([&] () -> expression { + if (outer_fn) { + return function_call{ + .func = std::move(outer_fn), + .args = std::move(outer_args), + }; + } else { + // When executing automatically parallelized queries, + // we get no state_to_result_function since we have to return + // the state. Just return the temporary that holds the state. + return outer_args[0]; + } + }); + inner_vec.push_back(std::move(inner)); + initial_values_vec.push_back(raw_value::make_value(agg.initial_state)); + return outer; + } + }, fc->func); + }); + outer_vec.push_back(std::move(outer)); + } + // Whew! + return aggregation_split_result{ + .inner_loop = std::move(inner_vec), + .outer_loop = std::move(outer_vec), + .initial_values_for_temporaries = std::move(initial_values_vec), + }; +} + } // namespace expr } // namespace cql3