Files
scylladb/tools/lua_sstable_consumer.cc
Avi Kivity 66bef0ed36 lua, tools: adjust for lua 5.5 lua_newstate seed parameter
Lua 5.5 adds a seed parameter to lua_newstate(), provide it
with a strong random seed.

Closes scylladb/scylladb#28734
2026-02-20 06:52:37 +02:00

1489 lines
52 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include <fmt/chrono.h>
#include <lua.hpp>
#include <random>
#include <seastar/core/fstream.hh>
#include <seastar/core/seastar.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/util/short_streams.hh>
#include "lang/lua_scylla_types.hh"
#include "reader_permit.hh"
#include "schema/schema.hh"
#include "sstables/sstables.hh"
#include "tools/json_writer.hh"
#include "tools/lua_sstable_consumer.hh"
#include "types/collection.hh"
#include "types/tuple.hh"
#include "dht/i_partitioner.hh"
// Lua 5.4 added an extra parameter to lua_resume
#if LUA_VERSION_NUM >= 504
# define LUA_504_PLUS(x...) x
#else
# define LUA_504_PLUS(x...)
#endif
// Lua 5.5 added a seed parameter to lua_newstate
#if LUA_VERSION_NUM >= 505
# define LUA_505_PLUS(x...) x
#else
# define LUA_505_PLUS(x...)
#endif
namespace {
#if LUA_VERSION_NUM >= 505
static unsigned lua_random_seed() {
static thread_local std::default_random_engine rng{std::random_device{}()};
return rng();
}
#endif
class gc_clock_time_point;
class counter_shards_value;
class atomic_cell_with_type;
class collection_with_type;
class json_writer;
template <typename T> struct type_to_metatable { static constexpr const char* metatable_name = nullptr; };
template <> struct type_to_metatable<column_definition> { static constexpr const char* metatable_name = "Scylla.column_definition"; };
template <> struct type_to_metatable<schema> { static constexpr const char* metatable_name = "Scylla.schema"; };
template <> struct type_to_metatable<partition_key> { static constexpr const char* metatable_name = "Scylla.partition_key"; };
template <> struct type_to_metatable<clustering_key> { static constexpr const char* metatable_name = "Scylla.clustering_key"; };
template <> struct type_to_metatable<dht::ring_position_ext> { static constexpr const char* metatable_name = "Scylla.ring_position"; };
template <> struct type_to_metatable<position_in_partition> { static constexpr const char* metatable_name = "Scylla.position_in_partition"; };
template <> struct type_to_metatable<sstables::sstable> { static constexpr const char* metatable_name = "Scylla.sstable"; };
template <> struct type_to_metatable<partition_start> { static constexpr const char* metatable_name = "Scylla.partition_start"; };
template <> struct type_to_metatable<gc_clock_time_point> { static constexpr const char* metatable_name = "Scylla.gc_clock_time_point"; };
template <> struct type_to_metatable<tombstone> { static constexpr const char* metatable_name = "Scylla.tombstone"; };
template <> struct type_to_metatable<data_value> { static constexpr const char* metatable_name = "Scylla.data_value"; };
template <> struct type_to_metatable<counter_shards_value> { static constexpr const char* metatable_name = "Scylla.counter_shards_value"; };
template <> struct type_to_metatable<atomic_cell_with_type> { static constexpr const char* metatable_name = "Scylla.atomic_cell"; };
template <> struct type_to_metatable<collection_with_type> { static constexpr const char* metatable_name = "Scylla.collection"; };
template <> struct type_to_metatable<static_row> { static constexpr const char* metatable_name = "Scylla.static_row"; };
template <> struct type_to_metatable<row_marker> { static constexpr const char* metatable_name = "Scylla.row_marker"; };
template <> struct type_to_metatable<clustering_row> { static constexpr const char* metatable_name = "Scylla.clustering_row"; };
template <> struct type_to_metatable<range_tombstone_change> { static constexpr const char* metatable_name = "Scylla.range_tombstone_change"; };
template <> struct type_to_metatable<json_writer> { static constexpr const char* metatable_name = "Scylla.json_writer"; };
template <typename T>
const char* get_metatable_name() {
const auto metatable_name = type_to_metatable<std::remove_cv_t<T>>::metatable_name;
SCYLLA_ASSERT(metatable_name);
return metatable_name;
}
struct lua_closer {
void operator()(lua_State* l) {
lua_close(l);
}
};
void* lua_alloc(void* ud, void* ptr, size_t osize, size_t nsize) {
if (!nsize) {
free(ptr);
return nullptr;
} else {
return realloc(ptr, nsize);
}
}
static const luaL_Reg loadedlibs[] = {
{"base", luaopen_base},
{LUA_MATHLIBNAME, luaopen_math},
{LUA_OSLIBNAME, luaopen_os},
{LUA_STRLIBNAME, luaopen_string},
{LUA_TABLIBNAME, luaopen_table},
{NULL, NULL},
};
template <typename T>
void push_userdata_ref(lua_State* l, T& v) {
*reinterpret_cast<T**>(lua_newuserdata(l, sizeof(T*))) = &v;
luaL_setmetatable(l, get_metatable_name<T>());
}
template <typename T>
T& pop_userdata_ref(lua_State* l, int i) {
return **reinterpret_cast<T**>(luaL_checkudata(l, i, get_metatable_name<T>()));
}
template <typename T>
void* alloc_userdata(lua_State* l) {
if constexpr (alignof(T) > 8) {
return lua::aligned_user_data<T>(l);
} else {
return lua_newuserdata(l, sizeof(T));
}
}
template <typename T, typename... Arg>
void push_userdata(lua_State* l, Arg&&... arg) {
new (alloc_userdata<T>(l)) T(std::forward<Arg>(arg)...);
luaL_setmetatable(l, get_metatable_name<T>());
}
// Push userdata with default constructed T in it.
template <typename T>
void push_userdata(lua_State* l) {
new (alloc_userdata<T>(l)) T;
luaL_setmetatable(l, get_metatable_name<T>());
}
template <typename T>
T* get_userdata_ptr(lua_State* l, int i) {
auto p = luaL_checkudata(l, i, get_metatable_name<T>());
if constexpr (alignof(T) <= 8) {
return reinterpret_cast<T*>(p);
} else {
return align_up(reinterpret_cast<char*>(p), alignof(T));
}
}
template <typename T>
T pop_userdata(lua_State* l, int i) {
return std::move(*get_userdata_ptr<T>(l, i));
}
// Get reference to userdata stored by-value.
// Valid only until Lua GC:s the value, so don't keep it around for long.
template <typename T>
T& pop_userdata_as_ref(lua_State* l, int i) {
return *get_userdata_ptr<T>(l, i);
}
template <typename T>
int cxx_gc_l(lua_State* l) {
auto& v = pop_userdata_as_ref<T>(l, 1);
v.~T();
lua_pop(l, 1);
return 0;
}
template <typename T>
int tostring_l(lua_State* l) {
const auto& v = pop_userdata_as_ref<T>(l, 1);
lua_pop(l, 1);
lua::push_sstring(l, format("{}", v));
return 1;
}
const schema& get_schema_l(lua_State* l) {
lua_getglobal(l, "schema");
auto& s = pop_userdata_ref<const schema>(l, -1);
lua_pop(l, 1);
return s;
}
template <typename T, typename Wrapper>
int tostring_with_wrapper(lua_State* l) {
const auto& v = pop_userdata_as_ref<T>(l, 1);
const auto& s = get_schema_l(l);
lua_pop(l, 1);
lua::push_sstring(l, format("{}", Wrapper(s, v)));
return 1;
}
int column_definition_index_l(lua_State* l) {
const auto& cdef = pop_userdata_ref<const column_definition>(l, 1);
auto field = lua_tostring(l, 2);
lua_pop(l, 2);
if (strcmp(field, "id") == 0) {
lua_pushinteger(l, cdef.id);
return 1;
} else if (strcmp(field, "name") == 0) {
lua::push_sstring(l, cdef.name_as_text());
return 1;
} else if (strcmp(field, "kind") == 0) {
lua::push_sstring(l, to_sstring(cdef.kind));
return 1;
}
return 0;
}
int push_column_definitions(lua_State* l, schema::const_iterator_range_type cols) {
lua_createtable(l, std::distance(cols.begin(), cols.end()), 0);
size_t i = 1; // lua arrays start from 1
for (const auto& col : cols) {
push_userdata_ref<const column_definition>(l, col);
lua_seti(l, -2, i++);
}
return 1;
}
int schema_index_l(lua_State* l) {
const auto& s = pop_userdata_ref<const schema>(l, 1);
auto field = lua_tostring(l, 2);
lua_pop(l, 2);
if (strcmp(field, "partition_key_columns") == 0) {
return push_column_definitions(l, s.partition_key_columns());
} else if (strcmp(field, "clustering_key_columns") == 0) {
return push_column_definitions(l, s.clustering_key_columns());
} else if (strcmp(field, "static_columns") == 0) {
return push_column_definitions(l, s.static_columns());
} else if (strcmp(field, "regular_columns") == 0) {
return push_column_definitions(l, s.regular_columns());
} else if (strcmp(field, "all_columns") == 0) {
return push_column_definitions(l, s.all_columns());
}
return 0;
}
template <typename Key>
int key_to_hex_l(lua_State* l) {
auto& k = pop_userdata_as_ref<Key>(l, 1);
auto hex = to_hex(to_bytes(k.representation()));
lua_pop(l, 1);
lua::push_sstring(l, hex);
return 1;
}
template <typename Key>
int key_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
const auto& k = pop_userdata_as_ref<Key>(l, 1);
const auto& s = get_schema_l(l);
lua_pop(l, 2);
if (strcmp(field, "components") == 0) {
const std::vector<data_type>* types;
if constexpr (std::is_same_v<Key, partition_key>) {
types = &s.partition_key_type()->types();
} else {
types = &s.clustering_key_type()->types();
}
const auto values = k.explode(s);
lua_createtable(l, types->size(), 0);
for (size_t i = 0; i < types->size(); ++i) {
if (i == values.size()) {
// prefix key
break;
}
lua::push_data_value(l, types->at(i)->deserialize(values.at(i)));
lua_seti(l, -2, i + 1); // lua arrays start from 1
}
return 1;
} else if (strcmp(field, "to_hex") == 0) {
lua_pushcfunction(l, key_to_hex_l<Key>);
return 1;
}
return 0;
}
int8_t normalize_weight(int w) {
if (!w) {
return w;
} else if (w < 0) {
return -1;
} else {
return 1;
}
}
int new_ring_position_l(lua_State* l) {
const auto weight = normalize_weight(lua_tointeger(l, 1));
const auto& s = get_schema_l(l);
std::optional<partition_key> pk;
std::optional<dht::token> token;
if (lua_gettop(l) == 2) {
if (lua_type(l, 2) == LUA_TUSERDATA) {
pk = pop_userdata<partition_key>(l, 2);
token = dht::get_token(s, *pk);
} else if (lua_type(l, 2) == LUA_TNUMBER) {
token = dht::token(lua_tointeger(l, 2));
}
}
if (lua_gettop(l) == 3) {
pk = pop_userdata<partition_key>(l, 2);
if (lua_type(l, 3) == LUA_TNUMBER) {
token = dht::token(lua_tointeger(l, 3));
} else {
luaL_checktype(l, 3, LUA_TNIL);
}
}
if (!token) {
if (!weight) {
luaL_error(l, "weight of 0 is invalid for min/max ring_position");
}
token = weight < 0 ? dht::maximum_token() : dht::minimum_token();
}
push_userdata<dht::ring_position_ext>(l, *token, std::move(pk), weight);
return 1;
}
int tri_cmp_to_int(std::strong_ordering r) {
if (r < 0) {
return -1;
} else if (r == 0) {
return 0;
} else {
return 1;
}
}
int ring_position_tri_cmp_l(lua_State* l) {
const auto& a = pop_userdata_as_ref<const dht::ring_position_ext>(l, 1);
const auto& b = pop_userdata_as_ref<const dht::ring_position_ext>(l, 2);
lua_pop(l, 2);
const auto& s = get_schema_l(l);
lua_pushinteger(l, tri_cmp_to_int(dht::ring_position_tri_compare(s, a, b)));
return 1;
}
int ring_position_index_l(lua_State* l) {
const auto& rp = pop_userdata_as_ref<const dht::ring_position_ext>(l, 1);
auto field = lua_tostring(l, 2);
lua_pop(l, 2);
if (strcmp(field, "tri_cmp") == 0) {
lua_pushcfunction(l, ring_position_tri_cmp_l);
return 1;
} else if (strcmp(field, "token") == 0) {
if (rp.token()._kind == dht::token_kind::key) {
lua_pushinteger(l, rp.token()._data);
return 1;
}
} else if (strcmp(field, "key") == 0) {
if (rp.key()) {
push_userdata<partition_key>(l, *rp.key());
return 1;
}
} else if (strcmp(field, "weight") == 0) {
lua_pushinteger(l, rp.weight());
return 1;
}
return 0;
}
int position_in_partition_tri_cmp_l(lua_State* l) {
const auto& a = pop_userdata_as_ref<const position_in_partition>(l, 1);
const auto& b = pop_userdata_as_ref<const position_in_partition>(l, 2);
lua_pop(l, 2);
const auto& s = get_schema_l(l);
position_in_partition::tri_compare cmp(s);
lua_pushinteger(l, tri_cmp_to_int(cmp(a, b)));
return 1;
}
int position_in_partition_index_l(lua_State* l) {
const auto& pos = pop_userdata_as_ref<const position_in_partition>(l, 1);
auto field = lua_tostring(l, 2);
lua_pop(l, 2);
if (strcmp(field, "tri_cmp") == 0) {
lua_pushcfunction(l, position_in_partition_tri_cmp_l);
return 1;
} else if (strcmp(field, "key") == 0) {
if (pos.has_key()) {
push_userdata<clustering_key>(l, pos.key());
return 1;
}
} else if (strcmp(field, "weight") == 0) {
lua_pushinteger(l, int(pos.get_bound_weight()));
return 1;
}
return 0;
}
int new_position_in_partition_l(lua_State* l) {
const auto weight = normalize_weight(lua_tointeger(l, 1));
std::optional<clustering_key> ck;
if (lua_gettop(l) == 2) {
if (lua_type(l, 2) == LUA_TUSERDATA) {
ck = pop_userdata<clustering_key>(l, 2);
} else {
luaL_checktype(l, 2, LUA_TNIL);
}
}
if (!ck) {
if (!weight) {
luaL_error(l, "weight of 0 is invalid for nil min/max position_in_partition");
}
}
push_userdata<position_in_partition>(l, partition_region::clustered, bound_weight(weight), std::move(ck));
return 1;
}
template <allow_prefixes AllowPrefix>
int unserialize_key_l(lua_State* l) {
const auto& s = get_schema_l(l);
auto key_str = lua_tostring(l, 1);
lua_pop(l, 2);
lw_shared_ptr<compound_type<AllowPrefix>> type;
managed_bytes value;
try {
value = managed_bytes(from_hex(key_str));
if constexpr (AllowPrefix == allow_prefixes::no) {
type = s.partition_key_type();
} else {
type = s.clustering_key_type();
}
type->validate(value);
if constexpr (AllowPrefix == allow_prefixes::no) {
push_userdata<partition_key>(l, partition_key::from_bytes(value));
} else {
push_userdata<clustering_key>(l, clustering_key::from_bytes(value));
}
return 1;
} catch (const std::exception& e) {
return luaL_error(l, "unserialize partition key: %s", e.what());
}
}
int token_of_l(lua_State* l) {
const auto& s = pop_userdata_ref<const schema>(l, 1);
const auto& k = pop_userdata_as_ref<partition_key>(l, 1);
lua_pop(l, 2);
auto t = dht::get_token(s, k);
lua_pushinteger(l, t._data);
return 1;
}
int sstable_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& sst = pop_userdata_ref<const sstables::sstable>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "filename") == 0) {
lua::push_sstring(l, fmt::to_string(sst.get_filename()));
return 1;
}
return 0;
}
int partition_start_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& ps = pop_userdata_as_ref<partition_start>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "key") == 0) {
push_userdata<partition_key>(l, ps.key().key());
return 1;
} else if (strcmp(field, "token") == 0) {
lua_pushinteger(l, ps.key().token()._data);
return 1;
} else if (strcmp(field, "tombstone") == 0) {
if (!ps.partition_tombstone()) {
return 0;
}
push_userdata<tombstone>(l, ps.partition_tombstone());
return 1;
}
return 0;
}
void push_gc_clock_time_point(lua_State* l, gc_clock::time_point tp) {
auto t = gc_clock::to_time_t(tp);
std::tm tm{};
// This time is actually UTC, but C API assumes local-times and there is no
// way to specify the TZ used. So we lie about it being local-time to avoid
// libC doing TZ adjustments.
localtime_r(&t, &tm);
lua_createtable(l, 0, 6);
luaL_setmetatable(l, get_metatable_name<gc_clock_time_point>());
lua_pushinteger(l, 1900 + tm.tm_year);
lua_setfield(l, -2, "year");
lua_pushinteger(l, 1 + tm.tm_mon);
lua_setfield(l, -2, "month");
lua_pushinteger(l, tm.tm_mday);
lua_setfield(l, -2, "day");
lua_pushinteger(l, tm.tm_hour);
lua_setfield(l, -2, "hour");
lua_pushinteger(l, tm.tm_min);
lua_setfield(l, -2, "min");
lua_pushinteger(l, tm.tm_sec);
lua_setfield(l, -2, "sec");
}
gc_clock::time_point get_gc_clock_time_point_l(lua_State* l) {
std::tm tm{};
tm.tm_isdst = -1;
lua_getfield(l, 1, "year");
tm.tm_year = lua_tointeger(l, -1) - 1900;
lua_pop(l, 1);
lua_getfield(l, 1, "month");
tm.tm_mon = lua_tointeger(l, -1) - 1;
lua_pop(l, 1);
lua_getfield(l, 1, "day");
tm.tm_mday = lua_tointeger(l, -1);
lua_pop(l, 1);
lua_getfield(l, 1, "hour");
tm.tm_hour = lua_tointeger(l, -1);
lua_pop(l, 1);
lua_getfield(l, 1, "min");
tm.tm_min = lua_tointeger(l, -1);
lua_pop(l, 1);
lua_getfield(l, 1, "sec");
tm.tm_sec = lua_tointeger(l, -1);
lua_pop(l, 1);
lua_pop(l, 1);
return gc_clock::from_time_t(std::mktime(&tm));
}
template <typename Op>
int gc_clock_time_point_binary_op_l(lua_State* l) {
auto b = get_gc_clock_time_point_l(l);
lua_pop(l, 1);
auto a = get_gc_clock_time_point_l(l);
lua_pop(l, 1);
Op op;
lua_pushboolean(l, op(a, b));
return 1;
}
int gc_clock_time_point_tostring_l(lua_State* l) {
auto tp = get_gc_clock_time_point_l(l);
lua::push_sstring(l, format("{:%F %T}z", fmt::gmtime(gc_clock::to_time_t(tp))));
return 1;
}
int gc_clock_time_point_now_l(lua_State* l) {
push_gc_clock_time_point(l, gc_clock::now());
return 1;
}
int gc_clock_time_point_from_string_l(lua_State* l) {
try {
auto tp = gc_clock::time_point(gc_clock::duration(timestamp_from_string(lua_tostring(l, 1)) / 1000));
lua_pop(l, 1);
push_gc_clock_time_point(l, tp);
} catch (const std::exception& e) {
return luaL_error(l, "failed to parse gc_clock_deletion_time from string: %s", e.what());
}
return 1;
}
int tombstone_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto tomb = pop_userdata<tombstone>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "timestamp") == 0) {
lua_pushinteger(l, tomb.timestamp);
return 1;
} else if (strcmp(field, "deletion_time") == 0) {
push_gc_clock_time_point(l, tomb.deletion_time);
return 1;
}
return 0;
}
int data_value_tostring_l(lua_State* l) {
auto& dv = pop_userdata_as_ref<data_value>(l, 1);
lua_pop(l, 1);
auto type = dv.type();
if (auto bvopt = dv.serialize()) {
lua::push_sstring(l, type->to_string(*bvopt));
} else {
lua_pushliteral(l, "");
}
return 1;
}
int data_value_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& dv = pop_userdata_as_ref<data_value>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "value") == 0) {
lua::push_data_value(l, dv);
return 1;
}
return 0;
}
struct counter_shards_value {
data_type type;
managed_bytes data;
};
int counter_shards_value_tostring_l(lua_State* l) {
auto& csv = pop_userdata_as_ref<counter_shards_value>(l, 1);
lua_pop(l, 1);
auto cv = counter_cell_view(atomic_cell_view::from_bytes(*csv.type, csv.data));
lua::push_sstring(l, format("{}", cv.total_value()));
return 1;
}
int counter_shards_value_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& csv = pop_userdata_as_ref<counter_shards_value>(l, 1);
lua_pop(l, 2);
auto cv = counter_cell_view(atomic_cell_view::from_bytes(*csv.type, csv.data));
if (strcmp(field, "value") == 0) {
lua::push_data_value(l, data_value(cv.total_value()));
} else if (strcmp(field, "shards") == 0) {
lua_createtable(l, 0, 0);
int i = 1; // Lua arrays start from 1
for (const auto& shard : cv.shards()) {
lua_createtable(l, 0, 3);
lua::push_sstring(l, shard.id().to_sstring());
lua_setfield(l, -2, "id");
lua_pushinteger(l, shard.value());
lua_setfield(l, -2, "value");
lua_pushinteger(l, shard.logical_clock());
lua_setfield(l, -2, "clock");
lua_seti(l, -2, i++);
}
return 1;
}
return 0;
}
struct atomic_cell_with_type {
atomic_cell data;
data_type type;
atomic_cell_with_type(atomic_cell_view data_view, data_type type) : data(*type, data_view), type(std::move(type)) { }
};
int atomic_cell_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& ct = pop_userdata_as_ref<atomic_cell_with_type>(l, 1);
auto& cell = ct.data;
auto& type = ct.type;
lua_pop(l, 2);
if (strcmp(field, "timestamp") == 0) {
lua_pushinteger(l, cell.timestamp());
return 1;
} else if (strcmp(field, "type") == 0) {
if (type->is_counter()) {
if (cell.is_counter_update()) {
lua_pushliteral(l, "counter-update");
} else {
lua_pushliteral(l, "counter-shards");
}
} else if (type->is_collection()) {
if (type->is_atomic()) {
lua_pushliteral(l, "frozen-collection");
} else {
lua_pushliteral(l, "collection");
}
} else {
lua_pushliteral(l, "regular");
}
return 1;
} else if (strcmp(field, "is_live") == 0) {
lua_pushboolean(l, cell.is_live());
return 1;
} else if (strcmp(field, "has_ttl") == 0) {
lua_pushboolean(l, cell.is_live_and_has_ttl());
return 1;
} else if (strcmp(field, "ttl") == 0) {
if (!cell.is_live_and_has_ttl()) {
return 0;
}
lua_pushinteger(l, std::chrono::duration_cast<std::chrono::seconds>(cell.ttl()).count());
return 1;
} else if (strcmp(field, "expiry") == 0) {
if (!cell.is_live_and_has_ttl()) {
return 0;
}
push_gc_clock_time_point(l, cell.expiry());
return 1;
} else if (strcmp(field, "deletion_time") == 0) {
if (!cell.is_dead(gc_clock::now())) {
return 0;
}
push_gc_clock_time_point(l, cell.deletion_time());
return 1;
} else if (strcmp(field, "value") == 0) {
if (!cell.is_live()) {
return 0;
}
if (type->is_counter()) {
if (cell.is_counter_update()) {
push_userdata<data_value>(l, cell.counter_update_value());
} else {
push_userdata<counter_shards_value>(l, counter_shards_value{type, managed_bytes(cell.serialize())});
}
} else {
push_userdata<data_value>(l, type->deserialize(managed_bytes_view(cell.value())));
}
return 1;
}
return 0;
}
struct collection_with_type {
collection_mutation data;
data_type type;
collection_with_type(collection_mutation_view data_view, data_type type) : data(*type, data_view), type(std::move(type)) { }
};
int collection_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& ct = pop_userdata_as_ref<collection_with_type>(l, 1);
auto type = ct.type;
lua_pop(l, 2);
return collection_mutation_view(ct.data).with_deserialized(*type, [l, field, type] (const collection_mutation_view_description& mv) {
if (strcmp(field, "tombstone") == 0) {
if (!mv.tomb) {
return 0;
}
push_userdata<tombstone>(l, mv.tomb);
return 1;
} else if (strcmp(field, "type") == 0) {
lua_pushliteral(l, "collection");
return 1;
} else if (strcmp(field, "values") == 0) {
const auto size = mv.cells.size();
std::function<void(size_t, bytes_view)> push_key;
std::function<void(size_t, atomic_cell_view)> push_value;
if (auto t = dynamic_cast<const collection_type_impl*>(type.get())) {
push_key = [l, t = t->name_comparator()] (size_t, bytes_view k) {
push_userdata<data_value>(l, t->deserialize(k));
};
push_value = [l, t = t->value_comparator()] (size_t, atomic_cell_view v) {
push_userdata<atomic_cell_with_type>(l, v, t);
};
} else if (auto t = dynamic_cast<const tuple_type_impl*>(type.get())) {
push_key = [l] (size_t, bytes_view) { lua_pushliteral(l, ""); };
push_value = [l, t] (size_t i, atomic_cell_view v) {
push_userdata<atomic_cell_with_type>(l, v, t->type(i));
};
} else {
return 0;
}
lua_createtable(l, size, 0);
for (size_t i = 0; i < size; ++i) {
const auto& e = mv.cells[i];
lua_createtable(l, 0, 3);
push_key(i, e.first);
lua_setfield(l, -2, "key");
push_value(i, e.second);
lua_setfield(l, -2, "value");
lua_seti(l, -2, i + 1); // Lua arrays start from 1
}
return 1;
}
return 0;
});
}
int push_cells(lua_State* l, const row& cells, column_kind kind) {
auto& schema = get_schema_l(l);
lua_createtable(l, 0, 0);
cells.for_each_cell([l, &schema, kind] (column_id id, const atomic_cell_or_collection& cell) {
auto cdef = schema.column_at(kind, id);
if (cdef.is_atomic()) {
push_userdata<atomic_cell_with_type>(l, cell.as_atomic_cell(cdef), cdef.type);
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
push_userdata<collection_with_type>(l, cell.as_collection_mutation(), cdef.type);
} else {
lua_pushnil(l);
}
lua_setfield(l, -2, cdef.name_as_text().data());
});
return 1;
}
int static_row_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& sr = pop_userdata_as_ref<static_row>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "cells") == 0) {
return push_cells(l, sr.cells(), column_kind::static_column);
}
return 0;
}
int row_marker_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& m = pop_userdata_as_ref<row_marker>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "timestamp") == 0) {
lua_pushinteger(l, m.timestamp());
return 1;
} else if (strcmp(field, "is_live") == 0) {
lua_pushboolean(l, m.is_live());
return 1;
} else if (strcmp(field, "has_ttl") == 0) {
lua_pushboolean(l, m.is_expiring());
return 1;
} else if (strcmp(field, "ttl") == 0) {
if (!m.is_live() || !m.is_expiring()) {
return 0;
}
lua_pushinteger(l, std::chrono::duration_cast<std::chrono::seconds>(m.ttl()).count());
return 1;
} else if (strcmp(field, "expiry") == 0) {
if (!m.is_live() || !m.is_expiring()) {
return 0;
}
push_gc_clock_time_point(l, m.expiry());
return 1;
} else if (strcmp(field, "deletion_time") == 0) {
if (!m.is_expiring() && !m.is_dead(gc_clock::now())) {
return 0;
}
push_gc_clock_time_point(l, m.deletion_time());
return 1;
}
return 0;
}
int clustering_row_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& cr = pop_userdata_as_ref<clustering_row>(l, 1);
lua_pop(l, 2);
if (strcmp(field, "key") == 0) {
push_userdata<clustering_key>(l, cr.key());
return 1;
} else if (strcmp(field, "tombstone") == 0) {
if (!cr.tomb()) {
return 0;
}
push_userdata<tombstone>(l, cr.tomb().regular());
return 1;
} else if (strcmp(field, "shadowable_tombstone") == 0) {
if (!cr.tomb()) {
return 0;
}
push_userdata<tombstone>(l, cr.tomb().shadowable().tomb());
return 1;
} else if (strcmp(field, "marker") == 0) {
if (cr.marker().is_missing()) {
return 0;
}
push_userdata<row_marker>(l, cr.marker());
return 1;
} else if (strcmp(field, "cells") == 0) {
return push_cells(l, cr.cells(), column_kind::regular_column);
}
return 0;
}
int range_tombstone_change_tostring_l(lua_State* l) {
auto& rtc = pop_userdata_as_ref<range_tombstone_change>(l, 1);
lua_pop(l, 1);
lua::push_sstring(l, format("{}", rtc));
return 1;
}
int range_tombstone_change_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
auto& rtc = pop_userdata_as_ref<range_tombstone_change>(l, 1);
lua_pop(l, 2);
const auto& pos = rtc.position();
if (strcmp(field, "key") == 0) {
if (!pos.has_key()) {
return 0;
}
push_userdata<clustering_key>(l, pos.key());
return 1;
} else if (strcmp(field, "key_weight") == 0) {
lua_pushinteger(l, static_cast<int>(pos.get_bound_weight()));
return 1;
} else if (strcmp(field, "tombstone") == 0) {
if (!rtc.tombstone()) {
return 0;
}
push_userdata<tombstone>(l, rtc.tombstone());
return 1;
} else if (strcmp(field, "__tostring") == 0) {
lua_pushcfunction(l, range_tombstone_change_tostring_l);
return 1;
}
return 0;
}
class json_writer {
tools::mutation_fragment_stream_json_writer _writer;
private:
static json_writer& get_this(lua_State* l) {
return pop_userdata_as_ref<json_writer>(l, 1);
}
template <typename Function>
static int do_invoke(lua_State* l, const char* name, Function&& fun) {
try {
fun(get_this(l));
} catch (const std::exception& e) {
return luaL_error(l, "json_writer::%s(): %s", name, e.what());
}
return 0;
}
template <typename Function>
static int invoke(lua_State* l, const char* name, int expected_type, Function&& fun) {
auto nargs = int(expected_type != LUA_TNIL) + 1;
if (const auto n = lua_gettop(l); n != nargs) {
return luaL_error(l, "json_writer::%s(): expected %I arguments, got %I", name, nargs, n);
}
const auto type = lua_type(l, 2);
if (expected_type != LUA_TNIL && type != expected_type) {
return luaL_error(l, "json_writer::%s(): expected argument of type %s, got %s", name, lua_typename(l, expected_type), lua_typename(l, type));
}
return do_invoke(l, name, fun);
}
template <typename Function>
static int invoke_optional_arg(lua_State* l, const char* name, int expected_type, Function&& fun) {
auto nargs = 2;
const auto n = lua_gettop(l);
if (n > nargs) {
return luaL_error(l, "json_writer::%s(): expected at most %I arguments, got %I", name, nargs, n);
}
if (n == 2) {
if (const auto type = lua_type(l, 2); type != LUA_TNIL && type != expected_type) {
return luaL_error(l, "json_writer::%s(): expected argument of type %s, got %s", name, lua_typename(l, expected_type), lua_typename(l, type));
}
}
return do_invoke(l, name, fun);
}
public:
json_writer(const schema& s) : _writer(s)
{ }
static int null_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.writer().Null();
});
}
static int bool_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TBOOLEAN, [l] (json_writer& w) {
w._writer.writer().Bool(lua_toboolean(l, 2));
});
}
static int int_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNUMBER, [l] (json_writer& w) {
w._writer.writer().Int64(lua_tointeger(l, 2));
});
}
static int double_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNUMBER, [l] (json_writer& w) {
w._writer.writer().Double(lua_tonumber(l, 2));
});
}
static int string_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TSTRING, [l] (json_writer& w) {
size_t len = 0;
auto str = lua_tolstring(l, 2, &len);
w._writer.writer().rjson_writer().String(str, len, false);
});
}
static int start_object_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.writer().StartObject();
});
}
static int key_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TSTRING, [l] (json_writer& w) {
size_t len = 0;
auto str = lua_tolstring(l, 2, &len);
w._writer.writer().rjson_writer().Key(str, len, false);
});
}
static int end_object_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.writer().EndObject();
});
}
static int start_array_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.writer().StartArray();
});
}
static int end_array_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.writer().EndArray();
});
}
static int start_stream_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.start_stream();
});
}
static int start_sstable_l(lua_State* l) {
return invoke_optional_arg(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) {
const sstables::sstable* sst = nullptr;
if (lua_gettop(l) > 1 && lua_type(l, 2) != LUA_TNIL) {
sst = &pop_userdata_ref<const sstables::sstable>(l, 2);
}
w._writer.start_sstable(sst);
});
}
static int start_partition_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) {
w._writer.start_partition(pop_userdata_as_ref<const partition_start>(l, 2));
});
}
static int static_row_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) {
w._writer.partition_element(pop_userdata_as_ref<const static_row>(l, 2));
});
}
static int clustering_row_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) {
w._writer.partition_element(pop_userdata_as_ref<const clustering_row>(l, 2));
});
}
static int range_tombstone_change_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) {
w._writer.partition_element(pop_userdata_as_ref<const range_tombstone_change>(l, 2));
});
}
static int end_partition_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.end_partition();
});
}
static int end_sstable_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.end_sstable();
});
}
static int end_stream_l(lua_State* l) {
return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) {
w._writer.end_stream();
});
}
};
int json_writer_new_l(lua_State* l) {
push_userdata<json_writer>(l, get_schema_l(l));
return 1;
}
int json_writer_index_l(lua_State* l) {
auto field = lua_tostring(l, 2);
lua_pop(l, 2);
std::pair<const char*, lua_CFunction> methods[] = {
{"null", &json_writer::null_l},
{"bool", &json_writer::bool_l},
{"int", &json_writer::int_l},
{"double", &json_writer::double_l},
{"string", &json_writer::string_l},
{"start_object", &json_writer::start_object_l},
{"key", &json_writer::key_l},
{"end_object", &json_writer::end_object_l},
{"start_array", &json_writer::start_array_l},
{"end_array", &json_writer::end_array_l},
{"start_stream", &json_writer::start_stream_l},
{"start_sstable", &json_writer::start_sstable_l},
{"start_partition", &json_writer::start_partition_l},
{"static_row", &json_writer::static_row_l},
{"clustering_row", &json_writer::clustering_row_l},
{"range_tombstone_change", &json_writer::range_tombstone_change_l},
{"end_partition", &json_writer::end_partition_l},
{"end_sstable", &json_writer::end_sstable_l},
{"end_stream", &json_writer::end_stream_l},
};
for (const auto& [name, method] : methods) {
if (strcmp(field, name) == 0) {
lua_pushcfunction(l, method);
return 1;
}
}
return 0;
}
class lua_sstable_consumer : public sstable_consumer {
schema_ptr _schema;
reader_permit _permit;
program_options::string_map _script_params;
std::unique_ptr<lua_State, lua_closer> _l;
private:
void register_metatables() {
auto* l = _l.get();
lua::register_metatables(l);
luaL_newmetatable(l, get_metatable_name<column_definition>());
lua_pushcfunction(l, column_definition_index_l);
lua_setfield(l, -2, "__index");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<schema>());
lua_pushcfunction(l, schema_index_l);
lua_setfield(l, -2, "__index");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<partition_key>());
lua_pushcfunction(l, key_index_l<partition_key>);
lua_setfield(l, -2, "__index");
lua_pushcclosure(l, tostring_with_wrapper<partition_key, partition_key::with_schema_wrapper>, 0);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<partition_key>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<clustering_key>());
lua_pushcfunction(l, key_index_l<clustering_key>);
lua_setfield(l, -2, "__index");
lua_pushcclosure(l, tostring_with_wrapper<clustering_key, clustering_key::with_schema_wrapper>, 0);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<clustering_key>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<dht::ring_position_ext>());
lua_pushcfunction(l, ring_position_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, tostring_l<dht::ring_position_ext>);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<dht::ring_position_ext>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<position_in_partition>());
lua_pushcfunction(l, position_in_partition_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, tostring_l<position_in_partition>);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<position_in_partition>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<sstables::sstable>());
lua_pushcfunction(l, sstable_index_l);
lua_setfield(l, -2, "__index");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<partition_start>());
lua_pushcfunction(l, partition_start_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, tostring_l<partition_start>);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<partition_start>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
// type is used just to tag metatable, underlying type is a lua table
luaL_newmetatable(l, get_metatable_name<gc_clock_time_point>());
lua_pushcfunction(l, gc_clock_time_point_binary_op_l<std::equal_to<gc_clock::time_point>>);
lua_setfield(l, -2, "__eq");
lua_pushcfunction(l, gc_clock_time_point_binary_op_l<std::less<gc_clock::time_point>>);
lua_setfield(l, -2, "__lt");
lua_pushcfunction(l, gc_clock_time_point_binary_op_l<std::less_equal<gc_clock::time_point>>);
lua_setfield(l, -2, "__le");
lua_pushcfunction(l, gc_clock_time_point_tostring_l);
lua_setfield(l, -2, "__tostring");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<tombstone>());
lua_pushcfunction(l, tombstone_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, tostring_l<tombstone>);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<tombstone>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<data_value>());
lua_pushcfunction(l, data_value_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, data_value_tostring_l);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<data_value>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<counter_shards_value>());
lua_pushcfunction(l, counter_shards_value_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, counter_shards_value_tostring_l);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<counter_shards_value>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<atomic_cell_with_type>());
lua_pushcfunction(l, atomic_cell_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, cxx_gc_l<atomic_cell_with_type>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<collection_with_type>());
lua_pushcfunction(l, collection_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, cxx_gc_l<collection_with_type>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<static_row>());
lua_pushcfunction(l, static_row_index_l);
lua_setfield(l, -2, "__index");
lua_pushcclosure(l, tostring_with_wrapper<static_row, static_row::printer>, 0);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<static_row>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<clustering_row>());
lua_pushcfunction(l, clustering_row_index_l);
lua_setfield(l, -2, "__index");
lua_pushcclosure(l, tostring_with_wrapper<clustering_row, clustering_row::printer>, 0);
lua_setfield(l, -2, "__tostring");
lua_pushcfunction(l, cxx_gc_l<clustering_row>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<row_marker>());
lua_pushcfunction(l, row_marker_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, cxx_gc_l<row_marker>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<range_tombstone_change>());
lua_pushcfunction(l, range_tombstone_change_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, cxx_gc_l<range_tombstone_change>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
luaL_newmetatable(l, get_metatable_name<json_writer>());
lua_pushcfunction(l, json_writer_index_l);
lua_setfield(l, -2, "__index");
lua_pushcfunction(l, cxx_gc_l<json_writer>);
lua_setfield(l, -2, "__gc");
lua_pop(l, 1);
}
void register_types() {
auto* l = _l.get();
const luaL_Reg scylla_lib[] = {
{"now", gc_clock_time_point_now_l},
{"time_point_from_string", gc_clock_time_point_from_string_l},
{"new_json_writer", json_writer_new_l},
{"new_ring_position", new_ring_position_l},
{"new_position_in_partition", new_position_in_partition_l},
{"unserialize_partition_key", unserialize_key_l<allow_prefixes::no>},
{"unserialize_clustering_key", unserialize_key_l<allow_prefixes::yes>},
{"token_of", token_of_l},
{NULL, NULL},
};
luaL_newlib(l, scylla_lib);
lua_setglobal(l, "Scylla");
}
void register_globals() {
auto* l = _l.get();
push_userdata_ref<const schema>(l, *_schema.get());
lua_setglobal(l, "schema");
}
future<int> call(std::string_view func_name, int nargs) {
auto* l = _l.get();
int ret = LUA_YIELD;
int nresults = 0;
while (ret == LUA_YIELD) {
ret = lua_resume(l, nullptr, nargs LUA_504_PLUS(, &nresults));
if (ret == LUA_YIELD) {
if (nresults == 0) {
co_await coroutine::maybe_yield();
} else if (nresults != 1) {
throw std::runtime_error(fmt::format("{} failed: unexpected number of results yielded, expected 1, got {}", func_name, nresults));
} else {
if (!lua_isuserdata(l, -1)) {
throw std::runtime_error(fmt::format("{} failed: only future<> is allowed to be yielded from a coroutine", func_name));
}
co_await pop_userdata<future<>>(l, -1);
lua_pop(l, 1);
}
}
}
if (ret != LUA_OK) {
throw std::runtime_error(fmt::format("{} failed: {}", func_name, lua_tostring(l, -1)));
}
co_return nresults;
}
struct invoke_meta {
int nargs;
bool returns_stop_iteration;
};
template <typename Func>
future<stop_iteration> invoke_script_method(const char* name, Func&& prepare_stack) {
auto* l = _l.get();
// basic stack sanity check
if (auto n = lua_gettop(l); n) {
throw std::runtime_error(fmt::format("{}() failed: stack expected to be empty (clean), but got {} items", name, n));
}
const auto type = lua_getglobal(l, name);
// check whether script has this method, short-circuit if not
if (type == LUA_TNIL) {
lua_pop(l, 1);
co_return stop_iteration::no;
}
// is symbol callable?
if (type != LUA_TFUNCTION) {
throw std::runtime_error(fmt::format("{}() failed: symbol is not callable", name));
}
const auto desc = prepare_stack();
// sanity check for argument count
// function to be called is also on the stack, account for that
if (auto n = lua_gettop(l); n != desc.nargs + 1) {
throw std::runtime_error(fmt::format("{}() failed: unexpected number of arguments for function, expected: {}, got: {}", name, desc.nargs, n - 1));
}
// call + sanity check for return value count
const auto nret = co_await call(name, desc.nargs);
if (nret > int(desc.returns_stop_iteration)) {
throw std::runtime_error(fmt::format("{}() failed: unexpected number of return values from function, expected: {}, got: {}", name, int(desc.returns_stop_iteration), nret));
}
// extract and convert return value if method has it (it can only be stop_iteration)
// we allow method to return nothing, int this case we assume stop_iteration::no
auto stop = stop_iteration::no;
if (desc.returns_stop_iteration && nret) {
// lua method return false when they don't want to continue
// stop_iteration expressed in boolean is too counter-intuitive
stop = stop_iteration(!lua_toboolean(l, 1));
lua_pop(l, 1);
}
co_return stop;
}
public:
explicit lua_sstable_consumer(schema_ptr s, reader_permit p, program_options::string_map script_params)
: _schema(std::move(s))
, _permit(std::move(p))
, _script_params(std::move(script_params))
, _l(lua_newstate(lua_alloc, nullptr LUA_505_PLUS(, lua_random_seed())))
{
}
future<> load_script(std::string_view script_name, std::string_view script) {
auto* l = _l.get();
// load lua standard libraries
for (const luaL_Reg* lib = loadedlibs; lib->func; lib++) {
luaL_requiref(l, lib->name, lib->func, 1);
lua_pop(l, 1);
}
register_metatables();
register_types();
register_globals();
// parse and load script
if (luaL_loadbuffer(l, script.data(), script.size(), script_name.data())) {
throw std::runtime_error(fmt::format("Failed to load {}: {}", script_name, lua_tostring(l, -1)));
}
// execute script (runs global code)
if (auto n = co_await call(script_name, 0); n) {
throw std::runtime_error(fmt::format("Failed to execute {}: unexpected return value, expected 0, got {}", script_name, n));
}
co_return;
}
virtual future<> consume_stream_start() override {
return invoke_script_method(__FUNCTION__, [this] {
auto* l = _l.get();
lua_newtable(l);
for (const auto& [k, v] : _script_params) {
lua::push_sstring(l, v);
lua_setfield(l, -2, k.data());
}
return invoke_meta{1, false};
}).discard_result();
}
virtual future<stop_iteration> consume_sstable_start(const sstables::sstable* const sst) override {
return invoke_script_method(__FUNCTION__, [this, sst] {
auto* l = _l.get();
if (sst) {
push_userdata_ref<const sstables::sstable>(l, *sst);
} else {
lua_pushnil(l);
}
return invoke_meta{1, true};
});
}
virtual future<stop_iteration> consume(partition_start&& ps) override {
return invoke_script_method("consume_partition_start", [this, ps = std::move(ps)] () mutable {
auto* l = _l.get();
push_userdata<partition_start>(l, std::move(ps));
return invoke_meta{1, true};
});
}
virtual future<stop_iteration> consume(static_row&& sr) override {
return invoke_script_method("consume_static_row", [this, sr = std::move(sr)] () mutable {
auto* l = _l.get();
push_userdata<static_row>(l, std::move(sr));
return invoke_meta{1, true};
});
}
virtual future<stop_iteration> consume(clustering_row&& cr) override {
return invoke_script_method("consume_clustering_row", [this, cr = std::move(cr)] () mutable {
auto* l = _l.get();
push_userdata<clustering_row>(l, std::move(cr));
return invoke_meta{1, true};
});
}
virtual future<stop_iteration> consume(range_tombstone_change&& rtc) override {
return invoke_script_method("consume_range_tombstone_change", [this, rtc = std::move(rtc)] () mutable {
auto* l = _l.get();
push_userdata<range_tombstone_change>(l, std::move(rtc));
return invoke_meta{1, true};
});
}
virtual future<stop_iteration> consume(partition_end&& pe) override {
return invoke_script_method("consume_partition_end", [] {
return invoke_meta{0, true};
});
}
virtual future<stop_iteration> consume_sstable_end() override {
return invoke_script_method(__FUNCTION__, [] {
return invoke_meta{0, true};
});
}
virtual future<> consume_stream_end() override {
return invoke_script_method(__FUNCTION__, [] {
return invoke_meta{0, false};
}).discard_result();
}
};
}
future<std::unique_ptr<sstable_consumer>> make_lua_sstable_consumer(schema_ptr s, reader_permit p, std::string_view script_path,
program_options::string_map script_args) {
auto consumer = std::make_unique<lua_sstable_consumer>(std::move(s), std::move(p), std::move(script_args));
auto file = co_await open_file_dma(script_path, open_flags::ro);
auto fstream = make_file_input_stream(file);
auto script = co_await util::read_entire_stream_contiguous(fstream);
co_await consumer->load_script(script_path, script);
co_return consumer;
}