diff --git a/cql3/expr/expr-utils.hh b/cql3/expr/expr-utils.hh index baf9916cbd..7915e51396 100644 --- a/cql3/expr/expr-utils.hh +++ b/cql3/expr/expr-utils.hh @@ -366,4 +366,25 @@ unsigned aggregation_depth(const cql3::expr::expression& e); // cql3::expr::expression levellize_aggregation_depth(const cql3::expr::expression& e, unsigned depth); + +struct aggregation_split_result { + std::vector inner_loop; + std::vector outer_loop; + std::vector initial_values_for_temporaries; // same size as inner_loop +}; + +// Given a vector of aggergation expressions, split them into an inner loop that +// calls the aggregating function on each input row, and an outer loop that calls +// the final function on temporaries and generate the result. +// +// inner_loop should be evaluated with for each input row in a group, and its +// results stored in temporaries seeded from initial_values_for_temporaries +// +// outer_loop should be evaluated once for each group, just with temporaries +// as input. +// +// If the expressions don't contain aggregates, inner_loop and initial_values_for_temporaries +// are empty, and outer_loop should be evaluated for each loop. +aggregation_split_result split_aggregation(std::span aggregation); + } \ No newline at end of file diff --git a/cql3/expr/expression.cc b/cql3/expr/expression.cc index b683f74231..c75140c45f 100644 --- a/cql3/expr/expression.cc +++ b/cql3/expr/expression.cc @@ -2929,6 +2929,75 @@ levellize_aggregation_depth(const cql3::expr::expression& e, unsigned desired_de }, e); } +aggregation_split_result +split_aggregation(std::span aggregation) { + size_t nr_temporaries = 0; + auto allocate_temporary = [&] () -> size_t { + return nr_temporaries++; + }; + std::vector inner_vec; + std::vector outer_vec; + std::vector initial_values_vec; + for (auto& e : aggregation) { + auto outer = search_and_replace(e, [&] (const expression& e) -> std::optional { + auto fc = as_if(&e); + if (!fc) { + return std::nullopt; + } + return std::visit(overloaded_functor{ + [] (const functions::function_name& n) -> std::optional { + on_internal_error(expr_logger, "unprepared function_call in split_aggregation()"); + }, + [&] (const shared_ptr& fn) -> std::optional { + if (!fn->is_aggregate()) { + return std::nullopt; + } + // Split the aggregate. The aggregation function becomes the inner loop, the final + // function becomes the outer loop, and they're connected with a temporary. + auto temp = allocate_temporary(); + auto agg_fn = dynamic_pointer_cast(fn); + auto& agg = agg_fn->get_aggregate(); + auto inner_fn = agg.aggregation_function; + auto outer_fn = agg.state_to_result_function; + auto inner_args = std::vector(); + inner_args.push_back(temporary{.index = temp, .type = agg.state_type}); + inner_args.insert(inner_args.end(), fc->args.begin(), fc->args.end()); + auto inner = function_call{ + .func = std::move(inner_fn), + .args = std::move(inner_args), + }; + // The result of evaluating inner should be stored in the same temporary. + auto outer_args = std::vector(); + outer_args.push_back(temporary{.index = temp, .type = agg.state_type}); + auto outer = std::invoke([&] () -> expression { + if (outer_fn) { + return function_call{ + .func = std::move(outer_fn), + .args = std::move(outer_args), + }; + } else { + // When executing automatically parallelized queries, + // we get no state_to_result_function since we have to return + // the state. Just return the temporary that holds the state. + return outer_args[0]; + } + }); + inner_vec.push_back(std::move(inner)); + initial_values_vec.push_back(raw_value::make_value(agg.initial_state)); + return outer; + } + }, fc->func); + }); + outer_vec.push_back(std::move(outer)); + } + // Whew! + return aggregation_split_result{ + .inner_loop = std::move(inner_vec), + .outer_loop = std::move(outer_vec), + .initial_values_for_temporaries = std::move(initial_values_vec), + }; +} + } // namespace expr } // namespace cql3