From 882b2f4e9f51dd71d228afa4ffcd86ff9d861fc3 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 6 Jun 2024 20:11:04 +0300 Subject: [PATCH] cql3, schema_tables: Generalize function creation When a function is created with the CREATE FUNCTION statement, the statement handler does all the necessary preparations on its own. The very same code exists in schema_tables, when the function is loaded on boot. This patch generalizes both and keeps function language-specific context creation inside lang/ code. The creation function returns context via argument reference. It would have been nicer if it was returned via future<>, but it's not suitable for future type :( Signed-off-by: Pavel Emelyanov --- cql3/query_processor.hh | 2 +- cql3/statements/create_function_statement.cc | 26 +++++--------------- db/schema_tables.cc | 25 ++++++------------- lang/manager.cc | 22 +++++++++++++++++ lang/manager.hh | 6 +++++ replica/database.hh | 4 +-- 6 files changed, 44 insertions(+), 41 deletions(-) diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 6889dba7eb..b6ae18dad0 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -174,7 +174,7 @@ public: return _cql_stats; } - lang::manager& wasm() { return _lang_manager; } + lang::manager& lang() { return _lang_manager; } db::system_keyspace::auth_version_t auth_version; diff --git a/cql3/statements/create_function_statement.cc b/cql3/statements/create_function_statement.cc index 3614df4046..b1d0448d9c 100644 --- a/cql3/statements/create_function_statement.cc +++ b/cql3/statements/create_function_statement.cc @@ -39,27 +39,13 @@ seastar::future> create_function_statement::crea } auto&& db = qp.db(); - if (_language == "lua") { - auto cfg = lua::make_runtime_config(db.get_config()); - functions::user_function::context ctx = functions::user_function::lua_context { - .bitcode = lua::compile(cfg, arg_names, _body), - .cfg = cfg, - }; - - co_return ::make_shared(_name, _arg_types, std::move(arg_names), _body, _language, - std::move(return_type), _called_on_null_input, std::move(ctx)); - } else if (_language == "wasm") { - // FIXME: need better way to test wasm compilation without real_database() - wasm::context ctx(qp.wasm(), _name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()); - try { - co_await qp.wasm().precompile(ctx, arg_names, _body); - co_return ::make_shared(_name, _arg_types, std::move(arg_names), _body, _language, - std::move(return_type), _called_on_null_input, std::move(ctx)); - } catch (const wasm::exception& we) { - throw exceptions::invalid_request_exception(we.what()); - } + lang::manager::context ctx; + co_await qp.lang().create(_language, ctx, db.get_config(), _name.name, arg_names, _body); + if (!ctx) { + co_return nullptr; } - co_return nullptr; + co_return ::make_shared(_name, _arg_types, std::move(arg_names), _body, _language, + std::move(return_type), _called_on_null_input, std::move(*ctx)); } std::unique_ptr create_function_statement::prepare(data_dictionary::database db, cql_stats& stats) { diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 266ed845fa..1e1ca6dc2e 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1953,25 +1953,14 @@ static seastar::future> create_func(r auto arg_names = get_list(row, "argument_names"); auto body = row.get_nonnull("body"); auto language = row.get_nonnull("language"); - if (language == "lua") { - lua::runtime_config cfg = lua::make_runtime_config(db.get_config()); - cql3::functions::user_function::context ctx = cql3::functions::user_function::lua_context { - .bitcode = lua::compile(cfg, arg_names, body), - .cfg = cfg, - }; - - co_return ::make_shared(std::move(name), std::move(arg_types), std::move(arg_names), - std::move(body), language, std::move(return_type), - row.get_nonnull("called_on_null_input"), std::move(ctx)); - } else if (language == "wasm") { - wasm::context ctx(db.wasm(), name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()); - co_await db.wasm().precompile(ctx, arg_names, body); - co_return ::make_shared(std::move(name), std::move(arg_types), std::move(arg_names), - std::move(body), language, std::move(return_type), - row.get_nonnull("called_on_null_input"), std::move(ctx)); - } else { + lang::manager::context ctx; + co_await db.lang().create(language, ctx, db.get_config(), name.name, arg_names, body); + if (!ctx) { throw std::runtime_error(format("Unsupported language for UDF: {}", language)); } + co_return ::make_shared(std::move(name), std::move(arg_types), std::move(arg_names), + std::move(body), language, std::move(return_type), + row.get_nonnull("called_on_null_input"), std::move(*ctx)); } static shared_ptr create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row) { @@ -2033,7 +2022,7 @@ static void drop_cached_func(replica::database& db, const query::result_set_row& cql3::functions::function_name name{ row.get_nonnull("keyspace_name"), row.get_nonnull("function_name")}; auto arg_types = read_arg_types(db, row, name.keyspace); - db.wasm().remove(name, arg_types); + db.lang().remove(name, arg_types); } } diff --git a/lang/manager.cc b/lang/manager.cc index 9ba90f4667..cf8921028d 100644 --- a/lang/manager.cc +++ b/lang/manager.cc @@ -8,6 +8,7 @@ #include "lang/wasm.hh" #include "lang/manager.hh" +#include "exceptions/exceptions.hh" namespace lang { @@ -38,4 +39,25 @@ future<> manager::stop() { } } +future<> manager::create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector& arg_names, std::string script) { + if (language == "lua") { + auto lua_cfg = lua::make_runtime_config(cfg); + auto lua_ctx = cql3::functions::user_function::lua_context { + .bitcode = lua::compile(lua_cfg, arg_names, script), + .cfg = lua_cfg, + }; + + ctx = std::move(lua_ctx); + } else if (language == "wasm") { + // FIXME: need better way to test wasm compilation without real_database() + auto wasm_ctx = wasm::context(*this, name, cfg.wasm_udf_yield_fuel(), cfg.wasm_udf_total_fuel()); + try { + co_await precompile(wasm_ctx, arg_names, script); + } catch (const wasm::exception& we) { + throw exceptions::invalid_request_exception(we.what()); + } + ctx.emplace(std::move(wasm_ctx)); + } +} + } // lang namespace diff --git a/lang/manager.hh b/lang/manager.hh index 9c65390bf2..2e58a2c464 100644 --- a/lang/manager.hh +++ b/lang/manager.hh @@ -12,11 +12,14 @@ #include "rust/wasmtime_bindings.hh" #include "lang/wasm_instance_cache.hh" #include "lang/wasm_alien_thread_runner.hh" +#include "cql3/functions/user_function.hh" namespace wasm { struct context; } +namespace db { class config; } + namespace lang { class manager : public seastar::peering_sharded_service { @@ -42,6 +45,9 @@ public: void remove(const db::functions::function_name& name, const std::vector& arg_types) noexcept { _instance_cache->remove(name, arg_types); } + + using context = std::optional; + future<> create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector& arg_names, std::string script); }; } // lang namespace diff --git a/replica/database.hh b/replica/database.hh index 77c58f97e4..3d1f92ff9a 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1625,8 +1625,8 @@ public: const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; } const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); } - lang::manager& wasm() noexcept { return _lang_manager; } - const lang::manager& wasm() const noexcept { return _lang_manager; } + lang::manager& lang() noexcept { return _lang_manager; } + const lang::manager& lang() const noexcept { return _lang_manager; } service::migration_notifier& get_notifier() { return _mnotifier; } const service::migration_notifier& get_notifier() const { return _mnotifier; }