Merge 'storage_proxy: update view update backlog on correct shard when writing' from Wojciech Mitros

This series is another approach of https://github.com/scylladb/scylladb/pull/18646 and https://github.com/scylladb/scylladb/pull/19181. In this series we only change where the view backlog gets
updated - we do not assure that the view update backlog returned in a response is necessarily the backlog
that increased due to the corresponding write, the returned backlog may be outdated up to 10ms. Because
 this series does not include this change, it's considerably less complex and it doesn't modify the common
write patch, so no particular performance considerations were needed in that context. The issue being fixed
is still the same, the full description can be seen below.

When a replica applies a write on a table which has a materialized view
it generates view updates. These updates take memory which is tracked
by `database::_view_update_concurrency_sem`, separate on each shard.
The fraction of units taken from the semaphore to the semaphore limit
is the shard's view update backlog. Based on these backlogs, we want
to estimate how busy a node is with its view updates work. We do that
by taking the max backlog across all shards.
To avoid excessive cross-shard operations, the node's (max) backlog isn't
calculated each time we need it, but up to 1 time per 10ms (the `_interval`) with an optimization where the backlog of the calculating shard is immediately up-to-date (we don't need cross-shard operations for it):
```
update_backlog node_update_backlog::fetch() {
    auto now = clock::now();
    if (now >= _last_update.load(std::memory_order_relaxed) + _interval) {
        _last_update.store(now, std::memory_order_relaxed);
        auto new_max = boost::accumulate(
                _backlogs,
                update_backlog::no_backlog(),
                [] (const update_backlog& lhs, const per_shard_backlog& rhs) {
                    return std::max(lhs, rhs.load());
                });
        _max.store(new_max, std::memory_order_relaxed);
        return new_max;
    }
    return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed));
}
```
For the same reason, even when we do calculate the new node's backlog,
we don't read from the `_view_update_concurrency_sem`. Instead, for
each shard we also store a update_backlog atomic which we use for
calculation:
```
    struct per_shard_backlog {
        // Multiply by 2 to defeat the prefetcher
        alignas(seastar::cache_line_size * 2) std::atomic<update_backlog> backlog = update_backlog::no_backlog();
        need_publishing need_publishing = need_publishing::no;

        update_backlog load() const {
            return backlog.load(std::memory_order_relaxed);
        }
    };
 std::vector<per_shard_backlog> _backlogs;
```
Due to this distinction, the update_backlog atomic need to be updated
separately, when the `_view_update_concurrency_sem` changes.
This is done by calling `storage_proxy::update_view_update_backlog`, which reads the `_view_update_concurrency_sem` of the shard (in `database::get_view_update_backlog`)
and then calls node`_update_backlog::add` where the read backlog
is stored in the atomic:
```
void storage_proxy::update_view_update_backlog() {
    _max_view_update_backlog.add(get_db().local().get_view_update_backlog());
}
void node_update_backlog::add(update_backlog backlog) {
    _backlogs[this_shard_id()].backlog.store(backlog, std::memory_order_relaxed);
    _backlogs[this_shard_id()].need_publishing = need_publishing::yes;
}
```
For this implementation of calculating the node's view update backlog to work,
we need the atomics to be updated correctly when the semaphores of corresponding
shards change.

The main event where the view update backlog changes is an incoming write
request. That's why when handling the request and preparing a response
we update the backlog calling `storage_proxy::get_view_update_backlog` (also
because we want to read the backlog and send it in the response):
backlog update after local view updates (`storage_proxy::send_to_live_endpoints` in `mutate_begin`)
```
 auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable {
     return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state())
             .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] {
         // make mutation alive until it is processed locally, otherwise it
         // may disappear if write timeouts before this future is ready
         got_response(response_id, my_address, get_view_update_backlog());
     });
 };
backlog update after remote view updates (storage_proxy::remote::handle_write)

 auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr,
         shard, response_id, p->get_view_update_backlog()));
```
Now assume that on a certain node we have a write request received on shard A,
which updates a row on shard B (A!=B). As a result, shard B will generate view
updates and consume units from its `_view_update_concurrency_sem`, but will
not update its atomic in `_backlogs` yet. Because both shards in the example
are on the same node, shard A will perform a local write calling `lmutate` shown
above. In the `lmutate` call, the `apply_locally` will initiate the actual write on
shard B and the `storage_proxy::update_view_update_backlog` will be called back
on shard A. In no place will the backlog atomic on shard B get updated even
though it increased in size due to the view updates generated there.
Currently, what we calculate there doesn't really matter - it's only used for the
MV flow control delays, so currently, in this scenario, we may only overload
a replica causing failed replica writes which will be later retried as hints. However,
when we add MV admission control, the calculated backlog will be the difference
between an accepted and a rejected request.

Fixes: https://github.com/scylladb/scylladb/issues/18542

Without admission control (https://github.com/scylladb/scylladb/pull/18334), this patch doesn't affect much, so I'm marking it as backport/none

Closes scylladb/scylladb#19341

* github.com:scylladb/scylladb:
  test: add test for view backlog not being updated on correct shard
  test: move auxiliary methods for waiting until a view is built to util
  mv: update view update backlog when it increases on correct shard
This commit is contained in:
Nadav Har'El
2024-07-04 11:40:09 +03:00
7 changed files with 73 additions and 28 deletions

View File

@@ -1695,6 +1695,9 @@ static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator
std::move(tr_state),
allow_hints,
service::is_cancellable::yes);
while (utils::get_local_injector().enter("never_finish_remote_view_updates")) {
co_await seastar::sleep(100ms);
}
}
static bool should_update_synchronously(const schema& s) {
@@ -2660,6 +2663,10 @@ update_backlog node_update_backlog::fetch() {
_max.store(new_max, std::memory_order_relaxed);
return new_max;
}
// If we perform a shard-aware write, we can read the backlog of the current shard,
// which was just updated.
// We still need to compare it to the max, aggregated from all shards, which might
// still be higher despite being most likely slightly outdated.
return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed));
}

