mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-01 12:36:56 +00:00
alternator: use LWT timestamp - in BatchWriteItems too
A previous patch fixed Alternator's writes to use the timestamp provided by LWT instead of the current timestamp. That patch fixed the PutItem, DeleteItem and UpdateItem operations - and this patch fixes the remaining write operation: BatchWriteItems. So, Fixes #5653. Unfortunatly, the requirements of both BatchWriteItems and LWT make the resulting code - and this patch - somewhat inelegant. BatchWriteItems requires that we prepare all the operations first - failing if any of them has an error. Before this patch, the result of this preparation was an array of mutations, which in a second step we wrote to the database. But we can no longer use mutations for the result of the first step, because creating a mutation requires knowing the timestamp, which we don't know during the preparate phase - we will only know it during the later LWT operation. So now we need to invent a new intermediate format between the request and the mutation. This intermediate format is further complicated by the need to be send it between shards (for LWT's shard forwarding) so it cannot, for example, contain a reference to a schema. The fact that different sub-operations need to be sent to different shards, and that different sub-operations may write to different tables, further complicate the book-keeping and gives us a bunch of funky-typed maps. But eventually it all fits together. After this patch, as before this patch, the same code (now called put_or_delete_item), is used to implement both the PutItem and DeleteItem stand-alone operation, and the BachWriteItems operation which includes a whole list of these PutItem and DeleteItem operation. This patch also includes two more tests in test_batch.py, which test two more corner tests we haven't tested before: One tests the capability of BatchWriteItems to write to more than one table. The other tests that BatchWriteItems can write an empty item (it is not surprising that it does, but we do have special code for this case, so we should test it). Signed-off-by: Nadav Har'El <nyh@scylladb.com>
This commit is contained in:
@@ -196,6 +196,32 @@ def test_batch_write_invalid_operation(test_table_s):
|
||||
for p in [p1, p2]:
|
||||
assert not 'item' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True)
|
||||
|
||||
# In test_item.py we have a bunch of test_empty_* tests on different ways to
|
||||
# create an empty item (which in Scylla requires the special CQL row marker
|
||||
# to be supported correctly). BatchWriteItems provides yet another way of
|
||||
# creating items, so check the empty case here too:
|
||||
def test_empty_batch_write(test_table):
|
||||
p = random_string()
|
||||
c = random_string()
|
||||
with test_table.batch_writer() as batch:
|
||||
batch.put_item({'p': p, 'c': c})
|
||||
assert test_table.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item'] == {'p': p, 'c': c}
|
||||
|
||||
# Test that BatchWriteItems allows writing to multiple tables in one operation
|
||||
def test_batch_write_multiple_tables(test_table_s, test_table):
|
||||
p1 = random_string()
|
||||
c1 = random_string()
|
||||
p2 = random_string()
|
||||
# We use the low-level batch_write_item API for lack of a more convenient
|
||||
# API (the batch_writer() API can only write to one table). At least it
|
||||
# spares us the need to encode the key's types...
|
||||
reply = test_table.meta.client.batch_write_item(RequestItems = {
|
||||
test_table.name: [{'PutRequest': {'Item': {'p': p1, 'c': c1, 'a': 'hi'}}}],
|
||||
test_table_s.name: [{'PutRequest': {'Item': {'p': p2, 'b': 'hello'}}}]
|
||||
})
|
||||
assert test_table.get_item(Key={'p': p1, 'c': c1}, ConsistentRead=True)['Item'] == {'p': p1, 'c': c1, 'a': 'hi'}
|
||||
assert test_table_s.get_item(Key={'p': p2}, ConsistentRead=True)['Item'] == {'p': p2, 'b': 'hello'}
|
||||
|
||||
# Basic test for BatchGetItem, reading several entire items.
|
||||
# Schema has both hash and sort keys.
|
||||
def test_batch_get_item(test_table):
|
||||
|
||||
@@ -881,28 +881,90 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
static mutation make_item_mutation(const rjson::value& item, schema_ptr schema, api::timestamp_type ts) {
|
||||
partition_key pk = pk_from_json(item, schema);
|
||||
clustering_key ck = ck_from_json(item, schema);
|
||||
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
|
||||
// components of a key, and if that succeeded, call check_key() to further
|
||||
// check that the key doesn't have any spurious components.
|
||||
static void check_key(const rjson::value& key, const schema_ptr& schema) {
|
||||
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
|
||||
throw api_error("ValidationException", "Given key attribute not in schema");
|
||||
}
|
||||
}
|
||||
|
||||
mutation m(schema, pk);
|
||||
attribute_collector attrs_collector;
|
||||
// The put_or_delete_item class builds the mutations needed by the PutItem and
|
||||
// DeleteItem operations - either as stand-alone commands or part of a list
|
||||
// of commands in BatchWriteItems.
|
||||
// put_or_delete_item splits each operation into two stages: Constructing the
|
||||
// object parses and validates the user input (throwing exceptions if there
|
||||
// are input errors). Later, build() generates the actual mutation, with a
|
||||
// specified timestamp. This split is needed because of the peculiar needs of
|
||||
// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before
|
||||
// any writing happens (if one of the commands has an error, none of the
|
||||
// writes should be done). LWT makes it impossible for the parse step to
|
||||
// generate "mutation" objects, because the timestamp still isn't known.
|
||||
class put_or_delete_item {
|
||||
private:
|
||||
partition_key _pk;
|
||||
clustering_key _ck;
|
||||
struct cell {
|
||||
bytes column_name;
|
||||
bytes value;
|
||||
};
|
||||
// PutItem: engaged _cells, write these cells to item (_pk, _ck).
|
||||
// DeleteItem: disengaged _cells, delete the entire item (_pk, _ck).
|
||||
std::optional<std::vector<cell>> _cells;
|
||||
public:
|
||||
struct delete_item {};
|
||||
struct put_item {};
|
||||
put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item);
|
||||
put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item);
|
||||
// put_or_delete_item doesn't keep a reference to schema (so it can be
|
||||
// moved between shards for LWT) so it needs to be given again to build():
|
||||
mutation build(schema_ptr schema, api::timestamp_type ts);
|
||||
const partition_key& pk() const { return _pk; }
|
||||
const clustering_key& ck() const { return _ck; }
|
||||
};
|
||||
|
||||
auto& row = m.partition().clustered_row(*schema, ck);
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item)
|
||||
: _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) {
|
||||
check_key(key, schema);
|
||||
}
|
||||
|
||||
put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item)
|
||||
: _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) {
|
||||
_cells = std::vector<cell>();
|
||||
_cells->reserve(item.MemberCount());
|
||||
for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) {
|
||||
bytes column_name = to_bytes(it->name.GetString());
|
||||
const column_definition* cdef = schema->get_column_definition(column_name);
|
||||
if (!cdef) {
|
||||
bytes value = serialize_item(it->value);
|
||||
attrs_collector.put(std::move(column_name), std::move(value), ts);
|
||||
_cells->push_back({std::move(column_name), serialize_item(it->value)});
|
||||
} else if (!cdef->is_primary_key()) {
|
||||
// Explicitly defined regular columns can appear as a result of creating a global secondary index
|
||||
bytes column_value = get_key_from_typed_value(it->value, *cdef, type_to_string(cdef->type));
|
||||
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, column_value));
|
||||
// Fixed-type regular column can be used for GSI key
|
||||
_cells->push_back({std::move(column_name),
|
||||
get_key_from_typed_value(it->value, *cdef, type_to_string(cdef->type))});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) {
|
||||
mutation m(schema, _pk);
|
||||
auto& row = m.partition().clustered_row(*schema, _ck);
|
||||
if (!_cells) {
|
||||
// a DeleteItem operation:
|
||||
row.apply(tombstone(ts, gc_clock::now()));
|
||||
return m;
|
||||
}
|
||||
// else, a PutItem operation:
|
||||
attribute_collector attrs_collector;
|
||||
for (auto& c : *_cells) {
|
||||
const column_definition* cdef = schema->get_column_definition(c.column_name);
|
||||
if (!cdef) {
|
||||
attrs_collector.put(std::move(c.column_name), std::move(c.value), ts);
|
||||
} else {
|
||||
row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, std::move(c.value)));
|
||||
}
|
||||
}
|
||||
if (!attrs_collector.empty()) {
|
||||
auto serialized_map = attrs_collector.to_mut().serialize(*attrs_type());
|
||||
row.cells().apply(attrs_column(*schema), std::move(serialized_map));
|
||||
@@ -1176,14 +1238,14 @@ static void verify_all_are_used(const rjson::value& req, const char* field,
|
||||
|
||||
class put_item_operation : public rmw_operation {
|
||||
private:
|
||||
rjson::value& _item;
|
||||
put_or_delete_item _mutation_builder;
|
||||
public:
|
||||
parsed::condition_expression _condition_expression;
|
||||
put_item_operation(service::storage_proxy& proxy, rjson::value&& request)
|
||||
: rmw_operation(proxy, std::move(request))
|
||||
, _item(rjson::get(_request, "Item")) {
|
||||
_pk = pk_from_json(_item, schema());
|
||||
_ck = ck_from_json(_item, schema());
|
||||
, _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{}) {
|
||||
_pk = _mutation_builder.pk();
|
||||
_ck = _mutation_builder.ck();
|
||||
auto return_values = get_string_attribute(_request, "ReturnValues", "NONE");
|
||||
if (return_values != "NONE") {
|
||||
// FIXME: Need to support also the ALL_OLD option. See issue #5053.
|
||||
@@ -1207,7 +1269,7 @@ public:
|
||||
verify_all_are_used(_request, "ExpressionAttributeNames", used_attribute_names, "UpdateExpression");
|
||||
verify_all_are_used(_request, "ExpressionAttributeValues", used_attribute_values, "UpdateExpression");
|
||||
}
|
||||
return make_item_mutation(_item, schema(), ts);
|
||||
return _mutation_builder.build(_schema, ts);
|
||||
}
|
||||
virtual ~put_item_operation() = default;
|
||||
};
|
||||
@@ -1240,36 +1302,16 @@ future<executor::request_return_type> executor::put_item(client_state& client_st
|
||||
});
|
||||
}
|
||||
|
||||
// After calling pk_from_json() and ck_from_json() to extract the pk and ck
|
||||
// components of a key, and if that succeeded, call check_key() to further
|
||||
// check that the key doesn't have any spurious components.
|
||||
static void check_key(const rjson::value& key, const schema_ptr& schema) {
|
||||
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
|
||||
throw api_error("ValidationException", "Given key attribute not in schema");
|
||||
}
|
||||
}
|
||||
|
||||
static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr schema, api::timestamp_type ts) {
|
||||
partition_key pk = pk_from_json(key, schema);
|
||||
clustering_key ck = ck_from_json(key, schema);
|
||||
check_key(key, schema);
|
||||
mutation m(schema, pk);
|
||||
auto& row = m.partition().clustered_row(*schema, ck);
|
||||
row.apply(tombstone(ts, gc_clock::now()));
|
||||
return m;
|
||||
}
|
||||
|
||||
class delete_item_operation : public rmw_operation {
|
||||
private:
|
||||
rjson::value& _key;
|
||||
put_or_delete_item _mutation_builder;
|
||||
public:
|
||||
parsed::condition_expression _condition_expression;
|
||||
delete_item_operation(service::storage_proxy& proxy, rjson::value&& request)
|
||||
: rmw_operation(proxy, std::move(request))
|
||||
, _key(rjson::get(_request, "Key")) {
|
||||
_pk = pk_from_json(_key, schema());
|
||||
_ck = ck_from_json(_key, schema());
|
||||
check_key(_key, schema());
|
||||
, _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) {
|
||||
_pk = _mutation_builder.pk();
|
||||
_ck = _mutation_builder.ck();
|
||||
auto return_values = get_string_attribute(_request, "ReturnValues", "NONE");
|
||||
if (return_values != "NONE") {
|
||||
// FIXME: Need to support also the ALL_OLD option. See issue #5053.
|
||||
@@ -1293,7 +1335,7 @@ public:
|
||||
verify_all_are_used(_request, "ExpressionAttributeNames", used_attribute_names, "UpdateExpression");
|
||||
verify_all_are_used(_request, "ExpressionAttributeValues", used_attribute_values, "UpdateExpression");
|
||||
}
|
||||
return make_delete_item_mutation(_key, schema(), ts);
|
||||
return _mutation_builder.build(_schema, ts);
|
||||
}
|
||||
virtual ~delete_item_operation() = default;
|
||||
};
|
||||
@@ -1350,55 +1392,68 @@ struct primary_key_equal {
|
||||
}
|
||||
};
|
||||
|
||||
struct decorated_key_hash {
|
||||
size_t operator()(const dht::decorated_key& k) const {
|
||||
return std::hash<dht::token>()(k.token());
|
||||
}
|
||||
};
|
||||
struct decorated_key_equal {
|
||||
schema_ptr _s;
|
||||
bool operator()(const dht::decorated_key& k1, const dht::decorated_key& k2) const {
|
||||
return k1.equal(*_s, k2);
|
||||
}
|
||||
};
|
||||
|
||||
// This is a cas_request subclass for applying a given mutation to one
|
||||
// partition using LWT. This is a write-only operation, not needing the
|
||||
// previous value of the item (the mutation to be done is known prior
|
||||
// to starting the operation). Nevertheless, we want to do this mutation
|
||||
// via LWT to ensure that it is serialized with other LWT mutations to
|
||||
// the same partition.
|
||||
class mutation_cas_request : public service::cas_request {
|
||||
mutation _mutation;
|
||||
// This is a cas_request subclass for applying given put_or_delete_items to
|
||||
// one partition using LWT as part as BatchWriteItems. This is a write-only
|
||||
// operation, not needing the previous value of the item (the mutation to be
|
||||
// done is known prior to starting the operation). Nevertheless, we want to
|
||||
// do this mutation via LWT to ensure that it is serialized with other LWT
|
||||
// mutations to the same partition.
|
||||
class put_or_delete_item_cas_request : public service::cas_request {
|
||||
schema_ptr schema;
|
||||
std::vector<put_or_delete_item> _mutation_builders;
|
||||
public:
|
||||
mutation_cas_request(mutation&& m) : _mutation(std::move(m)) { }
|
||||
virtual ~mutation_cas_request() = default;
|
||||
put_or_delete_item_cas_request(schema_ptr s, std::vector<put_or_delete_item>&& b) :
|
||||
schema(std::move(s)), _mutation_builders(std::move(b)) { }
|
||||
virtual ~put_or_delete_item_cas_request() = default;
|
||||
virtual std::optional<mutation> apply(query::result& qr, const query::partition_slice& slice, api::timestamp_type ts) override {
|
||||
// This function is only called once so we can move _mutation.
|
||||
return std::move(_mutation);
|
||||
std::optional<mutation> ret;
|
||||
for (put_or_delete_item& mutation_builder : _mutation_builders) {
|
||||
// We assume all these builders have the same partition.
|
||||
if (ret) {
|
||||
ret->apply(mutation_builder.build(schema, ts));
|
||||
} else {
|
||||
ret = mutation_builder.build(schema, ts);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
const dht::decorated_key& decorated_key() const { return _mutation.decorated_key(); }
|
||||
};
|
||||
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, mutation&& m, service::client_state& client_state) {
|
||||
static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, dht::decorated_key dk, std::vector<put_or_delete_item>&& mutation_builders, service::client_state& client_state) {
|
||||
auto timeout = default_timeout();
|
||||
auto read_command = read_nothing_read_command(schema);
|
||||
auto op = make_shared<mutation_cas_request>(std::move(m));
|
||||
return proxy.cas(schema, op, read_command, to_partition_ranges(op->decorated_key()),
|
||||
auto op = seastar::make_shared<put_or_delete_item_cas_request>(schema, std::move(mutation_builders));
|
||||
return proxy.cas(schema, op, read_command, to_partition_ranges(dk),
|
||||
{timeout, empty_service_permit(), client_state, client_state.get_trace_state()},
|
||||
db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM,
|
||||
timeout, timeout).discard_result();
|
||||
// We discarded cas()'s future value ("is_applied") because it's always
|
||||
// true, as mutation_cas_request::apply() always returns a mutation.
|
||||
// We discarded cas()'s future value ("is_applied") because BatchWriteItems
|
||||
// does not need to support conditional updates.
|
||||
}
|
||||
|
||||
|
||||
struct schema_decorated_key {
|
||||
schema_ptr schema;
|
||||
dht::decorated_key dk;
|
||||
};
|
||||
struct schema_decorated_key_hash {
|
||||
size_t operator()(const schema_decorated_key& k) const {
|
||||
return std::hash<dht::token>()(k.dk.token());
|
||||
}
|
||||
};
|
||||
struct schema_decorated_key_equal {
|
||||
bool operator()(const schema_decorated_key& k1, const schema_decorated_key& k2) const {
|
||||
return k1.schema == k2.schema && k1.dk.equal(*k1.schema, k2.dk);
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME: if we failed writing some of the mutations, need to return a list
|
||||
// of these failed mutations rather than fail the whole write (issue #5650).
|
||||
static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
std::vector<mutation> mutations,
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders,
|
||||
service::client_state& client_state,
|
||||
stats& stats) {
|
||||
if (mutations.empty()) {
|
||||
if (mutation_builders.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
// FIXME: Currently, the write isolation option is a constant chosen
|
||||
@@ -1406,6 +1461,12 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
const rmw_operation::write_isolation write_isolation = rmw_operation::default_write_isolation;
|
||||
if (write_isolation != rmw_operation::write_isolation::LWT_ALWAYS) {
|
||||
// Do a normal write, without LWT:
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(mutation_builders.size());
|
||||
api::timestamp_type now = api::new_timestamp();
|
||||
for (auto& b : mutation_builders) {
|
||||
mutations.push_back(b.second.build(b.first, now));
|
||||
}
|
||||
return proxy.mutate(std::move(mutations),
|
||||
db::consistency_level::LOCAL_QUORUM,
|
||||
default_timeout(),
|
||||
@@ -1416,23 +1477,22 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
// Multiple mutations may be destined for the same partition, adding
|
||||
// or deleting different items of one partition. Join them together
|
||||
// because we can do them in one cas() call.
|
||||
schema_ptr schema = mutations.front().schema();
|
||||
std::unordered_map<dht::decorated_key, mutation, decorated_key_hash, decorated_key_equal>
|
||||
key_mutations(1, decorated_key_hash{}, decorated_key_equal{schema});
|
||||
for (auto& m : mutations) {
|
||||
auto it = key_mutations.find(m.decorated_key());
|
||||
if (it == key_mutations.end()) {
|
||||
key_mutations.emplace(m.decorated_key(), m);
|
||||
std::unordered_map<schema_decorated_key, std::vector<put_or_delete_item>, schema_decorated_key_hash, schema_decorated_key_equal>
|
||||
key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{});
|
||||
for (auto& b : mutation_builders) {
|
||||
auto dk = dht::global_partitioner().decorate_key(*b.first, b.second.pk());
|
||||
auto it = key_builders.find({b.first, dk});
|
||||
if (it == key_builders.end()) {
|
||||
key_builders.emplace(schema_decorated_key{b.first, dk}, std::vector<put_or_delete_item>{std::move(b.second)});
|
||||
} else {
|
||||
it->second.apply(m);
|
||||
it->second.push_back(std::move(b.second));
|
||||
}
|
||||
}
|
||||
return parallel_for_each(std::move(key_mutations), [&proxy, &client_state, &stats] (auto& key_mutation) {
|
||||
return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats] (auto& e) {
|
||||
stats.write_using_lwt++;
|
||||
auto schema = key_mutation.second.schema();
|
||||
auto desired_shard = service::storage_proxy::cas_shard(key_mutation.second.decorated_key().token());
|
||||
auto desired_shard = service::storage_proxy::cas_shard(e.first.dk.token());
|
||||
if (desired_shard == engine().cpu_id()) {
|
||||
return cas_write(proxy, schema, std::move(key_mutation.second), client_state);
|
||||
return cas_write(proxy, e.first.schema, e.first.dk, std::move(e.second), client_state);
|
||||
} else {
|
||||
stats.shard_bounce_for_lwt++;
|
||||
// FIXME: create separate smp_service_group
|
||||
@@ -1441,15 +1501,17 @@ static future<> do_batch_write(service::storage_proxy& proxy,
|
||||
// use proxy.container().
|
||||
return service::get_storage_proxy().invoke_on(desired_shard, default_smp_service_group(),
|
||||
[cs = client_state.move_to_other_shard(),
|
||||
fm = frozen_mutation(key_mutation.second),
|
||||
ks = schema->ks_name(), cf = schema->cf_name()]
|
||||
mb = e.second,
|
||||
dk = e.first.dk,
|
||||
ks = e.first.schema->ks_name(),
|
||||
cf = e.first.schema->cf_name()]
|
||||
(service::storage_proxy& proxy) mutable {
|
||||
return do_with(cs.get(), [&proxy, fm = std::move(fm), ks = std::move(ks), cf = std::move(cf)]
|
||||
(service::client_state& client_state) {
|
||||
auto schema = proxy.get_db().local().find_schema(ks, cf);
|
||||
return cas_write(proxy, schema, fm.unfreeze(schema), client_state);
|
||||
});
|
||||
return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf)]
|
||||
(service::client_state& client_state) mutable {
|
||||
auto schema = proxy.get_db().local().find_schema(ks, cf);
|
||||
return cas_write(proxy, schema, dk, std::move(mb), client_state);
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1460,13 +1522,14 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
rjson::value batch_info = rjson::parse(content);
|
||||
rjson::value& request_items = batch_info["RequestItems"];
|
||||
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(request_items.MemberCount());
|
||||
std::vector<std::pair<schema_ptr, put_or_delete_item>> mutation_builders;
|
||||
mutation_builders.reserve(request_items.MemberCount());
|
||||
|
||||
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
|
||||
schema_ptr schema = get_table_from_batch_request(_proxy, it);
|
||||
tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name());
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(1, primary_key_hash{schema}, primary_key_equal{schema});
|
||||
std::unordered_set<primary_key, primary_key_hash, primary_key_equal> used_keys(
|
||||
1, primary_key_hash{schema}, primary_key_equal{schema});
|
||||
for (auto& request : it->value.GetArray()) {
|
||||
if (!request.IsObject() || request.MemberCount() != 1) {
|
||||
return make_ready_future<request_return_type>(api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request)));
|
||||
@@ -1476,24 +1539,19 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
if (r_name == "PutRequest") {
|
||||
const rjson::value& put_request = r->value;
|
||||
const rjson::value& item = put_request["Item"];
|
||||
// FIXME: Because of issue #5653, we must not call
|
||||
// make_item_mutation() here with the current timestamp!
|
||||
// Rather, instead of collecting mutations, we should collect
|
||||
// the operations themselves, and mutation_cas_request will
|
||||
// call make_item_mutation() or make_delete_item_mutation() in
|
||||
// its apply(), with the right timestamp.
|
||||
mutations.push_back(make_item_mutation(item, schema, api::new_timestamp()));
|
||||
// make_item_mutation returns a mutation with a single clustering row
|
||||
auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key());
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
item, schema, put_or_delete_item::put_item{}));
|
||||
auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck());
|
||||
if (used_keys.count(mut_key) > 0) {
|
||||
return make_ready_future<request_return_type>(api_error("ValidationException", "Provided list of item keys contains duplicates"));
|
||||
}
|
||||
used_keys.insert(std::move(mut_key));
|
||||
} else if (r_name == "DeleteRequest") {
|
||||
const rjson::value& key = (r->value)["Key"];
|
||||
mutations.push_back(make_delete_item_mutation(key, schema, api::new_timestamp()));
|
||||
// make_delete_item_mutation returns a mutation with a single clustering row
|
||||
auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key());
|
||||
mutation_builders.emplace_back(schema, put_or_delete_item(
|
||||
key, schema, put_or_delete_item::delete_item{}));
|
||||
auto mut_key = std::make_pair(mutation_builders.back().second.pk(),
|
||||
mutation_builders.back().second.ck());
|
||||
if (used_keys.count(mut_key) > 0) {
|
||||
return make_ready_future<request_return_type>(api_error("ValidationException", "Provided list of item keys contains duplicates"));
|
||||
}
|
||||
@@ -1504,7 +1562,7 @@ future<executor::request_return_type> executor::batch_write_item(client_state& c
|
||||
}
|
||||
}
|
||||
|
||||
return do_batch_write(_proxy, std::move(mutations), client_state, _stats).then([] () {
|
||||
return do_batch_write(_proxy, std::move(mutation_builders), client_state, _stats).then([] () {
|
||||
// FIXME: Issue #5650: If we failed writing some of the updates,
|
||||
// need to return a list of these failed updates in UnprocessedItems
|
||||
// rather than fail the whole write (issue #5650).
|
||||
|
||||
Reference in New Issue
Block a user