diff --git a/alternator/executor.cc b/alternator/executor.cc index f0c075424d..3afc5253d1 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -354,6 +354,20 @@ static schema_ptr get_table_from_batch_request(const service::storage_proxy& pro } } +using primary_key = std::pair; +struct primary_key_hash { + schema_ptr _s; + size_t operator()(const primary_key& key) const { + return utils::hash_combine(partition_key::hashing(*_s)(key.first), clustering_key::hashing(*_s)(key.second)); + } +}; +struct primary_key_equal { + schema_ptr _s; + bool operator()(const primary_key& k1, const primary_key& k2) const { + return partition_key::equality(*_s)(k1.first, k2.first) && clustering_key::equality(*_s)(k1.second, k2.second); + } +}; + future executor::batch_write_item(std::string content) { _stats.api_operations.batch_write_item++; Json::Value batch_info = json::to_json_value(content); @@ -361,8 +375,10 @@ future executor::batch_write_item(std::string content) { std::vector mutations; mutations.reserve(request_items.size()); + for (auto it = request_items.begin(); it != request_items.end(); ++it) { schema_ptr schema = get_table_from_batch_request(_proxy, it); + std::unordered_set used_keys(1, primary_key_hash{schema}, primary_key_equal{schema}); for (auto&& request : *it) { if (!request.isObject() || request.size() != 1) { throw api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request.toStyledString())); @@ -372,9 +388,21 @@ future executor::batch_write_item(std::string content) { const Json::Value& put_request = *r; const Json::Value& item = put_request["Item"]; mutations.push_back(make_item_mutation(item, schema)); + // make_item_mutation returns a mutation with a single clustering row + auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); + if (used_keys.count(mut_key) > 0) { + throw api_error("ValidationException", "Provided list of item keys contains duplicates"); + } + used_keys.insert(std::move(mut_key)); } else if (r.key() == "DeleteRequest") { const Json::Value& key = (*r)["Key"]; mutations.push_back(make_delete_item_mutation(key, schema)); + // make_delete_item_mutation returns a mutation with a single clustering row + auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); + if (used_keys.count(mut_key) > 0) { + throw api_error("ValidationException", "Provided list of item keys contains duplicates"); + } + used_keys.insert(std::move(mut_key)); } else { throw api_error("ValidationException", format("Unknown BatchWriteItem request type: {}", r.key())); }