From fd9c7d4d593bdf939486f98698207f32dc26611c Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Fri, 7 Jun 2024 17:56:26 +0200 Subject: [PATCH 1/3] mv: update view update backlog when it increases on correct shard When performing a write, we should update the view update backlog on the shard where the mutation is actually applied. Instead, currently we only update it on the shard that initially received the write request (which didn't change at all) and as a result, the backlog on the correct shard and the aggregated max view update backlog are not updated at all. This patch enables updating the backlog on the correct shard. The update is now performed just after the view generation and propagation finishes, so that all backlog increases are noted and the backlog is ready to be used in the write response. Additionally, after this patch, we no longer (falsely) assume that the backlog is modified on the same shard as where we later read it to attach to a response. However, we still compare the aggregated backlog from all shards and the backlog from the shard retrieving the max, as with a shard-aware driver, it's likely the exact shard whose backlog changed. --- db/view/view.cc | 4 ++++ db/view/view_update_generator.cc | 1 + service/storage_proxy.cc | 3 --- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index da7fe2a5aa..2432e95ae7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2646,6 +2646,10 @@ update_backlog node_update_backlog::fetch() { _max.store(new_max, std::memory_order_relaxed); return new_max; } + // If we perform a shard-aware write, we can read the backlog of the current shard, + // which was just updated. + // We still need to compare it to the max, aggregated from all shards, which might + // still be higher despite being most likely slightly outdated. return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed)); } diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index e58ea13302..0720addeb1 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -476,6 +476,7 @@ future<> view_update_generator::generate_and_propagate_view_updates(const replic } } co_await builder.close(); + _proxy.local().update_view_update_backlog(); if (err) { std::rethrow_exception(err); } diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 289c02da7a..07d14427c6 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -542,7 +542,6 @@ private: // // Usually we will return immediately, since this work only involves appending data to the connection // send buffer. - p->update_view_update_backlog(); auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr, shard, response_id, p->get_view_update_backlog())); f.ignore_ready_future(); @@ -581,7 +580,6 @@ private: } // ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs if (errors.count) { - p->update_view_update_backlog(); auto f = co_await coroutine::as_future(send_mutation_failed( netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr, @@ -4108,7 +4106,6 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready - update_view_update_backlog(); got_response(response_id, my_address, get_view_update_backlog()); }); }; From c4f5659c11e2842bc627f6971bdcd13f6bb7005f Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Wed, 3 Jul 2024 10:23:41 +0200 Subject: [PATCH 2/3] test: move auxiliary methods for waiting until a view is built to util In many materialized view tests we need to wait until a view is built before actually working on it, future tests will also need it. In existing tests we use the same, duplicated method for achieving that. In this patch the method is deduplicated and moved to pylib/util.py and existing tests are modified to use it instead. --- test/pylib/util.py | 7 +++++++ test/topology_custom/test_mv_delete_partitions.py | 14 ++------------ test/topology_custom/test_mv_fail_building.py | 13 +------------ 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/test/pylib/util.py b/test/pylib/util.py index 1dc2545378..dda8a1a77d 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -240,3 +240,10 @@ async def start_writes(cql: Session, keyspace: str, table: str, concurrency: int return 0 return finish + +async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120): + async def view_is_built(): + done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING") + return done[0][0] == node_count or None + deadline = time.time() + timeout + await wait_for(view_is_built, deadline) diff --git a/test/topology_custom/test_mv_delete_partitions.py b/test/topology_custom/test_mv_delete_partitions.py index 6f02b8c12c..6364aa5973 100644 --- a/test/topology_custom/test_mv_delete_partitions.py +++ b/test/topology_custom/test_mv_delete_partitions.py @@ -10,6 +10,7 @@ import pytest import time import logging from test.topology.conftest import skip_mode +from test.pylib.util import wait_for_view from cassandra.cqltypes import Int32Type logger = logging.getLogger(__name__) @@ -41,17 +42,6 @@ async def insert_with_concurrency(cql, value_count, concurrency): await asyncio.gather(*tasks) logger.info(f"Finished writes with concurrency {concurrency}") -async def wait_for_views(cql, mvs_count, node_count): - deadline = time.time() + 120 - while time.time() < deadline: - done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' ALLOW FILTERING") - logger.info(f"Views built: {done[0][0]}") - if done[0][0] == node_count * mvs_count: - return - else: - time.sleep(0.2) - raise Exception("Timeout waiting for views to build") - # This test reproduces issue #12379 # To quickly exceed the view update backlog limit, the test uses a minimal, 2 node # cluster and lowers the limit using the "view_update_limit" error injection. @@ -78,7 +68,7 @@ async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient) await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab " "WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ") - await wait_for_views(cql, 1, node_count) + await wait_for_view(cql, "mv_cf_view", node_count) logger.info(f"Deleting all rows from partition with key 0") await cql.run_async(f"DELETE FROM ks.tab WHERE key = 0", timeout=300) diff --git a/test/topology_custom/test_mv_fail_building.py b/test/topology_custom/test_mv_fail_building.py index 82e3df3f19..56c5bd92f2 100644 --- a/test/topology_custom/test_mv_fail_building.py +++ b/test/topology_custom/test_mv_fail_building.py @@ -8,22 +8,11 @@ import pytest import time from test.topology.conftest import skip_mode from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_view from cassandra.cluster import ConsistencyLevel # type: ignore from cassandra.query import SimpleStatement # type: ignore - -async def wait_for_view(cql, name, node_count): - deadline = time.time() + 120 - while time.time() < deadline: - done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING") - if done[0][0] == node_count: - return - else: - time.sleep(0.2) - raise Exception("Timeout waiting for views to build") - - # This test makes sure that even if the view building encounter errors, the view building is eventually finished # and the view is consistent with the base table. # Reproduces the scenario in #19261 From 1fdc65279dfc84ddfc7cc36370eb5180744f65e5 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Mon, 13 May 2024 13:02:53 +0200 Subject: [PATCH 3/3] test: add test for view backlog not being updated on correct shard This patch adds a test for reproducing issue https://github.com/scylladb/scylladb/issues/18542 The test performs writes on a table with a materialized view and checks that the view backlog increases. To get the current view update backlog, a new metric "view_update_backlog" is added to the `storage_proxy` metrics. The metric differs from the metric from `database` metric with the same name by taking the backlog from the max_view_update_backlog which keeps view update backlogs from all shards which may be a bit outdated, instead of taking the backlog by checking the view_update_semaphore which the backlog is based on directly. --- db/view/view.cc | 3 ++ service/storage_proxy.cc | 8 ++++- test/topology_custom/test_mv_backlog.py | 48 +++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 test/topology_custom/test_mv_backlog.py diff --git a/db/view/view.cc b/db/view/view.cc index 2432e95ae7..b7f8bb5532 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1681,6 +1681,9 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator std::move(tr_state), allow_hints, service::is_cancellable::yes); + while (utils::get_local_injector().enter("never_finish_remote_view_updates")) { + co_await seastar::sleep(100ms); + } } static bool should_update_synchronously(const schema& s) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 07d14427c6..e37f14bc94 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2929,7 +2929,13 @@ storage_proxy::storage_proxy(distributed& db, storage_proxy:: sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); }, sm::description("number of currently throttled write requests")), }); - + _metrics.add_group(storage_proxy_stats::REPLICA_STATS_CATEGORY, { + sm::make_current_bytes("view_update_backlog", [this] { return _max_view_update_backlog.fetch_shard(this_shard_id()).get_current_bytes(); }, + sm::description("Tracks the size of scylla_database_view_update_backlog and is used instead of that one to calculate the " + "max backlog across all shards, which is then used by other nodes to calculate appropriate throttling delays " + "if it grows too large. If it's notably different from scylla_database_view_update_backlog, it means " + "that we're currently processing a write that generated a large number of view updates.")), + }); slogger.trace("hinted DCs: {}", cfg.hinted_handoff_enabled.to_configuration_string()); _hints_manager.register_metrics("hints_manager"); _hints_for_views_manager.register_metrics("hints_for_views_manager"); diff --git a/test/topology_custom/test_mv_backlog.py b/test/topology_custom/test_mv_backlog.py new file mode 100644 index 0000000000..2407acef6e --- /dev/null +++ b/test/topology_custom/test_mv_backlog.py @@ -0,0 +1,48 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from test.pylib.manager_client import ManagerClient + +import asyncio +import pytest +from test.topology.conftest import skip_mode +from test.pylib.util import wait_for_view +from test.topology_experimental_raft.test_mv_tablets import pin_the_only_tablet +from test.pylib.tablets import get_tablet_replica + +# This test reproduces issue #18542 +# In the test, we create a table and perform a write to it a couple of times +# Each time, we check that a view update backlog on some shard increased +# due to the write. +@pytest.mark.asyncio +@skip_mode('release', "error injections aren't enabled in release mode") +async def test_view_backlog_increased_after_write(manager: ManagerClient) -> None: + node_count = 2 + # Use a higher smp to make it more likely that the writes go to a different shard than the coordinator. + servers = await manager.servers_add(node_count, cmdline=['--smp', '5'], config={'error_injections_at_startup': ['never_finish_remote_view_updates'], 'enable_tablets': True}) + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" + "AND tablets = {'initial': 1}") + await cql.run_async(f"CREATE TABLE ks.tab (base_key int, view_key int, v text, PRIMARY KEY (base_key, view_key))") + await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab " + "WHERE view_key IS NOT NULL and base_key IS NOT NULL PRIMARY KEY (view_key, base_key) ") + + await wait_for_view(cql, 'mv_cf_view', node_count) + # Only remote updates hold on to memory, so make the update remote + await pin_the_only_tablet(manager, "ks", "tab", servers[0]) + (_, shard) = await get_tablet_replica(manager, servers[0], "ks", "tab", 0) + await pin_the_only_tablet(manager, "ks", "mv_cf_view", servers[1]) + + for v in [1000, 4000, 16000, 64000, 256000]: + # Don't use a prepared statement, so that writes are likely sent to a different shard + # than the one containing the key. + await cql.run_async(f"INSERT INTO ks.tab (base_key, view_key, v) VALUES ({v}, {v}, '{v*'a'}')") + # The view update backlog should increase on the node generating view updates + local_metrics = await manager.metrics.query(servers[0].ip_addr) + view_backlog = local_metrics.get('scylla_storage_proxy_replica_view_update_backlog', shard=str(shard)) + # The read view_backlog might still contain backlogs from the previous iterations, so we only assert that it is large enough + assert view_backlog > v + + await cql.run_async(f"DROP KEYSPACE ks")