diff --git a/db/view/view.cc b/db/view/view.cc index 358f74d0b3..16e8e502b4 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1113,13 +1113,16 @@ future<> mutate_MV( // writes but mutate_locally() doesn't, so we need to do that here. ++stats.writes; auto mut_ptr = std::make_unique(std::move(mut.fm)); - fs->push_back(service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped( + future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, db::commitlog::force_sync::no).then_wrapped( [&stats, maybe_account_failure = std::move(maybe_account_failure), mut_ptr = std::move(mut_ptr)] (future<>&& f) { --stats.writes; return maybe_account_failure(std::move(f), utils::fb_utilities::get_broadcast_address(), true, 0); - })); + }); + // The update is sent to background in order to preserve availability, + // its parallelism is limited by view_update_concurrency_semaphore + (void)local_view_update; } else { vlogger.debug("Sending view update to endpoint {}, with pending endpoints = {}", *paired_endpoint, pending_endpoints); // Note we don't wait for the asynchronous operation to complete @@ -1129,7 +1132,7 @@ future<> mutate_MV( // to send the update there. Currently, we do this from *each* of // the base replicas, but this is probably excessive - see // See https://issues.apache.org/jira/browse/CASSANDRA-14262/ - fs->push_back(service::get_local_storage_proxy().send_to_endpoint( + future<> view_update = service::get_local_storage_proxy().send_to_endpoint( std::move(mut), *paired_endpoint, std::move(pending_endpoints), @@ -1141,7 +1144,10 @@ future<> mutate_MV( updates_pushed_remote, maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) mutable { return maybe_account_failure(std::move(f), std::move(*paired_endpoint), is_endpoint_local, updates_pushed_remote); - })); + }); + // The update is sent to background in order to preserve availability, + // its parallelism is limited by view_update_concurrency_semaphore + (void)view_update; } } else if (!pending_endpoints.empty()) { // If there is no paired endpoint, it means there's a range movement going on (decommission or move), @@ -1158,7 +1164,7 @@ future<> mutate_MV( cf_stats.total_view_updates_pushed_remote += updates_pushed_remote; auto target = pending_endpoints.back(); pending_endpoints.pop_back(); - fs->push_back(service::get_local_storage_proxy().send_to_endpoint( + future<> view_update = service::get_local_storage_proxy().send_to_endpoint( std::move(mut), target, std::move(pending_endpoints), @@ -1168,7 +1174,10 @@ future<> mutate_MV( updates_pushed_remote, maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) { return maybe_account_failure(std::move(f), std::move(target), false, updates_pushed_remote); - })); + }); + // The update is sent to background in order to preserve availability, + // its parallelism is limited by view_update_concurrency_semaphore + (void)view_update; } } auto f = seastar::when_all_succeed(fs->begin(), fs->end()); diff --git a/table.cc b/table.cc index 204c039cba..ea592f93ee 100644 --- a/table.cc +++ b/table.cc @@ -2107,8 +2107,7 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base, flat_mutation_reader_from_mutations({std::move(m)}), std::move(existings)).then([this, base_token = std::move(base_token)] (std::vector&& updates) mutable { auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); - //FIXME: discarded future. - (void)db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { }); + return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(units), service::allow_hints::yes).handle_exception([] (auto ignored) { }); }); }