View File

@@ -476,6 +476,7 @@ future<> view_update_generator::generate_and_propagate_view_updates(const replic
}
}
co_await builder.close();
_proxy.local().update_view_update_backlog();
if (err) {
std::rethrow_exception(err);
}

View File

@@ -542,7 +542,6 @@ private:
//
// Usually we will return immediately, since this work only involves appending data to the connection
// send buffer.
p->update_view_update_backlog();
auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr,
shard, response_id, p->get_view_update_backlog()));
f.ignore_ready_future();
@@ -581,7 +580,6 @@ private:
}
// ignore results, since we'll be returning them via MUTATION_DONE/MUTATION_FAILURE verbs
if (errors.count) {
p->update_view_update_backlog();
auto f = co_await coroutine::as_future(send_mutation_failed(
netw::messaging_service::msg_addr{reply_to, shard},
trace_state_ptr,
@@ -2931,7 +2929,13 @@ storage_proxy::storage_proxy(distributed<replica::database>& db, storage_proxy::
sm::make_queue_length("current_throttled_writes", [this] { return _throttled_writes.size(); },
sm::description("number of currently throttled write requests")),
});
_metrics.add_group(storage_proxy_stats::REPLICA_STATS_CATEGORY, {
sm::make_current_bytes("view_update_backlog", [this] { return _max_view_update_backlog.fetch_shard(this_shard_id()).get_current_bytes(); },
sm::description("Tracks the size of scylla_database_view_update_backlog and is used instead of that one to calculate the "
"max backlog across all shards, which is then used by other nodes to calculate appropriate throttling delays "
"if it grows too large. If it's notably different from scylla_database_view_update_backlog, it means "
"that we're currently processing a write that generated a large number of view updates.")),
});
slogger.trace("hinted DCs: {}", cfg.hinted_handoff_enabled.to_configuration_string());
_hints_manager.register_metrics("hints_manager");
_hints_for_views_manager.register_metrics("hints_for_views_manager");
@@ -4108,7 +4112,6 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
.then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] {
// make mutation alive until it is processed locally, otherwise it
// may disappear if write timeouts before this future is ready
update_view_update_backlog();
got_response(response_id, my_address, get_view_update_backlog());
});
};

View File

@@ -241,3 +241,10 @@ async def start_writes(cql: Session, keyspace: str, table: str, concurrency: int
return 0
return finish
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline)

View File

