alternator: fix propagating tags

Updating tags was erroneously done locally, which means that
the schema change was not propagated to other nodes.
The new code announces new schema globally.

Fixes #6513
Branches: 4.0,4.1
Tests: unit(dev)
       dtest(alternator_tests.AlternatorTest.test_update_condition_expression_and_write_isolation)
Message-Id: <3a816c4ecc33c03af4f36e51b11f195c231e7ce1.1592935039.git.sarna@scylladb.com>

(cherry picked from commit f4e8cfe03b)
This commit is contained in:
Piotr Sarna
2020-06-23 19:57:29 +02:00
committed by Nadav Har'El
parent 2ff6e2e122
commit e95bcd0f8f

View File

@@ -565,7 +565,7 @@ static void validate_tags(const std::map<sstring, sstring>& tags) {
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
enum class update_tags_action { add_tags, delete_tags };
static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
static future<> update_tags(service::migration_manager& mm, const rjson::value& tags, schema_ptr schema, std::map<sstring, sstring>&& tags_map, update_tags_action action) {
if (action == update_tags_action::add_tags) {
for (auto it = tags.Begin(); it != tags.End(); ++it) {
const rjson::value& key = (*it)["Key"];
@@ -592,24 +592,12 @@ static future<> update_tags(const rjson::value& tags, schema_ptr schema, std::ma
}
validate_tags(tags_map);
std::stringstream serialized_tags;
serialized_tags << '{';
for (auto& tag_entry : tags_map) {
serialized_tags << format("'{}':'{}',", tag_entry.first, tag_entry.second);
}
std::string serialized_tags_str = serialized_tags.str();
if (!tags_map.empty()) {
serialized_tags_str[serialized_tags_str.size() - 1] = '}'; // trims the last ',' delimiter
} else {
serialized_tags_str.push_back('}');
}
sstring req = format("ALTER TABLE \"{}\".\"{}\" WITH {} = {}",
schema->ks_name(), schema->cf_name(), tags_extension::NAME, serialized_tags_str);
return db::execute_cql(std::move(req)).discard_result();
schema_builder builder(schema);
builder.set_extensions(schema::extensions_map{{sstring(tags_extension::NAME), ::make_shared<tags_extension>(std::move(tags_map))}});
return mm.announce_column_family_update(builder.build(), false, std::vector<view_ptr>(), false);
}
static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
static future<> add_tags(service::migration_manager& mm, service::storage_proxy& proxy, schema_ptr schema, rjson::value& request_info) {
const rjson::value* tags = rjson::find(request_info, "Tags");
if (!tags || !tags->IsArray()) {
return make_exception_future<>(api_error("ValidationException", format("Cannot parse tags")));
@@ -619,7 +607,7 @@ static future<> add_tags(service::storage_proxy& proxy, schema_ptr schema, rjson
}
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
return update_tags(rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
return update_tags(mm, rjson::copy(*tags), schema, std::move(tags_map), update_tags_action::add_tags);
}
future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
@@ -631,7 +619,7 @@ future<executor::request_return_type> executor::tag_resource(client_state& clien
return api_error("AccessDeniedException", "Incorrect resource identifier");
}
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
add_tags(_proxy, schema, request).get();
add_tags(_mm, _proxy, schema, request).get();
return json_string("");
});
}
@@ -652,7 +640,7 @@ future<executor::request_return_type> executor::untag_resource(client_state& cli
schema_ptr schema = get_table_from_arn(_proxy, std::string_view(arn->GetString(), arn->GetStringLength()));
std::map<sstring, sstring> tags_map = get_tags_of_table(schema);
update_tags(*tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
update_tags(_mm, *tags, schema, std::move(tags_map), update_tags_action::delete_tags).get();
return json_string("");
});
}
@@ -869,7 +857,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
}).then([this, table_info = std::move(table_info), schema] () mutable {
future<> f = make_ready_future<>();
if (rjson::find(table_info, "Tags")) {
f = add_tags(_proxy, schema, table_info);
f = add_tags(_mm, _proxy, schema, table_info);
}
return f.then([table_info = std::move(table_info), schema] () mutable {
rjson::value status = rjson::empty_object();