diff --git a/db/view/view.cc b/db/view/view.cc index 1990a70579..a16682c0f0 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1210,28 +1210,7 @@ future<> mutate_MV( auto& keyspace_name = mut.s->ks_name(); auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token); auto remote_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name); - auto maybe_account_failure = [s = mut.s, tr_state, &stats, &cf_stats, base_token, view_token, units = pending_view_updates.split(mut.fm.representation().size())] ( - future<>&& f, - gms::inet_address target, - bool is_local, - size_t remotes) { - if (f.failed()) { - stats.view_updates_failed_local += is_local; - stats.view_updates_failed_remote += remotes; - cf_stats.total_view_updates_failed_local += is_local; - cf_stats.total_view_updates_failed_remote += remotes; - auto ep = f.get_exception(); - tracing::trace(tr_state, "Failed to apply {}view update for {} and {} remote endpoints", - seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes); - vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}", - target, s->ks_name(), s->cf_name(), base_token, view_token, ep); - return make_exception_future<>(std::move(ep)); - } else { - tracing::trace(tr_state, "Successfully applied {}view update for {} and {} remote endpoints", - seastar::value_of([is_local]{return is_local ? "local " : "";}), target, remotes); - return make_ready_future<>(); - } - }; + auto sem_units = pending_view_updates.split(mut.fm.representation().size()); // First, find the local endpoint and ensure that if it exists, // it will be the target endpoint. That way, all endpoints in the @@ -1268,11 +1247,20 @@ future<> mutate_MV( tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}", mut.s->ks_name(), mut.s->cf_name(), base_token, view_token); future<> local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, std::move(tr_state), db::commitlog::force_sync::no).then_wrapped( - [&stats, - maybe_account_failure = std::move(maybe_account_failure), - mut_ptr = std::move(mut_ptr)] (future<>&& f) { + [s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr), + units = sem_units.split(sem_units.count())] (future<>&& f) { --stats.writes; - return maybe_account_failure(std::move(f), utils::fb_utilities::get_broadcast_address(), true, 0); + if (f.failed()) { + ++stats.view_updates_failed_local; + ++cf_stats.total_view_updates_failed_local; + auto ep = f.get_exception(); + tracing::trace(tr_state, "Failed to apply local view update for {}", my_address); + vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}", + my_address, s->ks_name(), s->cf_name(), base_token, view_token, ep); + return make_exception_future<>(std::move(ep)); + } + tracing::trace(tr_state, "Successfully applied local view update for {}", my_address); + return make_ready_future<>(); }); fs->push_back(std::move(local_view_update)); // We just applied a local update to the target endpoint, so it should now be removed @@ -1294,11 +1282,23 @@ future<> mutate_MV( size_t updates_pushed_remote = remote_endpoints.size() + 1; stats.view_updates_pushed_remote += updates_pushed_remote; cf_stats.total_view_updates_pushed_remote += updates_pushed_remote; + schema_ptr s = mut.s; future<> view_update = apply_to_remote_endpoints(*target_endpoint, std::move(remote_endpoints), std::move(mut), base_token, view_token, allow_hints, tr_state).then_wrapped( - [target_endpoint, - updates_pushed_remote, - maybe_account_failure = std::move(maybe_account_failure)] (future<>&& f) mutable { - return maybe_account_failure(std::move(f), std::move(*target_endpoint), false, updates_pushed_remote); + [s = std::move(s), &stats, &cf_stats, tr_state, base_token, view_token, target_endpoint, updates_pushed_remote, + units = sem_units.split(sem_units.count())] (future<>&& f) mutable { + if (f.failed()) { + stats.view_updates_failed_remote += updates_pushed_remote; + cf_stats.total_view_updates_failed_remote += updates_pushed_remote; + auto ep = f.get_exception(); + tracing::trace(tr_state, "Failed to apply view update for {} and {} remote endpoints", + *target_endpoint, updates_pushed_remote); + vlogger.error("Error applying view update to {} (view: {}.{}, base token: {}, view token: {}): {}", + *target_endpoint, s->ks_name(), s->cf_name(), base_token, view_token, ep); + return make_exception_future<>(std::move(ep)); + } + tracing::trace(tr_state, "Successfully applied view update for {} and {} remote endpoints", + *target_endpoint, updates_pushed_remote); + return make_ready_future<>(); }); if (wait_for_all) { fs->push_back(std::move(view_update));