/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "utils/assert.hh" #include #include #include #include #include #include #include #include "lang/lua_scylla_types.hh" #include "reader_permit.hh" #include "schema/schema.hh" #include "sstables/sstables.hh" #include "tools/json_writer.hh" #include "tools/lua_sstable_consumer.hh" #include "types/collection.hh" #include "types/tuple.hh" #include "dht/i_partitioner.hh" // Lua 5.4 added an extra parameter to lua_resume #if LUA_VERSION_NUM >= 504 # define LUA_504_PLUS(x...) x #else # define LUA_504_PLUS(x...) #endif // Lua 5.5 added a seed parameter to lua_newstate #if LUA_VERSION_NUM >= 505 # define LUA_505_PLUS(x...) x #else # define LUA_505_PLUS(x...) #endif namespace { #if LUA_VERSION_NUM >= 505 static unsigned lua_random_seed() { static thread_local std::default_random_engine rng{std::random_device{}()}; return rng(); } #endif class gc_clock_time_point; class counter_shards_value; class atomic_cell_with_type; class collection_with_type; class json_writer; template struct type_to_metatable { static constexpr const char* metatable_name = nullptr; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.column_definition"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.schema"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.partition_key"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.clustering_key"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.ring_position"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.position_in_partition"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.sstable"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.partition_start"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.gc_clock_time_point"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.tombstone"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.data_value"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.counter_shards_value"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.atomic_cell"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.collection"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.static_row"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.row_marker"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.clustering_row"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.range_tombstone_change"; }; template <> struct type_to_metatable { static constexpr const char* metatable_name = "Scylla.json_writer"; }; template const char* get_metatable_name() { const auto metatable_name = type_to_metatable>::metatable_name; SCYLLA_ASSERT(metatable_name); return metatable_name; } struct lua_closer { void operator()(lua_State* l) { lua_close(l); } }; void* lua_alloc(void* ud, void* ptr, size_t osize, size_t nsize) { if (!nsize) { free(ptr); return nullptr; } else { return realloc(ptr, nsize); } } static const luaL_Reg loadedlibs[] = { {"base", luaopen_base}, {LUA_MATHLIBNAME, luaopen_math}, {LUA_OSLIBNAME, luaopen_os}, {LUA_STRLIBNAME, luaopen_string}, {LUA_TABLIBNAME, luaopen_table}, {NULL, NULL}, }; template void push_userdata_ref(lua_State* l, T& v) { *reinterpret_cast(lua_newuserdata(l, sizeof(T*))) = &v; luaL_setmetatable(l, get_metatable_name()); } template T& pop_userdata_ref(lua_State* l, int i) { return **reinterpret_cast(luaL_checkudata(l, i, get_metatable_name())); } template void* alloc_userdata(lua_State* l) { if constexpr (alignof(T) > 8) { return lua::aligned_user_data(l); } else { return lua_newuserdata(l, sizeof(T)); } } template void push_userdata(lua_State* l, Arg&&... arg) { new (alloc_userdata(l)) T(std::forward(arg)...); luaL_setmetatable(l, get_metatable_name()); } // Push userdata with default constructed T in it. template void push_userdata(lua_State* l) { new (alloc_userdata(l)) T; luaL_setmetatable(l, get_metatable_name()); } template T* get_userdata_ptr(lua_State* l, int i) { auto p = luaL_checkudata(l, i, get_metatable_name()); if constexpr (alignof(T) <= 8) { return reinterpret_cast(p); } else { return align_up(reinterpret_cast(p), alignof(T)); } } template T pop_userdata(lua_State* l, int i) { return std::move(*get_userdata_ptr(l, i)); } // Get reference to userdata stored by-value. // Valid only until Lua GC:s the value, so don't keep it around for long. template T& pop_userdata_as_ref(lua_State* l, int i) { return *get_userdata_ptr(l, i); } template int cxx_gc_l(lua_State* l) { auto& v = pop_userdata_as_ref(l, 1); v.~T(); lua_pop(l, 1); return 0; } template int tostring_l(lua_State* l) { const auto& v = pop_userdata_as_ref(l, 1); lua_pop(l, 1); lua::push_sstring(l, format("{}", v)); return 1; } const schema& get_schema_l(lua_State* l) { lua_getglobal(l, "schema"); auto& s = pop_userdata_ref(l, -1); lua_pop(l, 1); return s; } template int tostring_with_wrapper(lua_State* l) { const auto& v = pop_userdata_as_ref(l, 1); const auto& s = get_schema_l(l); lua_pop(l, 1); lua::push_sstring(l, format("{}", Wrapper(s, v))); return 1; } int column_definition_index_l(lua_State* l) { const auto& cdef = pop_userdata_ref(l, 1); auto field = lua_tostring(l, 2); lua_pop(l, 2); if (strcmp(field, "id") == 0) { lua_pushinteger(l, cdef.id); return 1; } else if (strcmp(field, "name") == 0) { lua::push_sstring(l, cdef.name_as_text()); return 1; } else if (strcmp(field, "kind") == 0) { lua::push_sstring(l, to_sstring(cdef.kind)); return 1; } return 0; } int push_column_definitions(lua_State* l, schema::const_iterator_range_type cols) { lua_createtable(l, std::distance(cols.begin(), cols.end()), 0); size_t i = 1; // lua arrays start from 1 for (const auto& col : cols) { push_userdata_ref(l, col); lua_seti(l, -2, i++); } return 1; } int schema_index_l(lua_State* l) { const auto& s = pop_userdata_ref(l, 1); auto field = lua_tostring(l, 2); lua_pop(l, 2); if (strcmp(field, "partition_key_columns") == 0) { return push_column_definitions(l, s.partition_key_columns()); } else if (strcmp(field, "clustering_key_columns") == 0) { return push_column_definitions(l, s.clustering_key_columns()); } else if (strcmp(field, "static_columns") == 0) { return push_column_definitions(l, s.static_columns()); } else if (strcmp(field, "regular_columns") == 0) { return push_column_definitions(l, s.regular_columns()); } else if (strcmp(field, "all_columns") == 0) { return push_column_definitions(l, s.all_columns()); } return 0; } template int key_to_hex_l(lua_State* l) { auto& k = pop_userdata_as_ref(l, 1); auto hex = to_hex(to_bytes(k.representation())); lua_pop(l, 1); lua::push_sstring(l, hex); return 1; } template int key_index_l(lua_State* l) { auto field = lua_tostring(l, 2); const auto& k = pop_userdata_as_ref(l, 1); const auto& s = get_schema_l(l); lua_pop(l, 2); if (strcmp(field, "components") == 0) { const std::vector* types; if constexpr (std::is_same_v) { types = &s.partition_key_type()->types(); } else { types = &s.clustering_key_type()->types(); } const auto values = k.explode(s); lua_createtable(l, types->size(), 0); for (size_t i = 0; i < types->size(); ++i) { if (i == values.size()) { // prefix key break; } lua::push_data_value(l, types->at(i)->deserialize(values.at(i))); lua_seti(l, -2, i + 1); // lua arrays start from 1 } return 1; } else if (strcmp(field, "to_hex") == 0) { lua_pushcfunction(l, key_to_hex_l); return 1; } return 0; } int8_t normalize_weight(int w) { if (!w) { return w; } else if (w < 0) { return -1; } else { return 1; } } int new_ring_position_l(lua_State* l) { const auto weight = normalize_weight(lua_tointeger(l, 1)); const auto& s = get_schema_l(l); std::optional pk; std::optional token; if (lua_gettop(l) == 2) { if (lua_type(l, 2) == LUA_TUSERDATA) { pk = pop_userdata(l, 2); token = dht::get_token(s, *pk); } else if (lua_type(l, 2) == LUA_TNUMBER) { token = dht::token(lua_tointeger(l, 2)); } } if (lua_gettop(l) == 3) { pk = pop_userdata(l, 2); if (lua_type(l, 3) == LUA_TNUMBER) { token = dht::token(lua_tointeger(l, 3)); } else { luaL_checktype(l, 3, LUA_TNIL); } } if (!token) { if (!weight) { luaL_error(l, "weight of 0 is invalid for min/max ring_position"); } token = weight < 0 ? dht::maximum_token() : dht::minimum_token(); } push_userdata(l, *token, std::move(pk), weight); return 1; } int tri_cmp_to_int(std::strong_ordering r) { if (r < 0) { return -1; } else if (r == 0) { return 0; } else { return 1; } } int ring_position_tri_cmp_l(lua_State* l) { const auto& a = pop_userdata_as_ref(l, 1); const auto& b = pop_userdata_as_ref(l, 2); lua_pop(l, 2); const auto& s = get_schema_l(l); lua_pushinteger(l, tri_cmp_to_int(dht::ring_position_tri_compare(s, a, b))); return 1; } int ring_position_index_l(lua_State* l) { const auto& rp = pop_userdata_as_ref(l, 1); auto field = lua_tostring(l, 2); lua_pop(l, 2); if (strcmp(field, "tri_cmp") == 0) { lua_pushcfunction(l, ring_position_tri_cmp_l); return 1; } else if (strcmp(field, "token") == 0) { if (rp.token()._kind == dht::token_kind::key) { lua_pushinteger(l, rp.token()._data); return 1; } } else if (strcmp(field, "key") == 0) { if (rp.key()) { push_userdata(l, *rp.key()); return 1; } } else if (strcmp(field, "weight") == 0) { lua_pushinteger(l, rp.weight()); return 1; } return 0; } int position_in_partition_tri_cmp_l(lua_State* l) { const auto& a = pop_userdata_as_ref(l, 1); const auto& b = pop_userdata_as_ref(l, 2); lua_pop(l, 2); const auto& s = get_schema_l(l); position_in_partition::tri_compare cmp(s); lua_pushinteger(l, tri_cmp_to_int(cmp(a, b))); return 1; } int position_in_partition_index_l(lua_State* l) { const auto& pos = pop_userdata_as_ref(l, 1); auto field = lua_tostring(l, 2); lua_pop(l, 2); if (strcmp(field, "tri_cmp") == 0) { lua_pushcfunction(l, position_in_partition_tri_cmp_l); return 1; } else if (strcmp(field, "key") == 0) { if (pos.has_key()) { push_userdata(l, pos.key()); return 1; } } else if (strcmp(field, "weight") == 0) { lua_pushinteger(l, int(pos.get_bound_weight())); return 1; } return 0; } int new_position_in_partition_l(lua_State* l) { const auto weight = normalize_weight(lua_tointeger(l, 1)); std::optional ck; if (lua_gettop(l) == 2) { if (lua_type(l, 2) == LUA_TUSERDATA) { ck = pop_userdata(l, 2); } else { luaL_checktype(l, 2, LUA_TNIL); } } if (!ck) { if (!weight) { luaL_error(l, "weight of 0 is invalid for nil min/max position_in_partition"); } } push_userdata(l, partition_region::clustered, bound_weight(weight), std::move(ck)); return 1; } template int unserialize_key_l(lua_State* l) { const auto& s = get_schema_l(l); auto key_str = lua_tostring(l, 1); lua_pop(l, 2); lw_shared_ptr> type; managed_bytes value; try { value = managed_bytes(from_hex(key_str)); if constexpr (AllowPrefix == allow_prefixes::no) { type = s.partition_key_type(); } else { type = s.clustering_key_type(); } type->validate(value); if constexpr (AllowPrefix == allow_prefixes::no) { push_userdata(l, partition_key::from_bytes(value)); } else { push_userdata(l, clustering_key::from_bytes(value)); } return 1; } catch (const std::exception& e) { return luaL_error(l, "unserialize partition key: %s", e.what()); } } int token_of_l(lua_State* l) { const auto& s = pop_userdata_ref(l, 1); const auto& k = pop_userdata_as_ref(l, 1); lua_pop(l, 2); auto t = dht::get_token(s, k); lua_pushinteger(l, t._data); return 1; } int sstable_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& sst = pop_userdata_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "filename") == 0) { lua::push_sstring(l, fmt::to_string(sst.get_filename())); return 1; } return 0; } int partition_start_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& ps = pop_userdata_as_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "key") == 0) { push_userdata(l, ps.key().key()); return 1; } else if (strcmp(field, "token") == 0) { lua_pushinteger(l, ps.key().token()._data); return 1; } else if (strcmp(field, "tombstone") == 0) { if (!ps.partition_tombstone()) { return 0; } push_userdata(l, ps.partition_tombstone()); return 1; } return 0; } void push_gc_clock_time_point(lua_State* l, gc_clock::time_point tp) { auto t = gc_clock::to_time_t(tp); std::tm tm{}; // This time is actually UTC, but C API assumes local-times and there is no // way to specify the TZ used. So we lie about it being local-time to avoid // libC doing TZ adjustments. localtime_r(&t, &tm); lua_createtable(l, 0, 6); luaL_setmetatable(l, get_metatable_name()); lua_pushinteger(l, 1900 + tm.tm_year); lua_setfield(l, -2, "year"); lua_pushinteger(l, 1 + tm.tm_mon); lua_setfield(l, -2, "month"); lua_pushinteger(l, tm.tm_mday); lua_setfield(l, -2, "day"); lua_pushinteger(l, tm.tm_hour); lua_setfield(l, -2, "hour"); lua_pushinteger(l, tm.tm_min); lua_setfield(l, -2, "min"); lua_pushinteger(l, tm.tm_sec); lua_setfield(l, -2, "sec"); } gc_clock::time_point get_gc_clock_time_point_l(lua_State* l) { std::tm tm{}; tm.tm_isdst = -1; lua_getfield(l, 1, "year"); tm.tm_year = lua_tointeger(l, -1) - 1900; lua_pop(l, 1); lua_getfield(l, 1, "month"); tm.tm_mon = lua_tointeger(l, -1) - 1; lua_pop(l, 1); lua_getfield(l, 1, "day"); tm.tm_mday = lua_tointeger(l, -1); lua_pop(l, 1); lua_getfield(l, 1, "hour"); tm.tm_hour = lua_tointeger(l, -1); lua_pop(l, 1); lua_getfield(l, 1, "min"); tm.tm_min = lua_tointeger(l, -1); lua_pop(l, 1); lua_getfield(l, 1, "sec"); tm.tm_sec = lua_tointeger(l, -1); lua_pop(l, 1); lua_pop(l, 1); return gc_clock::from_time_t(std::mktime(&tm)); } template int gc_clock_time_point_binary_op_l(lua_State* l) { auto b = get_gc_clock_time_point_l(l); lua_pop(l, 1); auto a = get_gc_clock_time_point_l(l); lua_pop(l, 1); Op op; lua_pushboolean(l, op(a, b)); return 1; } int gc_clock_time_point_tostring_l(lua_State* l) { auto tp = get_gc_clock_time_point_l(l); lua::push_sstring(l, format("{:%F %T}z", fmt::gmtime(gc_clock::to_time_t(tp)))); return 1; } int gc_clock_time_point_now_l(lua_State* l) { push_gc_clock_time_point(l, gc_clock::now()); return 1; } int gc_clock_time_point_from_string_l(lua_State* l) { try { auto tp = gc_clock::time_point(gc_clock::duration(timestamp_from_string(lua_tostring(l, 1)) / 1000)); lua_pop(l, 1); push_gc_clock_time_point(l, tp); } catch (const std::exception& e) { return luaL_error(l, "failed to parse gc_clock_deletion_time from string: %s", e.what()); } return 1; } int tombstone_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto tomb = pop_userdata(l, 1); lua_pop(l, 2); if (strcmp(field, "timestamp") == 0) { lua_pushinteger(l, tomb.timestamp); return 1; } else if (strcmp(field, "deletion_time") == 0) { push_gc_clock_time_point(l, tomb.deletion_time); return 1; } return 0; } int data_value_tostring_l(lua_State* l) { auto& dv = pop_userdata_as_ref(l, 1); lua_pop(l, 1); auto type = dv.type(); if (auto bvopt = dv.serialize()) { lua::push_sstring(l, type->to_string(*bvopt)); } else { lua_pushliteral(l, ""); } return 1; } int data_value_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& dv = pop_userdata_as_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "value") == 0) { lua::push_data_value(l, dv); return 1; } return 0; } struct counter_shards_value { data_type type; managed_bytes data; }; int counter_shards_value_tostring_l(lua_State* l) { auto& csv = pop_userdata_as_ref(l, 1); lua_pop(l, 1); auto cv = counter_cell_view(atomic_cell_view::from_bytes(*csv.type, csv.data)); lua::push_sstring(l, format("{}", cv.total_value())); return 1; } int counter_shards_value_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& csv = pop_userdata_as_ref(l, 1); lua_pop(l, 2); auto cv = counter_cell_view(atomic_cell_view::from_bytes(*csv.type, csv.data)); if (strcmp(field, "value") == 0) { lua::push_data_value(l, data_value(cv.total_value())); } else if (strcmp(field, "shards") == 0) { lua_createtable(l, 0, 0); int i = 1; // Lua arrays start from 1 for (const auto& shard : cv.shards()) { lua_createtable(l, 0, 3); lua::push_sstring(l, shard.id().to_sstring()); lua_setfield(l, -2, "id"); lua_pushinteger(l, shard.value()); lua_setfield(l, -2, "value"); lua_pushinteger(l, shard.logical_clock()); lua_setfield(l, -2, "clock"); lua_seti(l, -2, i++); } return 1; } return 0; } struct atomic_cell_with_type { atomic_cell data; data_type type; atomic_cell_with_type(atomic_cell_view data_view, data_type type) : data(*type, data_view), type(std::move(type)) { } }; int atomic_cell_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& ct = pop_userdata_as_ref(l, 1); auto& cell = ct.data; auto& type = ct.type; lua_pop(l, 2); if (strcmp(field, "timestamp") == 0) { lua_pushinteger(l, cell.timestamp()); return 1; } else if (strcmp(field, "type") == 0) { if (type->is_counter()) { if (cell.is_counter_update()) { lua_pushliteral(l, "counter-update"); } else { lua_pushliteral(l, "counter-shards"); } } else if (type->is_collection()) { if (type->is_atomic()) { lua_pushliteral(l, "frozen-collection"); } else { lua_pushliteral(l, "collection"); } } else { lua_pushliteral(l, "regular"); } return 1; } else if (strcmp(field, "is_live") == 0) { lua_pushboolean(l, cell.is_live()); return 1; } else if (strcmp(field, "has_ttl") == 0) { lua_pushboolean(l, cell.is_live_and_has_ttl()); return 1; } else if (strcmp(field, "ttl") == 0) { if (!cell.is_live_and_has_ttl()) { return 0; } lua_pushinteger(l, std::chrono::duration_cast(cell.ttl()).count()); return 1; } else if (strcmp(field, "expiry") == 0) { if (!cell.is_live_and_has_ttl()) { return 0; } push_gc_clock_time_point(l, cell.expiry()); return 1; } else if (strcmp(field, "deletion_time") == 0) { if (!cell.is_dead(gc_clock::now())) { return 0; } push_gc_clock_time_point(l, cell.deletion_time()); return 1; } else if (strcmp(field, "value") == 0) { if (!cell.is_live()) { return 0; } if (type->is_counter()) { if (cell.is_counter_update()) { push_userdata(l, cell.counter_update_value()); } else { push_userdata(l, counter_shards_value{type, managed_bytes(cell.serialize())}); } } else { push_userdata(l, type->deserialize(managed_bytes_view(cell.value()))); } return 1; } return 0; } struct collection_with_type { collection_mutation data; data_type type; collection_with_type(collection_mutation_view data_view, data_type type) : data(*type, data_view), type(std::move(type)) { } }; int collection_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& ct = pop_userdata_as_ref(l, 1); auto type = ct.type; lua_pop(l, 2); return collection_mutation_view(ct.data).with_deserialized(*type, [l, field, type] (const collection_mutation_view_description& mv) { if (strcmp(field, "tombstone") == 0) { if (!mv.tomb) { return 0; } push_userdata(l, mv.tomb); return 1; } else if (strcmp(field, "type") == 0) { lua_pushliteral(l, "collection"); return 1; } else if (strcmp(field, "values") == 0) { const auto size = mv.cells.size(); std::function push_key; std::function push_value; if (auto t = dynamic_cast(type.get())) { push_key = [l, t = t->name_comparator()] (size_t, bytes_view k) { push_userdata(l, t->deserialize(k)); }; push_value = [l, t = t->value_comparator()] (size_t, atomic_cell_view v) { push_userdata(l, v, t); }; } else if (auto t = dynamic_cast(type.get())) { push_key = [l] (size_t, bytes_view) { lua_pushliteral(l, ""); }; push_value = [l, t] (size_t i, atomic_cell_view v) { push_userdata(l, v, t->type(i)); }; } else { return 0; } lua_createtable(l, size, 0); for (size_t i = 0; i < size; ++i) { const auto& e = mv.cells[i]; lua_createtable(l, 0, 3); push_key(i, e.first); lua_setfield(l, -2, "key"); push_value(i, e.second); lua_setfield(l, -2, "value"); lua_seti(l, -2, i + 1); // Lua arrays start from 1 } return 1; } return 0; }); } int push_cells(lua_State* l, const row& cells, column_kind kind) { auto& schema = get_schema_l(l); lua_createtable(l, 0, 0); cells.for_each_cell([l, &schema, kind] (column_id id, const atomic_cell_or_collection& cell) { auto cdef = schema.column_at(kind, id); if (cdef.is_atomic()) { push_userdata(l, cell.as_atomic_cell(cdef), cdef.type); } else if (cdef.type->is_collection() || cdef.type->is_user_type()) { push_userdata(l, cell.as_collection_mutation(), cdef.type); } else { lua_pushnil(l); } lua_setfield(l, -2, cdef.name_as_text().data()); }); return 1; } int static_row_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& sr = pop_userdata_as_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "cells") == 0) { return push_cells(l, sr.cells(), column_kind::static_column); } return 0; } int row_marker_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& m = pop_userdata_as_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "timestamp") == 0) { lua_pushinteger(l, m.timestamp()); return 1; } else if (strcmp(field, "is_live") == 0) { lua_pushboolean(l, m.is_live()); return 1; } else if (strcmp(field, "has_ttl") == 0) { lua_pushboolean(l, m.is_expiring()); return 1; } else if (strcmp(field, "ttl") == 0) { if (!m.is_live() || !m.is_expiring()) { return 0; } lua_pushinteger(l, std::chrono::duration_cast(m.ttl()).count()); return 1; } else if (strcmp(field, "expiry") == 0) { if (!m.is_live() || !m.is_expiring()) { return 0; } push_gc_clock_time_point(l, m.expiry()); return 1; } else if (strcmp(field, "deletion_time") == 0) { if (!m.is_expiring() && !m.is_dead(gc_clock::now())) { return 0; } push_gc_clock_time_point(l, m.deletion_time()); return 1; } return 0; } int clustering_row_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& cr = pop_userdata_as_ref(l, 1); lua_pop(l, 2); if (strcmp(field, "key") == 0) { push_userdata(l, cr.key()); return 1; } else if (strcmp(field, "tombstone") == 0) { if (!cr.tomb()) { return 0; } push_userdata(l, cr.tomb().regular()); return 1; } else if (strcmp(field, "shadowable_tombstone") == 0) { if (!cr.tomb()) { return 0; } push_userdata(l, cr.tomb().shadowable().tomb()); return 1; } else if (strcmp(field, "marker") == 0) { if (cr.marker().is_missing()) { return 0; } push_userdata(l, cr.marker()); return 1; } else if (strcmp(field, "cells") == 0) { return push_cells(l, cr.cells(), column_kind::regular_column); } return 0; } int range_tombstone_change_tostring_l(lua_State* l) { auto& rtc = pop_userdata_as_ref(l, 1); lua_pop(l, 1); lua::push_sstring(l, format("{}", rtc)); return 1; } int range_tombstone_change_index_l(lua_State* l) { auto field = lua_tostring(l, 2); auto& rtc = pop_userdata_as_ref(l, 1); lua_pop(l, 2); const auto& pos = rtc.position(); if (strcmp(field, "key") == 0) { if (!pos.has_key()) { return 0; } push_userdata(l, pos.key()); return 1; } else if (strcmp(field, "key_weight") == 0) { lua_pushinteger(l, static_cast(pos.get_bound_weight())); return 1; } else if (strcmp(field, "tombstone") == 0) { if (!rtc.tombstone()) { return 0; } push_userdata(l, rtc.tombstone()); return 1; } else if (strcmp(field, "__tostring") == 0) { lua_pushcfunction(l, range_tombstone_change_tostring_l); return 1; } return 0; } class json_writer { tools::mutation_fragment_stream_json_writer _writer; private: static json_writer& get_this(lua_State* l) { return pop_userdata_as_ref(l, 1); } template static int do_invoke(lua_State* l, const char* name, Function&& fun) { try { fun(get_this(l)); } catch (const std::exception& e) { return luaL_error(l, "json_writer::%s(): %s", name, e.what()); } return 0; } template static int invoke(lua_State* l, const char* name, int expected_type, Function&& fun) { auto nargs = int(expected_type != LUA_TNIL) + 1; if (const auto n = lua_gettop(l); n != nargs) { return luaL_error(l, "json_writer::%s(): expected %I arguments, got %I", name, nargs, n); } const auto type = lua_type(l, 2); if (expected_type != LUA_TNIL && type != expected_type) { return luaL_error(l, "json_writer::%s(): expected argument of type %s, got %s", name, lua_typename(l, expected_type), lua_typename(l, type)); } return do_invoke(l, name, fun); } template static int invoke_optional_arg(lua_State* l, const char* name, int expected_type, Function&& fun) { auto nargs = 2; const auto n = lua_gettop(l); if (n > nargs) { return luaL_error(l, "json_writer::%s(): expected at most %I arguments, got %I", name, nargs, n); } if (n == 2) { if (const auto type = lua_type(l, 2); type != LUA_TNIL && type != expected_type) { return luaL_error(l, "json_writer::%s(): expected argument of type %s, got %s", name, lua_typename(l, expected_type), lua_typename(l, type)); } } return do_invoke(l, name, fun); } public: json_writer(const schema& s) : _writer(s) { } static int null_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.writer().Null(); }); } static int bool_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TBOOLEAN, [l] (json_writer& w) { w._writer.writer().Bool(lua_toboolean(l, 2)); }); } static int int_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNUMBER, [l] (json_writer& w) { w._writer.writer().Int64(lua_tointeger(l, 2)); }); } static int double_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNUMBER, [l] (json_writer& w) { w._writer.writer().Double(lua_tonumber(l, 2)); }); } static int string_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TSTRING, [l] (json_writer& w) { size_t len = 0; auto str = lua_tolstring(l, 2, &len); w._writer.writer().rjson_writer().String(str, len, false); }); } static int start_object_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.writer().StartObject(); }); } static int key_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TSTRING, [l] (json_writer& w) { size_t len = 0; auto str = lua_tolstring(l, 2, &len); w._writer.writer().rjson_writer().Key(str, len, false); }); } static int end_object_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.writer().EndObject(); }); } static int start_array_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.writer().StartArray(); }); } static int end_array_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.writer().EndArray(); }); } static int start_stream_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.start_stream(); }); } static int start_sstable_l(lua_State* l) { return invoke_optional_arg(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) { const sstables::sstable* sst = nullptr; if (lua_gettop(l) > 1 && lua_type(l, 2) != LUA_TNIL) { sst = &pop_userdata_ref(l, 2); } w._writer.start_sstable(sst); }); } static int start_partition_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) { w._writer.start_partition(pop_userdata_as_ref(l, 2)); }); } static int static_row_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) { w._writer.partition_element(pop_userdata_as_ref(l, 2)); }); } static int clustering_row_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) { w._writer.partition_element(pop_userdata_as_ref(l, 2)); }); } static int range_tombstone_change_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TUSERDATA, [l] (json_writer& w) { w._writer.partition_element(pop_userdata_as_ref(l, 2)); }); } static int end_partition_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.end_partition(); }); } static int end_sstable_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.end_sstable(); }); } static int end_stream_l(lua_State* l) { return invoke(l, __FUNCTION__, LUA_TNIL, [] (json_writer& w) { w._writer.end_stream(); }); } }; int json_writer_new_l(lua_State* l) { push_userdata(l, get_schema_l(l)); return 1; } int json_writer_index_l(lua_State* l) { auto field = lua_tostring(l, 2); lua_pop(l, 2); std::pair methods[] = { {"null", &json_writer::null_l}, {"bool", &json_writer::bool_l}, {"int", &json_writer::int_l}, {"double", &json_writer::double_l}, {"string", &json_writer::string_l}, {"start_object", &json_writer::start_object_l}, {"key", &json_writer::key_l}, {"end_object", &json_writer::end_object_l}, {"start_array", &json_writer::start_array_l}, {"end_array", &json_writer::end_array_l}, {"start_stream", &json_writer::start_stream_l}, {"start_sstable", &json_writer::start_sstable_l}, {"start_partition", &json_writer::start_partition_l}, {"static_row", &json_writer::static_row_l}, {"clustering_row", &json_writer::clustering_row_l}, {"range_tombstone_change", &json_writer::range_tombstone_change_l}, {"end_partition", &json_writer::end_partition_l}, {"end_sstable", &json_writer::end_sstable_l}, {"end_stream", &json_writer::end_stream_l}, }; for (const auto& [name, method] : methods) { if (strcmp(field, name) == 0) { lua_pushcfunction(l, method); return 1; } } return 0; } class lua_sstable_consumer : public sstable_consumer { schema_ptr _schema; reader_permit _permit; program_options::string_map _script_params; std::unique_ptr _l; private: void register_metatables() { auto* l = _l.get(); lua::register_metatables(l); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, column_definition_index_l); lua_setfield(l, -2, "__index"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, schema_index_l); lua_setfield(l, -2, "__index"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, key_index_l); lua_setfield(l, -2, "__index"); lua_pushcclosure(l, tostring_with_wrapper, 0); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, key_index_l); lua_setfield(l, -2, "__index"); lua_pushcclosure(l, tostring_with_wrapper, 0); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, ring_position_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, position_in_partition_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, sstable_index_l); lua_setfield(l, -2, "__index"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, partition_start_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); // type is used just to tag metatable, underlying type is a lua table luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, gc_clock_time_point_binary_op_l>); lua_setfield(l, -2, "__eq"); lua_pushcfunction(l, gc_clock_time_point_binary_op_l>); lua_setfield(l, -2, "__lt"); lua_pushcfunction(l, gc_clock_time_point_binary_op_l>); lua_setfield(l, -2, "__le"); lua_pushcfunction(l, gc_clock_time_point_tostring_l); lua_setfield(l, -2, "__tostring"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, tombstone_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, data_value_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, data_value_tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, counter_shards_value_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, counter_shards_value_tostring_l); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, atomic_cell_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, collection_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, static_row_index_l); lua_setfield(l, -2, "__index"); lua_pushcclosure(l, tostring_with_wrapper, 0); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, clustering_row_index_l); lua_setfield(l, -2, "__index"); lua_pushcclosure(l, tostring_with_wrapper, 0); lua_setfield(l, -2, "__tostring"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, row_marker_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, range_tombstone_change_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); luaL_newmetatable(l, get_metatable_name()); lua_pushcfunction(l, json_writer_index_l); lua_setfield(l, -2, "__index"); lua_pushcfunction(l, cxx_gc_l); lua_setfield(l, -2, "__gc"); lua_pop(l, 1); } void register_types() { auto* l = _l.get(); const luaL_Reg scylla_lib[] = { {"now", gc_clock_time_point_now_l}, {"time_point_from_string", gc_clock_time_point_from_string_l}, {"new_json_writer", json_writer_new_l}, {"new_ring_position", new_ring_position_l}, {"new_position_in_partition", new_position_in_partition_l}, {"unserialize_partition_key", unserialize_key_l}, {"unserialize_clustering_key", unserialize_key_l}, {"token_of", token_of_l}, {NULL, NULL}, }; luaL_newlib(l, scylla_lib); lua_setglobal(l, "Scylla"); } void register_globals() { auto* l = _l.get(); push_userdata_ref(l, *_schema.get()); lua_setglobal(l, "schema"); } future call(std::string_view func_name, int nargs) { auto* l = _l.get(); int ret = LUA_YIELD; int nresults = 0; while (ret == LUA_YIELD) { ret = lua_resume(l, nullptr, nargs LUA_504_PLUS(, &nresults)); if (ret == LUA_YIELD) { if (nresults == 0) { co_await coroutine::maybe_yield(); } else if (nresults != 1) { throw std::runtime_error(fmt::format("{} failed: unexpected number of results yielded, expected 1, got {}", func_name, nresults)); } else { if (!lua_isuserdata(l, -1)) { throw std::runtime_error(fmt::format("{} failed: only future<> is allowed to be yielded from a coroutine", func_name)); } co_await pop_userdata>(l, -1); lua_pop(l, 1); } } } if (ret != LUA_OK) { throw std::runtime_error(fmt::format("{} failed: {}", func_name, lua_tostring(l, -1))); } co_return nresults; } struct invoke_meta { int nargs; bool returns_stop_iteration; }; template future invoke_script_method(const char* name, Func&& prepare_stack) { auto* l = _l.get(); // basic stack sanity check if (auto n = lua_gettop(l); n) { throw std::runtime_error(fmt::format("{}() failed: stack expected to be empty (clean), but got {} items", name, n)); } const auto type = lua_getglobal(l, name); // check whether script has this method, short-circuit if not if (type == LUA_TNIL) { lua_pop(l, 1); co_return stop_iteration::no; } // is symbol callable? if (type != LUA_TFUNCTION) { throw std::runtime_error(fmt::format("{}() failed: symbol is not callable", name)); } const auto desc = prepare_stack(); // sanity check for argument count // function to be called is also on the stack, account for that if (auto n = lua_gettop(l); n != desc.nargs + 1) { throw std::runtime_error(fmt::format("{}() failed: unexpected number of arguments for function, expected: {}, got: {}", name, desc.nargs, n - 1)); } // call + sanity check for return value count const auto nret = co_await call(name, desc.nargs); if (nret > int(desc.returns_stop_iteration)) { throw std::runtime_error(fmt::format("{}() failed: unexpected number of return values from function, expected: {}, got: {}", name, int(desc.returns_stop_iteration), nret)); } // extract and convert return value if method has it (it can only be stop_iteration) // we allow method to return nothing, int this case we assume stop_iteration::no auto stop = stop_iteration::no; if (desc.returns_stop_iteration && nret) { // lua method return false when they don't want to continue // stop_iteration expressed in boolean is too counter-intuitive stop = stop_iteration(!lua_toboolean(l, 1)); lua_pop(l, 1); } co_return stop; } public: explicit lua_sstable_consumer(schema_ptr s, reader_permit p, program_options::string_map script_params) : _schema(std::move(s)) , _permit(std::move(p)) , _script_params(std::move(script_params)) , _l(lua_newstate(lua_alloc, nullptr LUA_505_PLUS(, lua_random_seed()))) { } future<> load_script(std::string_view script_name, std::string_view script) { auto* l = _l.get(); // load lua standard libraries for (const luaL_Reg* lib = loadedlibs; lib->func; lib++) { luaL_requiref(l, lib->name, lib->func, 1); lua_pop(l, 1); } register_metatables(); register_types(); register_globals(); // parse and load script if (luaL_loadbuffer(l, script.data(), script.size(), script_name.data())) { throw std::runtime_error(fmt::format("Failed to load {}: {}", script_name, lua_tostring(l, -1))); } // execute script (runs global code) if (auto n = co_await call(script_name, 0); n) { throw std::runtime_error(fmt::format("Failed to execute {}: unexpected return value, expected 0, got {}", script_name, n)); } co_return; } virtual future<> consume_stream_start() override { return invoke_script_method(__FUNCTION__, [this] { auto* l = _l.get(); lua_newtable(l); for (const auto& [k, v] : _script_params) { lua::push_sstring(l, v); lua_setfield(l, -2, k.data()); } return invoke_meta{1, false}; }).discard_result(); } virtual future consume_sstable_start(const sstables::sstable* const sst) override { return invoke_script_method(__FUNCTION__, [this, sst] { auto* l = _l.get(); if (sst) { push_userdata_ref(l, *sst); } else { lua_pushnil(l); } return invoke_meta{1, true}; }); } virtual future consume(partition_start&& ps) override { return invoke_script_method("consume_partition_start", [this, ps = std::move(ps)] () mutable { auto* l = _l.get(); push_userdata(l, std::move(ps)); return invoke_meta{1, true}; }); } virtual future consume(static_row&& sr) override { return invoke_script_method("consume_static_row", [this, sr = std::move(sr)] () mutable { auto* l = _l.get(); push_userdata(l, std::move(sr)); return invoke_meta{1, true}; }); } virtual future consume(clustering_row&& cr) override { return invoke_script_method("consume_clustering_row", [this, cr = std::move(cr)] () mutable { auto* l = _l.get(); push_userdata(l, std::move(cr)); return invoke_meta{1, true}; }); } virtual future consume(range_tombstone_change&& rtc) override { return invoke_script_method("consume_range_tombstone_change", [this, rtc = std::move(rtc)] () mutable { auto* l = _l.get(); push_userdata(l, std::move(rtc)); return invoke_meta{1, true}; }); } virtual future consume(partition_end&& pe) override { return invoke_script_method("consume_partition_end", [] { return invoke_meta{0, true}; }); } virtual future consume_sstable_end() override { return invoke_script_method(__FUNCTION__, [] { return invoke_meta{0, true}; }); } virtual future<> consume_stream_end() override { return invoke_script_method(__FUNCTION__, [] { return invoke_meta{0, false}; }).discard_result(); } }; } future> make_lua_sstable_consumer(schema_ptr s, reader_permit p, std::string_view script_path, program_options::string_map script_args) { auto consumer = std::make_unique(std::move(s), std::move(p), std::move(script_args)); auto file = co_await open_file_dma(script_path, open_flags::ro); auto fstream = make_file_input_stream(file); auto script = co_await util::read_entire_stream_contiguous(fstream); co_await consumer->load_script(script_path, script); co_return consumer; }