schema_tables: Fix hang during keyspace drop

Fixes #1484.

We drop tables as part of keyspace drop. Table drop starts with
creating a snapshot on all shards. All shards must use the same
snapshot timestamp which, among other things, is part of the snapshot
name. The timestamp is generated using supplied timestamp generating
function (joinpoint object). The joinpoint object will wait for all
shards to arrive and then generate and return the timestamp.

However, we drop tables in parallel, using the same joinpoint
instance. So joinpoint may be contacted by snapshotting shards of
tables A and B concurrently, generating timestamp t1 for some shards
of table A and some shards of table B. Later the remaining shards of
table A will get a different timestamp. As a result, different shards
may use different snapshot names for the same table. The snapshot
creation will never complete because the sealing fiber waits for all
shards to signal it, on the same name.

The fix is to give each table a separate joinpoint instance.

Message-Id: <1469117228-17879-1-git-send-email-tgrabiec@scylladb.com>
This commit is contained in:
Tomasz Grabiec
2016-07-21 18:07:08 +02:00
committed by Avi Kivity
parent e1480cd00d
commit 5e8f0efc85

View File

@@ -716,15 +716,21 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
struct dropped_table {
global_schema_ptr schema;
utils::joinpoint<db_clock::time_point> jp{[] {
return make_ready_future<db_clock::time_point>(db_clock::now());
}};
};
std::vector<global_schema_ptr> created;
std::vector<global_schema_ptr> altered;
std::vector<global_schema_ptr> dropped;
std::vector<dropped_table> dropped;
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
dropped.emplace_back(s);
dropped.emplace_back(dropped_table{s});
}
for (auto&& key : diff.entries_only_on_right) {
auto s = create_table_from_mutations(after.at(key));
@@ -737,9 +743,7 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
altered.emplace_back(s);
}
do_with(utils::make_joinpoint([] { return db_clock::now();})
, [&created, &dropped, &altered, &proxy](auto& tsf) {
return proxy.local().get_db().invoke_on_all([&created, &dropped, &altered, &tsf] (database& db) {
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
schema_ptr s = gs.get();
@@ -754,14 +758,13 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
for (auto&& gs : altered) {
update_column_family(db, gs.get()).get();
}
parallel_for_each(dropped.begin(), dropped.end(), [&db, &tsf](auto&& gs) {
schema_ptr s = gs.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&tsf] { return tsf.value(); }).then([s] {
parallel_for_each(dropped.begin(), dropped.end(), [&db](dropped_table& dt) {
schema_ptr s = dt.schema.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&dt] { return dt.jp.value(); }).then([s] {
return service::get_local_migration_manager().notify_drop_column_family(s);
});
}).get();
});
});
}).get();
}