db,view: move putting view updates to background to mutate_MV
Currently, launching view updates as an asynchronous background job is done via not waiting for mutate_MV() future in table::generate_and_propagate_view_updates. That has a big downside, since mutate_MV() handles *all* view updates for *all* views of a table, so it's not possible to wait for each view independently. Per-view granularity is required in order to implement synchronous view updates of local views - because then we'll synchronously wait for all views that write to a local node (due to having a matching partition key with the base), while remote view updates will still be sent asynchronously. In order to do that, instead of not waiting for mutate_MV, we do wait for it properly, but instead launch the asynchronous, unwaited-for futures inside mutate_MV. Effectively that means no changes for view updates so far - all updates will be fired in the background. Later, another patch will introduce a way to wait for selected updates to finish.
This commit is contained in:
@@ -1113,13 +1113,16 @@ future<> mutate_MV(
|
||||
// writes but mutate_locally() doesn't, so we need to do that here.
|
||||
++stats.writes;
|
||||
auto mut_ptr = std::make_unique<frozen_mutation>(std::move(mut.fm));
|
||||
fs->push_back(service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
|
||||
future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped(
|
||||
[&stats,
|
||||
maybe_account_failure = std::move(maybe_account_failure),
|
||||
mut_ptr = std::move(mut_ptr)] (future<>&& f) {
|
||||
--stats.writes;
|
||||
return maybe_account_failure(std::move(f), utils::fb_utilities::get_broadcast_address(), true, 0);
|
||||
}));
|
||||
});
|
||||
// The update is sent to background in order to preserve availability,
|
||||
// its parallelism is limited by view_update_concurrency_semaphore
|
||||
(void)local_view_update;
|
||||
} else {
|
||||
vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints);
|
||||
// Note we don't wait for the asynchronous operation to complete
|
||||
@@ -1129,7 +1132,7 @@ future<> mutate_MV(
|
||||
// to send the update there. Currently, we do this from *each* of
|
||||
// the base replicas, but this is probably excessive - see
|
||||
// See https://issues.apache.org/jira/browse/CASSANDRA-14262/
|
||||
fs->push_back(service::get_local_storage_proxy().send_to_endpoint(
|
||||
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
|
||||
std::move(mut),
|
||||
*paired_endpoint,
|
||||
std::move(pending_endpoints),
|
||||
@@ -1141,7 +1144,10 @@ future<> mutate_MV(
|
||||
updates_pushed_remote,
|
||||
maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) mutable {
|
||||
return maybe_account_failure(std::move(f), std::move(*paired_endpoint), is_endpoint_local, updates_pushed_remote);
|
||||
}));
|
||||
});
|
||||
// The update is sent to background in order to preserve availability,
|
||||
// its parallelism is limited by view_update_concurrency_semaphore
|
||||
(void)view_update;
|
||||
}
|
||||
} else if (!pending_endpoints.empty()) {
|
||||
// If there is no paired endpoint, it means there's a range movement going on (decommission or move),
|
||||
@@ -1158,7 +1164,7 @@ future<> mutate_MV(
|
||||
cf_stats.total_view_updates_pushed_remote += updates_pushed_remote;
|
||||
auto target = pending_endpoints.back();
|
||||
pending_endpoints.pop_back();
|
||||
fs->push_back(service::get_local_storage_proxy().send_to_endpoint(
|
||||
future<> view_update = service::get_local_storage_proxy().send_to_endpoint(
|
||||
std::move(mut),
|
||||
target,
|
||||
std::move(pending_endpoints),
|
||||
@@ -1168,7 +1174,10 @@ future<> mutate_MV(
|
||||
updates_pushed_remote,
|
||||
maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) {
|
||||
return maybe_account_failure(std::move(f), std::move(target), false, updates_pushed_remote);
|
||||
}));
|
||||
});
|
||||
// The update is sent to background in order to preserve availability,
|
||||
// its parallelism is limited by view_update_concurrency_semaphore
|
||||
(void)view_update;
|
||||
}
|
||||
}
|
||||
auto f = seastar::when_all_succeed(fs->begin(), fs->end());
|
||||
|
||||
3
table.cc
3
table.cc
@@ -2107,8 +2107,7 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
flat_mutation_reader_from_mutations({std::move(m)}),
|
||||
std::move(existings)).then([this, base_token = std::move(base_token)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
|
||||
//FIXME: discarded future.
|
||||
(void)db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { });
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { });
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user