db,view: move putting view updates to background to mutate_MV

Currently, launching view updates as an asynchronous background job
is done via not waiting for mutate_MV() future in
table::generate_and_propagate_view_updates. That has a big downside,
since mutate_MV() handles *all* view updates for *all* views of a table,
so it's not possible to wait for each view independently.
Per-view granularity is required in order to implement synchronous
view updates of local views - because then we'll synchronously
wait for all views that write to a local node (due to having a matching
partition key with the base), while remote view updates will still
be sent asynchronously.
In order to do that, instead of not waiting for mutate_MV,
we do wait for it properly, but instead launch the asynchronous,
unwaited-for futures inside mutate_MV.
Effectively that means no changes for view updates so far - all updates
will be fired in the background. Later, another patch will introduce
a way to wait for selected updates to finish.
This commit is contained in:
Piotr Sarna
2020-01-29 15:45:32 +01:00
parent 3b3659e8cd
commit fd49fd773c
2 changed files with 16 additions and 8 deletions

View File

@@ -1113,13 +1113,16 @@ future<> mutate_MV(
// writes but mutate_locally() doesn't, so we need to do that here.
++stats.writes;
auto mut_ptr = std::make_unique<frozen_mutation>(std::move(mut.fm));
fs->push_back(service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
[&stats,
maybe_account_failure = std::move(maybe_account_failure),
mut_ptr = std::move(mut_ptr)] (future<>&& f) {
--stats.writes;
return maybe_account_failure(std::move(f), utils::fb_utilities::get_broadcast_address(), true, 0);
}));
});
// The update is sent to background in order to preserve availability,
// its parallelism is limited by view_update_concurrency_semaphore
(void)local_view_update;
} else {
vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints);
// Note we don't wait for the asynchronous operation to complete
@@ -1129,7 +1132,7 @@ future<> mutate_MV(
// to send the update there. Currently, we do this from *each* of
// the base replicas, but this is probably excessive - see
// See https://issues.apache.org/jira/browse/CASSANDRA-14262/
fs->push_back(service::get_local_storage_proxy().send_to_endpoint(
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
std::move(mut),
*paired_endpoint,
std::move(pending_endpoints),
@@ -1141,7 +1144,10 @@ future<> mutate_MV(
updates_pushed_remote,
maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) mutable {
return maybe_account_failure(std::move(f), std::move(*paired_endpoint), is_endpoint_local, updates_pushed_remote);
}));
});
// The update is sent to background in order to preserve availability,
// its parallelism is limited by view_update_concurrency_semaphore
(void)view_update;
}
} else if (!pending_endpoints.empty()) {
// If there is no paired endpoint, it means there's a range movement going on (decommission or move),
@@ -1158,7 +1164,7 @@ future<> mutate_MV(
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
auto target = pending_endpoints.back();
pending_endpoints.pop_back();
fs->push_back(service::get_local_storage_proxy().send_to_endpoint(
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
std::move(mut),
target,
std::move(pending_endpoints),
@@ -1168,7 +1174,10 @@ future<> mutate_MV(
updates_pushed_remote,
maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) {
return maybe_account_failure(std::move(f), std::move(target), false, updates_pushed_remote);
}));
});
// The update is sent to background in order to preserve availability,
// its parallelism is limited by view_update_concurrency_semaphore
(void)view_update;
}
}
auto f = seastar::when_all_succeed(fs->begin(), fs->end());

View File

@@ -2107,8 +2107,7 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
flat_mutation_reader_from_mutations({std::move(m)}),
std::move(existings)).then([this, base_token = std::move(base_token)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
//FIXME: discarded future.
(void)db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { });
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { });
});
}