cql3, schema_tables: Generalize function creation
When a function is created with the CREATE FUNCTION statement, the statement handler does all the necessary preparations on its own. The very same code exists in schema_tables, when the function is loaded on boot. This patch generalizes both and keeps function language-specific context creation inside lang/ code. The creation function returns context via argument reference. It would have been nicer if it was returned via future<>, but it's not suitable for future<T> type :( Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -174,7 +174,7 @@ public:
|
||||
return _cql_stats;
|
||||
}
|
||||
|
||||
lang::manager& wasm() { return _lang_manager; }
|
||||
lang::manager& lang() { return _lang_manager; }
|
||||
|
||||
db::system_keyspace::auth_version_t auth_version;
|
||||
|
||||
|
||||
@@ -39,27 +39,13 @@ seastar::future<shared_ptr<functions::function>> create_function_statement::crea
|
||||
}
|
||||
|
||||
auto&& db = qp.db();
|
||||
if (_language == "lua") {
|
||||
auto cfg = lua::make_runtime_config(db.get_config());
|
||||
functions::user_function::context ctx = functions::user_function::lua_context {
|
||||
.bitcode = lua::compile(cfg, arg_names, _body),
|
||||
.cfg = cfg,
|
||||
};
|
||||
|
||||
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
|
||||
std::move(return_type), _called_on_null_input, std::move(ctx));
|
||||
} else if (_language == "wasm") {
|
||||
// FIXME: need better way to test wasm compilation without real_database()
|
||||
wasm::context ctx(qp.wasm(), _name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
|
||||
try {
|
||||
co_await qp.wasm().precompile(ctx, arg_names, _body);
|
||||
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
|
||||
std::move(return_type), _called_on_null_input, std::move(ctx));
|
||||
} catch (const wasm::exception& we) {
|
||||
throw exceptions::invalid_request_exception(we.what());
|
||||
}
|
||||
lang::manager::context ctx;
|
||||
co_await qp.lang().create(_language, ctx, db.get_config(), _name.name, arg_names, _body);
|
||||
if (!ctx) {
|
||||
co_return nullptr;
|
||||
}
|
||||
co_return nullptr;
|
||||
co_return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
|
||||
std::move(return_type), _called_on_null_input, std::move(*ctx));
|
||||
}
|
||||
|
||||
std::unique_ptr<prepared_statement> create_function_statement::prepare(data_dictionary::database db, cql_stats& stats) {
|
||||
|
||||
@@ -1953,25 +1953,14 @@ static seastar::future<shared_ptr<cql3::functions::user_function>> create_func(r
|
||||
auto arg_names = get_list<sstring>(row, "argument_names");
|
||||
auto body = row.get_nonnull<sstring>("body");
|
||||
auto language = row.get_nonnull<sstring>("language");
|
||||
if (language == "lua") {
|
||||
lua::runtime_config cfg = lua::make_runtime_config(db.get_config());
|
||||
cql3::functions::user_function::context ctx = cql3::functions::user_function::lua_context {
|
||||
.bitcode = lua::compile(cfg, arg_names, body),
|
||||
.cfg = cfg,
|
||||
};
|
||||
|
||||
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
|
||||
std::move(body), language, std::move(return_type),
|
||||
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
|
||||
} else if (language == "wasm") {
|
||||
wasm::context ctx(db.wasm(), name.name, db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel());
|
||||
co_await db.wasm().precompile(ctx, arg_names, body);
|
||||
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
|
||||
std::move(body), language, std::move(return_type),
|
||||
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
|
||||
} else {
|
||||
lang::manager::context ctx;
|
||||
co_await db.lang().create(language, ctx, db.get_config(), name.name, arg_names, body);
|
||||
if (!ctx) {
|
||||
throw std::runtime_error(format("Unsupported language for UDF: {}", language));
|
||||
}
|
||||
co_return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
|
||||
std::move(body), language, std::move(return_type),
|
||||
row.get_nonnull<bool>("called_on_null_input"), std::move(*ctx));
|
||||
}
|
||||
|
||||
static shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row) {
|
||||
@@ -2033,7 +2022,7 @@ static void drop_cached_func(replica::database& db, const query::result_set_row&
|
||||
cql3::functions::function_name name{
|
||||
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
|
||||
auto arg_types = read_arg_types(db, row, name.keyspace);
|
||||
db.wasm().remove(name, arg_types);
|
||||
db.lang().remove(name, arg_types);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "lang/wasm.hh"
|
||||
#include "lang/manager.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
|
||||
namespace lang {
|
||||
|
||||
@@ -38,4 +39,25 @@ future<> manager::stop() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> manager::create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector<sstring>& arg_names, std::string script) {
|
||||
if (language == "lua") {
|
||||
auto lua_cfg = lua::make_runtime_config(cfg);
|
||||
auto lua_ctx = cql3::functions::user_function::lua_context {
|
||||
.bitcode = lua::compile(lua_cfg, arg_names, script),
|
||||
.cfg = lua_cfg,
|
||||
};
|
||||
|
||||
ctx = std::move(lua_ctx);
|
||||
} else if (language == "wasm") {
|
||||
// FIXME: need better way to test wasm compilation without real_database()
|
||||
auto wasm_ctx = wasm::context(*this, name, cfg.wasm_udf_yield_fuel(), cfg.wasm_udf_total_fuel());
|
||||
try {
|
||||
co_await precompile(wasm_ctx, arg_names, script);
|
||||
} catch (const wasm::exception& we) {
|
||||
throw exceptions::invalid_request_exception(we.what());
|
||||
}
|
||||
ctx.emplace(std::move(wasm_ctx));
|
||||
}
|
||||
}
|
||||
|
||||
} // lang namespace
|
||||
|
||||
@@ -12,11 +12,14 @@
|
||||
#include "rust/wasmtime_bindings.hh"
|
||||
#include "lang/wasm_instance_cache.hh"
|
||||
#include "lang/wasm_alien_thread_runner.hh"
|
||||
#include "cql3/functions/user_function.hh"
|
||||
|
||||
namespace wasm {
|
||||
struct context;
|
||||
}
|
||||
|
||||
namespace db { class config; }
|
||||
|
||||
namespace lang {
|
||||
|
||||
class manager : public seastar::peering_sharded_service<manager> {
|
||||
@@ -42,6 +45,9 @@ public:
|
||||
void remove(const db::functions::function_name& name, const std::vector<data_type>& arg_types) noexcept {
|
||||
_instance_cache->remove(name, arg_types);
|
||||
}
|
||||
|
||||
using context = std::optional<cql3::functions::user_function::context>;
|
||||
future<> create(sstring language, context& ctx, const db::config& cfg, sstring name, const std::vector<sstring>& arg_names, std::string script);
|
||||
};
|
||||
|
||||
} // lang namespace
|
||||
|
||||
@@ -1625,8 +1625,8 @@ public:
|
||||
const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; }
|
||||
const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); }
|
||||
|
||||
lang::manager& wasm() noexcept { return _lang_manager; }
|
||||
const lang::manager& wasm() const noexcept { return _lang_manager; }
|
||||
lang::manager& lang() noexcept { return _lang_manager; }
|
||||
const lang::manager& lang() const noexcept { return _lang_manager; }
|
||||
|
||||
service::migration_notifier& get_notifier() { return _mnotifier; }
|
||||
const service::migration_notifier& get_notifier() const { return _mnotifier; }
|
||||
|
||||
Reference in New Issue
Block a user