And switch to std::source_location.
Upcoming seastar update will deprecate its compatibility layer.
The patch is
for f in $(git grep -l 'seastar::compat::source_location'); do
sed -e 's/seastar::compat::source_location/std::source_location/g' -i $f;
done
and removal of few header includes.
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Closes scylladb/scylladb#27309
896 lines
32 KiB
C++
896 lines
32 KiB
C++
/*
|
|
* Copyright (C) 2025-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <fmt/std.h>
|
|
#include <seastar/core/queue.hh>
|
|
|
|
#include "tools/json_mutation_stream_parser.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "utils/rjson.hh"
|
|
|
|
namespace tools {
|
|
namespace {
|
|
|
|
using reader = rapidjson::GenericReader<rjson::encoding, rjson::encoding, rjson::allocator>;
|
|
class stream {
|
|
public:
|
|
using Ch = char;
|
|
private:
|
|
input_stream<Ch> _is;
|
|
temporary_buffer<Ch> _current;
|
|
size_t _pos = 0;
|
|
size_t _line = 1;
|
|
size_t _last_lf_pos = 0;
|
|
private:
|
|
void maybe_read_some() {
|
|
if (!_current.empty()) {
|
|
return;
|
|
}
|
|
_current = _is.read().get();
|
|
// EOS is encoded as null char
|
|
if (_current.empty()) {
|
|
_current = temporary_buffer<Ch>("\0", 1);
|
|
}
|
|
}
|
|
public:
|
|
stream(input_stream<Ch> is) : _is(std::move(is)) {
|
|
maybe_read_some();
|
|
}
|
|
stream(stream&&) = default;
|
|
~stream() {
|
|
_is.close().get();
|
|
}
|
|
Ch Peek() const {
|
|
return *_current.get();
|
|
}
|
|
Ch Take() {
|
|
auto c = Peek();
|
|
if (c == '\n') {
|
|
++_line;
|
|
++_last_lf_pos = _pos;
|
|
}
|
|
++_pos;
|
|
_current.trim_front(1);
|
|
maybe_read_some();
|
|
return c;
|
|
}
|
|
size_t Tell() {
|
|
return _pos;
|
|
}
|
|
// ostream methods, unused but need a definition
|
|
Ch* PutBegin() { return nullptr; }
|
|
void Put(Ch c) { }
|
|
void Flush() { }
|
|
size_t PutEnd(Ch* begin) { return 0; }
|
|
// own methods
|
|
size_t line() const {
|
|
return _line;
|
|
}
|
|
size_t last_line_feed_pos() const {
|
|
return _last_lf_pos;
|
|
}
|
|
};
|
|
class handler {
|
|
public:
|
|
using Ch = char;
|
|
private:
|
|
enum class state {
|
|
start,
|
|
before_partition,
|
|
in_partition,
|
|
before_key,
|
|
in_key,
|
|
before_tombstone,
|
|
in_tombstone,
|
|
before_static_columns,
|
|
before_clustering_elements,
|
|
before_clustering_element,
|
|
in_clustering_element,
|
|
in_range_tombstone_change,
|
|
in_clustering_row,
|
|
before_marker,
|
|
in_marker,
|
|
before_clustering_columns,
|
|
before_column_key,
|
|
before_column,
|
|
in_column,
|
|
before_ignored_value,
|
|
before_integer,
|
|
before_string,
|
|
before_bool,
|
|
};
|
|
struct column {
|
|
const column_definition* def = nullptr;
|
|
std::optional<bool> is_live;
|
|
std::optional<api::timestamp_type> timestamp;
|
|
std::optional<bytes> value;
|
|
std::optional<gc_clock::time_point> deletion_time;
|
|
|
|
explicit column(const column_definition* def) : def(def) { }
|
|
};
|
|
struct tombstone {
|
|
std::optional<api::timestamp_type> timestamp;
|
|
std::optional<gc_clock::time_point> deletion_time;
|
|
};
|
|
private:
|
|
schema_ptr _schema;
|
|
reader_permit _permit;
|
|
queue<mutation_fragment_v2_opt>& _queue;
|
|
logger& _logger;
|
|
circular_buffer<state> _state_stack;
|
|
std::string _key; // last seen key
|
|
bool _partition_start_emited = false;
|
|
bool _is_shadowable = false; // currently processed tombstone is a shadowable one
|
|
std::optional<bool> _bool;
|
|
std::optional<int64_t> _integer;
|
|
std::optional<std::string_view> _string;
|
|
std::optional<partition_key> _pkey;
|
|
std::optional<tombstone> _tombstone;
|
|
std::optional<clustering_key> _ckey;
|
|
std::optional<bound_weight> _bound_weight;
|
|
std::optional<row_marker> _row_marker;
|
|
std::optional<row_tombstone> _row_tombstone;
|
|
std::optional<row> _row;
|
|
std::optional<column> _column;
|
|
std::optional<gc_clock::duration> _ttl;
|
|
std::optional<gc_clock::time_point> _expiry;
|
|
private:
|
|
static std::string_view to_string(state s) {
|
|
switch (s) {
|
|
case state::start: return "start";
|
|
case state::before_partition: return "before_partition";
|
|
case state::in_partition: return "in_partition";
|
|
case state::before_key: return "before_key";
|
|
case state::in_key: return "in_key";
|
|
case state::before_tombstone: return "before_tombstone";
|
|
case state::in_tombstone: return "in_tombstone";
|
|
case state::before_static_columns: return "before_static_columns";
|
|
case state::before_clustering_elements: return "before_clustering_elements";
|
|
case state::before_clustering_element: return "before_clustering_element";
|
|
case state::in_clustering_element: return "in_clustering_element";
|
|
case state::in_range_tombstone_change: return "in_range_tombstone_change";
|
|
case state::in_clustering_row: return "in_clustering_row";
|
|
case state::before_marker: return "before_marker";
|
|
case state::in_marker: return "in_marker";
|
|
case state::before_clustering_columns: return "before_clustering_columns";
|
|
case state::before_column_key: return "before_column_key";
|
|
case state::before_column: return "before_column";
|
|
case state::in_column: return "in_column";
|
|
case state::before_ignored_value: return "before_ignored_value";
|
|
case state::before_integer: return "before_integer";
|
|
case state::before_string: return "before_string";
|
|
case state::before_bool: return "before_bool";
|
|
}
|
|
std::abort();
|
|
}
|
|
|
|
std::string stack_to_string() const {
|
|
return fmt::to_string(fmt::join(_state_stack | std::views::transform([] (state s) { return to_string(s); }), "|"));
|
|
}
|
|
|
|
template<typename... Args>
|
|
bool error(fmt::format_string<Args...> fmt, Args&&... args) {
|
|
auto parse_error = fmt::format(fmt, std::forward<Args>(args)...);
|
|
_logger.trace("{}", parse_error);
|
|
_queue.abort(std::make_exception_ptr(std::runtime_error(parse_error)));
|
|
return false;
|
|
}
|
|
|
|
bool emit(mutation_fragment_v2 mf) {
|
|
_logger.trace("emit({})", mf.mutation_fragment_kind());
|
|
_queue.push_eventually(std::move(mf)).get();
|
|
return true;
|
|
}
|
|
|
|
bool parse_partition_key() {
|
|
try {
|
|
auto raw = from_hex(*_string);
|
|
_pkey.emplace(partition_key::from_bytes(raw));
|
|
} catch (...) {
|
|
return error("failed to parse partition key from raw string: {}", fmt::streamed(std::current_exception()));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool parse_clustering_key() {
|
|
try {
|
|
auto raw = from_hex(*_string);
|
|
_ckey.emplace(clustering_key::from_bytes(raw));
|
|
} catch (...) {
|
|
return error("failed to parse clustering key from raw string: {}", fmt::streamed(std::current_exception()));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool parse_bound_weight() {
|
|
switch (*_integer) {
|
|
case -1:
|
|
_bound_weight.emplace(bound_weight::before_all_prefixed);
|
|
return true;
|
|
case 0:
|
|
_bound_weight.emplace(bound_weight::equal);
|
|
return true;
|
|
case 1:
|
|
_bound_weight.emplace(bound_weight::after_all_prefixed);
|
|
return true;
|
|
default:
|
|
return error("failed to parse bound weight: {} is not a valid bound weight value", *_integer);
|
|
}
|
|
}
|
|
|
|
bool parse_deletion_time() {
|
|
try {
|
|
auto dt = gc_clock::time_point(gc_clock::duration(timestamp_from_string(*_string) / 1000));
|
|
if (top(1) == state::in_column) {
|
|
_column->deletion_time = dt;
|
|
} else {
|
|
_tombstone->deletion_time = dt;
|
|
}
|
|
return true;
|
|
} catch (...) {
|
|
return error("failed to parse deletion_time: {}", std::current_exception());
|
|
}
|
|
}
|
|
|
|
bool parse_ttl() {
|
|
auto e = _string->end();
|
|
if (*std::prev(e) == 's') {
|
|
--e;
|
|
}
|
|
uint64_t ttl;
|
|
std::stringstream ss(std::string(_string->begin(), e));
|
|
ss >> ttl;
|
|
if (ss.fail()) {
|
|
return error("failed to parse ttl value of {}", _string);
|
|
}
|
|
_ttl = gc_clock::duration(ttl);
|
|
return true;
|
|
}
|
|
|
|
bool parse_expiry() {
|
|
try {
|
|
_expiry = gc_clock::time_point(gc_clock::duration(timestamp_from_string(*_string) / 1000));
|
|
} catch (...) {
|
|
return error("failed to parse expiry: {}", std::current_exception());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
std::optional<::tombstone> get_tombstone() {
|
|
if (bool(_tombstone->timestamp) != bool(_tombstone->deletion_time)) {
|
|
error("incomplete tombstone: timestamp or deletion-time have to be either both present or missing");
|
|
return {};
|
|
}
|
|
if (!_tombstone->timestamp) {
|
|
_tombstone.reset();
|
|
return ::tombstone{};
|
|
}
|
|
auto tomb = ::tombstone(*_tombstone->timestamp, *_tombstone->deletion_time);
|
|
_tombstone.reset();
|
|
return tomb;
|
|
}
|
|
|
|
bool finalize_partition_start(::tombstone tomb = {}) {
|
|
auto pkey = std::exchange(_pkey, {});
|
|
if (!pkey) {
|
|
return error("failed to finalize partition start: no partition key");
|
|
}
|
|
partition_start ps(dht::decorate_key(*_schema, *pkey), tomb);
|
|
_partition_start_emited = true;
|
|
return emit(mutation_fragment_v2(*_schema, _permit, std::move(ps)));
|
|
}
|
|
|
|
bool finalize_static_row() {
|
|
if (!_row) {
|
|
return error("failed to finalize clustering row: row is not initialized yet");
|
|
}
|
|
auto row = std::exchange(_row, {});
|
|
auto sr = static_row(std::move(*row));
|
|
return emit(mutation_fragment_v2(*_schema, _permit, std::move(sr)));
|
|
}
|
|
|
|
bool finalize_range_tombstone_change() {
|
|
if (!_bound_weight) {
|
|
return error("failed to finalize range tombstone change: missing bound weight");
|
|
}
|
|
if (*_bound_weight == bound_weight::equal) {
|
|
return error("failed to finalize range tombstone change: bound_weight::equal is not valid for range tombstones changes");
|
|
}
|
|
if (!_row_tombstone) {
|
|
return error("failed to finalize range tombstone change: missing tombstone");
|
|
}
|
|
clustering_key ckey = clustering_key::make_empty();
|
|
if (_ckey) {
|
|
ckey = std::move(*std::exchange(_ckey, {}));
|
|
}
|
|
auto pos = position_in_partition(partition_region::clustered, *std::exchange(_bound_weight, {}), std::move(ckey));
|
|
auto tomb = std::exchange(_row_tombstone, {})->tomb();
|
|
auto rtc = range_tombstone_change(std::move(pos), std::move(tomb));
|
|
return emit(mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
|
|
}
|
|
|
|
bool finalize_row_marker() {
|
|
if (!_row_marker) {
|
|
return error("failed to finalize row marker: it has no timestamp");
|
|
}
|
|
if (bool(_expiry) != bool(_ttl)) {
|
|
return error("failed to finalize row marker: ttl and expiry must either be both present or both missing");
|
|
}
|
|
if (!_expiry && !_ttl) {
|
|
return true;
|
|
}
|
|
_row_marker->apply(row_marker(_row_marker->timestamp(), *std::exchange(_ttl, {}), *std::exchange(_expiry, {})));
|
|
return true;
|
|
}
|
|
|
|
bool parse_column_value() {
|
|
try {
|
|
_column->value.emplace(_column->def->type->from_string(*_string));
|
|
} catch (...) {
|
|
return error("failed to parse cell value: {}", std::current_exception());
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool finalize_column() {
|
|
if (!_row) {
|
|
return error("failed to finalize cell: row not initialized yet");
|
|
}
|
|
if (!_column->is_live || !_column->timestamp) {
|
|
return error("failed to finalize cell: required fields is_live and/or timestamp missing");
|
|
}
|
|
if (*_column->is_live && !_column->value) {
|
|
return error("failed to finalize cell: live cell doesn't have data");
|
|
}
|
|
if (!*_column->is_live && !_column->deletion_time) {
|
|
return error("failed to finalize cell: dead cell doesn't have deletion time");
|
|
}
|
|
if (bool(_expiry) != bool(_ttl)) {
|
|
return error("failed to finalize cell: ttl and expiry must either be both present or both missing");
|
|
}
|
|
if (*_column->is_live) {
|
|
if (_ttl) {
|
|
_row->apply(*_column->def, ::atomic_cell::make_live(*_column->def->type, *_column->timestamp, *_column->value,
|
|
*std::exchange(_expiry, {}), *std::exchange(_ttl, {})));
|
|
} else {
|
|
_row->apply(*_column->def, ::atomic_cell::make_live(*_column->def->type, *_column->timestamp, *_column->value));
|
|
}
|
|
} else {
|
|
_row->apply(*_column->def, ::atomic_cell::make_dead(*_column->timestamp, *_column->deletion_time));
|
|
}
|
|
_column.reset();
|
|
return true;
|
|
}
|
|
|
|
bool finalize_clustering_row() {
|
|
if (!_ckey) {
|
|
return error("failed to finalize clustering row: missing clustering key");
|
|
}
|
|
if (!_row) {
|
|
return error("failed to finalize clustering row: row is not initialized yet");
|
|
}
|
|
auto row = std::exchange(_row, {});
|
|
auto tomb = std::exchange(_row_tombstone, {});
|
|
auto marker = std::exchange(_row_marker, {});
|
|
auto cr = clustering_row(
|
|
std::move(*_ckey),
|
|
tomb.value_or(row_tombstone{}),
|
|
marker.value_or(row_marker{}),
|
|
std::move(*row));
|
|
return emit(mutation_fragment_v2(*_schema, _permit, std::move(cr)));
|
|
}
|
|
|
|
bool finalize_partition() {
|
|
_partition_start_emited = false;
|
|
return emit(mutation_fragment_v2(*_schema, _permit, partition_end{}));
|
|
}
|
|
|
|
struct retire_state_result {
|
|
bool ok = true;
|
|
unsigned pop_states = 1;
|
|
std::optional<state> next_state;
|
|
};
|
|
retire_state_result handle_retire_state() {
|
|
_logger.trace("handle_retire_state(): stack={}", stack_to_string());
|
|
retire_state_result ret;
|
|
switch (top()) {
|
|
case state::before_partition:
|
|
// EOS
|
|
_queue.push_eventually({}).get();
|
|
break;
|
|
case state::in_partition:
|
|
ret.ok = finalize_partition();
|
|
break;
|
|
case state::in_key:
|
|
ret.pop_states = 2;
|
|
break;
|
|
case state::in_tombstone:
|
|
ret.pop_states = 2;
|
|
{
|
|
auto is_shadowable = std::exchange(_is_shadowable, false);
|
|
auto tomb = get_tombstone();
|
|
if (!tomb) {
|
|
ret.ok = false;
|
|
break;
|
|
}
|
|
if (top(2) == state::in_partition) {
|
|
ret.ok = finalize_partition_start(*tomb);
|
|
} else if (top(2) == state::in_range_tombstone_change) {
|
|
_row_tombstone.emplace(*tomb);
|
|
} else if (top(2) == state::in_clustering_row) {
|
|
if (is_shadowable) {
|
|
if (!_row_tombstone) {
|
|
ret.ok = error("cannot apply shadowable tombstone, row tombstone not initialized yet");
|
|
break;
|
|
}
|
|
_row_tombstone->apply(shadowable_tombstone(*tomb), {});
|
|
} else {
|
|
_row_tombstone.emplace(*tomb);
|
|
}
|
|
} else {
|
|
ret.ok = error("retiring in_tombstone state in invalid context: {}", stack_to_string());
|
|
}
|
|
}
|
|
break;
|
|
case state::in_marker:
|
|
ret.pop_states = 2;
|
|
ret.ok = finalize_row_marker();
|
|
break;
|
|
case state::in_column:
|
|
ret.pop_states = 2;
|
|
ret.ok = finalize_column();
|
|
break;
|
|
case state::before_column_key:
|
|
if (top(1) == state::before_static_columns) {
|
|
ret.ok = finalize_static_row();
|
|
}
|
|
ret.pop_states = 2;
|
|
break;
|
|
case state::before_clustering_element:
|
|
ret.pop_states = 2;
|
|
break;
|
|
case state::in_range_tombstone_change:
|
|
ret.pop_states = 2;
|
|
ret.ok = finalize_range_tombstone_change();
|
|
break;
|
|
case state::in_clustering_row:
|
|
ret.pop_states = 2;
|
|
ret.ok = finalize_clustering_row();
|
|
break;
|
|
case state::before_ignored_value:
|
|
break;
|
|
case state::before_bool:
|
|
if (top(1) == state::in_column) {
|
|
_column->is_live = _bool;
|
|
}
|
|
_bool.reset();
|
|
break;
|
|
case state::before_integer:
|
|
if (top(1) == state::in_tombstone) {
|
|
_tombstone->timestamp = _integer.value();
|
|
}
|
|
if (top(1) == state::in_range_tombstone_change) {
|
|
ret.ok = parse_bound_weight();
|
|
}
|
|
if (top(1) == state::in_column) {
|
|
_column->timestamp = _integer;
|
|
}
|
|
if (top(1) == state::in_marker) {
|
|
_row_marker.emplace(_integer.value());
|
|
}
|
|
_integer.reset();
|
|
break;
|
|
case state::before_string:
|
|
if (top(1) == state::in_key) {
|
|
if (top(3) == state::in_partition) {
|
|
ret.ok = parse_partition_key();
|
|
} else if (top(3) == state::in_clustering_row || top(3) == state::in_range_tombstone_change) {
|
|
ret.ok = parse_clustering_key();
|
|
}
|
|
} else if (top(1) == state::in_tombstone) {
|
|
ret.ok = parse_deletion_time();
|
|
} else if (top(1) == state::in_marker) {
|
|
if (_key == "ttl") {
|
|
ret.ok = parse_ttl();
|
|
} else {
|
|
ret.ok = parse_expiry();
|
|
}
|
|
} else if (top(1) == state::in_clustering_element) {
|
|
if (*_string == "clustering-row") {
|
|
ret.next_state = state::in_clustering_row;
|
|
} else if (*_string == "range-tombstone-change") {
|
|
ret.next_state = state::in_range_tombstone_change;
|
|
} else {
|
|
ret.ok = error("invalid clustering element type: {}, expected clustering-row or range-tombstone-change", *_string);
|
|
}
|
|
} else if (top(1) == state::in_column) {
|
|
if (_key == "type") {
|
|
if (*_string != "regular") {
|
|
ret.ok = error("unsupported cell type {}, currently only regular cells are supported", *_string);
|
|
} else {
|
|
ret.ok = true;
|
|
}
|
|
} else if (_key == "ttl") {
|
|
ret.ok = parse_ttl();
|
|
} else if (_key == "expiry") {
|
|
ret.ok = parse_expiry();
|
|
} else if (_key == "deletion_time") {
|
|
ret.ok = parse_deletion_time();
|
|
} else {
|
|
ret.ok = parse_column_value();
|
|
}
|
|
}
|
|
_string.reset();
|
|
break;
|
|
default:
|
|
ret.ok = error("attempted to retire unexpected state {} ({})", to_string(top()), stack_to_string());
|
|
break;
|
|
}
|
|
return ret;
|
|
}
|
|
state top(size_t i = 0) const {
|
|
return _state_stack[i];
|
|
}
|
|
bool push(state s) {
|
|
_logger.trace("push({})", to_string(s));
|
|
_state_stack.push_front(s);
|
|
return true;
|
|
}
|
|
bool pop() {
|
|
auto res = handle_retire_state();
|
|
_logger.trace("pop({})", res.ok ? res.pop_states : 0);
|
|
if (!res.ok) {
|
|
return false;
|
|
}
|
|
while (res.pop_states--) {
|
|
_state_stack.pop_front();
|
|
}
|
|
if (res.next_state) {
|
|
push(*res.next_state);
|
|
}
|
|
return true;
|
|
}
|
|
bool unexpected(std::source_location sl = std::source_location::current()) {
|
|
return error("unexpected json event {} in state {}", sl.function_name(), stack_to_string());
|
|
}
|
|
bool unexpected(std::string_view key, std::source_location sl = std::source_location::current()) {
|
|
return error("unexpected json event {}({}) in state {}", sl.function_name(), key, stack_to_string());
|
|
}
|
|
public:
|
|
explicit handler(schema_ptr schema, reader_permit permit, queue<mutation_fragment_v2_opt>& queue, logger& logger)
|
|
: _schema(std::move(schema))
|
|
, _permit(std::move(permit))
|
|
, _queue(queue)
|
|
, _logger(logger)
|
|
{
|
|
push(state::start);
|
|
}
|
|
handler(handler&&) = default;
|
|
bool Null() {
|
|
_logger.trace("Null()");
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Bool(bool b) {
|
|
_logger.trace("Bool({})", b);
|
|
switch (top()) {
|
|
case state::before_bool:
|
|
_bool.emplace(b);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Int(int i) {
|
|
_logger.trace("Int({})", i);
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
case state::before_integer:
|
|
_integer.emplace(i);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Uint(unsigned i) {
|
|
_logger.trace("Uint({})", i);
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
case state::before_integer:
|
|
_integer.emplace(i);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Int64(int64_t i) {
|
|
_logger.trace("Int64({})", i);
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
case state::before_integer:
|
|
_integer.emplace(i);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Uint64(uint64_t i) {
|
|
_logger.trace("Uint64({})", i);
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
case state::before_integer:
|
|
_integer.emplace(i);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool Double(double d) {
|
|
_logger.trace("Double({})", d);
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool RawNumber(const Ch* str, rapidjson::SizeType length, bool copy) {
|
|
_logger.trace("RawNumber({})", std::string_view(str, length));
|
|
return unexpected();
|
|
}
|
|
bool String(const Ch* str, rapidjson::SizeType length, bool copy) {
|
|
_logger.trace("String({})", std::string_view(str, length));
|
|
switch (top()) {
|
|
case state::before_ignored_value:
|
|
return pop();
|
|
case state::before_string:
|
|
_string.emplace(str, length);
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
return true;
|
|
}
|
|
bool StartObject() {
|
|
_logger.trace("StartObject()");
|
|
switch (top()) {
|
|
case state::before_partition:
|
|
return push(state::in_partition);
|
|
case state::before_key:
|
|
return push(state::in_key);
|
|
case state::before_tombstone:
|
|
_tombstone.emplace();
|
|
return push(state::in_tombstone);
|
|
case state::before_static_columns:
|
|
_row.emplace();
|
|
return push(state::before_column_key);
|
|
case state::before_clustering_element:
|
|
_row.emplace();
|
|
return push(state::in_clustering_element);
|
|
case state::before_marker:
|
|
return push(state::in_marker);
|
|
case state::before_clustering_columns:
|
|
return push(state::before_column_key);
|
|
case state::before_column:
|
|
return push(state::in_column);
|
|
default:
|
|
return unexpected();
|
|
}
|
|
}
|
|
bool Key(const Ch* str, rapidjson::SizeType length, bool copy) {
|
|
_key = std::string(str, length);
|
|
_logger.trace("Key({})", _key);
|
|
switch (top()) {
|
|
case state::in_partition:
|
|
if (_key == "key") {
|
|
return push(state::before_key);
|
|
}
|
|
if (_key == "tombstone") {
|
|
return push(state::before_tombstone);
|
|
}
|
|
if (_key == "static_row" || _key == "clustering_elements") {
|
|
if (!_partition_start_emited && !finalize_partition_start()) {
|
|
return false;
|
|
}
|
|
if (_key == "static_row") {
|
|
return push(state::before_static_columns);
|
|
} else {
|
|
return push(state::before_clustering_elements);
|
|
}
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_key:
|
|
if (_key == "value" || (top(2) == state::in_partition && _key == "token")) {
|
|
return push(state::before_ignored_value);
|
|
}
|
|
if (_key == "raw") {
|
|
return push(state::before_string);
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_tombstone:
|
|
if (_key == "timestamp") {
|
|
return push(state::before_integer);
|
|
}
|
|
if (_key == "deletion_time") {
|
|
return push(state::before_string);
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_marker:
|
|
if (_key == "timestamp") {
|
|
return push(state::before_integer);
|
|
}
|
|
if (_key == "ttl" || _key == "expiry") {
|
|
return push(state::before_string);
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_clustering_element:
|
|
if (_key == "type") {
|
|
return push(state::before_string);
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_range_tombstone_change:
|
|
if (_key == "key") {
|
|
return push(state::before_key);
|
|
}
|
|
if (_key == "weight") {
|
|
return push(state::before_integer);
|
|
}
|
|
if (_key == "tombstone") {
|
|
return push(state::before_tombstone);
|
|
}
|
|
return unexpected(_key);
|
|
case state::in_clustering_row:
|
|
if (_key == "key") {
|
|
return push(state::before_key);
|
|
}
|
|
if (_key == "marker") {
|
|
return push(state::before_marker);
|
|
}
|
|
if (_key == "tombstone") {
|
|
return push(state::before_tombstone);
|
|
}
|
|
if (_key == "shadowable_tombstone") {
|
|
_is_shadowable = true;
|
|
return push(state::before_tombstone);
|
|
}
|
|
if (_key == "columns") {
|
|
return push(state::before_clustering_columns);
|
|
}
|
|
return unexpected(_key);
|
|
case state::before_column_key:
|
|
_column.emplace(_schema->get_column_definition(bytes(reinterpret_cast<bytes::const_pointer>(_key.data()), _key.size())));
|
|
if (!_column->def) {
|
|
return error("failed to look-up column name {}", _key);
|
|
}
|
|
if (top(1) == state::before_static_columns && _column->def->kind != column_kind::static_column) {
|
|
return error("cannot add column {} of kind {} to static row", _key, to_sstring(_column->def->kind));
|
|
}
|
|
if (top(1) == state::before_clustering_columns && _column->def->kind != column_kind::regular_column) {
|
|
return error("cannot add column {} of kind {} to regular row", _key, to_sstring(_column->def->kind));
|
|
}
|
|
if (!_column->def->is_atomic()) {
|
|
return error("failed to initialize column {}: non-atomic columns are not supported yet", _key);
|
|
}
|
|
return push(state::before_column);
|
|
case state::in_column:
|
|
if (_key == "is_live") {
|
|
return push(state::before_bool);
|
|
}
|
|
if (_key == "timestamp") {
|
|
return push(state::before_integer);
|
|
}
|
|
if (_key == "type" || _key == "ttl" || _key == "expiry" || _key == "value" || _key == "deletion_time") {
|
|
return push(state::before_string);
|
|
}
|
|
return unexpected(_key);
|
|
default:
|
|
return unexpected(_key);
|
|
}
|
|
}
|
|
bool EndObject(rapidjson::SizeType memberCount) {
|
|
_logger.trace("EndObject()");
|
|
switch (top()) {
|
|
case state::in_partition:
|
|
case state::in_key:
|
|
case state::in_tombstone:
|
|
case state::in_range_tombstone_change:
|
|
case state::in_clustering_row:
|
|
case state::before_column_key:
|
|
case state::in_marker:
|
|
case state::in_column:
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
}
|
|
bool StartArray() {
|
|
_logger.trace("StartArray()");
|
|
switch (top()) {
|
|
case state::start:
|
|
return push(state::before_partition);
|
|
case state::before_clustering_elements:
|
|
return push(state::before_clustering_element);
|
|
default:
|
|
return unexpected();
|
|
}
|
|
}
|
|
bool EndArray(rapidjson::SizeType elementCount) {
|
|
_logger.trace("EndArray({})", elementCount);
|
|
switch (top()) {
|
|
case state::before_clustering_element:
|
|
case state::before_partition:
|
|
return pop();
|
|
default:
|
|
return unexpected();
|
|
}
|
|
}
|
|
};
|
|
|
|
struct parsing_aborted : public std::exception { };
|
|
|
|
} // anonymous namespace
|
|
|
|
class json_mutation_stream_parser::impl {
|
|
queue<mutation_fragment_v2_opt> _queue;
|
|
stream _stream;
|
|
logger& _logger;
|
|
handler _handler;
|
|
reader _reader;
|
|
thread _thread;
|
|
|
|
public:
|
|
impl(schema_ptr schema, reader_permit permit, input_stream<char> istream, logger& logger)
|
|
: _queue(1)
|
|
, _stream(std::move(istream))
|
|
, _logger(logger)
|
|
, _handler(std::move(schema), std::move(permit), _queue, _logger)
|
|
, _thread([this] { _reader.Parse(_stream, _handler); })
|
|
{ }
|
|
~impl() {
|
|
_queue.abort(std::make_exception_ptr(parsing_aborted{}));
|
|
try {
|
|
_thread.join().get();
|
|
} catch (...) {
|
|
_logger.warn("json_mutation_stream_parser: parser thread exited with exception: {}", std::current_exception());
|
|
}
|
|
}
|
|
future<mutation_fragment_v2_opt> operator()() {
|
|
return _queue.pop_eventually().handle_exception([this] (std::exception_ptr e) -> mutation_fragment_v2_opt {
|
|
auto err_off = _reader.GetErrorOffset();
|
|
throw std::runtime_error(fmt::format("parsing input failed at line {}, offset {}: {}", _stream.line(), err_off - _stream.last_line_feed_pos(), e));
|
|
});
|
|
}
|
|
};
|
|
|
|
json_mutation_stream_parser::json_mutation_stream_parser(schema_ptr schema, reader_permit permit, input_stream<char> istream, logger& logger)
|
|
: _impl(std::make_unique<impl>(std::move(schema), std::move(permit), std::move(istream), logger))
|
|
{ }
|
|
|
|
json_mutation_stream_parser::json_mutation_stream_parser(json_mutation_stream_parser&&) noexcept = default;
|
|
|
|
json_mutation_stream_parser::~json_mutation_stream_parser() = default;
|
|
|
|
future<mutation_fragment_v2_opt> json_mutation_stream_parser::operator()() { return (*_impl)(); }
|
|
|
|
} // namespace tools
|