diff --git a/db/view/view.cc b/db/view/view.cc index 93bc0c64c5..b4cd54514b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1695,6 +1695,9 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator std::move(tr_state), allow_hints, service::is_cancellable::yes); + while (utils::get_local_injector().enter("never_finish_remote_view_updates")) { + co_await seastar::sleep(100ms); + } } static bool should_update_synchronously(const schema& s) { @@ -2660,6 +2663,10 @@ update_backlog node_update_backlog::fetch() { _max.store(new_max, std::memory_order_relaxed); return new_max; } + // If we perform a shard-aware write, we can read the backlog of the current shard, + // which was just updated. + // We still need to compare it to the max, aggregated from all shards, which might + // still be higher despite being most likely slightly outdated. return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed)); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 40ea0c6189..e80cac31ac 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -476,6 +476,7 @@ future<> view_update_generator::generate_and_propagate_view_updates(const replic } } co_await builder.close(); + _proxy.local().update_view_update_backlog(); if (err) { std::rethrow_exception(err); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index b05ea0e53a..3c0ff70ed5 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -542,7 +542,6 @@ private: // // Usually we will return immediately, since this work only involves appending data to the connection // send buffer. - p->update_view_update_backlog(); auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr, shard, response_id, p->get_view_update_backlog())); f.ignore_ready_future(); @@ -581,7 +580,6 @@ private: } // ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs if (errors.count) { - p->update_view_update_backlog(); auto f = co_await coroutine::as_future(send_mutation_failed( netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr, @@ -2931,7 +2929,13 @@ storage_proxy::storage_proxy(distributed& db, storage_proxy:: sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); }, sm::description("number of currently throttled write requests")), }); - + _metrics.add_group(storage_proxy_stats::REPLICA_STATS_CATEGORY, { + sm::make_current_bytes("view_update_backlog", [this] { return _max_view_update_backlog.fetch_shard(this_shard_id()).get_current_bytes(); }, + sm::description("Tracks the size of scylla_database_view_update_backlog and is used instead of that one to calculate the " + "max backlog across all shards, which is then used by other nodes to calculate appropriate throttling delays " + "if it grows too large. If it's notably different from scylla_database_view_update_backlog, it means " + "that we're currently processing a write that generated a large number of view updates.")), + }); slogger.trace("hinted DCs: {}", cfg.hinted_handoff_enabled.to_configuration_string()); _hints_manager.register_metrics("hints_manager"); _hints_for_views_manager.register_metrics("hints_for_views_manager"); @@ -4108,7 +4112,6 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready - update_view_update_backlog(); got_response(response_id, my_address, get_view_update_backlog()); }); }; diff --git a/test/pylib/util.py b/test/pylib/util.py index 63a2e9f536..93e2a583eb 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -241,3 +241,10 @@ async def start_writes(cql: Session, keyspace: str, table: str, concurrency: int return 0 return finish + +async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120): + async def view_is_built(): + done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING") + return done[0][0] == node_count or None + deadline = time.time() + timeout + await wait_for(view_is_built, deadline) diff --git a/test/topology_custom/test_mv_backlog.py b/test/topology_custom/test_mv_backlog.py new file mode 100644 index 0000000000..2407acef6e --- /dev/null +++ b/test/topology_custom/test_mv_backlog.py @@ -0,0 +1,48 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from test.pylib.manager_client import ManagerClient + +import asyncio +import pytest +from test.topology.conftest import skip_mode +from test.pylib.util import wait_for_view +from test.topology_experimental_raft.test_mv_tablets import pin_the_only_tablet +from test.pylib.tablets import get_tablet_replica + +# This test reproduces issue #18542 +# In the test, we create a table and perform a write to it a couple of times +# Each time, we check that a view update backlog on some shard increased +# due to the write. +@pytest.mark.asyncio +@skip_mode('release', "error injections aren't enabled in release mode") +async def test_view_backlog_increased_after_write(manager: ManagerClient) -> None: + node_count = 2 + # Use a higher smp to make it more likely that the writes go to a different shard than the coordinator. + servers = await manager.servers_add(node_count, cmdline=['--smp', '5'], config={'error_injections_at_startup': ['never_finish_remote_view_updates'], 'enable_tablets': True}) + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" + "AND tablets = {'initial': 1}") + await cql.run_async(f"CREATE TABLE ks.tab (base_key int, view_key int, v text, PRIMARY KEY (base_key, view_key))") + await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab " + "WHERE view_key IS NOT NULL and base_key IS NOT NULL PRIMARY KEY (view_key, base_key) ") + + await wait_for_view(cql, 'mv_cf_view', node_count) + # Only remote updates hold on to memory, so make the update remote + await pin_the_only_tablet(manager, "ks", "tab", servers[0]) + (_, shard) = await get_tablet_replica(manager, servers[0], "ks", "tab", 0) + await pin_the_only_tablet(manager, "ks", "mv_cf_view", servers[1]) + + for v in [1000, 4000, 16000, 64000, 256000]: + # Don't use a prepared statement, so that writes are likely sent to a different shard + # than the one containing the key. + await cql.run_async(f"INSERT INTO ks.tab (base_key, view_key, v) VALUES ({v}, {v}, '{v*'a'}')") + # The view update backlog should increase on the node generating view updates + local_metrics = await manager.metrics.query(servers[0].ip_addr) + view_backlog = local_metrics.get('scylla_storage_proxy_replica_view_update_backlog', shard=str(shard)) + # The read view_backlog might still contain backlogs from the previous iterations, so we only assert that it is large enough + assert view_backlog > v + + await cql.run_async(f"DROP KEYSPACE ks") diff --git a/test/topology_custom/test_mv_delete_partitions.py b/test/topology_custom/test_mv_delete_partitions.py index 6f02b8c12c..6364aa5973 100644 --- a/test/topology_custom/test_mv_delete_partitions.py +++ b/test/topology_custom/test_mv_delete_partitions.py @@ -10,6 +10,7 @@ import pytest import time import logging from test.topology.conftest import skip_mode +from test.pylib.util import wait_for_view from cassandra.cqltypes import Int32Type logger = logging.getLogger(__name__) @@ -41,17 +42,6 @@ async def insert_with_concurrency(cql, value_count, concurrency): await asyncio.gather(*tasks) logger.info(f"Finished writes with concurrency {concurrency}") -async def wait_for_views(cql, mvs_count, node_count): - deadline = time.time() + 120 - while time.time() < deadline: - done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' ALLOW FILTERING") - logger.info(f"Views built: {done[0][0]}") - if done[0][0] == node_count * mvs_count: - return - else: - time.sleep(0.2) - raise Exception("Timeout waiting for views to build") - # This test reproduces issue #12379 # To quickly exceed the view update backlog limit, the test uses a minimal, 2 node # cluster and lowers the limit using the "view_update_limit" error injection. @@ -78,7 +68,7 @@ async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient) await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab " "WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ") - await wait_for_views(cql, 1, node_count) + await wait_for_view(cql, "mv_cf_view", node_count) logger.info(f"Deleting all rows from partition with key 0") await cql.run_async(f"DELETE FROM ks.tab WHERE key = 0", timeout=300) diff --git a/test/topology_custom/test_mv_fail_building.py b/test/topology_custom/test_mv_fail_building.py index 82e3df3f19..56c5bd92f2 100644 --- a/test/topology_custom/test_mv_fail_building.py +++ b/test/topology_custom/test_mv_fail_building.py @@ -8,22 +8,11 @@ import pytest import time from test.topology.conftest import skip_mode from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_view from cassandra.cluster import ConsistencyLevel # type: ignore from cassandra.query import SimpleStatement # type: ignore - -async def wait_for_view(cql, name, node_count): - deadline = time.time() + 120 - while time.time() < deadline: - done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING") - if done[0][0] == node_count: - return - else: - time.sleep(0.2) - raise Exception("Timeout waiting for views to build") - - # This test makes sure that even if the view building encounter errors, the view building is eventually finished # and the view is consistent with the base table. # Reproduces the scenario in #19261