diff --git a/alternator-test/test_batch.py b/alternator-test/test_batch.py index 2a0df570a5..1568242c87 100644 --- a/alternator-test/test_batch.py +++ b/alternator-test/test_batch.py @@ -196,6 +196,32 @@ def test_batch_write_invalid_operation(test_table_s): for p in [p1, p2]: assert not 'item' in test_table_s.get_item(Key={'p': p}, ConsistentRead=True) +# In test_item.py we have a bunch of test_empty_* tests on different ways to +# create an empty item (which in Scylla requires the special CQL row marker +# to be supported correctly). BatchWriteItems provides yet another way of +# creating items, so check the empty case here too: +def test_empty_batch_write(test_table): + p = random_string() + c = random_string() + with test_table.batch_writer() as batch: + batch.put_item({'p': p, 'c': c}) + assert test_table.get_item(Key={'p': p, 'c': c}, ConsistentRead=True)['Item'] == {'p': p, 'c': c} + +# Test that BatchWriteItems allows writing to multiple tables in one operation +def test_batch_write_multiple_tables(test_table_s, test_table): + p1 = random_string() + c1 = random_string() + p2 = random_string() + # We use the low-level batch_write_item API for lack of a more convenient + # API (the batch_writer() API can only write to one table). At least it + # spares us the need to encode the key's types... + reply = test_table.meta.client.batch_write_item(RequestItems = { + test_table.name: [{'PutRequest': {'Item': {'p': p1, 'c': c1, 'a': 'hi'}}}], + test_table_s.name: [{'PutRequest': {'Item': {'p': p2, 'b': 'hello'}}}] + }) + assert test_table.get_item(Key={'p': p1, 'c': c1}, ConsistentRead=True)['Item'] == {'p': p1, 'c': c1, 'a': 'hi'} + assert test_table_s.get_item(Key={'p': p2}, ConsistentRead=True)['Item'] == {'p': p2, 'b': 'hello'} + # Basic test for BatchGetItem, reading several entire items. # Schema has both hash and sort keys. def test_batch_get_item(test_table): diff --git a/alternator/executor.cc b/alternator/executor.cc index 3773f6af1f..89f2066393 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -881,28 +881,90 @@ public: } }; -static mutation make_item_mutation(const rjson::value& item, schema_ptr schema, api::timestamp_type ts) { - partition_key pk = pk_from_json(item, schema); - clustering_key ck = ck_from_json(item, schema); +// After calling pk_from_json() and ck_from_json() to extract the pk and ck +// components of a key, and if that succeeded, call check_key() to further +// check that the key doesn't have any spurious components. +static void check_key(const rjson::value& key, const schema_ptr& schema) { + if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) { + throw api_error("ValidationException", "Given key attribute not in schema"); + } +} - mutation m(schema, pk); - attribute_collector attrs_collector; +// The put_or_delete_item class builds the mutations needed by the PutItem and +// DeleteItem operations - either as stand-alone commands or part of a list +// of commands in BatchWriteItems. +// put_or_delete_item splits each operation into two stages: Constructing the +// object parses and validates the user input (throwing exceptions if there +// are input errors). Later, build() generates the actual mutation, with a +// specified timestamp. This split is needed because of the peculiar needs of +// BatchWriteItems and LWT. BatchWriteItems needs all parsing to happen before +// any writing happens (if one of the commands has an error, none of the +// writes should be done). LWT makes it impossible for the parse step to +// generate "mutation" objects, because the timestamp still isn't known. +class put_or_delete_item { +private: + partition_key _pk; + clustering_key _ck; + struct cell { + bytes column_name; + bytes value; + }; + // PutItem: engaged _cells, write these cells to item (_pk, _ck). + // DeleteItem: disengaged _cells, delete the entire item (_pk, _ck). + std::optional> _cells; +public: + struct delete_item {}; + struct put_item {}; + put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item); + put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item); + // put_or_delete_item doesn't keep a reference to schema (so it can be + // moved between shards for LWT) so it needs to be given again to build(): + mutation build(schema_ptr schema, api::timestamp_type ts); + const partition_key& pk() const { return _pk; } + const clustering_key& ck() const { return _ck; } +}; - auto& row = m.partition().clustered_row(*schema, ck); +put_or_delete_item::put_or_delete_item(const rjson::value& key, schema_ptr schema, delete_item) + : _pk(pk_from_json(key, schema)), _ck(ck_from_json(key, schema)) { + check_key(key, schema); +} +put_or_delete_item::put_or_delete_item(const rjson::value& item, schema_ptr schema, put_item) + : _pk(pk_from_json(item, schema)), _ck(ck_from_json(item, schema)) { + _cells = std::vector(); + _cells->reserve(item.MemberCount()); for (auto it = item.MemberBegin(); it != item.MemberEnd(); ++it) { bytes column_name = to_bytes(it->name.GetString()); const column_definition* cdef = schema->get_column_definition(column_name); if (!cdef) { bytes value = serialize_item(it->value); - attrs_collector.put(std::move(column_name), std::move(value), ts); + _cells->push_back({std::move(column_name), serialize_item(it->value)}); } else if (!cdef->is_primary_key()) { - // Explicitly defined regular columns can appear as a result of creating a global secondary index - bytes column_value = get_key_from_typed_value(it->value, *cdef, type_to_string(cdef->type)); - row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, column_value)); + // Fixed-type regular column can be used for GSI key + _cells->push_back({std::move(column_name), + get_key_from_typed_value(it->value, *cdef, type_to_string(cdef->type))}); } } +} +mutation put_or_delete_item::build(schema_ptr schema, api::timestamp_type ts) { + mutation m(schema, _pk); + auto& row = m.partition().clustered_row(*schema, _ck); + if (!_cells) { + // a DeleteItem operation: + row.apply(tombstone(ts, gc_clock::now())); + return m; + } + // else, a PutItem operation: + attribute_collector attrs_collector; + for (auto& c : *_cells) { + const column_definition* cdef = schema->get_column_definition(c.column_name); + if (!cdef) { + attrs_collector.put(std::move(c.column_name), std::move(c.value), ts); + } else { + row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, ts, std::move(c.value))); + } + } if (!attrs_collector.empty()) { auto serialized_map = attrs_collector.to_mut().serialize(*attrs_type()); row.cells().apply(attrs_column(*schema), std::move(serialized_map)); @@ -1176,14 +1238,14 @@ static void verify_all_are_used(const rjson::value& req, const char* field, class put_item_operation : public rmw_operation { private: - rjson::value& _item; + put_or_delete_item _mutation_builder; public: parsed::condition_expression _condition_expression; put_item_operation(service::storage_proxy& proxy, rjson::value&& request) : rmw_operation(proxy, std::move(request)) - , _item(rjson::get(_request, "Item")) { - _pk = pk_from_json(_item, schema()); - _ck = ck_from_json(_item, schema()); + , _mutation_builder(rjson::get(_request, "Item"), schema(), put_or_delete_item::put_item{}) { + _pk = _mutation_builder.pk(); + _ck = _mutation_builder.ck(); auto return_values = get_string_attribute(_request, "ReturnValues", "NONE"); if (return_values != "NONE") { // FIXME: Need to support also the ALL_OLD option. See issue #5053. @@ -1207,7 +1269,7 @@ public: verify_all_are_used(_request, "ExpressionAttributeNames", used_attribute_names, "UpdateExpression"); verify_all_are_used(_request, "ExpressionAttributeValues", used_attribute_values, "UpdateExpression"); } - return make_item_mutation(_item, schema(), ts); + return _mutation_builder.build(_schema, ts); } virtual ~put_item_operation() = default; }; @@ -1240,36 +1302,16 @@ future executor::put_item(client_state& client_st }); } -// After calling pk_from_json() and ck_from_json() to extract the pk and ck -// components of a key, and if that succeeded, call check_key() to further -// check that the key doesn't have any spurious components. -static void check_key(const rjson::value& key, const schema_ptr& schema) { - if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) { - throw api_error("ValidationException", "Given key attribute not in schema"); - } -} - -static mutation make_delete_item_mutation(const rjson::value& key, schema_ptr schema, api::timestamp_type ts) { - partition_key pk = pk_from_json(key, schema); - clustering_key ck = ck_from_json(key, schema); - check_key(key, schema); - mutation m(schema, pk); - auto& row = m.partition().clustered_row(*schema, ck); - row.apply(tombstone(ts, gc_clock::now())); - return m; -} - class delete_item_operation : public rmw_operation { private: - rjson::value& _key; + put_or_delete_item _mutation_builder; public: parsed::condition_expression _condition_expression; delete_item_operation(service::storage_proxy& proxy, rjson::value&& request) : rmw_operation(proxy, std::move(request)) - , _key(rjson::get(_request, "Key")) { - _pk = pk_from_json(_key, schema()); - _ck = ck_from_json(_key, schema()); - check_key(_key, schema()); + , _mutation_builder(rjson::get(_request, "Key"), schema(), put_or_delete_item::delete_item{}) { + _pk = _mutation_builder.pk(); + _ck = _mutation_builder.ck(); auto return_values = get_string_attribute(_request, "ReturnValues", "NONE"); if (return_values != "NONE") { // FIXME: Need to support also the ALL_OLD option. See issue #5053. @@ -1293,7 +1335,7 @@ public: verify_all_are_used(_request, "ExpressionAttributeNames", used_attribute_names, "UpdateExpression"); verify_all_are_used(_request, "ExpressionAttributeValues", used_attribute_values, "UpdateExpression"); } - return make_delete_item_mutation(_key, schema(), ts); + return _mutation_builder.build(_schema, ts); } virtual ~delete_item_operation() = default; }; @@ -1350,55 +1392,68 @@ struct primary_key_equal { } }; -struct decorated_key_hash { - size_t operator()(const dht::decorated_key& k) const { - return std::hash()(k.token()); - } -}; -struct decorated_key_equal { - schema_ptr _s; - bool operator()(const dht::decorated_key& k1, const dht::decorated_key& k2) const { - return k1.equal(*_s, k2); - } -}; - -// This is a cas_request subclass for applying a given mutation to one -// partition using LWT. This is a write-only operation, not needing the -// previous value of the item (the mutation to be done is known prior -// to starting the operation). Nevertheless, we want to do this mutation -// via LWT to ensure that it is serialized with other LWT mutations to -// the same partition. -class mutation_cas_request : public service::cas_request { - mutation _mutation; +// This is a cas_request subclass for applying given put_or_delete_items to +// one partition using LWT as part as BatchWriteItems. This is a write-only +// operation, not needing the previous value of the item (the mutation to be +// done is known prior to starting the operation). Nevertheless, we want to +// do this mutation via LWT to ensure that it is serialized with other LWT +// mutations to the same partition. +class put_or_delete_item_cas_request : public service::cas_request { + schema_ptr schema; + std::vector _mutation_builders; public: - mutation_cas_request(mutation&& m) : _mutation(std::move(m)) { } - virtual ~mutation_cas_request() = default; + put_or_delete_item_cas_request(schema_ptr s, std::vector&& b) : + schema(std::move(s)), _mutation_builders(std::move(b)) { } + virtual ~put_or_delete_item_cas_request() = default; virtual std::optional apply(query::result& qr, const query::partition_slice& slice, api::timestamp_type ts) override { - // This function is only called once so we can move _mutation. - return std::move(_mutation); + std::optional ret; + for (put_or_delete_item& mutation_builder : _mutation_builders) { + // We assume all these builders have the same partition. + if (ret) { + ret->apply(mutation_builder.build(schema, ts)); + } else { + ret = mutation_builder.build(schema, ts); + } + } + return ret; } - const dht::decorated_key& decorated_key() const { return _mutation.decorated_key(); } }; -static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, mutation&& m, service::client_state& client_state) { +static future<> cas_write(service::storage_proxy& proxy, schema_ptr schema, dht::decorated_key dk, std::vector&& mutation_builders, service::client_state& client_state) { auto timeout = default_timeout(); auto read_command = read_nothing_read_command(schema); - auto op = make_shared(std::move(m)); - return proxy.cas(schema, op, read_command, to_partition_ranges(op->decorated_key()), + auto op = seastar::make_shared(schema, std::move(mutation_builders)); + return proxy.cas(schema, op, read_command, to_partition_ranges(dk), {timeout, empty_service_permit(), client_state, client_state.get_trace_state()}, db::consistency_level::LOCAL_SERIAL, db::consistency_level::LOCAL_QUORUM, timeout, timeout).discard_result(); - // We discarded cas()'s future value ("is_applied") because it's always - // true, as mutation_cas_request::apply() always returns a mutation. + // We discarded cas()'s future value ("is_applied") because BatchWriteItems + // does not need to support conditional updates. } + +struct schema_decorated_key { + schema_ptr schema; + dht::decorated_key dk; +}; +struct schema_decorated_key_hash { + size_t operator()(const schema_decorated_key& k) const { + return std::hash()(k.dk.token()); + } +}; +struct schema_decorated_key_equal { + bool operator()(const schema_decorated_key& k1, const schema_decorated_key& k2) const { + return k1.schema == k2.schema && k1.dk.equal(*k1.schema, k2.dk); + } +}; + // FIXME: if we failed writing some of the mutations, need to return a list // of these failed mutations rather than fail the whole write (issue #5650). static future<> do_batch_write(service::storage_proxy& proxy, - std::vector mutations, + std::vector> mutation_builders, service::client_state& client_state, stats& stats) { - if (mutations.empty()) { + if (mutation_builders.empty()) { return make_ready_future<>(); } // FIXME: Currently, the write isolation option is a constant chosen @@ -1406,6 +1461,12 @@ static future<> do_batch_write(service::storage_proxy& proxy, const rmw_operation::write_isolation write_isolation = rmw_operation::default_write_isolation; if (write_isolation != rmw_operation::write_isolation::LWT_ALWAYS) { // Do a normal write, without LWT: + std::vector mutations; + mutations.reserve(mutation_builders.size()); + api::timestamp_type now = api::new_timestamp(); + for (auto& b : mutation_builders) { + mutations.push_back(b.second.build(b.first, now)); + } return proxy.mutate(std::move(mutations), db::consistency_level::LOCAL_QUORUM, default_timeout(), @@ -1416,23 +1477,22 @@ static future<> do_batch_write(service::storage_proxy& proxy, // Multiple mutations may be destined for the same partition, adding // or deleting different items of one partition. Join them together // because we can do them in one cas() call. - schema_ptr schema = mutations.front().schema(); - std::unordered_map - key_mutations(1, decorated_key_hash{}, decorated_key_equal{schema}); - for (auto& m : mutations) { - auto it = key_mutations.find(m.decorated_key()); - if (it == key_mutations.end()) { - key_mutations.emplace(m.decorated_key(), m); + std::unordered_map, schema_decorated_key_hash, schema_decorated_key_equal> + key_builders(1, schema_decorated_key_hash{}, schema_decorated_key_equal{}); + for (auto& b : mutation_builders) { + auto dk = dht::global_partitioner().decorate_key(*b.first, b.second.pk()); + auto it = key_builders.find({b.first, dk}); + if (it == key_builders.end()) { + key_builders.emplace(schema_decorated_key{b.first, dk}, std::vector{std::move(b.second)}); } else { - it->second.apply(m); + it->second.push_back(std::move(b.second)); } } - return parallel_for_each(std::move(key_mutations), [&proxy, &client_state, &stats] (auto& key_mutation) { + return parallel_for_each(std::move(key_builders), [&proxy, &client_state, &stats] (auto& e) { stats.write_using_lwt++; - auto schema = key_mutation.second.schema(); - auto desired_shard = service::storage_proxy::cas_shard(key_mutation.second.decorated_key().token()); + auto desired_shard = service::storage_proxy::cas_shard(e.first.dk.token()); if (desired_shard == engine().cpu_id()) { - return cas_write(proxy, schema, std::move(key_mutation.second), client_state); + return cas_write(proxy, e.first.schema, e.first.dk, std::move(e.second), client_state); } else { stats.shard_bounce_for_lwt++; // FIXME: create separate smp_service_group @@ -1441,15 +1501,17 @@ static future<> do_batch_write(service::storage_proxy& proxy, // use proxy.container(). return service::get_storage_proxy().invoke_on(desired_shard, default_smp_service_group(), [cs = client_state.move_to_other_shard(), - fm = frozen_mutation(key_mutation.second), - ks = schema->ks_name(), cf = schema->cf_name()] + mb = e.second, + dk = e.first.dk, + ks = e.first.schema->ks_name(), + cf = e.first.schema->cf_name()] (service::storage_proxy& proxy) mutable { - return do_with(cs.get(), [&proxy, fm = std::move(fm), ks = std::move(ks), cf = std::move(cf)] - (service::client_state& client_state) { - auto schema = proxy.get_db().local().find_schema(ks, cf); - return cas_write(proxy, schema, fm.unfreeze(schema), client_state); - }); + return do_with(cs.get(), [&proxy, mb = std::move(mb), dk = std::move(dk), ks = std::move(ks), cf = std::move(cf)] + (service::client_state& client_state) mutable { + auto schema = proxy.get_db().local().find_schema(ks, cf); + return cas_write(proxy, schema, dk, std::move(mb), client_state); }); + }); } }); } @@ -1460,13 +1522,14 @@ future executor::batch_write_item(client_state& c rjson::value batch_info = rjson::parse(content); rjson::value& request_items = batch_info["RequestItems"]; - std::vector mutations; - mutations.reserve(request_items.MemberCount()); + std::vector> mutation_builders; + mutation_builders.reserve(request_items.MemberCount()); for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) { schema_ptr schema = get_table_from_batch_request(_proxy, it); tracing::add_table_name(client_state.get_trace_state(), schema->ks_name(), schema->cf_name()); - std::unordered_set used_keys(1, primary_key_hash{schema}, primary_key_equal{schema}); + std::unordered_set used_keys( + 1, primary_key_hash{schema}, primary_key_equal{schema}); for (auto& request : it->value.GetArray()) { if (!request.IsObject() || request.MemberCount() != 1) { return make_ready_future(api_error("ValidationException", format("Invalid BatchWriteItem request: {}", request))); @@ -1476,24 +1539,19 @@ future executor::batch_write_item(client_state& c if (r_name == "PutRequest") { const rjson::value& put_request = r->value; const rjson::value& item = put_request["Item"]; - // FIXME: Because of issue #5653, we must not call - // make_item_mutation() here with the current timestamp! - // Rather, instead of collecting mutations, we should collect - // the operations themselves, and mutation_cas_request will - // call make_item_mutation() or make_delete_item_mutation() in - // its apply(), with the right timestamp. - mutations.push_back(make_item_mutation(item, schema, api::new_timestamp())); - // make_item_mutation returns a mutation with a single clustering row - auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); + mutation_builders.emplace_back(schema, put_or_delete_item( + item, schema, put_or_delete_item::put_item{})); + auto mut_key = std::make_pair(mutation_builders.back().second.pk(), mutation_builders.back().second.ck()); if (used_keys.count(mut_key) > 0) { return make_ready_future(api_error("ValidationException", "Provided list of item keys contains duplicates")); } used_keys.insert(std::move(mut_key)); } else if (r_name == "DeleteRequest") { const rjson::value& key = (r->value)["Key"]; - mutations.push_back(make_delete_item_mutation(key, schema, api::new_timestamp())); - // make_delete_item_mutation returns a mutation with a single clustering row - auto mut_key = std::make_pair(mutations.back().key(), mutations.back().partition().clustered_rows().begin()->key()); + mutation_builders.emplace_back(schema, put_or_delete_item( + key, schema, put_or_delete_item::delete_item{})); + auto mut_key = std::make_pair(mutation_builders.back().second.pk(), + mutation_builders.back().second.ck()); if (used_keys.count(mut_key) > 0) { return make_ready_future(api_error("ValidationException", "Provided list of item keys contains duplicates")); } @@ -1504,7 +1562,7 @@ future executor::batch_write_item(client_state& c } } - return do_batch_write(_proxy, std::move(mutations), client_state, _stats).then([] () { + return do_batch_write(_proxy, std::move(mutation_builders), client_state, _stats).then([] () { // FIXME: Issue #5650: If we failed writing some of the updates, // need to return a list of these failed updates in UnprocessedItems // rather than fail the whole write (issue #5650).