cql3, schema_tables: Generalize function creation

When a function is created with the CREATE FUNCTION statement, the
statement handler does all the necessary preparations on its own. The
very same code exists in schema_tables, when the function is loaded on
boot. This patch generalizes both and keeps function language-specific
context creation inside lang/ code.

The creation function returns context via argument reference. It would
have been nicer if it was returned via future<>, but it's not suitable
for future<T> type :(

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2024-06-06 20:11:04 +03:00
parent fe7ff7172d
commit 882b2f4e9f
6 changed files with 44 additions and 41 deletions

View File

@@ -174,7 +174,7 @@ public:
return _cql_stats;
}
lang::manager& wasm() { return _lang_manager; }
lang::manager& lang() { return _lang_manager; }
db::system_keyspace::auth_version_t auth_version;

View File

@@ -39,27 +39,13 @@ seastar::future<shared_ptr<functions::function>> create_function_statement::crea
}
auto&& db = qp.db();
if (_language == "lua") {
auto cfg = lua::make_runtime_config(db.get_config());
functions::user_function::context ctx = functions::user_function::lua_context {
.bitcode = lua::compile(cfg, arg_names, _body),
.cfg = cfg,
};
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(ctx));
} else if (_language == "wasm") {
// FIXME: need better way to test wasm compilation without real_database()
wasm::context ctx(qp.wasm(), _name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
try {
co_await qp.wasm().precompile(ctx, arg_names, _body);
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(ctx));
} catch (const wasm::exception& we) {
throw exceptions::invalid_request_exception(we.what());
}
lang::manager::context ctx;
co_await qp.lang().create(_language, ctx, db.get_config(), _name.name, arg_names, _body);
if (!ctx) {
co_return nullptr;
}
co_return nullptr;
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(*ctx));
}
std::unique_ptr<prepared_statement> create_function_statement::prepare(data_dictionary::database db, cql_stats& stats) {

View File

@@ -1953,25 +1953,14 @@ static seastar::future<shared_ptr<cql3::functions::user_function>> create_func(r
auto arg_names = get_list<sstring>(row, "argument_names");
auto body = row.get_nonnull<sstring>("body");
auto language = row.get_nonnull<sstring>("language");
if (language == "lua") {
lua::runtime_config cfg = lua::make_runtime_config(db.get_config());
cql3::functions::user_function::context ctx = cql3::functions::user_function::lua_context {
.bitcode = lua::compile(cfg, arg_names, body),
.cfg = cfg,
};
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else if (language == "wasm") {
wasm::context ctx(db.wasm(), name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
co_await db.wasm().precompile(ctx, arg_names, body);
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else {
lang::manager::context ctx;
co_await db.lang().create(language, ctx, db.get_config(), name.name, arg_names, body);
if (!ctx) {
throw std::runtime_error(format("Unsupported language for UDF: {}", language));
}
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(*ctx));
}
static shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row) {
@@ -2033,7 +2022,7 @@ static void drop_cached_func(replica::database& db, const query::result_set_row&
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(db, row, name.keyspace);
db.wasm().remove(name, arg_types);
db.lang().remove(name, arg_types);
}
}

View File

@@ -8,6 +8,7 @@
#include "lang/wasm.hh"
#include "lang/manager.hh"
#include "exceptions/exceptions.hh"
namespace lang {
@@ -38,4 +39,25 @@ future<> manager::stop() {
}
}
future<> manager::create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector<sstring>& arg_names, std::string script) {
if (language == "lua") {
auto lua_cfg = lua::make_runtime_config(cfg);
auto lua_ctx = cql3::functions::user_function::lua_context {
.bitcode = lua::compile(lua_cfg, arg_names, script),
.cfg = lua_cfg,
};
ctx = std::move(lua_ctx);
} else if (language == "wasm") {
// FIXME: need better way to test wasm compilation without real_database()
auto wasm_ctx = wasm::context(*this, name, cfg.wasm_udf_yield_fuel(), cfg.wasm_udf_total_fuel());
try {
co_await precompile(wasm_ctx, arg_names, script);
} catch (const wasm::exception& we) {
throw exceptions::invalid_request_exception(we.what());
}
ctx.emplace(std::move(wasm_ctx));
}
}
} // lang namespace

View File

@@ -12,11 +12,14 @@
#include "rust/wasmtime_bindings.hh"
#include "lang/wasm_instance_cache.hh"
#include "lang/wasm_alien_thread_runner.hh"
#include "cql3/functions/user_function.hh"
namespace wasm {
struct context;
}
namespace db { class config; }
namespace lang {
class manager : public seastar::peering_sharded_service<manager> {
@@ -42,6 +45,9 @@ public:
void remove(const db::functions::function_name& name, const std::vector<data_type>& arg_types) noexcept {
_instance_cache->remove(name, arg_types);
}
using context = std::optional<cql3::functions::user_function::context>;
future<> create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector<sstring>& arg_names, std::string script);
};
} // lang namespace

View File

@@ -1625,8 +1625,8 @@ public:
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
lang::manager& wasm() noexcept { return _lang_manager; }
const lang::manager& wasm() const noexcept { return _lang_manager; }
lang::manager& lang() noexcept { return _lang_manager; }
const lang::manager& lang() const noexcept { return _lang_manager; }
service::migration_notifier& get_notifier() { return _mnotifier; }
const service::migration_notifier& get_notifier() const { return _mnotifier; }