diff --git a/alternator/conditions.cc b/alternator/conditions.cc index 1db0cecf8f..76bc3effb2 100644 --- a/alternator/conditions.cc +++ b/alternator/conditions.cc @@ -11,16 +11,16 @@ #include #include #include "alternator/conditions.hh" -#include "alternator/serialization.hh" #include "alternator/error.hh" #include "cql3/constants.hh" #include +#include "rjson.hh" namespace alternator { static logging::logger clogger("alternator-conditions"); -comparison_operator_type get_comparison_operator(const Json::Value& comparison_operator) { +comparison_operator_type get_comparison_operator(const rjson::value& comparison_operator) { static std::unordered_map ops = { {"EQ", comparison_operator_type::EQ}, {"LE", comparison_operator_type::LE}, @@ -30,10 +30,10 @@ comparison_operator_type get_comparison_operator(const Json::Value& comparison_o {"BETWEEN", comparison_operator_type::BETWEEN}, {"BEGINS_WITH", comparison_operator_type::BEGINS_WITH}, }; //TODO(sarna): NE, IN, CONTAINS, NULL, NOT_NULL - if (!comparison_operator.isString()) { - throw api_error("ValidationException", format("Invalid comparison operator definition {}", comparison_operator.toStyledString())); + if (!comparison_operator.IsString()) { + throw api_error("ValidationException", format("Invalid comparison operator definition {}", rjson::print(comparison_operator))); } - std::string op = comparison_operator.asString(); + std::string op = comparison_operator.GetString(); auto it = ops.find(op); if (it == ops.end()) { throw api_error("ValidationException", format("Unsupported comparison operator {}", op)); @@ -41,7 +41,7 @@ comparison_operator_type get_comparison_operator(const Json::Value& comparison_o return it->second; } -::shared_ptr make_map_element_restriction(const column_definition& cdef, const std::string& key, const Json::Value& value) { +::shared_ptr make_map_element_restriction(const column_definition& cdef, const std::string& key, const rjson::value& value) { bytes raw_key = utf8_type->from_string(sstring(key)); auto key_value = ::make_shared(cql3::raw_value::make_value(std::move(raw_key))); bytes raw_value = serialize_item(value); @@ -49,15 +49,15 @@ comparison_operator_type get_comparison_operator(const Json::Value& comparison_o return make_shared(cdef, std::move(key_value), std::move(entry_value)); } -::shared_ptr get_filtering_restrictions(schema_ptr schema, const column_definition& attrs_col, const Json::Value& query_filter) { - clogger.trace("Getting filtering restrictions for: {}", query_filter.toStyledString()); +::shared_ptr get_filtering_restrictions(schema_ptr schema, const column_definition& attrs_col, const rjson::value& query_filter) { + clogger.trace("Getting filtering restrictions for: {}", rjson::print(query_filter)); auto filtering_restrictions = ::make_shared(schema, true); - for (auto it = query_filter.begin(); it != query_filter.end(); ++it) { - std::string column_name = it.key().asString(); - const Json::Value& condition = *it; + for (auto it = query_filter.MemberBegin(); it != query_filter.MemberEnd(); ++it) { + std::string column_name = it->name.GetString(); + const rjson::value& condition = it->value; - Json::Value comp_definition = condition.get("ComparisonOperator", Json::Value()); - Json::Value attr_list = condition.get("AttributeValueList", Json::Value(Json::arrayValue)); + const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator"); + const rjson::value& attr_list = rjson::get(condition, "AttributeValueList"); comparison_operator_type op = get_comparison_operator(comp_definition); if (schema->get_column_definition(to_bytes(column_name))) { @@ -67,8 +67,8 @@ comparison_operator_type get_comparison_operator(const Json::Value& comparison_o if (op != comparison_operator_type::EQ) { throw api_error("ValidationException", "Filtering is currently implemented for EQ operator only"); } - if (attr_list.size() != 1) { - throw api_error("ValidationException", format("EQ restriction needs exactly 1 attribute value: {}", attr_list.toStyledString())); + if (attr_list.Size() != 1) { + throw api_error("ValidationException", format("EQ restriction needs exactly 1 attribute value: {}", rjson::print(attr_list))); } filtering_restrictions->add_restriction(make_map_element_restriction(attrs_col, column_name, attr_list[0]), false, true); diff --git a/alternator/conditions.hh b/alternator/conditions.hh index 8c82a26e37..a4ff66cff4 100644 --- a/alternator/conditions.hh +++ b/alternator/conditions.hh @@ -21,6 +21,7 @@ #pragma once #include "cql3/restrictions/statement_restrictions.hh" +#include "serialization.hh" namespace alternator { @@ -28,9 +29,9 @@ enum class comparison_operator_type { EQ, NE, LE, LT, GE, GT, IN, BETWEEN, CONTAINS, IS_NULL, NOT_NULL, BEGINS_WITH }; -comparison_operator_type get_comparison_operator(const Json::Value& comparison_operator); +comparison_operator_type get_comparison_operator(const rjson::value& comparison_operator); -::shared_ptr make_map_element_restriction(const column_definition& cdef, const std::string& key, const Json::Value& value); -::shared_ptr get_filtering_restrictions(schema_ptr schema, const column_definition& attrs_col, const Json::Value& query_filter); +::shared_ptr make_map_element_restriction(const column_definition& cdef, const std::string& key, const rjson::value& value); +::shared_ptr get_filtering_restrictions(schema_ptr schema, const column_definition& attrs_col, const rjson::value& query_filter); } diff --git a/alternator/executor.cc b/alternator/executor.cc index 967169eccc..cf20776f5f 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -14,7 +14,6 @@ #include "alternator/executor.hh" #include "log.hh" -#include "json.hh" #include "schema_builder.hh" #include "exceptions/exceptions.hh" #include "timestamp.hh" @@ -37,11 +36,12 @@ #include "cql3/constants.hh" #include #include "utils/big_decimal.hh" +#include "seastar/json/json_elements.hh" #include #include -static logging::logger elogger("alternator-executor"); +logging::logger elogger("alternator-executor"); namespace alternator { @@ -57,11 +57,11 @@ static const column_definition& attrs_column(const schema& schema) { } struct make_jsonable : public json::jsonable { - Json::Value _value; + rjson::value _value; public: - explicit make_jsonable(Json::Value&& value) : _value(std::move(value)) {} + explicit make_jsonable(rjson::value&& value) : _value(std::move(value)) {} virtual std::string to_json() const override { - return _value.toStyledString(); + return rjson::print(_value); } }; struct json_string : public json::jsonable { @@ -73,10 +73,11 @@ public: } }; -static void supplement_table_info(Json::Value& descr, const schema& schema) { - descr["CreationDateTime"] = std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count(); - descr["TableStatus"] = "ACTIVE"; - descr["TableId"] = schema.id().to_sstring().c_str(); +static void supplement_table_info(rjson::value& descr, const schema& schema) { + rjson::set(descr, "CreationDateTime", rjson::value(std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count())); + rjson::set(descr, "TableStatus", "ACTIVE"); + auto schema_id_str = schema.id().to_sstring(); + rjson::set(descr, "TableId", rjson::from_string(schema_id_str)); } // The DynamoDB developer guide, https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules @@ -106,13 +107,13 @@ static void validate_table_name(const std::string& name) { * and api_error in case the table name is missing or not a string, or * doesn't pass validate_table_name(). */ -static std::string get_table_name(const Json::Value& request) { - Json::Value table_name_value = request.get("TableName", Json::nullValue); - if (!table_name_value.isString()) { +static std::string get_table_name(const rjson::value& request) { + const rjson::value& table_name_value = rjson::get(request, "TableName"); + if (!table_name_value.IsString()) { throw api_error("ValidationException", "Missing or non-string TableName field in request"); } - std::string table_name = table_name_value.asString(); + std::string table_name = table_name_value.GetString(); validate_table_name(table_name); return table_name; } @@ -125,7 +126,7 @@ static std::string get_table_name(const Json::Value& request) { * name is missing, invalid or the table doesn't exist. If everything is * successful, it returns the table's schema. */ -static schema_ptr get_table(service::storage_proxy& proxy, const Json::Value& request) { +static schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request) { std::string table_name = get_table_name(request); try { return proxy.get_db().local().find_schema(executor::KEYSPACE_NAME, table_name); @@ -137,15 +138,15 @@ static schema_ptr get_table(service::storage_proxy& proxy, const Json::Value& re future executor::describe_table(std::string content) { _stats.api_operations.describe_table++; - Json::Value request = json::to_json_value(content); - elogger.trace("Describing table {}", request.toStyledString()); + rjson::value request = rjson::parse(content); + elogger.trace("Describing table {}", request); schema_ptr schema = get_table(_proxy, request); - Json::Value table_description(Json::objectValue); - table_description["TableName"] = schema->cf_name().c_str(); + rjson::value table_description = rjson::empty_object(); + rjson::set(table_description, "TableName", rjson::from_string(schema->cf_name())); // FIXME: take the tables creation time, not the current time! - table_description["CreationDateTime"] = std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count(); + rjson::set(table_description, "CreationDateTime", rjson::value(std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count())); // FIXME: In DynamoDB the CreateTable implementation is asynchronous, and // the table may be in "Creating" state until creating is finished. // We don't currently do this in Alternator - instead CreateTable waits @@ -153,20 +154,20 @@ future executor::describe_table(std::string content) { // ACTIVE or doesn't exist at all (and DescribeTable returns an error). // The other states (CREATING, UPDATING, DELETING) are not currently // returned. - table_description["TableStatus"] = "ACTIVE"; + rjson::set(table_description, "TableStatus", "ACTIVE"); // FIXME: more attributes! Check https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TableDescription.html#DDB-Type-TableDescription-TableStatus but also run a test to see what DyanmoDB really fills // maybe for TableId or TableArn use schema.id().to_sstring().c_str(); // Of course, the whole schema is missing! - Json::Value response(Json::objectValue); - response["Table"] = std::move(table_description); - elogger.trace("returning {}", response.toStyledString()); + rjson::value response = rjson::empty_object(); + rjson::set(response, "Table", std::move(table_description)); + elogger.trace("returning {}", response); return make_ready_future(make_jsonable(std::move(response))); } future executor::delete_table(std::string content) { _stats.api_operations.delete_table++; - Json::Value request = json::to_json_value(content); - elogger.trace("Deleting table {}", request.toStyledString()); + rjson::value request = rjson::parse(content); + elogger.trace("Deleting table {}", request); std::string table_name = get_table_name(request); if (!_proxy.get_db().local().has_schema(KEYSPACE_NAME, table_name)) { @@ -176,12 +177,12 @@ future executor::delete_table(std::string content) { return _mm.announce_column_family_drop(KEYSPACE_NAME, table_name).then([table_name = std::move(table_name)] { // FIXME: need more attributes? - Json::Value table_description(Json::objectValue); - table_description["TableName"] = table_name.c_str(); - table_description["TableStatus"] = "DELETING"; - Json::Value response(Json::objectValue); - response["TableDescription"] = std::move(table_description); - elogger.trace("returning {}", response.toStyledString()); + rjson::value table_description = rjson::empty_object(); + rjson::set(table_description, "TableName", rjson::from_string(table_name)); + rjson::set(table_description, "TableStatus", "DELETING"); + rjson::value response = rjson::empty_object(); + rjson::set(response, "TableDescription", std::move(table_description)); + elogger.trace("returning {}", response); return make_ready_future(make_jsonable(std::move(response))); }); } @@ -201,10 +202,11 @@ static data_type parse_key_type(const std::string& type) { } -static void add_column(schema_builder& builder, const std::string& name, const Json::Value& attribute_definitions, column_kind kind) { - for (const Json::Value& attribute_info : attribute_definitions) { - if (attribute_info["AttributeName"].asString() == name) { - auto type = attribute_info["AttributeType"].asString(); +static void add_column(schema_builder& builder, const std::string& name, const rjson::value& attribute_definitions, column_kind kind) { + for (auto it = attribute_definitions.Begin(); it != attribute_definitions.End(); ++it) { + const rjson::value& attribute_info = *it; + if (attribute_info["AttributeName"].GetString() == name) { + auto type = attribute_info["AttributeType"].GetString(); builder.with_column(to_bytes(name), parse_key_type(type), kind); return; } @@ -215,19 +217,17 @@ static void add_column(schema_builder& builder, const std::string& name, const J future executor::create_table(std::string content) { _stats.api_operations.create_table++; - Json::Value table_info = json::to_json_value(content); - elogger.trace("Creating table {}", table_info.toStyledString()); - + rjson::value table_info = rjson::parse(content); + elogger.trace("Creating table {}", table_info); std::string table_name = get_table_name(table_info); - const Json::Value& key_schema = table_info["KeySchema"]; - const Json::Value& attribute_definitions = table_info["AttributeDefinitions"]; + const rjson::value& key_schema = table_info["KeySchema"]; + const rjson::value& attribute_definitions = table_info["AttributeDefinitions"]; schema_builder builder(KEYSPACE_NAME, table_name); - // DynamoDB requires that KeySchema includes up to two elements, the // first must be a HASH, the optional second one can be a RANGE. // These key names must also be present in the attributes_definitions. - if (!key_schema.isArray() || key_schema.size() < 1 || key_schema.size() > 2) { + if (!key_schema.IsArray() || key_schema.Size() < 1 || key_schema.Size() > 2) { throw api_error("ValidationException", "KeySchema must list exactly one or two key columns"); } @@ -235,27 +235,24 @@ future executor::create_table(std::string content) { throw api_error("ValidationException", "First key in KeySchema must be a HASH key"); } - add_column(builder, key_schema[0]["AttributeName"].asString(), attribute_definitions, column_kind::partition_key); - if (key_schema.size() == 2) { + add_column(builder, key_schema[0]["AttributeName"].GetString(), attribute_definitions, column_kind::partition_key); + if (key_schema.Size() == 2) { if (key_schema[1]["KeyType"] != "RANGE") { throw api_error("ValidationException", "Second key in KeySchema must be a RANGE key"); } - add_column(builder, key_schema[1]["AttributeName"].asString(), attribute_definitions, column_kind::clustering_key); + add_column(builder, key_schema[1]["AttributeName"].GetString(), attribute_definitions, column_kind::clustering_key); } builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column); - - const Json::Value& gsi = table_info["GlobalSecondaryIndexes"]; - if (!gsi.isNull()) { + if (table_info.HasMember("GlobalSecondaryIndexes")) { throw api_error("ValidationException", "FIXME: GSI not yet supported."); } - schema_ptr schema = builder.build(); return futurize_apply([&] { return _mm.announce_new_column_family(schema, false); }).then([table_info = std::move(table_info), schema] () mutable { - Json::Value status(Json::objectValue); + rjson::value status = rjson::empty_object(); supplement_table_info(table_info, *schema); - status["TableDescription"] = std::move(table_info); + rjson::set(status, "TableDescription", std::move(table_info)); return make_ready_future(make_jsonable(std::move(status))); }).handle_exception_type([table_name = std::move(table_name)] (exceptions::already_exists_exception&) { return make_exception_future( @@ -291,7 +288,7 @@ public: } }; -static mutation make_item_mutation(const Json::Value& item, schema_ptr schema) { +static mutation make_item_mutation(const rjson::value& item, schema_ptr schema) { partition_key pk = pk_from_json(item, schema); clustering_key ck = ck_from_json(item, schema); @@ -300,11 +297,11 @@ static mutation make_item_mutation(const Json::Value& item, schema_ptr schema) { auto ts = api::new_timestamp(); - for (auto it = item.begin(); it != item.end(); ++it) { - bytes column_name = to_bytes(it.key().asString()); + for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) { + bytes column_name = to_bytes(it->name.GetString()); const column_definition* cdef = schema->get_column_definition(column_name); if (!cdef || !cdef->is_primary_key()) { - bytes value = serialize_item(*it); + bytes value = serialize_item(it->value); attrs_collector.put(std::move(column_name), std::move(value), ts); } } @@ -327,11 +324,11 @@ static mutation make_item_mutation(const Json::Value& item, schema_ptr schema) { future executor::put_item(std::string content) { _stats.api_operations.put_item++; - Json::Value update_info = json::to_json_value(content); - elogger.trace("Updating value {}", update_info.toStyledString()); + rjson::value update_info = rjson::parse(content); + elogger.trace("Updating value {}", update_info); schema_ptr schema = get_table(_proxy, update_info); - const Json::Value& item = update_info["Item"]; + const rjson::value& item = update_info["Item"]; mutation m = make_item_mutation(item, schema); @@ -345,13 +342,13 @@ future executor::put_item(std::string content) { // After calling pk_from_json() and ck_from_json() to extract the pk and ck // components of a key, and if that succeeded, call check_key() to further // check that the key doesn't have any spurious components. -static void check_key(const Json::Value& key, const schema_ptr& schema) { - if (key.size() != (schema->clustering_key_size() == 0 ? 1 : 2)) { +static void check_key(const rjson::value& key, const schema_ptr& schema) { + if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) { throw api_error("ValidationException", "Given key attribute not in schema"); } } -static mutation make_delete_item_mutation(const Json::Value& key, schema_ptr schema) { +static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr schema) { partition_key pk = pk_from_json(key, schema); clustering_key ck = ck_from_json(key, schema); check_key(key, schema); @@ -363,10 +360,10 @@ static mutation make_delete_item_mutation(const Json::Value& key, schema_ptr sch future executor::delete_item(std::string content) { _stats.api_operations.delete_item++; - Json::Value update_info = json::to_json_value(content); + rjson::value update_info = rjson::parse(content); schema_ptr schema = get_table(_proxy, update_info); - const Json::Value& key = update_info["Key"]; + const rjson::value& key = update_info["Key"]; mutation m = make_delete_item_mutation(key, schema); check_key(key, schema); @@ -378,8 +375,8 @@ future executor::delete_item(std::string content) { } -static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const Json::Value::const_iterator& batch_request) { - std::string table_name = batch_request.key().asString(); // JSON keys are always strings +static schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) { + std::string table_name = batch_request->name.GetString(); // JSON keys are always strings validate_table_name(table_name); try { return proxy.get_db().local().find_schema(executor::KEYSPACE_NAME, table_name); @@ -404,23 +401,24 @@ struct primary_key_equal { future executor::batch_write_item(std::string content) { _stats.api_operations.batch_write_item++; - Json::Value batch_info = json::to_json_value(content); - Json::Value& request_items = batch_info["RequestItems"]; + rjson::value batch_info = rjson::parse(content); + rjson::value& request_items = batch_info["RequestItems"]; std::vector mutations; - mutations.reserve(request_items.size()); + mutations.reserve(request_items.MemberCount()); - for (auto it = request_items.begin(); it != request_items.end(); ++it) { + for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { schema_ptr schema = get_table_from_batch_request(_proxy, it); std::unordered_set used_keys(1, primary_key_hash{schema}, primary_key_equal{schema}); - for (auto&& request : *it) { - if (!request.isObject() || request.size() != 1) { - throw api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request.toStyledString())); + for (auto& request : it->value.GetArray()) { + if (!request.IsObject() || request.MemberCount() != 1) { + throw api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request)); } - auto r = request.begin(); - if (r.key() == "PutRequest") { - const Json::Value& put_request = *r; - const Json::Value& item = put_request["Item"]; + auto r = request.MemberBegin(); + const std::string r_name = r->name.GetString(); + if (r_name == "PutRequest") { + const rjson::value& put_request = r->value; + const rjson::value& item = put_request["Item"]; mutations.push_back(make_item_mutation(item, schema)); // make_item_mutation returns a mutation with a single clustering row auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); @@ -428,8 +426,8 @@ future executor::batch_write_item(std::string content) { throw api_error("ValidationException", "Provided list of item keys contains duplicates"); } used_keys.insert(std::move(mut_key)); - } else if (r.key() == "DeleteRequest") { - const Json::Value& key = (*r)["Key"]; + } else if (r_name == "DeleteRequest") { + const rjson::value& key = (r->value)["Key"]; mutations.push_back(make_delete_item_mutation(key, schema)); // make_delete_item_mutation returns a mutation with a single clustering row auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); @@ -438,7 +436,7 @@ future executor::batch_write_item(std::string content) { } used_keys.insert(std::move(mut_key)); } else { - throw api_error("ValidationException", format("Unknown BatchWriteItem request type: {}", r.key())); + throw api_error("ValidationException", format("Unknown BatchWriteItem request type: {}", r_name)); } } } @@ -447,8 +445,8 @@ future executor::batch_write_item(std::string content) { // Without special options on what to return, BatchWriteItem returns nothing, // unless there are UnprocessedItems - it's possible to just stop processing a batch // due to throttling. TODO(sarna): Consider UnprocessedItems when returning. - Json::Value ret; - ret["UnprocessedItems"] = Json::Value(Json::objectValue); + rjson::value ret = rjson::empty_object(); + rjson::set(ret, "UnprocessedItems", rjson::empty_object()); return make_ready_future(make_jsonable(std::move(ret))); }); } @@ -463,7 +461,7 @@ future executor::batch_write_item(std::string content) { struct allow_key_columns_tag; using allow_key_columns = bool_class; static std::string resolve_update_path(const parsed::path& p, - const Json::Value& update_info, + const rjson::value& update_info, const schema_ptr& schema, std::unordered_set& used_attribute_names, allow_key_columns allow_key_columns) { @@ -472,14 +470,15 @@ static std::string resolve_update_path(const parsed::path& p, } auto column_name = p.root(); if (column_name.size() > 0 && column_name[0] == '#') { - const Json::Value& value = update_info["ExpressionAttributeNames"].get(column_name, Json::nullValue); - if (!value.isString()) { + const rjson::value& expression_attribute_names = rjson::get(update_info, "ExpressionAttributeNames"); + const rjson::value& value = rjson::get(expression_attribute_names, rjson::string_ref_type(column_name.c_str())); + if (!value.IsString()) { throw api_error("ValidationException", format("ExpressionAttributeNames missing entry '{}' required by UpdateExpression", column_name)); } used_attribute_names.emplace(std::move(column_name)); - column_name = value.asString(); + column_name = value.GetString(); } const column_definition* cdef = schema->get_column_definition(to_bytes(column_name)); if (!allow_key_columns && cdef && cdef->is_primary_key()) { @@ -491,75 +490,122 @@ static std::string resolve_update_path(const parsed::path& p, // Fail the expression if it has unused attribute names or values. This is // how DynamoDB behaves, so we do too. -static void verify_all_are_used(const Json::Value& req, const char* field, +static void verify_all_are_used(const rjson::value& req, const char* field, const std::unordered_set& used, const char* operation) { - auto& attribute_names = req[field]; - for (auto it = attribute_names.begin(); it != attribute_names.end(); ++it) { - if (!used.count(it.key().asString())) { + const rjson::value* attribute_names = rjson::find(req, rjson::string_ref_type(field)); + if (!attribute_names) { + return; + } + for (auto it = attribute_names->MemberBegin(); it != attribute_names->MemberEnd(); ++it) { + if (!used.count(it->name.GetString())) { throw api_error("ValidationException", format("{} has spurious '{}', not used in {}", - field, it.key().asString(), operation)); + field, it->name.GetString(), operation)); } } } // Check if a given JSON object encodes a list (i.e., it is a {"L": [...]} // and returns a pointer to that list. -static const Json::Value* unwrap_list(const Json::Value& v) { - if (!v.isObject() || v.size() != 1) { +static const rjson::value* unwrap_list(const rjson::value& v) { + if (!v.IsObject() || v.MemberCount() != 1) { return nullptr; } - auto it = v.begin(); - if (it.key() != "L") { + auto it = v.MemberBegin(); + if (it->name != std::string("L")) { return nullptr; } - return &(*it); + return &(it->value); } -static std::string get_item_type_string(const Json::Value& v) { - if (!v.isObject() || v.size() != 1) { - throw api_error("ValidationException", format("Item has invalid format: {}", v.toStyledString())); +static std::string get_item_type_string(const rjson::value& v) { + if (!v.IsObject() || v.MemberCount() != 1) { + throw api_error("ValidationException", format("Item has invalid format: {}", v)); } - auto it = v.begin(); - return it.key().asString(); + auto it = v.MemberBegin(); + return it->name.GetString(); } // Check if a given JSON object encodes a set (i.e., it is a {"SS": [...]}, or "NS", "BS" // and returns set's type and a pointer to that set. If the object does not encode a set, // returned value is {"", nullptr} -static const std::pair unwrap_set(const Json::Value& v) { - if (!v.isObject() || v.size() != 1) { +static const std::pair unwrap_set(const rjson::value& v) { + if (!v.IsObject() || v.MemberCount() != 1) { return {"", nullptr}; } - auto it = v.begin(); - const auto& it_key = it.key().asString(); + auto it = v.MemberBegin(); + const std::string it_key = it->name.GetString(); if (it_key != "SS" && it_key != "BS" && it_key != "NS") { return {"", nullptr}; } - return std::make_pair(it_key, &(*it)); + return std::make_pair(it_key, &(it->value)); } // Take two JSON-encoded list values (remember that a list value is // {"L": [...the actual list]}) and return the concatenation, again as // a list value. -static Json::Value list_concatenate(const Json::Value& v1, const Json::Value& v2) { - const Json::Value* list1 = unwrap_list(v1); - const Json::Value* list2 = unwrap_list(v2); +static rjson::value list_concatenate(const rjson::value& v1, const rjson::value& v2) { + const rjson::value* list1 = unwrap_list(v1); + const rjson::value* list2 = unwrap_list(v2); if (!list1 || !list2) { throw api_error("ValidationException", "UpdateExpression: list_append() given a non-list"); } - Json::Value cat = *list1; - for (const auto& a : *list2) { - cat.append(a); + rjson::value cat = rjson::copy(*list1); + for (const auto& a : list2->GetArray()) { + rjson::push_back(cat, rjson::copy(a)); } - Json::Value ret(Json::objectValue); - ret["L"] = std::move(cat); + rjson::value ret = rjson::empty_object(); + rjson::set(ret, "L", std::move(cat)); return ret; } +struct single_value_rjson_comp { + bool operator()(const rjson::value& r1, const rjson::value& r2) const { + auto r1_type = r1.GetType(); + auto r2_type = r2.GetType(); + switch (r1_type) { + case rjson::type::kNullType: + return r1_type < r2_type; + case rjson::type::kFalseType: + return r1_type < r2_type; + case rjson::type::kTrueType: + return r1_type < r2_type; + case rjson::type::kObjectType: + throw rjson::error("Object type comparison is not supported"); + case rjson::type::kArrayType: + throw rjson::error("Array type comparison is not supported"); + case rjson::type::kStringType: { + const size_t r1_len = r1.GetStringLength(); + const size_t r2_len = r2.GetStringLength(); + size_t len = std::min(r1_len, r2_len); + int result = std::strncmp(r1.GetString(), r2.GetString(), len); + return result < 0 || (result == 0 && r1_len < r2_len); + } + case rjson::type::kNumberType: { + if (r1_type != r2_type) { + throw rjson::error("All numbers in a set should have the same type"); + } + if (r1.IsDouble()) { + return r1.GetDouble() < r2.GetDouble(); + } else if (r1.IsInt()) { + return r1.GetInt() < r2.GetInt(); + } else if (r1.IsUint()) { + return r1.GetUint() < r2.GetUint(); + } else if (r1.IsInt64()) { + return r1.GetInt64() < r2.GetInt64(); + } else { + return r1.GetUint64() < r2.GetUint64(); + } + } + default: + return false; + } + } +}; + // Take two JSON-encoded set values (e.g. {"SS": [...the actual set]}) and return the sum of both sets, // again as a set value. -static Json::Value set_sum(const Json::Value& v1, const Json::Value& v2) { +static rjson::value set_sum(const rjson::value& v1, const rjson::value& v2) { auto [set1_type, set1] = unwrap_set(v1); auto [set2_type, set2] = unwrap_set(v2); if (set1_type != set2_type) { @@ -568,21 +614,24 @@ static Json::Value set_sum(const Json::Value& v1, const Json::Value& v2) { if (!set1 || !set2) { throw api_error("ValidationException", "UpdateExpression: ADD operation for sets must be given sets as arguments"); } - Json::Value sum = *set1; - std::set set1_raw(sum.begin(), sum.end()); - for (const auto& a : *set2) { + rjson::value sum = rjson::copy(*set1); + std::set set1_raw; + for (auto it = sum.Begin(); it != sum.End(); ++it) { + set1_raw.insert(rjson::copy(*it)); + } + for (const auto& a : set2->GetArray()) { if (set1_raw.count(a) == 0) { - sum.append(a); + rjson::push_back(sum, rjson::copy(a)); } } - Json::Value ret(Json::objectValue); - ret[set1_type] = std::move(sum); + rjson::value ret = rjson::empty_object(); + rjson::set_with_string_name(ret, set1_type, std::move(sum)); return ret; } // Take two JSON-encoded set values (e.g. {"SS": [...the actual list]}) and return the difference of s1 - s2, // again as a set value. -static Json::Value set_diff(const Json::Value& v1, const Json::Value& v2) { +static rjson::value set_diff(const rjson::value& v1, const rjson::value& v2) { auto [set1_type, set1] = unwrap_set(v1); auto [set2_type, set2] = unwrap_set(v2); if (set1_type != set2_type) { @@ -591,52 +640,61 @@ static Json::Value set_diff(const Json::Value& v1, const Json::Value& v2) { if (!set1 || !set2) { throw api_error("ValidationException", "UpdateExpression: DELETE operation can only be performed on a set"); } - std::set set1_raw(set1->begin(), set1->end()); - for (const auto& a : *set2) { + std::set set1_raw; + for (auto it = set1->Begin(); it != set1->End(); ++it) { + set1_raw.insert(rjson::copy(*it)); + } + for (const auto& a : set2->GetArray()) { set1_raw.erase(a); } - Json::Value ret(Json::objectValue); - Json::Value& result_set = ret[set1_type]; + rjson::value ret = rjson::empty_object(); + rjson::set_with_string_name(ret, set1_type, rjson::empty_array()); + rjson::value& result_set = ret[set1_type]; for (const auto& a : set1_raw) { - result_set.append(a); + rjson::push_back(result_set, rjson::copy(a)); } return ret; } // Check if a given JSON object encodes a number (i.e., it is a {"N": [...]} // and returns an object representing it. -static big_decimal unwrap_number(const Json::Value& v) { - if (!v.isObject() || v.size() != 1) { +static big_decimal unwrap_number(const rjson::value& v) { + if (!v.IsObject() || v.MemberCount() != 1) { throw api_error("ValidationException", "UpdateExpression: invalid number object"); } - auto it = v.begin(); - if (it.key() != "N") { + auto it = v.MemberBegin(); + if (it->name != "N") { throw api_error("ValidationException", - format("UpdateExpression: expected number, found type '{}'", it.key())); + format("UpdateExpression: expected number, found type '{}'", it->name)); } - if (!it->isString()) { + if (it->value.IsNumber()) { + return big_decimal(rjson::print(it->value)); // FIXME(sarna): should use big_decimal constructor with numeric values directly + } + if (!it->value.IsString()) { throw api_error("ValidationException", "UpdateExpression: improperly formatted number constant"); } // FIXME: to not lose precision, we really need to do something like: - // return decimal_type->from_string(it->asString()); - return big_decimal(it->asString()); + // return decimal_type->from_string(it->value.GetString()); + return big_decimal(it->value.GetString()); } // Take two JSON-encoded numeric values ({"N": "thenumber"}) and return the // sum, again as a JSON-encoded number. -static Json::Value number_add(const Json::Value& v1, const Json::Value& v2) { +static rjson::value number_add(const rjson::value& v1, const rjson::value& v2) { auto n1 = unwrap_number(v1); auto n2 = unwrap_number(v2); - Json::Value ret(Json::objectValue); - ret["N"] = std::string((n1 + n2).to_string()); + rjson::value ret = rjson::empty_object(); + std::string str_ret = std::string((n1 + n2).to_string()); + rjson::set(ret, "N", rjson::from_string(str_ret)); return ret; } -static Json::Value number_subtract(const Json::Value& v1, const Json::Value& v2) { +static rjson::value number_subtract(const rjson::value& v1, const rjson::value& v2) { auto n1 = unwrap_number(v1); auto n2 = unwrap_number(v2); - Json::Value ret(Json::objectValue); - ret["N"] = std::string((n1 - n2).to_string()); + rjson::value ret = rjson::empty_object(); + std::string str_ret = std::string((n1 - n2).to_string()); + rjson::set(ret, "N", rjson::from_string(str_ret)); return ret; } @@ -646,31 +704,31 @@ template overloaded(Ts...) -> overloaded; // Given a parsed::value, which can refer either to a constant value from // ExpressionAttributeValues, to the value of some attribute, or to a function // of other values, this function calculates the resulting value. -static Json::Value calculate_value(const parsed::value& v, - const Json::Value& expression_attribute_values, +static rjson::value calculate_value(const parsed::value& v, + const rjson::value& expression_attribute_values, std::unordered_set& used_attribute_names, std::unordered_set& used_attribute_values, - const Json::Value& update_info, + const rjson::value& update_info, schema_ptr schema, - const std::unique_ptr& previous_item) { + const std::unique_ptr& previous_item) { return std::visit(overloaded { - [&] (const std::string& valref) -> Json::Value { - const Json::Value& value = expression_attribute_values.get(valref, Json::nullValue); - if (value.isNull()) { + [&] (const std::string& valref) -> rjson::value { + const rjson::value& value = rjson::get(expression_attribute_values, rjson::string_ref_type(valref.c_str())); + if (value.IsNull()) { throw api_error("ValidationException", format("ExpressionAttributeValues missing entry '{}' required by UpdateExpression", valref)); } used_attribute_values.emplace(std::move(valref)); - return value; + return rjson::copy(value); }, - [&] (const parsed::value::function_call& f) -> Json::Value { + [&] (const parsed::value::function_call& f) -> rjson::value { if (f._function_name == "list_append") { if (f._parameters.size() != 2) { throw api_error("ValidationException", format("UpdateExpression: list_append() accepts 2 parameters, got {}", f._parameters.size())); } - Json::Value v1 = calculate_value(f._parameters[0], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(f._parameters[1], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v1 = calculate_value(f._parameters[0], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(f._parameters[1], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); return list_concatenate(v1, v2); } else if (f._function_name == "if_not_exists") { if (f._parameters.size() != 2) { @@ -680,53 +738,54 @@ static Json::Value calculate_value(const parsed::value& v, if (!std::holds_alternative(f._parameters[0]._value)) { throw api_error("ValidationException", "UpdateExpression: if_not_exists() must include path as its first argument"); } - Json::Value v1 = calculate_value(f._parameters[0], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(f._parameters[1], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); - return v1.isNull() ? v2 : v1; + rjson::value v1 = calculate_value(f._parameters[0], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(f._parameters[1], expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + return v1.IsNull() ? std::move(v2) : std::move(v1); } else { throw api_error("ValidationException", format("UpdateExpression: unknown function '{}' called.", f._function_name)); } }, - [&] (const parsed::path& p) -> Json::Value { - if (!previous_item) { - return Json::nullValue; + [&] (const parsed::path& p) -> rjson::value { + if (!previous_item || previous_item->IsNull() || previous_item->ObjectEmpty()) { + return rjson::null_value(); } std::string update_path = resolve_update_path(p, update_info, schema, used_attribute_names, allow_key_columns::yes); - return (*previous_item)["Item"].get(update_path, Json::nullValue); + rjson::value* previous_value = rjson::find((*previous_item)["Item"], rjson::string_ref_type(update_path.c_str())); + return previous_value ? rjson::copy(*previous_value) : rjson::null_value(); } }, v._value); } // Same as calculate_value() above, except takes a set_rhs, which may be // either a single value, or v1+v2 or v1-v2. -static Json::Value calculate_value(const parsed::set_rhs& rhs, - const Json::Value& expression_attribute_values, +static rjson::value calculate_value(const parsed::set_rhs& rhs, + const rjson::value& expression_attribute_values, std::unordered_set& used_attribute_names, std::unordered_set& used_attribute_values, - const Json::Value& update_info, + const rjson::value& update_info, schema_ptr schema, - const std::unique_ptr& previous_item) { + const std::unique_ptr& previous_item) { switch(rhs._op) { case 'v': return calculate_value(rhs._v1, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); case '+': { - Json::Value v1 = calculate_value(rhs._v1, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(rhs._v2, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v1 = calculate_value(rhs._v1, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(rhs._v2, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); return number_add(v1, v2); } case '-': { - Json::Value v1 = calculate_value(rhs._v1, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(rhs._v2, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v1 = calculate_value(rhs._v1, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(rhs._v2, expression_attribute_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); return number_subtract(v1, v2); } } // Can't happen - return Json::Value::null; + return rjson::null_value(); } static std::string resolve_projection_path(const parsed::path& p, - const Json::Value& expression_attribute_names, + const rjson::value* expression_attribute_names, std::unordered_set& used_attribute_names, std::unordered_set& seen_column_names) { if (p.has_operators()) { @@ -735,13 +794,16 @@ static std::string resolve_projection_path(const parsed::path& p, } auto column_name = p.root(); if (column_name.size() > 0 && column_name[0] == '#') { - const Json::Value& value = expression_attribute_names.get(column_name, Json::nullValue); - if (!value.isString()) { + if (!expression_attribute_names) { + throw api_error("ValidationException", "ExpressionAttributeNames parameter not found"); + } + const rjson::value& value = rjson::get(*expression_attribute_names, rjson::string_ref_type(column_name.c_str())); + if (!value.IsString()) { throw api_error("ValidationException", format("ExpressionAttributeNames missing entry '{}' required by ProjectionExpression", column_name)); } used_attribute_names.emplace(std::move(column_name)); - column_name = value.asString(); + column_name = value.GetString(); } // FIXME: this check will need to change when we support non-toplevel attributes if (!seen_column_names.insert(column_name).second) { @@ -762,21 +824,26 @@ static std::string resolve_projection_path(const parsed::path& p, // the parts of the JSON attributes that were chosen in the paths' // operators. Because we don't have such filtering yet (FIXME), we fail here // if the requested paths are anything but top-level attributes. -std::unordered_set calculate_attrs_to_get(const Json::Value& req) { - const Json::Value& attributes_to_get = req["AttributesToGet"]; - const Json::Value& projection_expression = req["ProjectionExpression"]; - if (attributes_to_get && projection_expression) { +std::unordered_set calculate_attrs_to_get(const rjson::value& req) { + const bool has_attributes_to_get = req.HasMember("AttributesToGet"); + const bool has_projection_expression = req.HasMember("ProjectionExpression"); + if (has_attributes_to_get && has_projection_expression) { throw api_error("ValidationException", format("GetItem does not allow both ProjectionExpression and AttributesToGet to be given together")); } - if (attributes_to_get) { - return boost::copy_range>(attributes_to_get | - boost::adaptors::transformed(std::bind(&Json::Value::asString, std::placeholders::_1))); - } else if (projection_expression){ - const Json::Value& expression_attribute_names = req["ExpressionAttributeNames"]; + if (has_attributes_to_get) { + const rjson::value& attributes_to_get = req["AttributesToGet"]; + std::unordered_set ret; + for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) { + ret.insert(it->GetString()); + } + return ret; + } else if (has_projection_expression) { + const rjson::value& projection_expression = req["ProjectionExpression"]; + const rjson::value* expression_attribute_names = rjson::find(req, "ExpressionAttributeNames"); std::vector paths_to_get; try { - paths_to_get = parse_projection_expression(projection_expression.asString()); + paths_to_get = parse_projection_expression(projection_expression.GetString()); } catch(expressions_syntax_error& e) { throw api_error("ValidationException", e.what()); } @@ -793,8 +860,8 @@ std::unordered_set calculate_attrs_to_get(const Json::Value& req) { return {}; } -static std::optional describe_single_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr> query_result, std::unordered_set&& attrs_to_get) { - Json::Value item(Json::objectValue); +static std::optional describe_single_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr> query_result, std::unordered_set&& attrs_to_get) { + rjson::value item = rjson::empty_object(); cql3::selection::result_set_builder builder(selection, gc_clock::now(), cql_serialization_format::latest()); query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); @@ -814,8 +881,9 @@ static std::optional describe_single_item(schema_ptr schema, const std::string column_name = (*column_it)->name_as_text(); if (cell && column_name != executor::ATTRS_COLUMN_NAME) { if (attrs_to_get.empty() || attrs_to_get.count(column_name) > 0) { - Json::Value& field = item[column_name.c_str()]; - field[type_to_string((*column_it)->type)] = json_key_column_value(*cell, **column_it); + rjson::set_with_string_name(item, column_name.c_str(), rjson::empty_object()); + rjson::value& field = item[column_name.c_str()]; + rjson::set_with_string_name(field, type_to_string((*column_it)->type), json_key_column_value(*cell, **column_it)); } } else if (cell) { auto deserialized = attrs_type()->deserialize(*cell, cql_serialization_format::latest()); @@ -824,7 +892,7 @@ static std::optional describe_single_item(schema_ptr schema, const std::string attr_name = value_cast(entry.first); if (attrs_to_get.empty() || attrs_to_get.count(attr_name) > 0) { bytes value = value_cast(entry.second); - item[attr_name] = deserialize_item(value); + rjson::set_with_string_name(item, attr_name, deserialize_item(value)); } } } @@ -834,15 +902,15 @@ static std::optional describe_single_item(schema_ptr schema, const return item; } -static Json::Value describe_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr> query_result, std::unordered_set&& attrs_to_get) { - std::optional opt_item = describe_single_item(std::move(schema), slice, selection, std::move(query_result), std::move(attrs_to_get)); +static rjson::value describe_item(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, foreign_ptr> query_result, std::unordered_set&& attrs_to_get) { + std::optional opt_item = describe_single_item(std::move(schema), slice, selection, std::move(query_result), std::move(attrs_to_get)); if (!opt_item) { // If there is no matching item, we're supposed to return an empty // object without an Item member - not one with an empty Item member - return Json::objectValue; + return rjson::empty_object(); } - Json::Value item_descr(Json::objectValue); - item_descr["Item"] = *opt_item; + rjson::value item_descr = rjson::empty_object(); + rjson::set(item_descr, "Item", std::move(*opt_item)); return item_descr; } @@ -883,11 +951,11 @@ static bool check_needs_read_before_write(const std::vector> maybe_get_previous_item(service::storage_proxy& proxy, schema_ptr schema, const partition_key& pk, const clustering_key& ck, - const Json::Value& update_expression, const parsed::update_expression& expression) { - const bool needs_read_before_write = update_expression && check_needs_read_before_write(expression.actions()); +static future> maybe_get_previous_item(service::storage_proxy& proxy, schema_ptr schema, const partition_key& pk, const clustering_key& ck, + bool has_update_expression, const parsed::update_expression& expression) { + const bool needs_read_before_write = has_update_expression && check_needs_read_before_write(expression.actions()); if (!needs_read_before_write) { - return make_ready_future>(); + return make_ready_future>(); } dht::partition_range_vector partition_ranges{dht::partition_range(dht::global_partitioner().decorate_key(*schema, pk))}; @@ -910,17 +978,20 @@ static future> maybe_get_previous_item(service::sto return proxy.query(schema, std::move(command), std::move(partition_ranges), cl, service::storage_proxy::coordinator_query_options(db::no_timeout, empty_service_permit())).then( [schema, partition_slice = std::move(partition_slice), selection = std::move(selection)] (service::storage_proxy::coordinator_query_result qr) { auto previous_item = describe_item(schema, partition_slice, *selection, std::move(qr.query_result), {}); - return make_ready_future>(std::make_unique(std::move(previous_item))); + return make_ready_future>(std::make_unique(std::move(previous_item))); }); } future executor::update_item(std::string content) { _stats.api_operations.update_item++; - Json::Value update_info = json::to_json_value(content); - elogger.trace("update_item {}", update_info.toStyledString()); + rjson::value update_info = rjson::parse(content); + elogger.trace("update_item {}", update_info); schema_ptr schema = get_table(_proxy, update_info); // FIXME: handle missing Key. - const Json::Value& key = update_info["Key"]; + if (!update_info.HasMember("Key")) { + throw api_error("ValidationException", "UpdateItem requires a Key parameter"); + } + const rjson::value& key = update_info["Key"]; partition_key pk = pk_from_json(key, schema); clustering_key ck = ck_from_json(key, schema); check_key(key, schema); @@ -929,35 +1000,41 @@ future executor::update_item(std::string content) { attribute_collector attrs_collector; auto ts = api::new_timestamp(); - const Json::Value& attribute_updates = update_info["AttributeUpdates"]; - const Json::Value& update_expression = update_info["UpdateExpression"]; - // DynamoDB forbids having both old-style AttributeUpdates and new-style // UpdateExpression in the same request - if (attribute_updates && update_expression) { + const bool has_update_expression = update_info.HasMember("UpdateExpression"); + const bool has_attribute_updates = update_info.HasMember("AttributeUpdates"); + if (has_update_expression && has_attribute_updates) { throw api_error("ValidationException", format("UpdateItem does not allow both AttributeUpdates and UpdateExpression to be given together")); } + rjson::value attribute_updates = rjson::empty_object(); parsed::update_expression expression; - if (update_expression) { + if (update_info.HasMember("UpdateExpression")) { + const rjson::value& update_expression = update_info["UpdateExpression"]; try { - expression = parse_update_expression(update_expression.asString()); + expression = parse_update_expression(update_expression.GetString()); } catch(expressions_syntax_error& e) { throw api_error("ValidationException", e.what()); } if (expression.empty()) { throw api_error("ValidationException", "Empty expression in UpdateExpression is not allowed"); } + } else if (has_attribute_updates) { + attribute_updates = update_info["AttributeUpdates"]; } - return maybe_get_previous_item(_proxy, schema, pk, ck, update_expression, expression).then( - [this, schema, expression = std::move(expression), update_expression = std::move(update_expression), ck = std::move(ck), - update_info = std::move(update_info), m = std::move(m), attrs_collector = std::move(attrs_collector), attribute_updates = std::move(attribute_updates), ts] (std::unique_ptr previous_item) mutable { - if (update_expression) { + return maybe_get_previous_item(_proxy, schema, pk, ck, has_update_expression, expression).then( + [this, schema, expression = std::move(expression), has_update_expression, ck = std::move(ck), + update_info = rjson::copy(update_info), m = std::move(m), attrs_collector = std::move(attrs_collector), attribute_updates = rjson::copy(attribute_updates), ts] (std::unique_ptr previous_item) mutable { + if (has_update_expression) { std::unordered_set seen_column_names; std::unordered_set used_attribute_values; std::unordered_set used_attribute_names; + rjson::value empty_attribute_values = rjson::empty_object(); + const rjson::value* attr_values_raw = rjson::find(update_info, "ExpressionAttributeValues"); + const rjson::value& attr_values = attr_values_raw ? *attr_values_raw : empty_attribute_values; for (auto& action : expression.actions()) { std::string column_name = resolve_update_path(action._path, update_info, schema, used_attribute_names, allow_key_columns::no); // DynamoDB forbids multiple updates in the same expression to @@ -972,7 +1049,7 @@ future executor::update_item(std::string content) { } std::visit(overloaded { [&] (const parsed::update_expression::action::set& a) { - auto value = calculate_value(a._rhs, update_info["ExpressionAttributeValues"], used_attribute_names, used_attribute_values, update_info, schema, previous_item); + auto value = calculate_value(a._rhs, attr_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); attrs_collector.put(to_bytes(column_name), serialize_item(value), ts); }, [&] (const parsed::update_expression::action::remove& a) { @@ -983,18 +1060,18 @@ future executor::update_item(std::string content) { parsed::value addition; base.set_path(action._path); addition.set_valref(a._valref); - Json::Value v1 = calculate_value(base, update_info["ExpressionAttributeValues"], used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(addition, update_info["ExpressionAttributeValues"], used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value result; + rjson::value v1 = calculate_value(base, attr_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(addition, attr_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value result; std::string v1_type = get_item_type_string(v1); if (v1_type == "N") { if (get_item_type_string(v2) != "N") { - throw api_error("ValidationException", format("Incorrect operand type for operator or function. Expected {}: {}", v1_type, v2)); + throw api_error("ValidationException", format("Incorrect operand type for operator or function. Expected {}: {}", v1_type, rjson::print(v2))); } result = number_add(v1, v2); } else if (v1_type == "SS" || v1_type == "NS" || v1_type == "BS") { if (get_item_type_string(v2) != v1_type) { - throw api_error("ValidationException", format("Incorrect operand type for operator or function. Expected {}: {}", v1_type, v2)); + throw api_error("ValidationException", format("Incorrect operand type for operator or function. Expected {}: {}", v1_type, rjson::print(v2))); } result = set_sum(v1, v2); } else { @@ -1007,9 +1084,9 @@ future executor::update_item(std::string content) { parsed::value subset; base.set_path(action._path); subset.set_valref(a._valref); - Json::Value v1 = calculate_value(base, update_info["ExpressionAttributeValues"], used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value v2 = calculate_value(subset, update_info["ExpressionAttributeValues"], used_attribute_names, used_attribute_values, update_info, schema, previous_item); - Json::Value result = set_diff(v1, v2); + rjson::value v1 = calculate_value(base, attr_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value v2 = calculate_value(subset, attr_values, used_attribute_names, used_attribute_values, update_info, schema, previous_item); + rjson::value result = set_diff(v1, v2); attrs_collector.put(to_bytes(column_name), serialize_item(result), ts); } }, action._action); @@ -1017,31 +1094,30 @@ future executor::update_item(std::string content) { verify_all_are_used(update_info, "ExpressionAttributeNames", used_attribute_names, "UpdateExpression"); verify_all_are_used(update_info, "ExpressionAttributeValues", used_attribute_values, "UpdateExpression"); } - - for (auto it = attribute_updates.begin(); it != attribute_updates.end(); ++it) { + for (auto it = attribute_updates.MemberBegin(); it != attribute_updates.MemberEnd(); ++it) { // Note that it.key() is the name of the column, *it is the operation - bytes column_name = to_bytes(it.key().asString()); + bytes column_name = to_bytes(it->name.GetString()); const column_definition* cdef = schema->get_column_definition(column_name); if (cdef && cdef->is_primary_key()) { throw api_error("ValidationException", - format("UpdateItem cannot update key column {}", it.key().asString())); + format("UpdateItem cannot update key column {}", it->name.GetString())); } - std::string action = (*it)["Action"].asString(); + std::string action = (it->value)["Action"].GetString(); if (action == "DELETE") { // FIXME: Currently we support only the simple case where the // "Value" field is missing. If it were not missing, we would // we need to verify the old type and/or value is same as // specified before deleting... We don't do this yet. - if (!it->get("Value", "").asString().empty()) { + if (it->value.HasMember("Value")) { throw api_error("ValidationException", format("UpdateItem DELETE with checking old value not yet supported")); } attrs_collector.del(std::move(column_name), ts); } else if (action == "PUT") { - const Json::Value& value = (*it)["Value"]; - if (value.size() != 1) { + const rjson::value& value = (it->value)["Value"]; + if (value.MemberCount() != 1) { throw api_error("ValidationException", - format("Value field in AttributeUpdates must have just one item", it.key().asString())); + format("Value field in AttributeUpdates must have just one item", it->name.GetString())); } attrs_collector.put(std::move(column_name), serialize_item(value), ts); } else { @@ -1072,12 +1148,12 @@ future executor::update_item(std::string content) { // field is absense, we default to eventually consistent reads. // In Scylla, eventually-consistent reads are implemented as consistency // level LOCAL_ONE, and strongly-consistent reads as LOCAL_QUORUM. -static db::consistency_level get_read_consistency(const Json::Value& request) { - Json::Value consistent_read_value = request.get("ConsistentRead", Json::nullValue); +static db::consistency_level get_read_consistency(const rjson::value& request) { + const rjson::value* consistent_read_value = rjson::find(request, "ConsistentRead"); bool consistent_read = false; - if (!consistent_read_value.isNull()) { - if (consistent_read_value.isBool()) { - consistent_read = consistent_read_value.asBool(); + if (consistent_read_value && !consistent_read_value->IsNull()) { + if (consistent_read_value->IsBool()) { + consistent_read = consistent_read_value->GetBool(); } else { throw api_error("ValidationException", "ConsistentRead flag must be a boolean"); } @@ -1087,12 +1163,12 @@ static db::consistency_level get_read_consistency(const Json::Value& request) { future executor::get_item(std::string content) { _stats.api_operations.get_item++; - Json::Value table_info = json::to_json_value(content); - elogger.trace("Getting item {}", table_info.toStyledString()); + rjson::value table_info = rjson::parse(content); + elogger.trace("Getting item {}", table_info); schema_ptr schema = get_table(_proxy, table_info); - Json::Value query_key = table_info["Key"]; + rjson::value& query_key = table_info["Key"]; db::consistency_level cl = get_read_consistency(table_info); partition_key pk = pk_from_json(query_key, schema); @@ -1130,8 +1206,8 @@ future executor::batch_get_item(std::string content) { // work in the following loops. So we should limit the batch size, and/or // the response size, as DynamoDB does. _stats.api_operations.batch_get_item++; - Json::Value req = json::to_json_value(content); - Json::Value& request_items = req["RequestItems"]; + rjson::value req = rjson::parse(content); + rjson::value& request_items = req["RequestItems"]; // We need to validate all the parameters before starting any asynchronous // query, and fail the entire request on any parse error. So we parse all @@ -1148,12 +1224,13 @@ future executor::batch_get_item(std::string content) { }; std::vector requests; - for (auto it = request_items.begin(); it != request_items.end(); ++it) { + for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { table_requests rs; rs.schema = get_table_from_batch_request(_proxy, it); - rs.cl = get_read_consistency(*it); - rs.attrs_to_get = calculate_attrs_to_get(*it); - for (const Json::Value& key : (*it)["Keys"]) { + rs.cl = get_read_consistency(it->value); + rs.attrs_to_get = calculate_attrs_to_get(it->value); + auto& keys = (it->value)["Keys"]; + for (const rjson::value& key : keys.GetArray()) { rs.requests.push_back({pk_from_json(key, rs.schema), ck_from_json(key, rs.schema)}); check_key(key, rs.schema); } @@ -1162,7 +1239,7 @@ future executor::batch_get_item(std::string content) { // If got here, all "requests" are valid, so let's start them all // in parallel. The requests object are then immediately destroyed. - std::vector>>> response_futures; + std::vector>>> response_futures; for (const auto& rs : requests) { for (const auto &r : rs.requests) { dht::partition_range_vector partition_ranges{dht::partition_range(dht::global_partitioner().decorate_key(*rs.schema, std::move(r.pk)))}; @@ -1176,17 +1253,17 @@ future executor::batch_get_item(std::string content) { auto selection = cql3::selection::selection::wildcard(rs.schema); auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options()); auto command = ::make_lw_shared(rs.schema->id(), rs.schema->version(), partition_slice, query::max_partitions); - future>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(db::no_timeout, empty_service_permit())).then( + future>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl, service::storage_proxy::coordinator_query_options(db::no_timeout, empty_service_permit())).then( [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { - std::optional json = describe_single_item(schema, partition_slice, *selection, std::move(qr.query_result), std::move(attrs_to_get)); - // Unfortunately, future> doesn't - // work because Json::Value doesn't have a non-throwing move + std::optional json = describe_single_item(schema, partition_slice, *selection, std::move(qr.query_result), std::move(attrs_to_get)); + // Unfortunately, future> doesn't + // work because rjson::value doesn't have a non-throwing move // constructor. So we need to convert it to a std::unique_ptr. - std::unique_ptr v; + std::unique_ptr v; if (json) { - v = std::make_unique(std::move(*json)); + v = std::make_unique(std::move(*json)); } - return make_ready_future>>( + return make_ready_future>>( std::make_tuple(schema->cf_name(), std::move(v))); }); response_futures.push_back(std::move(f)); @@ -1201,19 +1278,15 @@ future executor::batch_get_item(std::string content) { // handled it above), but this case does include things like timeouts, // unavailable CL, etc. return when_all_succeed(response_futures.begin(), response_futures.end()).then( - [] (std::vector>> responses) { - Json::Value response = Json::objectValue; - response["Responses"] = Json::objectValue; + [] (std::vector>> responses) { + rjson::value response = rjson::empty_object(); + rjson::set(response, "Responses", rjson::empty_object()); for (const auto& t : responses) { + if (!response["Responses"].HasMember(std::get<0>(t).c_str())) { + rjson::set_with_string_name(response["Responses"], std::get<0>(t), rjson::empty_array()); + } if (std::get<1>(t)) { - response["Responses"][std::get<0>(t)].append(*std::get<1>(t)); - } else { - // Even if all items requested for a particular table are - // missing, we still need to return an empty array. - Json::Value& x = response["Responses"][std::get<0>(t)]; - if (x.isNull()) { - x = Json::arrayValue; - } + rjson::push_back(response["Responses"][std::get<0>(t)], std::move(*std::get<1>(t))); } } return make_ready_future(make_jsonable(std::move(response))); @@ -1225,16 +1298,16 @@ class describe_items_visitor { const columns_t& _columns; const std::unordered_set& _attrs_to_get; typename columns_t::const_iterator _column_it; - Json::Value _item; - Json::Value _items; + rjson::value _item; + rjson::value _items; public: describe_items_visitor(const columns_t& columns, const std::unordered_set& attrs_to_get) : _columns(columns) , _attrs_to_get(attrs_to_get) , _column_it(columns.begin()) - , _item(Json::objectValue) - , _items(Json::arrayValue) + , _item(rjson::empty_object()) + , _items(rjson::empty_array()) { } void start_row() { @@ -1250,8 +1323,11 @@ public: std::string column_name = (*_column_it)->name_as_text(); if (column_name != executor::ATTRS_COLUMN_NAME) { if (_attrs_to_get.empty() || _attrs_to_get.count(column_name) > 0) { - Json::Value& field = _item[column_name.c_str()]; - field[type_to_string((*_column_it)->type)] = json_key_column_value(bv, **_column_it); + if (!_item.HasMember(column_name.c_str())) { + rjson::set_with_string_name(_item, column_name, rjson::empty_object()); + } + rjson::value& field = _item[column_name.c_str()]; + rjson::set_with_string_name(field, type_to_string((*_column_it)->type), json_key_column_value(bv, **_column_it)); } } else { auto deserialized = attrs_type()->deserialize(bv, cql_serialization_format::latest()); @@ -1260,7 +1336,7 @@ public: std::string attr_name = value_cast(entry.first); if (_attrs_to_get.empty() || _attrs_to_get.count(attr_name) > 0) { bytes value = value_cast(entry.second); - _item[attr_name] = deserialize_item(value); + rjson::set_with_string_name(_item, attr_name, deserialize_item(value)); } } } @@ -1269,33 +1345,34 @@ public: } void end_row() { - _items.append(std::move(_item)); - _item = Json::objectValue; + rjson::push_back(_items, std::move(_item)); + _item = rjson::empty_object(); } - Json::Value get_items() && { + rjson::value get_items() && { return std::move(_items); } }; -static Json::Value describe_items(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, std::unique_ptr result_set, std::unordered_set&& attrs_to_get) { +static rjson::value describe_items(schema_ptr schema, const query::partition_slice& slice, const cql3::selection::selection& selection, std::unique_ptr result_set, std::unordered_set&& attrs_to_get) { describe_items_visitor visitor(selection.get_columns(), attrs_to_get); result_set->visit(visitor); - Json::Value items = std::move(visitor).get_items(); - Json::Value items_descr(Json::objectValue); - items_descr["Count"] = items.size(); - items_descr["ScannedCount"] = items.size(); // TODO(sarna): Update once filtering is implemented - items_descr["Items"] = std::move(items); + rjson::value items = std::move(visitor).get_items(); + rjson::value items_descr = rjson::empty_object(); + rjson::set(items_descr, "Count", rjson::value(items.Size())); + rjson::set(items_descr, "ScannedCount", rjson::value(items.Size())); // TODO(sarna): Update once filtering is implemented + rjson::set(items_descr, "Items", std::move(items)); return items_descr; } -static Json::Value encode_paging_state(const schema& schema, const service::pager::paging_state& paging_state) { - Json::Value last_evaluated_key(Json::objectValue); +static rjson::value encode_paging_state(const schema& schema, const service::pager::paging_state& paging_state) { + rjson::value last_evaluated_key = rjson::empty_object(); std::vector exploded_pk = paging_state.get_partition_key().explode(); auto exploded_pk_it = exploded_pk.begin(); for (const column_definition& cdef : schema.partition_key_columns()) { - Json::Value& key_entry = last_evaluated_key[cdef.name_as_text()]; - key_entry[type_to_string(cdef.type)] = json::to_json_value(cdef.type->to_json_string(*exploded_pk_it)); + rjson::set_with_string_name(last_evaluated_key, cdef.name_as_text(), rjson::empty_object()); + rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()]; + rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(cdef.type->to_json_string(*exploded_pk_it))); ++exploded_pk_it; } auto ck = paging_state.get_clustering_key(); @@ -1303,8 +1380,9 @@ static Json::Value encode_paging_state(const schema& schema, const service::page auto exploded_ck = ck->explode(); auto exploded_ck_it = exploded_ck.begin(); for (const column_definition& cdef : schema.clustering_key_columns()) { - Json::Value& key_entry = last_evaluated_key[cdef.name_as_text()]; - key_entry[type_to_string(cdef.type)] = json::to_json_value(cdef.type->to_json_string(*exploded_ck_it)); + rjson::set_with_string_name(last_evaluated_key, cdef.name_as_text(), rjson::empty_object()); + rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()]; + rjson::set_with_string_name(key_entry, type_to_string(cdef.type), rjson::parse(cdef.type->to_json_string(*exploded_ck_it))); ++exploded_ck_it; } } @@ -1312,7 +1390,7 @@ static Json::Value encode_paging_state(const schema& schema, const service::page } static future do_query(schema_ptr schema, - const Json::Value& exclusive_start_key, + const rjson::value* exclusive_start_key, dht::partition_range_vector&& partition_ranges, std::vector&& ck_bounds, std::unordered_set&& attrs_to_get, @@ -1321,11 +1399,11 @@ static future do_query(schema_ptr schema, ::shared_ptr filtering_restrictions) { ::shared_ptr paging_state = nullptr; - if (!exclusive_start_key.empty()) { - partition_key pk = pk_from_json(exclusive_start_key, schema); + if (exclusive_start_key) { + partition_key pk = pk_from_json(*exclusive_start_key, schema); std::optional ck; if (schema->clustering_key_size() > 0) { - ck = ck_from_json(exclusive_start_key, schema); + ck = ck_from_json(*exclusive_start_key, schema); } paging_state = ::make_shared(pk, ck, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0); } @@ -1354,7 +1432,7 @@ static future do_query(schema_ptr schema, auto paging_state = rs->get_metadata().paging_state(); auto items = describe_items(schema, partition_slice, *selection, std::move(rs), std::move(attrs_to_get)); if (paging_state) { - items["LastEvaluatedKey"] = encode_paging_state(*schema, *paging_state); + rjson::set(items, "LastEvaluatedKey", encode_paging_state(*schema, *paging_state)); } return make_ready_future(make_jsonable(std::move(items))); }); @@ -1367,16 +1445,17 @@ static future do_query(schema_ptr schema, // 4. Implement parallel scanning via Segments future executor::scan(std::string content) { _stats.api_operations.scan++; - Json::Value request_info = json::to_json_value(content); - elogger.trace("Scanning {}", request_info.toStyledString()); + rjson::value request_info = rjson::parse(content); + elogger.trace("Scanning {}", request_info); schema_ptr schema = get_table(_proxy, request_info); - Json::Value exclusive_start_key = request_info["ExclusiveStartKey"]; + rjson::value* exclusive_start_key = rjson::find(request_info, "ExclusiveStartKey"); //FIXME(sarna): ScanFilter is deprecated in favor of FilterExpression - const Json::Value& scan_filter = request_info["ScanFilter"]; + rjson::value* scan_filter = rjson::find(request_info, "ScanFilter"); db::consistency_level cl = get_read_consistency(request_info); - uint32_t limit = request_info.get("Limit", query::max_partitions).asUInt(); + rjson::value* limit_json = rjson::find(request_info, "Limit"); + uint32_t limit = limit_json ? limit_json->GetUint64() : query::max_partitions; if (limit <= 0) { throw api_error("ValidationException", "Limit must be greater than 0"); } @@ -1387,15 +1466,18 @@ future executor::scan(std::string content) { dht::partition_range_vector partition_ranges{dht::partition_range::make_open_ended_both_sides()}; std::vector ck_bounds{query::clustering_range::make_open_ended_both_sides()}; - auto filtering_restrictions = get_filtering_restrictions(schema, attrs_column(*schema), scan_filter); + ::shared_ptr filtering_restrictions; + if (scan_filter) { + filtering_restrictions = get_filtering_restrictions(schema, attrs_column(*schema), *scan_filter); + } return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions)); } -static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const Json::Value& attrs) { - if (attrs.size() != 1) { - throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs.toStyledString())); +static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, comparison_operator_type op, const rjson::value& attrs) { + if (attrs.Size() != 1) { + throw api_error("ValidationException", format("Only a single attribute is allowed for a hash key restriction: {}", attrs)); } - bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].asString()); + bytes raw_value = pk_cdef.type->from_string(attrs[0][type_to_string(pk_cdef.type)].GetString()); partition_key pk = partition_key::from_singular(*schema, pk_cdef.type->deserialize(raw_value)); auto decorated_key = dht::global_partitioner().decorate_key(*schema, pk); if (op != comparison_operator_type::EQ) { @@ -1415,12 +1497,12 @@ static query::clustering_range get_clustering_range_for_begins_with(bytes&& targ return query::clustering_range::make_starting_with(query::clustering_range::bound(ck)); } -static query::clustering_range calculate_ck_bound(schema_ptr schema, const column_definition& ck_cdef, comparison_operator_type op, const Json::Value& attrs) { +static query::clustering_range calculate_ck_bound(schema_ptr schema, const column_definition& ck_cdef, comparison_operator_type op, const rjson::value& attrs) { const size_t expected_attrs_size = (op == comparison_operator_type::BETWEEN) ? 2 : 1; - if (attrs.size() != expected_attrs_size) { - throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs.toStyledString())); + if (attrs.Size() != expected_attrs_size) { + throw api_error("ValidationException", format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs)); } - bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].asString()); + bytes raw_value = ck_cdef.type->from_string(attrs[0][type_to_string(ck_cdef.type)].GetString()); clustering_key ck = clustering_key::from_singular(*schema, ck_cdef.type->deserialize(raw_value)); switch (op) { case comparison_operator_type::EQ: @@ -1434,7 +1516,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum case comparison_operator_type::GT: return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false)); case comparison_operator_type::BETWEEN: { - bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].asString()); + bytes raw_upper_limit = ck_cdef.type->from_string(attrs[1][type_to_string(ck_cdef.type)].GetString()); clustering_key upper_limit = clustering_key::from_singular(*schema, ck_cdef.type->deserialize(raw_upper_limit)); return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit)); } @@ -1447,7 +1529,7 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum if (!ck_cdef.type->is_compatible_with(*utf8_type)) { throw api_error("ValidationException", format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type))); } - std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].asString(); + std::string raw_upper_limit_str = attrs[0][type_to_string(ck_cdef.type)].GetString(); bytes raw_upper_limit = ck_cdef.type->from_string(raw_upper_limit_str); return get_clustering_range_for_begins_with(std::move(raw_upper_limit), ck, schema, ck_cdef.type); } @@ -1458,16 +1540,16 @@ static query::clustering_range calculate_ck_bound(schema_ptr schema, const colum // Calculates primary key bounds from the list of conditions static std::pair> -calculate_bounds(schema_ptr schema, const Json::Value& conditions) { +calculate_bounds(schema_ptr schema, const rjson::value& conditions) { dht::partition_range_vector partition_ranges; std::vector ck_bounds; - for (auto it = conditions.begin(); it != conditions.end(); ++it) { - std::string key = it.key().asString(); - const Json::Value& condition = *it; + for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) { + std::string key = it->name.GetString(); + const rjson::value& condition = it->value; - Json::Value comp_definition = condition.get("ComparisonOperator", Json::Value()); - Json::Value attr_list = condition.get("AttributeValueList", Json::Value(Json::arrayValue)); + const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator"); + const rjson::value& attr_list = rjson::get(condition, "AttributeValueList"); auto op = get_comparison_operator(comp_definition); @@ -1493,13 +1575,13 @@ calculate_bounds(schema_ptr schema, const Json::Value& conditions) { throw api_error("ValidationException", format("Query missing condition on hash key '{}'", schema->partition_key_columns().front().name_as_text())); } if (schema->clustering_key_size() == 0) { - if (conditions.size() != 1) { + if (conditions.MemberCount() != 1) { throw api_error("ValidationException", "Only one condition allowed in table with only hash key"); } } else { - if (conditions.size() == 2 && ck_bounds.empty()) { + if (conditions.MemberCount() == 2 && ck_bounds.empty()) { throw api_error("ValidationException", format("Query missing condition on sort key '{}'", schema->clustering_key_columns().front().name_as_text())); - } else if (conditions.size() > 2) { + } else if (conditions.MemberCount() > 2) { throw api_error("ValidationException", "Only one or two conditions allowed in table with hash key and sort key"); } } @@ -1513,28 +1595,32 @@ calculate_bounds(schema_ptr schema, const Json::Value& conditions) { future executor::query(std::string content) { _stats.api_operations.query++; - Json::Value request_info = json::to_json_value(content); - elogger.trace("Querying {}", request_info.toStyledString()); + rjson::value request_info = rjson::parse(content); + elogger.trace("Querying {}", request_info); schema_ptr schema = get_table(_proxy, request_info); - Json::Value exclusive_start_key = request_info["ExclusiveStartKey"]; + rjson::value* exclusive_start_key = rjson::find(request_info, "ExclusiveStartKey"); db::consistency_level cl = get_read_consistency(request_info); - uint32_t limit = request_info.get("Limit", query::max_partitions).asUInt(); + rjson::value* limit_json = rjson::find(request_info, "Limit"); + uint32_t limit = limit_json ? limit_json->GetUint64() : query::max_partitions; if (limit <= 0) { throw api_error("ValidationException", "Limit must be greater than 0"); } //FIXME(sarna): KeyConditions are deprecated in favor of KeyConditionExpression - const Json::Value& conditions = request_info["KeyConditions"]; + rjson::value& conditions = rjson::get(request_info, "KeyConditions"); //FIXME(sarna): QueryFilter is deprecated in favor of FilterExpression - const Json::Value& query_filter = request_info["QueryFilter"]; + rjson::value* query_filter = rjson::find(request_info, "QueryFilter"); auto [partition_ranges, ck_bounds] = calculate_bounds(schema, conditions); auto attrs_to_get = calculate_attrs_to_get(request_info); - auto filtering_restrictions = get_filtering_restrictions(schema, attrs_column(*schema), query_filter); + ::shared_ptr filtering_restrictions; + if (query_filter) { + filtering_restrictions = get_filtering_restrictions(schema, attrs_column(*schema), *query_filter); + } return do_query(schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl, std::move(filtering_restrictions)); } @@ -1546,11 +1632,13 @@ static void validate_limit(int limit) { future executor::list_tables(std::string content) { _stats.api_operations.list_tables++; - Json::Value table_info = json::to_json_value(content); - elogger.trace("Listing tables {}", table_info.toStyledString()); + rjson::value table_info = rjson::parse(content); + elogger.trace("Listing tables {}", table_info); - std::string exclusive_start = table_info.get("ExclusiveStartTableName", "").asString(); - int limit = table_info.get("Limit", 100).asInt(); + rjson::value* exclusive_start_json = rjson::find(table_info, "ExclusiveStartTableName"); + rjson::value* limit_json = rjson::find(table_info, "Limit"); + std::string exclusive_start = exclusive_start_json ? exclusive_start_json->GetString() : ""; + int limit = limit_json ? limit_json->GetInt() : 100; validate_limit(limit); auto table_names = _proxy.get_db().local().get_column_families() @@ -1562,9 +1650,9 @@ future executor::list_tables(std::string content) { return t->schema()->cf_name(); }); - Json::Value response; - Json::Value& all_tables = response["TableNames"]; - all_tables = Json::Value(Json::arrayValue); + rjson::value response = rjson::empty_object(); + rjson::set(response, "TableNames", rjson::empty_array()); + rjson::value& all_tables = response["TableNames"]; //TODO(sarna): Dynamo doesn't declare any ordering when listing tables, // but our implementation is vulnerable to changes, because the tables @@ -1580,13 +1668,14 @@ future executor::list_tables(std::string content) { } }(); while (limit > 0 && table_names_it != table_names.end()) { - all_tables.append(Json::Value(table_names_it->c_str())); + rjson::push_back(all_tables, rjson::from_string(*table_names_it)); --limit; ++table_names_it; } if (table_names_it != table_names.end()) { - response["LastEvaluatedTableName"] = *std::prev(all_tables.end()); + auto& last_table_name = *std::prev(all_tables.End()); + rjson::set(response, "LastEvaluatedTableName", rjson::copy(last_table_name)); } return make_ready_future(make_jsonable(std::move(response))); @@ -1594,7 +1683,7 @@ future executor::list_tables(std::string content) { future executor::describe_endpoints(std::string content, std::string host_header) { _stats.api_operations.describe_endpoints++; - Json::Value response; + rjson::value response = rjson::empty_object(); // Without having any configuration parameter to say otherwise, we tell // the user to return to the same endpoint they used to reach us. The only // way we can know this is through the "Host:" header in the request, @@ -1604,8 +1693,10 @@ future executor::describe_endpoints(std::string content, if (host_header.empty()) { throw api_error("ValidationException", "DescribeEndpoints needs a 'Host:' header in request"); } - response["Endpoints"][0]["Address"] = host_header; - response["Endpoints"][0]["CachePeriodInMinutes"] = 1440; + rjson::set(response, "Endpoints", rjson::empty_array()); + rjson::push_back(response["Endpoints"], rjson::empty_object()); + rjson::set(response["Endpoints"][0], "Address", rjson::from_string(host_header)); + rjson::set(response["Endpoints"][0], "CachePeriodInMinutes", rjson::value(1440)); return make_ready_future(make_jsonable(std::move(response))); } diff --git a/alternator/serialization.cc b/alternator/serialization.cc index a140041b0b..fba36ed0ad 100644 --- a/alternator/serialization.cc +++ b/alternator/serialization.cc @@ -12,6 +12,7 @@ #include "log.hh" #include "serialization.hh" #include "error.hh" +#include "rapidjson/writer.h" static logging::logger slogger("alternator-serialization"); @@ -45,26 +46,32 @@ type_representation represent_type(alternator_type atype) { return it->second; } -bytes serialize_item(const Json::Value& item) { - if (item.size() != 1) { - throw api_error("ValidationException", format("An item can contain only one attribute definition: {}", item.toStyledString())); +bytes serialize_item(const rjson::value& item) { + if (item.IsNull() || item.MemberCount() != 1) { + throw api_error("ValidationException", format("An item can contain only one attribute definition: {}", item)); } - auto it = item.begin(); - type_info type_info = type_info_from_string(it.key().asString()); // JSON keys are guaranteed to be strings + auto it = item.MemberBegin(); + type_info type_info = type_info_from_string(it->name.GetString()); // JSON keys are guaranteed to be strings if (type_info.atype == alternator_type::NOT_SUPPORTED_YET) { - slogger.trace("Non-optimal serialization of type {}", it.key()); - return bytes{int8_t(type_info.atype)} + to_bytes(item.toStyledString()); + slogger.trace("Non-optimal serialization of type {}", it->name.GetString()); + return bytes{int8_t(type_info.atype)} + to_bytes(rjson::print(item)); } bytes serialized; // Alternator bytes representation does not start with "0x" followed by hex digits as Scylla-JSON does, // but instead uses base64. + if (type_info.dtype == bytes_type) { - std::string raw_value = it->asString(); + std::string raw_value = it->value.GetString(); serialized = base64_decode(std::string_view(raw_value)); + } else if (type_info.dtype == decimal_type) { + serialized = type_info.dtype->from_string(it->value.GetString()); + } else if (type_info.dtype == boolean_type) { + serialized = type_info.dtype->from_json_object(Json::Value(it->value.GetBool()), cql_serialization_format::internal()); } else { - serialized = type_info.dtype->from_json_object(*it, cql_serialization_format::internal()); + //FIXME(sarna): Once we have type visitors, this double conversion hack should be replaced with parsing straight from rapidjson + serialized = type_info.dtype->from_json_object(Json::Value(rjson::print(it->value)), cql_serialization_format::internal()); } //NOTICE: redundant copy here, from_json_object should accept bytes' output iterator too. @@ -72,8 +79,8 @@ bytes serialize_item(const Json::Value& item) { return bytes{int8_t(type_info.atype)} + std::move(serialized); } -Json::Value deserialize_item(bytes_view bv) { - Json::Value deserialized; +rjson::value deserialize_item(bytes_view bv) { + rjson::value deserialized(rapidjson::kObjectType); if (bv.empty()) { throw api_error("ValidationException", "Serialized value empty"); } @@ -83,17 +90,18 @@ Json::Value deserialize_item(bytes_view bv) { if (atype == alternator_type::NOT_SUPPORTED_YET) { slogger.trace("Non-optimal deserialization of alternator type {}", int8_t(atype)); - return json::to_json_value(sstring(reinterpret_cast(bv.data()), bv.size())); + return rjson::parse_raw(reinterpret_cast(bv.data()), bv.size()); } type_representation type_representation = represent_type(atype); if (type_representation.dtype == bytes_type) { - deserialized[type_representation.ident] = base64_encode(bv); + std::string b64 = base64_encode(bv); + rjson::set_with_string_name(deserialized, type_representation.ident, rjson::from_string(b64)); } else if (type_representation.dtype == decimal_type) { auto s = decimal_type->to_json_string(bytes(bv)); //FIXME(sarna): unnecessary copy - deserialized[type_representation.ident] = Json::Value(reinterpret_cast(s.data()), reinterpret_cast(s.data()) + s.size()); + rjson::set_with_string_name(deserialized, type_representation.ident, rjson::from_string(s)); } else { - deserialized[type_representation.ident] = json::to_json_value(type_representation.dtype->to_json_string(bytes(bv))); //FIXME(sarna): unnecessary copy + rjson::set_with_string_name(deserialized, type_representation.ident, rjson::parse(type_representation.dtype->to_string(bytes(bv)))); } return deserialized; @@ -113,43 +121,41 @@ std::string type_to_string(data_type type) { return it->second; } -bytes get_key_column_value(const Json::Value& item, const column_definition& column) { +bytes get_key_column_value(const rjson::value& item, const column_definition& column) { std::string column_name = column.name_as_text(); std::string expected_type = type_to_string(column.type); - Json::Value key_typed_value = item.get(column_name, Json::nullValue); - if (!key_typed_value.isObject() || key_typed_value.size() != 1) { + const rjson::value& key_typed_value = rjson::get(item, rjson::value::StringRefType(column_name.c_str())); + if (!key_typed_value.IsObject() || key_typed_value.MemberCount() != 1) { throw api_error("ValidationException", - format("Missing or invalid value object for key column {}: {}", - column_name, item.toStyledString())); + format("Missing or invalid value object for key column {}: {}", column_name, item)); } - auto it = key_typed_value.begin(); - if (it.key().asString() != expected_type) { + auto it = key_typed_value.MemberBegin(); + if (it->name.GetString() != expected_type) { throw api_error("ValidationException", format("Expected type {} for key column {}, got type {}", - expected_type, column_name, it.key().asString())); + expected_type, column_name, it->name.GetString())); } if (column.type == bytes_type) { - return base64_decode(it->asString()); + return base64_decode(it->value.GetString()); } else { - return column.type->from_string(it->asString()); + return column.type->from_string(it->value.GetString()); } } -Json::Value json_key_column_value(bytes_view cell, const column_definition& column) { +rjson::value json_key_column_value(bytes_view cell, const column_definition& column) { if (column.type == bytes_type) { - return base64_encode(cell); + std::string b64 = base64_encode(cell); + return rjson::from_string(b64); } if (column.type == utf8_type) { - return Json::Value(reinterpret_cast(cell.data()), - reinterpret_cast(cell.data()) + cell.size()); + return rjson::from_string(std::string(reinterpret_cast(cell.data()), cell.size())); } else if (column.type == decimal_type) { // FIXME: use specialized Alternator number type, not the more // general "decimal_type". A dedicated type can be more efficient // in storage space and in parsing speed. auto s = decimal_type->to_json_string(bytes(cell)); - return Json::Value(reinterpret_cast(s.data()), - reinterpret_cast(s.data()) + s.size()); + return rjson::from_string(s); } else { // We shouldn't get here, we shouldn't see such key columns. throw std::runtime_error(format("Unexpected key type: {}", column.type->name())); @@ -157,7 +163,7 @@ Json::Value json_key_column_value(bytes_view cell, const column_definition& colu } -partition_key pk_from_json(const Json::Value& item, schema_ptr schema) { +partition_key pk_from_json(const rjson::value& item, schema_ptr schema) { std::vector raw_pk; // FIXME: this is a loop, but we really allow only one partition key column. for (const column_definition& cdef : schema->partition_key_columns()) { @@ -167,7 +173,7 @@ partition_key pk_from_json(const Json::Value& item, schema_ptr schema) { return partition_key::from_exploded(raw_pk); } -clustering_key ck_from_json(const Json::Value& item, schema_ptr schema) { +clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) { if (schema->clustering_key_size() == 0) { return clustering_key::make_empty(); } diff --git a/alternator/serialization.hh b/alternator/serialization.hh index d385b9c4c2..f61f08eebc 100644 --- a/alternator/serialization.hh +++ b/alternator/serialization.hh @@ -14,7 +14,7 @@ #include "types.hh" #include "schema.hh" #include "keys.hh" -#include "json.hh" +#include "rjson.hh" namespace alternator { @@ -35,15 +35,15 @@ struct type_representation { type_info type_info_from_string(std::string type); type_representation represent_type(alternator_type atype); -bytes serialize_item(const Json::Value& item); -Json::Value deserialize_item(bytes_view bv); +bytes serialize_item(const rjson::value& item); +rjson::value deserialize_item(bytes_view bv); std::string type_to_string(data_type type); -bytes get_key_column_value(const Json::Value& item, const column_definition& column); -Json::Value json_key_column_value(bytes_view cell, const column_definition& column); +bytes get_key_column_value(const rjson::value& item, const column_definition& column); +rjson::value json_key_column_value(bytes_view cell, const column_definition& column); -partition_key pk_from_json(const Json::Value& item, schema_ptr schema); -clustering_key ck_from_json(const Json::Value& item, schema_ptr schema); +partition_key pk_from_json(const rjson::value& item, schema_ptr schema); +clustering_key ck_from_json(const rjson::value& item, schema_ptr schema); }