cql3: expression: add helper to split expressions with aggregate functions

Aggregate functions cannot be evaluated directly, since they implicitly
refer to state (the accumulator). To allow for evaluation, we
split the expression into two: an inner expression that is evaluated
over the input vector (once per element). The inner expression calls
the aggregation function, with an extra input parameter (the accumulator).

The outer expression is evaluated once per input vector; it calls
the final function, and its input is just the accumulator. The outer
expression also contains any expressions that operate on the result
of the aggregate function.

The acculator is stored in a temporary.

Simple example:

   sum(x)

is transformed into an inner expression:

   t1 = (t1 + x)   // really sum.aggregation_function

and an outer expression:

   result = t1     // really sum.state_to_result_function

Complicated example:

    scalar_func(agg1(x, f1(y)), agg2(x, f2(y)))

is transformed into two inner expressions:

    t1 = agg1.aggregation_function(t1, x, f1(y))
    t2 = agg2.aggregation_function(t2, x, f2(y))

and an outer expression

    output = scalar_func(agg1.state_to_result_function(t1),
                         agg2.state_to_result_function(t2))

There's a small wart: automatically parallelized queries can generate
"reducible" aggregates that have no state_to_result function, since we
want to pass the state back to the coordinator. Detect that and short
circuit evaluation to pass the accumulator directly.
This commit is contained in:
Avi Kivity
2023-06-27 15:04:50 +03:00
parent f48ecb5049
commit a26516ef65
2 changed files with 90 additions and 0 deletions

View File

@@ -366,4 +366,25 @@ unsigned aggregation_depth(const cql3::expr::expression& e);
//
cql3::expr::expression levellize_aggregation_depth(const cql3::expr::expression& e, unsigned depth);
struct aggregation_split_result {
std::vector<expression> inner_loop;
std::vector<expression> outer_loop;
std::vector<cql3::raw_value> initial_values_for_temporaries; // same size as inner_loop
};
// Given a vector of aggergation expressions, split them into an inner loop that
// calls the aggregating function on each input row, and an outer loop that calls
// the final function on temporaries and generate the result.
//
// inner_loop should be evaluated with for each input row in a group, and its
// results stored in temporaries seeded from initial_values_for_temporaries
//
// outer_loop should be evaluated once for each group, just with temporaries
// as input.
//
// If the expressions don't contain aggregates, inner_loop and initial_values_for_temporaries
// are empty, and outer_loop should be evaluated for each loop.
aggregation_split_result split_aggregation(std::span<const expression> aggregation);
}

View File

@@ -2929,6 +2929,75 @@ levellize_aggregation_depth(const cql3::expr::expression& e, unsigned desired_de
}, e);
}
aggregation_split_result
split_aggregation(std::span<const expression> aggregation) {
size_t nr_temporaries = 0;
auto allocate_temporary = [&] () -> size_t {
return nr_temporaries++;
};
std::vector<expression> inner_vec;
std::vector<expression> outer_vec;
std::vector<raw_value> initial_values_vec;
for (auto& e : aggregation) {
auto outer = search_and_replace(e, [&] (const expression& e) -> std::optional<expression> {
auto fc = as_if<function_call>(&e);
if (!fc) {
return std::nullopt;
}
return std::visit(overloaded_functor{
[] (const functions::function_name& n) -> std::optional<expression> {
on_internal_error(expr_logger, "unprepared function_call in split_aggregation()");
},
[&] (const shared_ptr<functions::function>& fn) -> std::optional<expression> {
if (!fn->is_aggregate()) {
return std::nullopt;
}
// Split the aggregate. The aggregation function becomes the inner loop, the final
// function becomes the outer loop, and they're connected with a temporary.
auto temp = allocate_temporary();
auto agg_fn = dynamic_pointer_cast<cql3::functions::aggregate_function>(fn);
auto& agg = agg_fn->get_aggregate();
auto inner_fn = agg.aggregation_function;
auto outer_fn = agg.state_to_result_function;
auto inner_args = std::vector<expression>();
inner_args.push_back(temporary{.index = temp, .type = agg.state_type});
inner_args.insert(inner_args.end(), fc->args.begin(), fc->args.end());
auto inner = function_call{
.func = std::move(inner_fn),
.args = std::move(inner_args),
};
// The result of evaluating inner should be stored in the same temporary.
auto outer_args = std::vector<expression>();
outer_args.push_back(temporary{.index = temp, .type = agg.state_type});
auto outer = std::invoke([&] () -> expression {
if (outer_fn) {
return function_call{
.func = std::move(outer_fn),
.args = std::move(outer_args),
};
} else {
// When executing automatically parallelized queries,
// we get no state_to_result_function since we have to return
// the state. Just return the temporary that holds the state.
return outer_args[0];
}
});
inner_vec.push_back(std::move(inner));
initial_values_vec.push_back(raw_value::make_value(agg.initial_state));
return outer;
}
}, fc->func);
});
outer_vec.push_back(std::move(outer));
}
// Whew!
return aggregation_split_result{
.inner_loop = std::move(inner_vec),
.outer_loop = std::move(outer_vec),
.initial_values_for_temporaries = std::move(initial_values_vec),
};
}
} // namespace expr
} // namespace cql3