Enable all counters-related tests that were disabled for tablets because counters was not supported with tablets until now. Some tests were parametrized to run with both vnodes and tablets, and the tablets case was skipped, in order to not lose coverage. We change them to run with the default configuration since now counters is supported with both vnodes and tablets, and the implementation is the same, so there is no benefit in running them with both configurations.
747 lines
35 KiB
Python
747 lines
35 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2021-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
#############################################################################
|
|
# Tests for user defined functions defined with WebAssembly backend
|
|
#############################################################################
|
|
|
|
from cassandra.protocol import InvalidRequest
|
|
from cassandra.cluster import NoHostAvailable
|
|
from .util import new_test_table, unique_name, new_function, new_aggregate
|
|
|
|
import pytest
|
|
import requests
|
|
import re
|
|
import os.path
|
|
|
|
# Can be used for marking functions which require
|
|
# WASM support to be compiled into Scylla
|
|
@pytest.fixture(scope="module")
|
|
def scylla_with_wasm_only(scylla_only, cql, test_keyspace):
|
|
try:
|
|
f42 = unique_name()
|
|
f42_body = f'(module(func ${f42} (param $n i64) (result i64)(return i64.const 42))(export "{f42}" (func ${f42})))'
|
|
res = cql.execute(f"CREATE FUNCTION {test_keyspace}.{f42} (input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{f42_body}'")
|
|
cql.execute(f"DROP FUNCTION {test_keyspace}.{f42}")
|
|
except NoHostAvailable as err:
|
|
if "not enabled" in str(err):
|
|
pytest.skip("WASM support was not enabled in Scylla, skipping")
|
|
yield
|
|
|
|
@pytest.fixture(scope="module")
|
|
def table1(cql, test_keyspace):
|
|
table = test_keyspace + "." + unique_name()
|
|
cql.execute("CREATE TABLE " + table +
|
|
"(p bigint PRIMARY KEY, p2 bigint, i int, i2 int, s smallint, s2 smallint, t tinyint, t2 tinyint, d double, f float, bl boolean, txt text)")
|
|
yield table
|
|
cql.execute("DROP TABLE " + table)
|
|
|
|
# Test that calling a wasm-based fibonacci function works
|
|
def test_fib(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
fib_name = unique_name()
|
|
fib_source = f"""
|
|
(module
|
|
(func ${fib_name} (param $n i64) (result i64)
|
|
(if
|
|
(i64.lt_s (local.get $n) (i64.const 2))
|
|
(then
|
|
(return (local.get $n))
|
|
)
|
|
)
|
|
(i64.add
|
|
(call ${fib_name} (i64.sub (local.get $n) (i64.const 1)))
|
|
(call ${fib_name} (i64.sub (local.get $n) (i64.const 2)))
|
|
)
|
|
)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{fib_name}" (func ${fib_name}))
|
|
)
|
|
"""
|
|
src = f"(input bigint) RETURNS NULL ON NULL INPUT RETURNS bigint LANGUAGE wasm AS '{fib_source}'"
|
|
with new_function(cql, test_keyspace, src, fib_name):
|
|
cql.execute(f"INSERT INTO {table1} (p) VALUES (10)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 10")]
|
|
assert len(res) == 1 and res[0].result == 55
|
|
|
|
cql.execute(f"INSERT INTO {table} (p) VALUES (14)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 14")]
|
|
assert len(res) == 1 and res[0].result == 377
|
|
|
|
# This function returns null on null values
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p2) AS result FROM {table} WHERE p = 14")]
|
|
assert len(res) == 1 and res[0].result is None
|
|
|
|
cql.execute(f"INSERT INTO {table} (p) VALUES (997)")
|
|
# The call request takes too much time and resources, and should therefore fail
|
|
with pytest.raises(InvalidRequest, match="wasm"):
|
|
cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 997")
|
|
|
|
# Reads WASM UDF from a file, which should be located in the "build/wasm" directory.
|
|
# Supports renaming the exported function.
|
|
def read_function_from_file(file_name, orig_name=None, rename=None):
|
|
wat_path = os.path.realpath(os.path.join(__file__, f'../../../build/wasm/{file_name}.wat'))
|
|
orig_name = orig_name or file_name
|
|
rename = rename or orig_name
|
|
try:
|
|
with open(wat_path, "r") as f:
|
|
return f.read().replace("'", "''").replace(f'export "{orig_name}"', f'export "{rename}"')
|
|
except:
|
|
print(f"Can't open {wat_path}.\nPlease build Wasm examples.")
|
|
exit(1)
|
|
|
|
|
|
# Test that calling a fibonacci function that claims to accept null input works.
|
|
# Note that since the int field is nullable, it's no longer
|
|
# passed as a simple param, but instead as a pointer to a structure with a serialized
|
|
# integer underneath - which follows the C ABI for WebAssembly.
|
|
# Also, note that CQL serializes integers as big endian, which means that
|
|
# WebAssembly should convert to host endianness (assumed little endian here)
|
|
# before operating on its native types.
|
|
def test_fib_called_on_null(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
fib_name = unique_name()
|
|
fib_source = read_function_from_file('test_fib_called_on_null', 'fib', fib_name)
|
|
src = f"(input bigint) CALLED ON NULL INPUT RETURNS bigint LANGUAGE wasm AS '{fib_source}'"
|
|
with new_function(cql, test_keyspace, src, fib_name):
|
|
cql.execute(f"INSERT INTO {table1} (p) VALUES (3)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 3")]
|
|
assert len(res) == 1 and res[0].result == 2
|
|
|
|
cql.execute(f"INSERT INTO {table} (p) VALUES (7)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 7")]
|
|
assert len(res) == 1 and res[0].result == 13
|
|
|
|
# Special semantics defined for null input in our function is to return "42"
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{fib_name}(p2) AS result FROM {table} WHERE p = 7")]
|
|
assert len(res) == 1 and res[0].result == 42
|
|
|
|
cql.execute(f"INSERT INTO {table} (p) VALUES (997)")
|
|
# The call request takes too much time and resources, and should therefore fail
|
|
with pytest.raises(InvalidRequest, match="wasm"):
|
|
cql.execute(f"SELECT {test_keyspace}.{fib_name}(p) AS result FROM {table} WHERE p = 997")
|
|
|
|
# Test that an infinite loop gets broken out of eventually
|
|
def test_infinite_loop(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
inf_loop_name = "inf_loop_" + unique_name()
|
|
inf_loop_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i32) (result i32)))
|
|
(func ${inf_loop_name} (type 0) (param i32) (result i32)
|
|
loop (result i32) ;; label = @1
|
|
br 0 (;@1;)
|
|
end)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{inf_loop_name}" (func ${inf_loop_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{inf_loop_source}'"
|
|
with new_function(cql, test_keyspace, src, inf_loop_name):
|
|
cql.execute(f"INSERT INTO {table} (p,i) VALUES (10, 10)")
|
|
import time
|
|
start = time.monotonic()
|
|
with pytest.raises(InvalidRequest, match="fuel consumed"):
|
|
cql.execute(f"SELECT {test_keyspace}.{inf_loop_name}(i) AS result FROM {table} WHERE p = 10")
|
|
elapsed_s = time.monotonic() - start
|
|
print(f"Breaking the loop took {elapsed_s*1000:.2f}ms")
|
|
|
|
# Test a wasm function which decreases given double by 1
|
|
def test_f64_param(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
dec_double_name = "dec_double_" + unique_name()
|
|
dec_double_source = f"""
|
|
(module
|
|
(type (;0;) (func (param f64) (result f64)))
|
|
(func ${dec_double_name} (type 0) (param f64) (result f64)
|
|
local.get 0
|
|
f64.const -0x1p+0 (;=-1;)
|
|
f64.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{dec_double_name}" (func ${dec_double_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input double) RETURNS NULL ON NULL INPUT RETURNS double LANGUAGE wasm AS '{dec_double_source}'"
|
|
with new_function(cql, test_keyspace, src, dec_double_name):
|
|
cql.execute(f"INSERT INTO {table} (p,d) VALUES (17,17.015625)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{dec_double_name}(d) AS result FROM {table} WHERE p = 17")]
|
|
assert len(res) == 1 and res[0].result == 16.015625
|
|
|
|
# Test a wasm function which increases given float by 1
|
|
def test_f32_param(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
inc_float_name = "inc_float_" + unique_name()
|
|
inc_float_source = f"""
|
|
(module
|
|
(type (;0;) (func (param f32) (result f32)))
|
|
(func ${inc_float_name} (type 0) (param f32) (result f32)
|
|
local.get 0
|
|
f32.const 0x1p+0 (;=1;)
|
|
f32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{inc_float_name}" (func ${inc_float_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input float) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE wasm AS '{inc_float_source}'"
|
|
with new_function(cql, test_keyspace, src, inc_float_name):
|
|
cql.execute(f"INSERT INTO {table} (p, f) VALUES (121, 121.00390625)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{inc_float_name}(f) AS result FROM {table} WHERE p = 121")]
|
|
assert len(res) == 1 and res[0].result == 122.00390625
|
|
|
|
# Test a wasm function which operates on booleans
|
|
def test_bool_negate(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
negate_name = "negate_" + unique_name()
|
|
negate_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i32) (result i32)))
|
|
(func ${negate_name} (type 0) (param i32) (result i32)
|
|
local.get 0
|
|
i32.eqz)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{negate_name}" (func ${negate_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input boolean) RETURNS NULL ON NULL INPUT RETURNS boolean LANGUAGE wasm AS '{negate_source}'"
|
|
with new_function(cql, test_keyspace, src, negate_name):
|
|
cql.execute(f"INSERT INTO {table} (p, bl) VALUES (19, true)")
|
|
cql.execute(f"INSERT INTO {table} (p, bl) VALUES (21, false)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{negate_name}(bl) AS result FROM {table} WHERE p = 19")]
|
|
assert len(res) == 1 and res[0].result == False
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{negate_name}(bl) AS result FROM {table} WHERE p = 21")]
|
|
assert len(res) == 1 and res[0].result == True
|
|
|
|
# Test wasm functions which operate on 8bit and 16bit integers,
|
|
# which are simulated by 32bit integers by wasm anyway
|
|
def test_short_ints(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
plus_name = "plus_" + unique_name()
|
|
plus_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i32 i32) (result i32)))
|
|
(func ${plus_name} (type 0) (param i32 i32) (result i32)
|
|
local.get 1
|
|
local.get 0
|
|
i32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{plus_name}" (func ${plus_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input tinyint, input2 tinyint) RETURNS NULL ON NULL INPUT RETURNS tinyint LANGUAGE wasm AS '{plus_source}'"
|
|
with new_function(cql, test_keyspace, src, plus_name):
|
|
cql.execute(f"INSERT INTO {table} (p, t, t2, s, s2) VALUES (42, 42, 24, 33, 55)")
|
|
cql.execute(f"INSERT INTO {table} (p, t, t2, s, s2) VALUES (43, 120, 112, 32000, 24001)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(t, t2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 66
|
|
# Overflow is fine
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(t, t2) AS result FROM {table} WHERE p = 43")]
|
|
assert len(res) == 1 and res[0].result == -24
|
|
# A similar run for 16bit ints - note that the exact same source code is used
|
|
src = f"(input smallint, input2 smallint) RETURNS NULL ON NULL INPUT RETURNS smallint LANGUAGE wasm AS '{plus_source}'"
|
|
with new_function(cql, test_keyspace, src, plus_name):
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 88
|
|
# Overflow is fine
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s2) AS result FROM {table} WHERE p = 43")]
|
|
assert len(res) == 1 and res[0].result == -9535
|
|
# Check whether we can use a different function under the same name
|
|
plus42_source = read_function_from_file('test_short_ints', 'plus42', plus_name)
|
|
plus42_src = f"(input smallint, input2 smallint) RETURNS NULL ON NULL INPUT RETURNS smallint LANGUAGE wasm AS '{plus42_source}'"
|
|
# Repeat a number of times so the wasm instances get cached on all shards
|
|
with new_function(cql, test_keyspace, src, plus_name):
|
|
for _ in range(100):
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 88
|
|
with new_function(cql, test_keyspace, plus42_src, plus_name):
|
|
for _ in range(100):
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 88 + 42
|
|
|
|
# Check whether we can use another function with the same name but different signature
|
|
plusplus_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i32 i32 i32) (result i32)))
|
|
(func ${plus_name} (type 0) (param i32 i32 i32) (result i32)
|
|
local.get 2
|
|
local.get 1
|
|
i32.add
|
|
local.get 0
|
|
i32.add)
|
|
(memory (;0;) 2)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "memory" (memory 0))
|
|
(export "{plus_name}" (func ${plus_name}))
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data (;0;) (i32.const 1024) "\\01"))
|
|
"""
|
|
plusplus_src = f"(input smallint, input2 smallint, input3 smallint) RETURNS NULL ON NULL INPUT RETURNS smallint LANGUAGE wasm AS '{plusplus_source}'"
|
|
# Repeat a number of times so the wasm instances get cached on all shards
|
|
with new_function(cql, test_keyspace, src, plus_name):
|
|
for _ in range(100):
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 88
|
|
with new_function(cql, test_keyspace, plusplus_src, plus_name):
|
|
for _ in range(100):
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{plus_name}(s, s, s2) AS result FROM {table} WHERE p = 42")]
|
|
assert len(res) == 1 and res[0].result == 121
|
|
|
|
# Test that passing a large number of params works fine
|
|
def test_9_params(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
sum9_name = "sum9_" + unique_name()
|
|
sum9_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i32 i32 i32 i32 i32 i32 i32 i32 i32) (result i32)))
|
|
(func ${sum9_name} (type 0) (param i32 i32 i32 i32 i32 i32 i32 i32 i32) (result i32)
|
|
local.get 1
|
|
local.get 0
|
|
i32.add
|
|
local.get 2
|
|
i32.add
|
|
local.get 3
|
|
i32.add
|
|
local.get 4
|
|
i32.add
|
|
local.get 5
|
|
i32.add
|
|
local.get 6
|
|
i32.add
|
|
local.get 7
|
|
i32.add
|
|
local.get 8
|
|
i32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(export "memory" (memory 0))
|
|
(export "{sum9_name}" (func ${sum9_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
|
|
"""
|
|
src = f"(a int, b int, c int, d int, e int, f int, g int, h int, i int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{sum9_source}'"
|
|
with new_function(cql, test_keyspace, src, sum9_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i, i2) VALUES (777, 1,2)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{sum9_name}(i,i2,i2,i,i2,i,i2,i,i2) AS result FROM {table} WHERE p = 777")]
|
|
assert len(res) == 1 and res[0].result == 14
|
|
|
|
# Test a wasm function which takes 2 arguments - a base and a power - and returns base**power
|
|
def test_pow(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
pow_name = "pow_" + unique_name()
|
|
pow_source = read_function_from_file('test_pow', 'power', pow_name)
|
|
src = f"(base int, pow int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{pow_source}'"
|
|
with new_function(cql, test_keyspace, src, pow_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i, i2) VALUES (311, 3, 11)")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{pow_name}(i, i2) AS result FROM {table} WHERE p = 311")]
|
|
assert len(res) == 1 and res[0].result == 177147
|
|
|
|
# Test that only compilable input is accepted
|
|
def test_compilable(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
wrong_source = f"""
|
|
Dear wasmtime compiler, please return a function which returns its float argument increased by 1
|
|
"""
|
|
with pytest.raises(InvalidRequest, match="Compilation failed"):
|
|
cql.execute(f"CREATE FUNCTION {test_keyspace}.i_was_not_exported (input float) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE wasm "
|
|
f"AS '{wrong_source}'")
|
|
|
|
# Test that not exporting a function with matching name
|
|
# results in an error
|
|
def test_not_exported(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
wrong_source = f"""
|
|
(module
|
|
(type (;0;) (func (param f32) (result f32)))
|
|
(func $i_was_not_exported (type 0) (param f32) (result f32)
|
|
local.get 0
|
|
f32.const 0x1p+0 (;=1;)
|
|
f32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(elem (;0;) (i32.const 0) func))
|
|
"""
|
|
with pytest.raises(InvalidRequest, match="not found"):
|
|
cql.execute(f"CREATE FUNCTION {test_keyspace}.i_was_not_exported (input float) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE wasm "
|
|
f"AS '{wrong_source}'")
|
|
|
|
# Test that trying to use something that is exported, but is not a function, won't work
|
|
def test_not_a_function(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
wrong_source = f"""
|
|
(module
|
|
(type (;0;) (func (param f32) (result f32)))
|
|
(func $i_was_not_exported (type 0) (param f32) (result f32)
|
|
local.get 0
|
|
f32.const 0x1p+0 (;=1;)
|
|
f32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(global (;0;) (mut i32) (i32.const 1048576))
|
|
(global (;1;) i32 (i32.const 1024))
|
|
(export "memory" (memory 0))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(export "_scylla_abi" (global 1))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
with pytest.raises(InvalidRequest, match="not a function"):
|
|
cql.execute(f"CREATE FUNCTION {test_keyspace}.memory (input float) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE wasm "
|
|
f"AS '{wrong_source}'")
|
|
|
|
# Test that the function should accept only the correct number and types of params
|
|
def test_validate_params(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
inc_float_name = "inc_float_" + unique_name()
|
|
inc_float_source = f"""
|
|
(module
|
|
(type (;0;) (func (param f32) (result f32)))
|
|
(func ${inc_float_name} (type 0) (param f32) (result f32)
|
|
local.get 0
|
|
f32.const 0x1p+0 (;=1;)
|
|
f32.add)
|
|
(table (;0;) 1 1 funcref)
|
|
(table (;1;) 32 externref)
|
|
(memory (;0;) 17)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "memory" (memory 0))
|
|
(export "{inc_float_name}" (func ${inc_float_name}))
|
|
(elem (;0;) (i32.const 0) func)
|
|
(export "_scylla_abi" (global 0))
|
|
(data $.rodata (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input int) RETURNS NULL ON NULL INPUT RETURNS float LANGUAGE wasm AS '{inc_float_source}'"
|
|
with new_function(cql, test_keyspace, src, inc_float_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i, f, txt) VALUES (700, 7, 7., 'oi')")
|
|
with pytest.raises(InvalidRequest, match="type mismatch"):
|
|
cql.execute(f"SELECT {test_keyspace}.{inc_float_name}(i) AS result FROM {table} WHERE p = 700")
|
|
src = f"(input text) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{inc_float_source}'"
|
|
with new_function(cql, test_keyspace, src, inc_float_name):
|
|
with pytest.raises(InvalidRequest, match="failed"):
|
|
cql.execute(f"SELECT {test_keyspace}.{inc_float_name}(txt) AS result FROM {table} WHERE p = 700")
|
|
src = f"(input float) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{inc_float_source}'"
|
|
with new_function(cql, test_keyspace, src, inc_float_name):
|
|
with pytest.raises(InvalidRequest, match="Expected i32, got f32"):
|
|
cql.execute(f"SELECT {test_keyspace}.{inc_float_name}(f) AS result FROM {table} WHERE p = 700")
|
|
with pytest.raises(InvalidRequest, match="number.*arguments"):
|
|
cql.execute(f"SELECT {test_keyspace}.{inc_float_name}(i, f) AS result FROM {table} WHERE p = 700")
|
|
|
|
# Test that calling a wasm-based function on a string works.
|
|
# The function doubles the string: dog -> dogdog.
|
|
def test_word_double(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
dbl_name = unique_name()
|
|
dbl_source = read_function_from_file('test_word_double', 'dbl', dbl_name)
|
|
src = f"(input text) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE wasm AS '{dbl_source}'"
|
|
with new_function(cql, test_keyspace, src, dbl_name):
|
|
cql.execute(f"INSERT INTO {table1} (p, txt) VALUES (1000, 'doggo')")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{dbl_name}(txt) AS result FROM {table} WHERE p = 1000")]
|
|
assert len(res) == 1 and res[0].result == 'doggodoggo'
|
|
|
|
cql.execute(f"INSERT INTO {table} (p, txt) VALUES (1001, 'cat42')")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{dbl_name}(txt) AS result FROM {table} WHERE p = 1001")]
|
|
assert len(res) == 1 and res[0].result == 'cat42cat42'
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{dbl_name}(txt) AS result FROM {table} WHERE p IN (1000, 1001)")]
|
|
assert len(res) == 2 and (res[0].result == 'cat42cat42' and res[1].result == 'doggodoggo' or res[0].result == 'doggodoggo' and res[1].result == 'cat42cat42')
|
|
|
|
# Test that calling a wasm-based function works with ABI version 2.
|
|
# The function returns the input. It's compatible with all data types represented by size + pointer.
|
|
def test_abi_v2(cql, test_keyspace, table1, scylla_with_wasm_only):
|
|
table = table1
|
|
ri_name = unique_name()
|
|
ri_source = read_function_from_file('return_input', 'return_input', ri_name)
|
|
text_src = f"(input text) RETURNS NULL ON NULL INPUT RETURNS text LANGUAGE wasm AS '{ri_source}'"
|
|
with new_function(cql, test_keyspace, text_src, ri_name):
|
|
cql.execute(f"INSERT INTO {table1} (p, txt) VALUES (2000, 'doggo')")
|
|
res = [row for row in cql.execute(f"SELECT {test_keyspace}.{ri_name}(txt) AS result FROM {table} WHERE p = 2000")]
|
|
assert len(res) == 1 and res[0].result == 'doggo'
|
|
|
|
@pytest.fixture(scope="module")
|
|
def metrics(request, scylla_with_wasm_only, cql):
|
|
# The Prometheus API is on port 9180, and always http
|
|
url = f'http://{cql.cluster.contact_points[0]}:9180/metrics'
|
|
resp = requests.get(url)
|
|
if resp.status_code != 200:
|
|
pytest.skip('Metrics port 9180 is not available')
|
|
yield url
|
|
|
|
def get_metrics(metrics):
|
|
response = requests.get(metrics)
|
|
assert response.status_code == 200
|
|
return response.text
|
|
|
|
def get_metric(metrics, name, requested_labels=None, the_metrics=None):
|
|
if not the_metrics:
|
|
the_metrics = get_metrics(metrics)
|
|
total = 0.0
|
|
lines = re.compile('^'+name+'{.*$', re.MULTILINE)
|
|
for match in re.findall(lines, the_metrics):
|
|
a = match.split()
|
|
metric = a[0]
|
|
val = float(a[1])
|
|
# Check if match also matches the requested labels
|
|
if requested_labels:
|
|
# we know metric begins with name{ and ends with } - the labels
|
|
# are what we have between those
|
|
got_labels = metric[len(name)+1:-1].split(',')
|
|
# Check that every one of the requested labels is in got_labels:
|
|
for k, v in requested_labels.items():
|
|
if not f'{k}="{v}"' in got_labels:
|
|
# No match for requested label, skip this metric (python
|
|
# doesn't have "continue 2" so let's just set val to 0...
|
|
val = 0
|
|
break
|
|
total += float(val)
|
|
return total
|
|
|
|
# Test that calling a wasm-based aggregate works.
|
|
# The aggregate calculates the average of integers.
|
|
def test_UDA(cql, test_keyspace, table1, scylla_with_wasm_only, metrics):
|
|
table = table1
|
|
sum_name = unique_name()
|
|
sum_source = read_function_from_file('test_UDA_scalar', 'sum', sum_name)
|
|
sum_src = f"(acc tuple<int, int>, input int) CALLED ON NULL INPUT RETURNS tuple<int,int> LANGUAGE wasm AS '{sum_source}'"
|
|
|
|
div_name = unique_name()
|
|
div_source = read_function_from_file('test_UDA_final', 'div', div_name)
|
|
div_src = f"(acc tuple<int, int>) CALLED ON NULL INPUT RETURNS float LANGUAGE wasm AS '{div_source}'"
|
|
for i in range(20):
|
|
cql.execute(f"INSERT INTO {table} (p, i, i2) VALUES ({i}, {i}, {i})")
|
|
with new_function(cql, test_keyspace, sum_src, sum_name), new_function(cql, test_keyspace, div_src, div_name):
|
|
agg_body = f"(int) SFUNC {sum_name} STYPE tuple<int,int> FINALFUNC {div_name} INITCOND (0,0)"
|
|
hits_before = get_metric(metrics, 'scylla_user_functions_cache_hits')
|
|
with new_aggregate(cql, test_keyspace, agg_body) as custom_avg:
|
|
custom_res = [row for row in cql.execute(f"SELECT {test_keyspace}.{custom_avg}(i) AS result FROM {table}")]
|
|
avg_res = [row for row in cql.execute(f"SELECT avg(cast(i as float)) AS result FROM {table}")]
|
|
assert custom_res == avg_res
|
|
hits_after = get_metric(metrics, 'scylla_user_functions_cache_hits')
|
|
assert hits_after - hits_before >= 1
|
|
misses_before_reuse = get_metric(metrics, 'scylla_user_functions_cache_misses')
|
|
for i in range(100):
|
|
custom_res = [row for row in cql.execute(f"SELECT {test_keyspace}.{custom_avg}(i2) AS result FROM {table}")]
|
|
avg_res = [row for row in cql.execute(f"SELECT avg(cast(i2 as float)) AS result FROM {table}")]
|
|
assert custom_res == avg_res
|
|
|
|
res = [row for row in cql.execute(f"SELECT i2 AS result FROM {table}")]
|
|
|
|
misses_after_reuse = get_metric(metrics, 'scylla_user_functions_cache_misses')
|
|
# Sum of hits and misses should equal the total number of UDF calls, which is one row function call for
|
|
# each of the table elements and one additional call for the final function, both multiplied by the number of repetitions.
|
|
assert misses_after_reuse - misses_before_reuse + get_metric(metrics, 'scylla_user_functions_cache_hits') - hits_after == 100 * (1 + len(res))
|
|
# Each shard has its own cache, so check if at least one shard reuses the cache without excessive missing.
|
|
# Misses caused by replacing an instance that can no longer be used can be justified, we estimate that this happens
|
|
# once every 7 calls for row function calls (memory has 2 initial pages, 2 pages are added for 2 arguments, max instance
|
|
# size = 16 pages) and once every 14 calls for final function calls (1 page is added for each call in this case).
|
|
# Additionally, 2 misses are expected for each of the 2 shards, for the first calls of row and final functions.
|
|
assert misses_after_reuse - misses_before_reuse <= 4 + 100 * len(res) / 7 + 100 / 14
|
|
|
|
# Test that wasm instances are removed from the cache when:
|
|
# - a single instance is too big
|
|
# - the instances in cache consume too much memory in total
|
|
# - the instance hasn't been used for a long time
|
|
# FIXME: shorten the wait time when such configuration becomes possible
|
|
|
|
# The function grows the memory by n pages and returns n.
|
|
@pytest.mark.skip(reason="slow test, remove skip to try it anyway")
|
|
def test_mem_grow(cql, test_keyspace, table1, scylla_with_wasm_only, metrics):
|
|
table = table1
|
|
mem_grow_name = "mem_grow_" + unique_name()
|
|
mem_grow_source = read_function_from_file('test_mem_grow', 'grow_mem', mem_grow_name)
|
|
src = f"(pages int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{mem_grow_source}'"
|
|
with new_function(cql, test_keyspace, src, mem_grow_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i) VALUES (8, 8)")
|
|
for i in range(512):
|
|
cql.execute(f"SELECT {test_keyspace}.{mem_grow_name}(i) AS result FROM {table} WHERE p = 8")
|
|
# We grow the memory by 8 pages, each page is 64KiB, so in total we'll grow 512*8*64*1024=256MiB
|
|
# The default memory limit is 128MiB, so assert we're staying under that anyway
|
|
assert(get_metric(metrics, 'scylla_user_functions_cache_total_size') <= 128*1024*1024)
|
|
|
|
# Wait for all instances to time out
|
|
import time
|
|
time.sleep(10)
|
|
assert(get_metric(metrics, 'scylla_user_functions_cache_total_size') == 0)
|
|
|
|
cql.execute(f"INSERT INTO {table} (p, i) VALUES (30, 30)")
|
|
cql.execute(f"SELECT {test_keyspace}.{mem_grow_name}(i) AS result FROM {table} WHERE p = 30")
|
|
# A memory of 30+ pages is too big for the cache, so assert that it's not cached
|
|
assert(get_metric(metrics, 'scylla_user_functions_cache_total_size') == 0)
|
|
|
|
cql.execute(f"INSERT INTO {table} (p, i) VALUES (100, 100)")
|
|
# A memory of 100+ pages is too big for an instance in the cache, it is rejected
|
|
map_res = [row for row in cql.execute(f"SELECT {test_keyspace}.{mem_grow_name}(i) AS result FROM {table} WHERE p = 100")]
|
|
assert len(map_res) == 1 and map_res[0].result == -1
|
|
|
|
# Test that all wasm instance entries are removed from the cache when the correlating UDF is dropped,
|
|
# to avoid using excessive memory for unused UDFs.
|
|
# The first UDF used returns the input integer.
|
|
def test_drop(cql, test_keyspace, table1, scylla_with_wasm_only, metrics):
|
|
table = table1
|
|
ret_name = "ret_" + unique_name()
|
|
ret_source = f"""
|
|
(module
|
|
(type (;0;) (func (param i64) (result i64)))
|
|
(func $ret_name (type 0) (param i64) (result i64)
|
|
local.get 0)
|
|
(memory (;0;) 2)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "memory" (memory 0))
|
|
(export "ret_name" (func $ret_name))
|
|
(export "_scylla_abi" (global 0))
|
|
(data (;0;) (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input bigint) RETURNS NULL ON NULL INPUT RETURNS bigint LANGUAGE wasm AS '{ret_source}'"
|
|
cql.execute(f"INSERT INTO {table} (p) VALUES (42)")
|
|
for _ in range(10):
|
|
ret_name = "ret_" + unique_name()
|
|
with new_function(cql, test_keyspace, src.replace('ret_name', ret_name), ret_name):
|
|
cql.execute(f"SELECT {test_keyspace}.{ret_name}(p) AS result FROM {table} WHERE p = 42")
|
|
assert(get_metric(metrics, 'scylla_user_functions_cache_instace_count_any') > 0)
|
|
assert(get_metric(metrics, 'scylla_user_functions_cache_instace_count_any') == 0)
|
|
|
|
# Test that we can use counters as the return type of a WASM UDF.
|
|
def test_counter(cql, test_keyspace, scylla_only):
|
|
schema = "p int, c counter, PRIMARY KEY (p)"
|
|
ri_counter_name = unique_name()
|
|
ri_counter_source = read_function_from_file('return_input', 'return_input', ri_counter_name)
|
|
src = f"(input counter) RETURNS NULL ON NULL INPUT RETURNS counter LANGUAGE wasm AS '{ri_counter_source}'"
|
|
with new_test_table(cql, test_keyspace, schema) as table:
|
|
cql.execute(f"UPDATE {table} SET c = c + 2 WHERE p = 42;")
|
|
with new_function(cql, test_keyspace, src, ri_counter_name):
|
|
assert cql.execute(f"SELECT {ri_counter_name}(c) AS result FROM {table} WHERE p = 42").one().result == 2
|
|
cql.execute(f"UPDATE {table} SET c = c + 1 WHERE p = 42;")
|
|
cql.execute(f"UPDATE {table} SET c = c - 4 WHERE p = 42;")
|
|
assert cql.execute(f"SELECT {ri_counter_name}(c) AS result FROM {table} WHERE p = 42").one().result == -1
|
|
|
|
# See docs/cql/wasm.rst for the source and build instructions of the compiled UDF.
|
|
def test_docs_assemblyscript(cql, test_keyspace, table1, scylla_only):
|
|
table = table1
|
|
fib_name = unique_name()
|
|
fib_source = f"""(module
|
|
(type $i32_=>_i32 (func (param i32) (result i32)))
|
|
(global $fib/_scylla_abi i32 (i32.const 1088))
|
|
(memory $0 1)
|
|
(data (i32.const 1036) "\\1c")
|
|
(data (i32.const 1048) "\\01\\00\\00\\00\\04\\00\\00\\00\\01")
|
|
(data (i32.const 1068) ",")
|
|
(data (i32.const 1080) "\\04\\00\\00\\00\\10\\00\\00\\00 \\04\\00\\00 \\04\\00\\00\\04\\00\\00\\00\\01")
|
|
(export "_scylla_abi" (global $fib/_scylla_abi))
|
|
(export "{fib_name}" (func $fib/fib))
|
|
(export "memory" (memory $0))
|
|
(func $fib/fib (param $0 i32) (result i32)
|
|
local.get $0
|
|
i32.const 2
|
|
i32.lt_s
|
|
if
|
|
local.get $0
|
|
return
|
|
end
|
|
local.get $0
|
|
i32.const 1
|
|
i32.sub
|
|
call $fib/fib
|
|
local.get $0
|
|
i32.const 2
|
|
i32.sub
|
|
call $fib/fib
|
|
i32.add
|
|
)
|
|
)
|
|
"""
|
|
src = f"(input int) RETURNS NULL ON NULL INPUT RETURNS int LANGUAGE wasm AS '{fib_source}'"
|
|
with new_function(cql, test_keyspace, src, fib_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i) VALUES (42, 10)")
|
|
assert cql.execute(f"SELECT {test_keyspace}.{fib_name}(i) AS result FROM {table} WHERE p = 42").one().result == 55
|
|
|
|
# See docs/cql/wasm.rst for the source and build instructions of the compiled UDF.
|
|
def test_docs_c(cql, test_keyspace, table1, scylla_only):
|
|
table = table1
|
|
fib_name = unique_name()
|
|
fib_source = f"""(module
|
|
(type (;0;) (func (param i32) (result i64)))
|
|
(type (;1;) (func))
|
|
(func (;0;) (type 0) (param i32) (result i64)
|
|
(local i64 i32)
|
|
local.get 0
|
|
i32.const 2
|
|
i32.ge_s
|
|
if ;; label = @1
|
|
loop ;; label = @2
|
|
local.get 0
|
|
i32.const 1
|
|
i32.sub
|
|
call 0
|
|
local.get 1
|
|
i64.add
|
|
local.set 1
|
|
local.get 0
|
|
i32.const 4
|
|
i32.lt_u
|
|
local.set 2
|
|
local.get 0
|
|
i32.const 2
|
|
i32.sub
|
|
local.set 0
|
|
local.get 2
|
|
i32.eqz
|
|
br_if 0 (;@2;)
|
|
end
|
|
end
|
|
local.get 1
|
|
local.get 0
|
|
i64.extend_i32_s
|
|
i64.add)
|
|
(func (;1;) (type 1)
|
|
nop)
|
|
(func (;2;) (type 0) (param i32) (result i64)
|
|
local.get 0
|
|
call 0)
|
|
(memory (;0;) 2)
|
|
(global (;0;) i32 (i32.const 1024))
|
|
(export "memory" (memory 0))
|
|
(export "_scylla_abi" (global 0))
|
|
(export "_start" (func 1))
|
|
(export "{fib_name}" (func 2))
|
|
(data (;0;) (i32.const 1024) "\\01"))
|
|
"""
|
|
src = f"(input int) RETURNS NULL ON NULL INPUT RETURNS bigint LANGUAGE wasm AS '{fib_source}'"
|
|
with new_function(cql, test_keyspace, src, fib_name):
|
|
cql.execute(f"INSERT INTO {table} (p, i) VALUES (42, 9)")
|
|
assert cql.execute(f"SELECT {test_keyspace}.{fib_name}(i) AS result FROM {table} WHERE p = 42").one().result == 34
|