mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 03:56:42 +00:00
This series is another approach of https://github.com/scylladb/scylladb/pull/18646 and https://github.com/scylladb/scylladb/pull/19181. In this series we only change where the view backlog gets updated - we do not assure that the view update backlog returned in a response is necessarily the backlog that increased due to the corresponding write, the returned backlog may be outdated up to 10ms. Because this series does not include this change, it's considerably less complex and it doesn't modify the common write patch, so no particular performance considerations were needed in that context. The issue being fixed is still the same, the full description can be seen below. When a replica applies a write on a table which has a materialized view it generates view updates. These updates take memory which is tracked by `database::_view_update_concurrency_sem`, separate on each shard. The fraction of units taken from the semaphore to the semaphore limit is the shard's view update backlog. Based on these backlogs, we want to estimate how busy a node is with its view updates work. We do that by taking the max backlog across all shards. To avoid excessive cross-shard operations, the node's (max) backlog isn't calculated each time we need it, but up to 1 time per 10ms (the `_interval`) with an optimization where the backlog of the calculating shard is immediately up-to-date (we don't need cross-shard operations for it): ``` update_backlog node_update_backlog::fetch() { auto now = clock::now(); if (now >= _last_update.load(std::memory_order_relaxed) + _interval) { _last_update.store(now, std::memory_order_relaxed); auto new_max = boost::accumulate( _backlogs, update_backlog::no_backlog(), [] (const update_backlog& lhs, const per_shard_backlog& rhs) { return std::max(lhs, rhs.load()); }); _max.store(new_max, std::memory_order_relaxed); return new_max; } return std::max(fetch_shard(this_shard_id()), _max.load(std::memory_order_relaxed)); } ``` For the same reason, even when we do calculate the new node's backlog, we don't read from the `_view_update_concurrency_sem`. Instead, for each shard we also store a update_backlog atomic which we use for calculation: ``` struct per_shard_backlog { // Multiply by 2 to defeat the prefetcher alignas(seastar::cache_line_size * 2) std::atomic<update_backlog> backlog = update_backlog::no_backlog(); need_publishing need_publishing = need_publishing::no; update_backlog load() const { return backlog.load(std::memory_order_relaxed); } }; std::vector<per_shard_backlog> _backlogs; ``` Due to this distinction, the update_backlog atomic need to be updated separately, when the `_view_update_concurrency_sem` changes. This is done by calling `storage_proxy::update_view_update_backlog`, which reads the `_view_update_concurrency_sem` of the shard (in `database::get_view_update_backlog`) and then calls node`_update_backlog::add` where the read backlog is stored in the atomic: ``` void storage_proxy::update_view_update_backlog() { _max_view_update_backlog.add(get_db().local().get_view_update_backlog()); } void node_update_backlog::add(update_backlog backlog) { _backlogs[this_shard_id()].backlog.store(backlog, std::memory_order_relaxed); _backlogs[this_shard_id()].need_publishing = need_publishing::yes; } ``` For this implementation of calculating the node's view update backlog to work, we need the atomics to be updated correctly when the semaphores of corresponding shards change. The main event where the view update backlog changes is an incoming write request. That's why when handling the request and preparing a response we update the backlog calling `storage_proxy::get_view_update_backlog` (also because we want to read the backlog and send it in the response): backlog update after local view updates (`storage_proxy::send_to_live_endpoints` in `mutate_begin`) ``` auto lmutate = [handler_ptr, response_id, this, my_address, timeout] () mutable { return handler_ptr->apply_locally(timeout, handler_ptr->get_trace_state()) .then([response_id, this, my_address, h = std::move(handler_ptr), p = shared_from_this()] { // make mutation alive until it is processed locally, otherwise it // may disappear if write timeouts before this future is ready got_response(response_id, my_address, get_view_update_backlog()); }); }; backlog update after remote view updates (storage_proxy::remote::handle_write) auto f = co_await coroutine::as_future(send_mutation_done(netw::messaging_service::msg_addr{reply_to, shard}, trace_state_ptr, shard, response_id, p->get_view_update_backlog())); ``` Now assume that on a certain node we have a write request received on shard A, which updates a row on shard B (A!=B). As a result, shard B will generate view updates and consume units from its `_view_update_concurrency_sem`, but will not update its atomic in `_backlogs` yet. Because both shards in the example are on the same node, shard A will perform a local write calling `lmutate` shown above. In the `lmutate` call, the `apply_locally` will initiate the actual write on shard B and the `storage_proxy::update_view_update_backlog` will be called back on shard A. In no place will the backlog atomic on shard B get updated even though it increased in size due to the view updates generated there. Currently, what we calculate there doesn't really matter - it's only used for the MV flow control delays, so currently, in this scenario, we may only overload a replica causing failed replica writes which will be later retried as hints. However, when we add MV admission control, the calculated backlog will be the difference between an accepted and a rejected request. Fixes: https://github.com/scylladb/scylladb/issues/18542 Without admission control (https://github.com/scylladb/scylladb/pull/18334), this patch doesn't affect much, so I'm marking it as backport/none Closes scylladb/scylladb#19341 * github.com:scylladb/scylladb: test: add test for view backlog not being updated on correct shard test: move auxiliary methods for waiting until a view is built to util mv: update view update backlog when it increases on correct shard
251 lines
8.7 KiB
Python
251 lines
8.7 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
#
|
|
import threading
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
import pathlib
|
|
import os
|
|
import pytest
|
|
import random
|
|
import string
|
|
|
|
from typing import Callable, Awaitable, Optional, TypeVar, Any
|
|
|
|
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
|
from cassandra import DriverException, ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
|
|
|
|
from test.pylib.internal_types import ServerInfo
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LogPrefixAdapter(logging.LoggerAdapter):
|
|
def process(self, msg, kwargs):
|
|
return '[%s] %s' % (self.extra['prefix'], msg), kwargs
|
|
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
def unique_name(unique_name_prefix = 'test_'):
|
|
if not hasattr(unique_name, "last_ms"):
|
|
unique_name.last_ms = 0
|
|
current_ms = int(round(time.time() * 1000))
|
|
# If unique_name() is called twice in the same millisecond...
|
|
if unique_name.last_ms >= current_ms:
|
|
current_ms = unique_name.last_ms + 1
|
|
unique_name.last_ms = current_ms
|
|
return unique_name_prefix + str(current_ms) + '_' + ''.join(random.choice(string.ascii_lowercase) for _ in range(5))
|
|
|
|
|
|
async def wait_for(
|
|
pred: Callable[[], Awaitable[Optional[T]]],
|
|
deadline: float,
|
|
period: float = 1,
|
|
before_retry: Optional[Callable[[], Any]] = None) -> T:
|
|
while True:
|
|
assert(time.time() < deadline), "Deadline exceeded, failing test."
|
|
res = await pred()
|
|
if res is not None:
|
|
return res
|
|
await asyncio.sleep(period)
|
|
if before_retry:
|
|
before_retry()
|
|
|
|
|
|
async def wait_for_cql(cql: Session, host: Host, deadline: float) -> None:
|
|
async def cql_ready():
|
|
try:
|
|
await cql.run_async("select * from system.local", host=host)
|
|
except NoHostAvailable:
|
|
logging.info(f"Driver not connected to {host} yet")
|
|
return None
|
|
return True
|
|
await wait_for(cql_ready, deadline)
|
|
|
|
|
|
async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], deadline: float) \
|
|
-> list[Host]:
|
|
"""Wait until every server in `servers` is available through `cql`
|
|
and translate `servers` to a list of Cassandra `Host`s.
|
|
"""
|
|
ip_set = set(str(srv.rpc_address) for srv in servers)
|
|
async def get_hosts() -> Optional[list[Host]]:
|
|
hosts = cql.cluster.metadata.all_hosts()
|
|
remaining = ip_set - {h.address for h in hosts}
|
|
if not remaining:
|
|
return hosts
|
|
|
|
logging.info(f"Driver hasn't yet learned about hosts: {remaining}")
|
|
return None
|
|
def try_refresh_nodes():
|
|
try:
|
|
cql.cluster.refresh_nodes(force_token_rebuild=True)
|
|
except DriverException:
|
|
# Silence the exception, which might get thrown if we call this in the middle of
|
|
# driver reconnect (scylladb/scylladb#17616). `wait_for` will retry anyway and it's enough
|
|
# if we succeed only one `get_hosts()` attempt before timing out.
|
|
pass
|
|
hosts = await wait_for(
|
|
pred=get_hosts,
|
|
deadline=deadline,
|
|
before_retry=try_refresh_nodes,
|
|
)
|
|
|
|
# Take only hosts from `ip_set` (there may be more)
|
|
hosts = [h for h in hosts if h.address in ip_set]
|
|
await asyncio.gather(*(wait_for_cql(cql, h, deadline) for h in hosts))
|
|
|
|
return hosts
|
|
|
|
def read_last_line(file_path: pathlib.Path, max_line_bytes = 512):
|
|
file_size = os.stat(file_path).st_size
|
|
with file_path.open('rb') as f:
|
|
f.seek(max(0, file_size - max_line_bytes), os.SEEK_SET)
|
|
line_bytes = f.read()
|
|
line_str = line_bytes.decode('utf-8', errors='ignore')
|
|
linesep = os.linesep
|
|
if line_str.endswith(linesep):
|
|
line_str = line_str[:-len(linesep)]
|
|
linesep_index = line_str.rfind(linesep)
|
|
if linesep_index != -1:
|
|
line_str = line_str[linesep_index + len(linesep):]
|
|
elif file_size > max_line_bytes:
|
|
line_str = '...' + line_str
|
|
return line_str
|
|
|
|
|
|
async def get_available_host(cql: Session, deadline: float) -> Host:
|
|
hosts = cql.cluster.metadata.all_hosts()
|
|
async def find_host():
|
|
for h in hosts:
|
|
try:
|
|
await cql.run_async(
|
|
"select key from system.local where key = 'local'", host=h)
|
|
except NoHostAvailable:
|
|
logging.debug(f"get_available_host: {h} not available")
|
|
continue
|
|
return h
|
|
return None
|
|
return await wait_for(find_host, deadline)
|
|
|
|
|
|
async def read_barrier(cql: Session, host: Host):
|
|
"""To issue a read barrier it is sufficient to attempt dropping a
|
|
non-existing table. We need to use `if exists`, otherwise the statement
|
|
would fail on prepare/validate step which happens before a read barrier is
|
|
performed.
|
|
"""
|
|
await cql.run_async("drop table if exists nosuchkeyspace.nosuchtable", host = host)
|
|
|
|
|
|
# Wait for the given feature to be enabled.
|
|
async def wait_for_feature(feature: str, cql: Session, host: Host, deadline: float) -> None:
|
|
async def feature_is_enabled():
|
|
enabled_features = await get_enabled_features(cql, host)
|
|
return feature in enabled_features or None
|
|
await wait_for(feature_is_enabled, deadline)
|
|
|
|
|
|
async def get_supported_features(cql: Session, host: Host) -> set[str]:
|
|
"""Returns a set of cluster features that a node advertises support for."""
|
|
rs = await cql.run_async(f"SELECT supported_features FROM system.local WHERE key = 'local'", host=host)
|
|
return set(rs[0].supported_features.split(","))
|
|
|
|
|
|
async def get_enabled_features(cql: Session, host: Host) -> set[str]:
|
|
"""Returns a set of cluster features that a node considers to be enabled."""
|
|
rs = await cql.run_async(f"SELECT value FROM system.scylla_local WHERE key = 'enabled_features'", host=host)
|
|
return set(rs[0].value.split(","))
|
|
|
|
|
|
class KeyGenerator:
|
|
def __init__(self):
|
|
self.pk = None
|
|
self.pk_lock = threading.Lock()
|
|
|
|
def next_pk(self):
|
|
with self.pk_lock:
|
|
if self.pk is not None:
|
|
self.pk += 1
|
|
else:
|
|
self.pk = 0
|
|
return self.pk
|
|
|
|
def last_pk(self):
|
|
with self.pk_lock:
|
|
return self.pk
|
|
|
|
|
|
async def start_writes(cql: Session, keyspace: str, table: str, concurrency: int = 3, ignore_errors=False):
|
|
logger.info(f"Starting to asynchronously write, concurrency = {concurrency}")
|
|
|
|
stop_event = asyncio.Event()
|
|
|
|
warmup_writes = 128 // concurrency
|
|
warmup_event = asyncio.Event()
|
|
|
|
stmt = cql.prepare(f"INSERT INTO {keyspace}.{table} (pk, c) VALUES (?, ?)")
|
|
stmt.consistency_level = ConsistencyLevel.QUORUM
|
|
rd_stmt = cql.prepare(f"SELECT * FROM {keyspace}.{table} WHERE pk = ?")
|
|
rd_stmt.consistency_level = ConsistencyLevel.QUORUM
|
|
|
|
key_gen = KeyGenerator()
|
|
|
|
async def do_writes(worker_id: int):
|
|
write_count = 0
|
|
while not stop_event.is_set():
|
|
pk = key_gen.next_pk()
|
|
|
|
# Once next_pk() is produced, key_gen.last_key() is assumed to be in the database
|
|
# hence we can't give up on it.
|
|
while True:
|
|
try:
|
|
await cql.run_async(stmt, [pk, pk])
|
|
# Check read-your-writes
|
|
rows = await cql.run_async(rd_stmt, [pk])
|
|
assert(len(rows) == 1)
|
|
assert(rows[0].c == pk)
|
|
write_count += 1
|
|
break
|
|
except Exception as e:
|
|
if ignore_errors:
|
|
pass # Expected when node is brought down temporarily
|
|
else:
|
|
raise e
|
|
|
|
if pk == warmup_writes:
|
|
warmup_event.set()
|
|
|
|
logger.info(f"Worker #{worker_id} did {write_count} successful writes")
|
|
|
|
tasks = [asyncio.create_task(do_writes(worker_id)) for worker_id in range(concurrency)]
|
|
|
|
await asyncio.wait_for(warmup_event.wait(), timeout=60)
|
|
|
|
async def finish():
|
|
logger.info("Stopping workers")
|
|
stop_event.set()
|
|
await asyncio.gather(*tasks)
|
|
|
|
last = key_gen.last_pk()
|
|
if last is not None:
|
|
return last + 1
|
|
return 0
|
|
|
|
return finish
|
|
|
|
async def wait_for_view(cql: Session, name: str, node_count: int, timeout: int = 120):
|
|
async def view_is_built():
|
|
done = await cql.run_async(f"SELECT COUNT(*) FROM system_distributed.view_build_status WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING")
|
|
return done[0][0] == node_count or None
|
|
deadline = time.time() + timeout
|
|
await wait_for(view_is_built, deadline)
|