alternator: add checking for duplicate keys in batches

Batch writes and batch deletes do not allow multiple entries
for the same key. This patch implements checking for duplicated
entries and throws an error if applicable.
Message-Id: <450220ba74f26a0893430cb903e4749f978dfd31.1562325663.git.sarna@scylladb.com>
This commit is contained in:
Piotr Sarna
2019-07-05 13:21:29 +02:00
committed by Nadav Har'El
parent b810fa59c4
commit eb7ada8387

View File

@@ -354,6 +354,20 @@ static schema_ptr get_table_from_batch_request(const service::storage_proxy& pro
}
}
using primary_key = std::pair<partition_key, clustering_key>;
struct primary_key_hash {
schema_ptr _s;
size_t operator()(const primary_key& key) const {
return utils::hash_combine(partition_key::hashing(*_s)(key.first), clustering_key::hashing(*_s)(key.second));
}
};
struct primary_key_equal {
schema_ptr _s;
bool operator()(const primary_key& k1, const primary_key& k2) const {
return partition_key::equality(*_s)(k1.first, k2.first) && clustering_key::equality(*_s)(k1.second, k2.second);
}
};
future<json::json_return_type> executor::batch_write_item(std::string content) {
_stats.api_operations.batch_write_item++;
Json::Value batch_info = json::to_json_value(content);
@@ -361,8 +375,10 @@ future<json::json_return_type> executor::batch_write_item(std::string content) {
std::vector<mutation> mutations;
mutations.reserve(request_items.size());
for (auto it = request_items.begin(); it != request_items.end(); ++it) {
schema_ptr schema = get_table_from_batch_request(_proxy, it);
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(1, primary_key_hash{schema}, primary_key_equal{schema});
for (auto&& request : *it) {
if (!request.isObject() || request.size() != 1) {
throw api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request.toStyledString()));
@@ -372,9 +388,21 @@ future<json::json_return_type> executor::batch_write_item(std::string content) {
const Json::Value& put_request = *r;
const Json::Value& item = put_request["Item"];
mutations.push_back(make_item_mutation(item, schema));
// make_item_mutation returns a mutation with a single clustering row
auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key());
if (used_keys.count(mut_key) > 0) {
throw api_error("ValidationException", "Provided list of item keys contains duplicates");
}
used_keys.insert(std::move(mut_key));
} else if (r.key() == "DeleteRequest") {
const Json::Value& key = (*r)["Key"];
mutations.push_back(make_delete_item_mutation(key, schema));
// make_delete_item_mutation returns a mutation with a single clustering row
auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key());
if (used_keys.count(mut_key) > 0) {
throw api_error("ValidationException", "Provided list of item keys contains duplicates");
}
used_keys.insert(std::move(mut_key));
} else {
throw api_error("ValidationException", format("Unknown BatchWriteItem request type: {}", r.key()));
}