@@ -0,0 +1,48 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from test.pylib.manager_client import ManagerClient
import asyncio
import pytest
from test.topology.conftest import skip_mode
from test.pylib.util import wait_for_view
from test.topology_experimental_raft.test_mv_tablets import pin_the_only_tablet
from test.pylib.tablets import get_tablet_replica
# This test reproduces issue #18542
# In the test, we create a table and perform a write to it a couple of times
# Each time, we check that a view update backlog on some shard increased
# due to the write.
@pytest.mark.asyncio
@skip_mode('release', "error injections aren't enabled in release mode")
async def test_view_backlog_increased_after_write(manager: ManagerClient) -> None:
node_count = 2
# Use a higher smp to make it more likely that the writes go to a different shard than the coordinator.
servers = await manager.servers_add(node_count, cmdline=['--smp', '5'], config={'error_injections_at_startup': ['never_finish_remote_view_updates'], 'enable_tablets': True})
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
"AND tablets = {'initial': 1}")
await cql.run_async(f"CREATE TABLE ks.tab (base_key int, view_key int, v text, PRIMARY KEY (base_key, view_key))")
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab "
"WHERE view_key IS NOT NULL and base_key IS NOT NULL PRIMARY KEY (view_key, base_key) ")
await wait_for_view(cql, 'mv_cf_view', node_count)
# Only remote updates hold on to memory, so make the update remote
await pin_the_only_tablet(manager, "ks", "tab", servers[0])
(_, shard) = await get_tablet_replica(manager, servers[0], "ks", "tab", 0)
await pin_the_only_tablet(manager, "ks", "mv_cf_view", servers[1])
for v in [1000, 4000, 16000, 64000, 256000]:
# Don't use a prepared statement, so that writes are likely sent to a different shard
# than the one containing the key.
await cql.run_async(f"INSERT INTO ks.tab (base_key, view_key, v) VALUES ({v}, {v}, '{v*'a'}')")
# The view update backlog should increase on the node generating view updates
local_metrics = await manager.metrics.query(servers[0].ip_addr)
view_backlog = local_metrics.get('scylla_storage_proxy_replica_view_update_backlog', shard=str(shard))
# The read view_backlog might still contain backlogs from the previous iterations, so we only assert that it is large enough
assert view_backlog > v
await cql.run_async(f"DROP KEYSPACE ks")

View File

@@ -10,6 +10,7 @@ import pytest
import time
import logging
from test.topology.conftest import skip_mode
from test.pylib.util import wait_for_view
from cassandra.cqltypes import Int32Type
logger = logging.getLogger(__name__)
@@ -41,17 +42,6 @@ async def insert_with_concurrency(cql, value_count, concurrency):
await asyncio.gather(*tasks)
logger.info(f"Finished writes with concurrency {concurrency}")
async def wait_for_views(cql, mvs_count, node_count):
deadline = time.time() + 120
while time.time() < deadline:
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' ALLOW FILTERING")
logger.info(f"Views built: {done[0][0]}")
if done[0][0] == node_count * mvs_count:
return
else:
time.sleep(0.2)
raise Exception("Timeout waiting for views to build")
# This test reproduces issue #12379
# To quickly exceed the view update backlog limit, the test uses a minimal, 2 node
# cluster and lowers the limit using the "view_update_limit" error injection.
@@ -78,7 +68,7 @@ async def test_delete_partition_rows_from_table_with_mv(manager: ManagerClient)
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab "
"WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ")
await wait_for_views(cql, 1, node_count)
await wait_for_view(cql, "mv_cf_view", node_count)
logger.info(f"Deleting all rows from partition with key 0")
await cql.run_async(f"DELETE FROM ks.tab WHERE key = 0", timeout=300)

View File

@@ -8,22 +8,11 @@ import pytest
import time
from test.topology.conftest import skip_mode
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_view
from cassandra.cluster import ConsistencyLevel # type: ignore
from cassandra.query import SimpleStatement # type: ignore
async def wait_for_view(cql, name, node_count):
deadline = time.time() + 120
while time.time() < deadline:
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
if done[0][0] == node_count:
return
else:
time.sleep(0.2)
raise Exception("Timeout waiting for views to build")
# This test makes sure that even if the view building encounter errors, the view building is eventually finished
# and the view is consistent with the base table.
# Reproduces the scenario in #19261