wasm: limit memory allocated using mmap

The wasmtime runtime allocates memory for the executable code of
the WASM programs using mmap and not the seastar allocator. As
a result, the memory that Scylla actually uses becomes not only
the memory preallocated for the seastar allocator but the sum of
that and the memory allocated for executable codes by the WASM
runtime.
To keep limiting the memory used by Scylla, we measure how much
memory do the WASM programs use and if they use too much, compiled
WASM UDFs (modules) that are currently not in use are evicted to
make room.
To evict a module it is required to evict all instances of this
module (the underlying implementation of modules and instances uses
shared pointers to the executable code). For this reason, we add
reference counts to modules. Each instance using a module is a
reference. When an instance is destroyed, a reference is removed.
If all references to a module are removed, the executable code
for this module is deallocated.
The eviction of a module is actually acheved by eviction of all
its references. When we want to free memory for a new module we
repeatedly evict instances from the wasm_instance_cache using its
LRU strategy until some module loses all its instances. This
process may not succeed if the instances currently in use (so not
in the cache) use too much memory - in this case the query also
fails. Otherwise the new module is added to the tracking system.
This strategy may evict some instances unnecessarily, but evicting
modules should not happen frequently, and any more efficient
solution requires an even bigger intervention into the code.
This commit is contained in:
Wojciech Mitros
2022-12-09 14:59:30 +01:00
parent b8d28a95bf
commit f05d612da8
9 changed files with 233 additions and 16 deletions

View File

