diff --git a/db/view/view.cc b/db/view/view.cc index add19121bc..78a2a500bb 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1331,7 +1331,7 @@ future<> mutate_MV( auto mut_ptr = remote_endpoints.empty() ? std::make_unique(std::move(mut.fm)) : std::make_unique(mut.fm); tracing::trace(tr_state, "Locally applying view update for {}.{}; base token = {}; view token = {}", mut.s->ks_name(), mut.s->cf_name(), base_token, view_token); - local_view_update = service::get_local_storage_proxy().mutate_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( + local_view_update = service::get_local_storage_proxy().mutate_mv_locally(mut.s, *mut_ptr, tr_state, db::commitlog::force_sync::no).then_wrapped( [s = mut.s, &stats, &cf_stats, tr_state, base_token, view_token, my_address, mut_ptr = std::move(mut_ptr), units = sem_units.split(sem_units.count())] (future<>&& f) { --stats.writes; diff --git a/main.cc b/main.cc index 763e302428..ecf4d47375 100644 --- a/main.cc +++ b/main.cc @@ -1037,6 +1037,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl storage_proxy_smp_service_group_config.max_nonlocal_requests = 5000; spcfg.read_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); + spcfg.write_mv_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.hints_write_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); spcfg.write_ack_smp_service_group = create_smp_service_group(storage_proxy_smp_service_group_config).get0(); static db::view::node_update_backlog node_backlog(smp::count, 10ms); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 8d9be5f253..57c46920d7 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1876,6 +1876,7 @@ storage_proxy::storage_proxy(distributed& db, gms::gossiper& , _erm_factory(erm_factory) , _read_smp_service_group(cfg.read_smp_service_group) , _write_smp_service_group(cfg.write_smp_service_group) + , _write_mv_smp_service_group(cfg.write_mv_smp_service_group) , _hints_write_smp_service_group(cfg.hints_write_smp_service_group) , _write_ack_smp_service_group(cfg.write_ack_smp_service_group) , _next_response_id(std::chrono::system_clock::now().time_since_epoch()/1ms) diff --git a/service/storage_proxy.hh b/service/storage_proxy.hh index df8c0738a3..61ec548b0d 100644 --- a/service/storage_proxy.hh +++ b/service/storage_proxy.hh @@ -139,6 +139,7 @@ public: size_t available_memory; smp_service_group read_smp_service_group = default_smp_service_group(); smp_service_group write_smp_service_group = default_smp_service_group(); + smp_service_group write_mv_smp_service_group = default_smp_service_group(); smp_service_group hints_write_smp_service_group = default_smp_service_group(); // Write acknowledgments might not be received on the correct shard, and // they need a separate smp_service_group to prevent an ABBA deadlock @@ -236,6 +237,7 @@ private: shared_ptr _mm; smp_service_group _read_smp_service_group; smp_service_group _write_smp_service_group; + smp_service_group _write_mv_smp_service_group; smp_service_group _hints_write_smp_service_group; smp_service_group _write_ack_smp_service_group; response_id_type _next_response_id; @@ -524,6 +526,11 @@ public: future<> mutate_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { return mutate_locally(s, m, tr_state, sync, timeout, _write_smp_service_group, rate_limit_info); } + // Applies materialized view mutation on this node. + // Resolves with timed_out_error when timeout is reached. + future<> mutate_mv_locally(const schema_ptr& s, const frozen_mutation& m, tracing::trace_state_ptr tr_state, db::commitlog::force_sync sync, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate()) { + return mutate_locally(s, m, tr_state, sync, timeout, _write_mv_smp_service_group, rate_limit_info); + } // Applies mutations on this node. // Resolves with timed_out_error when timeout is reached. future<> mutate_locally(std::vector mutation, tracing::trace_state_ptr tr_state, clock_type::time_point timeout = clock_type::time_point::max(), db::per_partition_rate_limit::info rate_limit_info = std::monostate());