@@ -50,7 +50,7 @@ shared_ptr<functions::function> create_function_statement::create(query_processo
// FIXME: need better way to test wasm compilation without real_database()
wasm::context ctx{db.real_database().wasm_engine(), _name.name, qp.get_wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
try {
wasm::compile(ctx, arg_names, _body);
wasm::precompile(ctx, arg_names, _body);
return ::make_shared<functions::user_function>(_name, _arg_types, std::move(arg_names), _body, _language,
std::move(return_type), _called_on_null_input, std::move(ctx));
} catch (const wasm::exception& we) {

View File

@@ -1734,7 +1734,7 @@ static shared_ptr<cql3::functions::user_function> create_func(replica::database&
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));
} else if (language == "xwasm") {
wasm::context ctx{db.wasm_engine(), name.name, qctx->qp().get_wasm_instance_cache(), db.get_config().wasm_udf_yield_fuel(), db.get_config().wasm_udf_total_fuel()};
wasm::compile(ctx, arg_names, body);
wasm::precompile(ctx, arg_names, body);
return ::make_shared<cql3::functions::user_function>(std::move(name), std::move(arg_types), std::move(arg_names),
std::move(body), language, std::move(return_type),
row.get_nonnull<bool>("called_on_null_input"), std::move(ctx));

View File

@@ -210,14 +210,16 @@ struct from_val_visitor {
}
};
void compile(context& ctx, const std::vector<sstring>& arg_names, std::string script) {
void precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script) {
try {
ctx.module = wasmtime::create_module(ctx.engine_ptr, rust::Str(script.data(), script.size()));
// After precompiling the module, we try creating a store, an instance and a function with it to make sure it's valid.
// If we succeed, we drop them and keep the module, knowing that we will be able to create them again for UDF execution.
ctx.module.value()->compile(ctx.engine_ptr);
auto store = wasmtime::create_store(ctx.engine_ptr, ctx.total_fuel, ctx.yield_fuel);
auto inst = create_instance(ctx.engine_ptr, **ctx.module, *store);
create_func(*inst, *store, ctx.function_name);
ctx.module.value()->release();
} catch (const rust::Error& e) {
throw wasm::exception(e.what());
}
@@ -293,7 +295,10 @@ seastar::future<bytes_opt> run_script(const db::functions::function_name& name,
} catch (...) {
ex = std::current_exception();
}
ctx.cache->recycle(func_inst);
if (func_inst) {
// The construction of func_inst may have failed due to a insufficient free memory for compiled modules.
ctx.cache->recycle(func_inst);
}
if (ex) {
std::rethrow_exception(std::move(ex));
}

View File

@@ -41,7 +41,7 @@ struct context {
context(wasmtime::Engine& engine_ptr, std::string name, instance_cache* cache, uint64_t yield_fuel, uint64_t total_fuel);
};
void compile(context& ctx, const std::vector<sstring>& arg_names, std::string script);
void precompile(context& ctx, const std::vector<sstring>& arg_names, std::string script);
seastar::future<bytes_opt> run_script(const db::functions::function_name& name, context& ctx, const std::vector<data_type>& arg_types, const std::vector<bytes_opt>& params, data_type return_type, bool allow_null_input);

View File

@@ -7,14 +7,47 @@
*/
#include "lang/wasm_instance_cache.hh"
#include "lang/wasm.hh"
#include "seastar/core/metrics.hh"
#include "seastar/core/scheduling.hh"
#include <exception>
#include <seastar/core/units.hh>
#include <seastar/core/shared_mutex.hh>
#include <seastar/util/defer.hh>
#include <unistd.h>
namespace wasm {
static size_t compiled_size(const wasmtime::Module& module) noexcept {
// Round up the exact size to the nearest page size.
auto page_size = getpagesize();
return (module.raw_size() + (page_size - 1)) & (~(page_size - 1));
}
static size_t wasm_stack_size() noexcept {
// Wasm stack contains 2 stacks - one for wasm functions and one for
// host functions, both of which are 128KB - and a guard page.
return 256 * KB + getpagesize();
}
module_handle::module_handle(wasmtime::Module& module, instance_cache& cache, wasmtime::Engine& engine)
: _module(module)
, _cache(cache)
{
_cache.track_module_ref(_module, engine);
}
module_handle::module_handle(const module_handle& mh) noexcept
: _module(mh._module)
, _cache(mh._cache)
{
_module.add_user();
}
module_handle::~module_handle() noexcept {
_cache.remove_module_ref(_module);
}
static constexpr size_t WASM_PAGE_SIZE = 64 * KB;
instance_cache::stats& instance_cache::shard_stats() {
@@ -48,11 +81,18 @@ instance_cache::instance_cache(size_t size, size_t instance_size, seastar::lowre
}
wasm_instance instance_cache::load(wasm::context& ctx) {
auto mh = module_handle(**ctx.module, *this, ctx.engine_ptr);
auto store = wasmtime::create_store(ctx.engine_ptr, ctx.total_fuel, ctx.yield_fuel);
auto instance = wasmtime::create_instance(ctx.engine_ptr, **ctx.module, *store);
auto func = wasmtime::create_func(*instance, *store, ctx.function_name);
auto memory = wasmtime::get_memory(*instance, *store);
return wasm_instance{.store=std::move(store), .instance=std::move(instance), .func=std::move(func), .memory=std::move(memory)};
return wasm_instance{
.store = std::move(store),
.instance = std::move(instance),
.func = std::move(func),
.memory = std::move(memory),
.mh = std::move(mh)
};
}
// lru must not be empty, and its elements must refer to entries in _cache
@@ -92,6 +132,7 @@ seastar::future<instance_cache::value_type> instance_cache::get(const db::functi
.mutex = seastar::shared_mutex(),
.instance = std::nullopt,
.it = _lru.end(),
.module = *ctx.module.value(),
}));
}
auto& entry = it->second;
@@ -100,9 +141,21 @@ seastar::future<instance_cache::value_type> instance_cache::get(const db::functi
++shard_stats().cache_blocks;
}
return f.then([this, entry, &ctx] {
// When the instance leaves the cache, it should be ready to be used. For
// that, we need to make sure that there is enough free memory for the
// wasm runtime stack, that is allocated at the start of the UDF execution,
// and which is not allocated using seastar allocator, but using mmap.
reserve_wasm_stack();
if (!entry->instance) {
++shard_stats().cache_misses;
entry->instance = load(ctx);
try {
entry->instance.emplace(load(ctx));
} catch (...) {
// We couldn't actually use the compiled module, so we need to remove
// the reference to it.
std::exception_ptr ex = std::current_exception();
return make_exception_future<instance_cache::value_type>(std::move(ex));
}
} else {
// because we don't want to remove an instance after it starts being used,
// and also because we can't track its size efficiently, we remove it from
@@ -112,11 +165,13 @@ seastar::future<instance_cache::value_type> instance_cache::get(const db::functi
_lru.erase(entry->it);
entry->it = _lru.end();
}
return entry;
return make_ready_future<instance_cache::value_type>(entry);
});
}
void instance_cache::recycle(instance_cache::value_type val) noexcept {
// While the instance is in cache, it is not used and no stack is allocated for it.
free_wasm_stack();
val->mutex.unlock();
size_t size;
try {
@@ -165,6 +220,52 @@ void instance_cache::remove(const db::functions::function_name& name, const std:
}
}
void instance_cache::track_module_ref(wasmtime::Module& module, wasmtime::Engine& engine) {
if (!module.is_compiled()) {
size_t module_size = compiled_size(module);
while (_compiled_size + module_size > _max_compiled_size && !_lru.empty()) {
evict_modules();
}
if (_compiled_size + module_size > _max_compiled_size) {
throw wasm::exception("No memory left for the compiled WASM function");
}
module.compile(engine);
_compiled_size += module_size;
}
module.add_user();
}
void instance_cache::remove_module_ref(wasmtime::Module& module) noexcept {
module.remove_user();
if (module.user_count() == 0) {
module.release();
_compiled_size -= compiled_size(module);
}
}
void instance_cache::reserve_wasm_stack() {
size_t stack_size = wasm_stack_size();
while (!_lru.empty() && _compiled_size + stack_size > _max_compiled_size) {
evict_modules();
}
if (_compiled_size + stack_size > _max_compiled_size) {
throw wasm::exception("No memory left to execute the WASM function");
} else {
_compiled_size += stack_size;
}
}
void instance_cache::free_wasm_stack() noexcept {
_compiled_size -= wasm_stack_size();
}
void instance_cache::evict_modules() noexcept {
size_t prev_size = _compiled_size;
while (!_lru.empty() && _compiled_size == prev_size) {
evict_lru();
}
}
size_t instance_cache::size() const {
return _cache.size();
}

View File

@@ -22,11 +22,21 @@
namespace wasm {
class module_handle {
wasmtime::Module& _module;
instance_cache& _cache;
public:
module_handle(wasmtime::Module& module, instance_cache& cache, wasmtime::Engine& engine);
module_handle(const module_handle&) noexcept;
~module_handle() noexcept;
};
struct wasm_instance {
rust::Box<wasmtime::Store> store;
rust::Box<wasmtime::Instance> instance;
rust::Box<wasmtime::Func> func;
rust::Box<wasmtime::Memory> memory;
module_handle mh;
};
// For each UDF full name and a scheduling group, we store a wasmtime instance
@@ -74,6 +84,7 @@ private:
std::optional<wasm_instance> instance;
// iterator points to _lru.end() when the entry is being used (at that point, it is not in lru)
std::list<lru_entry_type>::iterator it;
wasmtime::Module& module;
};
public:
@@ -95,6 +106,12 @@ private:
size_t _total_size = 0;
size_t _max_size;
size_t _max_instance_size;
size_t _compiled_size = 0;
// The reserved size for compiled code (which is not allocated by the seastar allocator)
// is 50MB. We always leave some of this space free for the compilation of new instances
// - we only find out the real compiled size after the compilation finishes. (During
// the verification of the compiled code, we also allocate a new stack using this memory)
size_t _max_compiled_size = 40 * 1024 * 1024;
public:
explicit instance_cache(size_t size, size_t instance_size, seastar::lowres_clock::duration timer_period);
@@ -113,6 +130,42 @@ public:
void remove(const db::functions::function_name& name, const std::vector<data_type>& arg_types) noexcept;
private:
friend class module_handle;
// Wasmtime instances hold references to modules, so the module can only be dropped
// when all instances are dropped. For a given module, we can have at most one
// instance for each scheduling group.
// This function is called each time a new instance is created for a given module.
// If there were no instances for the module before, i.e. this module was not
// compiled, the module is compiled and the size of the compiled code is added
// to the total size of compiled code. If the total size of compiled code exceeds
// the maximum size as a result of this, the function will evict modules until
// there is enough space for the new module. If it is not possible, the function
// will throw an exception. If this function succeeds, the counter of instances
// for the module is increased by one.
void track_module_ref(wasmtime::Module& module, wasmtime::Engine& engine);
// This function is called each time an instance for a given module is dropped.
// If the counter of instances for the module reaches zero, the module is dropped
// and the size of the compiled code is subtracted from the total size of compiled code.
void remove_module_ref(wasmtime::Module& module) noexcept;
// When a WASM UDF is executed, a separate stack is first allocated for it.
// This stack is used by the WASM code and it is not tracked by the seastar allocator.
// This function will evict cached modules until the stack can be allocated. If enough
// memory can't be freed, the function will throw an exception.
void reserve_wasm_stack();
// This function should be called after a WASM UDF finishes execution. Its stack is then
// destroyed and this function accounts for the freed memory.
void free_wasm_stack() noexcept;
// Evicts instances using lru until a module is no longer referenced by any of them.
void evict_modules() noexcept;
public:
size_t size() const;
size_t max_size() const;

View File

@@ -457,6 +457,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
app_cfg.default_task_quota = 500us;
app_cfg.auto_handle_sigint_sigterm = false;
app_cfg.max_networking_aio_io_control_blocks = 50000;
// We need to have the entire app config to run the app, but we need to
// run the app to read the config file with UDF specific options so that
// we know whether we need to reserve additional memory for UDFs.
app_cfg.reserve_additional_memory = 50 * 1024 * 1024;
app_template app(std::move(app_cfg));
auto ext = std::make_shared<db::extensions>();

View File

@@ -37,6 +37,13 @@ mod ffi {
type Module;
fn create_module(engine: &mut Engine, script: &str) -> Result<Box<Module>>;
fn raw_size(self: &Module) -> usize;
fn is_compiled(self: &Module) -> bool;
fn compile(self: &mut Module, engine: &mut Engine) -> Result<()>;
fn release(self: &mut Module);
fn add_user(self: &mut Module);
fn remove_user(self: &mut Module);
fn user_count(self: &Module) -> usize;
type Store;
fn create_store(
@@ -97,8 +104,13 @@ pub struct Instance {
fn create_instance(engine: &Engine, module: &Module, store: &mut Store) -> Result<Box<Instance>> {
let mut linker = wasmtime::Linker::new(&engine.wasmtime_engine);
wasmtime_wasi::add_to_linker(&mut linker, |s| s).context("Failed to add wasi to linker")?;
let wasmtime_module = module
.wasmtime_module
.as_ref()
.ok_or_else(|| anyhow!("Module is not compiled"))?;
let mut inst_fut =
Box::pin(linker.instantiate_async(&mut store.wasmtime_store, &module.wasmtime_module));
Box::pin(linker.instantiate_async(&mut store.wasmtime_store, wasmtime_module));
let mut ctx = core::task::Context::from_waker(futures::task::noop_waker_ref());
loop {
@@ -119,15 +131,55 @@ fn create_instance(engine: &Engine, module: &Module, store: &mut Store) -> Resul
}
pub struct Module {
wasmtime_module: wasmtime::Module,
serialized_module: Vec<u8>,
wasmtime_module: Option<wasmtime::Module>,
references: usize,
}
fn create_module(engine: &mut Engine, script: &str) -> Result<Box<Module>> {
let module = wasmtime::Module::new(&engine.wasmtime_engine, script)
let module_bytes = engine
.wasmtime_engine
.precompile_module((&script).as_bytes())
.map_err(|e| anyhow!("Compilation failed: {:?}", e))?;
Ok(Box::new(Module {
wasmtime_module: module,
}))
let module = Box::new(Module {
serialized_module: module_bytes,
wasmtime_module: None,
references: 0,
});
Ok(module)
}
impl Module {
fn raw_size(&self) -> usize {
self.serialized_module.len()
}
fn is_compiled(&self) -> bool {
self.wasmtime_module.is_some()
}
fn compile(&mut self, engine: &mut Engine) -> Result<()> {
if self.is_compiled() {
return Ok(());
}
// `deserialize` is safe because we put the result of `precompile_module` as input.
let module = unsafe {
wasmtime::Module::deserialize(&engine.wasmtime_engine, &self.serialized_module)
.map_err(|e| anyhow!("Deserialization failed: {:?}", e))?
};
self.wasmtime_module = Some(module);
Ok(())
}
fn release(&mut self) {
self.wasmtime_module = None;
}
fn add_user(&mut self) {
self.references += 1;
}
fn remove_user(&mut self) {
self.references -= 1;
}
fn user_count(&self) -> usize {
self.references
}
}
pub struct Store {
@@ -214,6 +266,8 @@ fn create_engine(max_size: u32) -> Result<Box<Engine>> {
config.static_memory_maximum_size(0);
config.dynamic_memory_reserved_for_growth(0);
config.dynamic_memory_guard_size(0);
config.max_wasm_stack(128 * 1024);
config.async_stack_size(256 * 1024);
let engine =
wasmtime::Engine::new(&config).map_err(|e| anyhow!("Failed to create engine: {:?}", e))?;

View File

@@ -20,7 +20,7 @@ SEASTAR_TEST_CASE(test_long_udf_yields) {
auto wasm_cache = std::make_unique<wasm::instance_cache>(100 * 1024 * 1024, 1024 * 1024, std::chrono::seconds(1));
auto wasm_ctx = wasm::context(*wasm_engine, "fib", wasm_cache.get(), 100000, 100000000000);
// Recursive fibonacci function
wasm::compile(wasm_ctx, {}, R"(
wasm::precompile(wasm_ctx, {}, R"(
(module
(type (;0;) (func (param i64) (result i64)))
(func (;0;) (type 0) (param i64) (result i64)
@@ -62,7 +62,7 @@ SEASTAR_TEST_CASE(test_long_udf_yields) {
(export "_scylla_abi" (global 0))
(data (;0;) (i32.const 1024) "01"))
)");
wasm_ctx.module.value()->compile(*wasm_engine);
auto argv = wasmtime::get_val_vec();
argv->push_i64(42);
auto rets = wasmtime::get_val_vec();