mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-20 00:20:47 +00:00
Compare commits
2 Commits
copilot/fi
...
SCYLLADB-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ce2a2a479 | ||
|
|
2cdd178379 |
@@ -7,11 +7,6 @@ on:
|
||||
- synchronize
|
||||
- reopened
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: write
|
||||
statuses: write
|
||||
|
||||
jobs:
|
||||
validate_pr_author_email:
|
||||
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main
|
||||
|
||||
@@ -2,12 +2,6 @@ cmake_minimum_required(VERSION 3.27)
|
||||
|
||||
project(scylla)
|
||||
|
||||
# Disable CMake's automatic -fcolor-diagnostics injection (CMake 3.24+ adds
|
||||
# it for Clang+Ninja). configure.py does not add any color diagnostics flags,
|
||||
# so we clear the internal CMake variable to prevent injection.
|
||||
set(CMAKE_CXX_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
|
||||
set(CMAKE_C_COMPILE_OPTIONS_COLOR_DIAGNOSTICS "")
|
||||
|
||||
list(APPEND CMAKE_MODULE_PATH
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/cmake
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/seastar/cmake)
|
||||
@@ -57,16 +51,6 @@ set(CMAKE_CXX_EXTENSIONS ON CACHE INTERNAL "")
|
||||
set(CMAKE_CXX_SCAN_FOR_MODULES OFF CACHE INTERNAL "")
|
||||
set(CMAKE_VISIBILITY_INLINES_HIDDEN ON)
|
||||
|
||||
# Global defines matching configure.py
|
||||
# Since gcc 13, libgcc doesn't need the exception workaround
|
||||
add_compile_definitions(SEASTAR_NO_EXCEPTION_HACK)
|
||||
# Hacks needed to expose internal APIs for xxhash dependencies
|
||||
add_compile_definitions(XXH_PRIVATE_API)
|
||||
# SEASTAR_TESTING_MAIN is added later (after add_subdirectory(seastar) and
|
||||
# add_subdirectory(abseil)) to avoid leaking into the seastar subdirectory.
|
||||
# If SEASTAR_TESTING_MAIN is defined globally before seastar, it causes a
|
||||
# duplicate 'main' symbol in seastar_testing.
|
||||
|
||||
if(is_multi_config)
|
||||
find_package(Seastar)
|
||||
# this is atypical compared to standard ExternalProject usage:
|
||||
@@ -114,31 +98,10 @@ else()
|
||||
set(Seastar_IO_URING ON CACHE BOOL "" FORCE)
|
||||
set(Seastar_SCHEDULING_GROUPS_COUNT 21 CACHE STRING "" FORCE)
|
||||
set(Seastar_UNUSED_RESULT_ERROR ON CACHE BOOL "" FORCE)
|
||||
# Match configure.py's build_seastar_shared_libs: Debug and Dev
|
||||
# build Seastar as a shared library, others build it static.
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "Debug" OR CMAKE_BUILD_TYPE STREQUAL "Dev")
|
||||
set(BUILD_SHARED_LIBS ON CACHE BOOL "" FORCE)
|
||||
else()
|
||||
set(BUILD_SHARED_LIBS OFF CACHE BOOL "" FORCE)
|
||||
endif()
|
||||
add_subdirectory(seastar)
|
||||
|
||||
# Coverage mode sets cmake_build_type='Debug' for Seastar
|
||||
# (configure.py:515), so Seastar's pkg-config output includes sanitizer
|
||||
# link flags in seastar_libs_coverage (configure.py:2514,2649).
|
||||
# Seastar's own CMake only activates sanitizer targets for Debug/Sanitize
|
||||
# configs, so we inject link options on the seastar target for Coverage.
|
||||
# Using PUBLIC ensures they propagate to all targets linking Seastar
|
||||
# (but not standalone tools like patchelf), matching configure.py's
|
||||
# behavior. Compile-time flags and defines are handled globally in
|
||||
# cmake/mode.Coverage.cmake.
|
||||
if(CMAKE_BUILD_TYPE STREQUAL "Coverage")
|
||||
target_link_options(seastar
|
||||
PUBLIC
|
||||
-fsanitize=address
|
||||
-fsanitize=undefined
|
||||
-fsanitize=vptr)
|
||||
endif()
|
||||
target_compile_definitions (seastar
|
||||
PRIVATE
|
||||
SEASTAR_NO_EXCEPTION_HACK)
|
||||
endif()
|
||||
|
||||
set(ABSL_PROPAGATE_CXX_STD ON CACHE BOOL "" FORCE)
|
||||
@@ -148,10 +111,8 @@ if(Scylla_ENABLE_LTO)
|
||||
endif()
|
||||
|
||||
find_package(Sanitizers QUIET)
|
||||
# Match configure.py:2192 — abseil gets sanitizer flags with -fno-sanitize=vptr
|
||||
# to exclude vptr checks which are incompatible with abseil's usage.
|
||||
list(APPEND absl_cxx_flags
|
||||
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>;-fno-sanitize=vptr>)
|
||||
$<$<CONFIG:Debug,Sanitize>:$<TARGET_PROPERTY:Sanitizers::address,INTERFACE_COMPILE_OPTIONS>;$<TARGET_PROPERTY:Sanitizers::undefined_behavior,INTERFACE_COMPILE_OPTIONS>>)
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
list(APPEND ABSL_GCC_FLAGS ${absl_cxx_flags})
|
||||
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
@@ -176,38 +137,9 @@ add_library(absl::headers ALIAS absl-headers)
|
||||
# unfortunately.
|
||||
set_target_properties(absl_strerror PROPERTIES EXCLUDE_FROM_ALL TRUE)
|
||||
|
||||
# Now that seastar and abseil subdirectories are fully processed, add
|
||||
# SEASTAR_TESTING_MAIN globally. This matches configure.py's global define
|
||||
# without leaking into seastar (which would cause duplicate main symbols).
|
||||
add_compile_definitions(SEASTAR_TESTING_MAIN)
|
||||
|
||||
# System libraries dependencies
|
||||
find_package(Boost REQUIRED
|
||||
COMPONENTS filesystem program_options system thread regex unit_test_framework)
|
||||
# When using shared Boost libraries, define BOOST_ALL_DYN_LINK (matching configure.py)
|
||||
if(NOT Boost_USE_STATIC_LIBS)
|
||||
add_compile_definitions(BOOST_ALL_DYN_LINK)
|
||||
endif()
|
||||
|
||||
# CMake's Boost package config adds per-component defines like
|
||||
# BOOST_UNIT_TEST_FRAMEWORK_DYN_LINK, BOOST_REGEX_DYN_LINK, etc. on the
|
||||
# imported targets. configure.py only uses BOOST_ALL_DYN_LINK (which covers
|
||||
# all components), so strip the per-component defines to align the two build
|
||||
# systems.
|
||||
foreach(_boost_target
|
||||
Boost::unit_test_framework
|
||||
Boost::regex
|
||||
Boost::filesystem
|
||||
Boost::program_options
|
||||
Boost::system
|
||||
Boost::thread)
|
||||
if(TARGET ${_boost_target})
|
||||
# Completely remove all INTERFACE_COMPILE_DEFINITIONS from the Boost target.
|
||||
# This prevents per-component *_DYN_LINK and *_NO_LIB defines from
|
||||
# propagating. BOOST_ALL_DYN_LINK (set globally) covers all components.
|
||||
set_property(TARGET ${_boost_target} PROPERTY INTERFACE_COMPILE_DEFINITIONS)
|
||||
endif()
|
||||
endforeach()
|
||||
target_link_libraries(Boost::regex
|
||||
INTERFACE
|
||||
ICU::i18n
|
||||
@@ -264,10 +196,6 @@ if (Scylla_USE_PRECOMPILED_HEADER)
|
||||
message(STATUS "Using precompiled header for Scylla - remember to add `sloppiness = pch_defines,time_macros` to ccache.conf, if you're using ccache.")
|
||||
target_precompile_headers(scylla-precompiled-header PRIVATE "stdafx.hh")
|
||||
target_compile_definitions(scylla-precompiled-header PRIVATE SCYLLA_USE_PRECOMPILED_HEADER)
|
||||
# Match configure.py: -fpch-validate-input-files-content tells the compiler
|
||||
# to check content of stdafx.hh if timestamps don't match (important for
|
||||
# ccache/git workflows where timestamps may not be preserved).
|
||||
add_compile_options(-fpch-validate-input-files-content)
|
||||
endif()
|
||||
else()
|
||||
set(Scylla_USE_PRECOMPILED_HEADER_USE OFF)
|
||||
|
||||
2
abseil
2
abseil
Submodule abseil updated: 255c84dadd...d7aaad83b4
@@ -699,17 +699,6 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
// for such a size.
|
||||
co_return api_error::payload_too_large(fmt::format("Request content length limit of {} bytes exceeded", request_content_length_limit));
|
||||
}
|
||||
// Check the concurrency limit early, before acquiring memory and
|
||||
// reading the request body, to avoid piling up memory from excess
|
||||
// requests that will be rejected anyway. This mirrors the CQL
|
||||
// transport which also checks concurrency before memory acquisition
|
||||
// (transport/server.cc).
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
// JSON parsing can allocate up to roughly 2x the size of the raw
|
||||
// document, + a couple of bytes for maintenance.
|
||||
// If the Content-Length of the request is not available, we assume
|
||||
@@ -771,6 +760,12 @@ future<executor::request_return_type> server::handle_api_request(std::unique_ptr
|
||||
_executor._stats.unsupported_operations++;
|
||||
co_return api_error::unknown_operation(fmt::format("Unsupported operation {}", op));
|
||||
}
|
||||
if (_pending_requests.get_count() >= _max_concurrent_requests) {
|
||||
_executor._stats.requests_shed++;
|
||||
co_return api_error::request_limit_exceeded(format("too many in-flight requests (configured via max_concurrent_requests_per_shard): {}", _pending_requests.get_count()));
|
||||
}
|
||||
_pending_requests.enter();
|
||||
auto leave = defer([this] () noexcept { _pending_requests.leave(); });
|
||||
executor::client_state client_state(service::client_state::external_tag(),
|
||||
_auth_service, &_sl_controller, _timeout_config.current_values(), req->get_client_address());
|
||||
if (!username.empty()) {
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
static logging::logger slogger("alternator-streams");
|
||||
static logging::logger elogger("alternator-streams");
|
||||
|
||||
/**
|
||||
* Base template type to implement rapidjson::internal::TypeHelper<...>:s
|
||||
@@ -437,7 +437,7 @@ const cdc::stream_id& find_parent_shard_in_previous_generation(db_clock::time_po
|
||||
if (prev_streams.empty()) {
|
||||
// something is really wrong - streams are empty
|
||||
// let's try internal_error in hope it will be notified and fixed
|
||||
on_internal_error(slogger, fmt::format("streams are empty for cdc generation at {} ({})", prev_timestamp, prev_timestamp.time_since_epoch().count()));
|
||||
on_internal_error(elogger, fmt::format("streams are empty for cdc generation at {} ({})", prev_timestamp, prev_timestamp.time_since_epoch().count()));
|
||||
}
|
||||
auto it = std::lower_bound(prev_streams.begin(), prev_streams.end(), child.token(), [](const cdc::stream_id& id, const dht::token& t) {
|
||||
return id.token() < t;
|
||||
@@ -787,18 +787,16 @@ future<executor::request_return_type> executor::get_shard_iterator(client_state&
|
||||
struct event_id {
|
||||
cdc::stream_id stream;
|
||||
utils::UUID timestamp;
|
||||
size_t index = 0;
|
||||
|
||||
static constexpr auto marker = 'E';
|
||||
|
||||
event_id(cdc::stream_id s, utils::UUID ts, size_t index)
|
||||
event_id(cdc::stream_id s, utils::UUID ts)
|
||||
: stream(s)
|
||||
, timestamp(ts)
|
||||
, index(index)
|
||||
{}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const event_id& id) {
|
||||
fmt::print(os, "{}{}:{}:{}", marker, id.stream.to_bytes(), id.timestamp, id.index);
|
||||
fmt::print(os, "{}{}:{}", marker, id.stream.to_bytes(), id.timestamp);
|
||||
return os;
|
||||
}
|
||||
};
|
||||
@@ -810,19 +808,7 @@ struct rapidjson::internal::TypeHelper<ValueType, alternator::event_id>
|
||||
{};
|
||||
|
||||
namespace alternator {
|
||||
namespace {
|
||||
struct managed_bytes_ptr_hash {
|
||||
size_t operator()(const managed_bytes *k) const noexcept {
|
||||
return std::hash<managed_bytes>{}(*k);
|
||||
}
|
||||
};
|
||||
struct managed_bytes_ptr_equal {
|
||||
bool operator()(const managed_bytes *a, const managed_bytes *b) const noexcept {
|
||||
return *a == *b;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
future<executor::request_return_type> executor::get_records(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
|
||||
_stats.api_operations.get_records++;
|
||||
auto start_time = std::chrono::steady_clock::now();
|
||||
@@ -893,12 +879,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
|
||||
auto pks = schema->partition_key_columns();
|
||||
auto cks = schema->clustering_key_columns();
|
||||
|
||||
auto base_cks = base->clustering_key_columns();
|
||||
if (base_cks.size() > 1) {
|
||||
throw api_error::internal(fmt::format("invalid alternator table, clustering key count ({}) is bigger than one", base_cks.size()));
|
||||
}
|
||||
const bytes *clustering_key_column_name = !base_cks.empty() ? &base_cks.front().name() : nullptr;
|
||||
|
||||
std::transform(pks.begin(), pks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
std::transform(cks.begin(), cks.end(), std::back_inserter(columns), [](auto& c) { return &c; });
|
||||
@@ -953,40 +933,42 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
return cdef->name->name() == eor_column_name;
|
||||
})
|
||||
);
|
||||
auto clustering_key_index = clustering_key_column_name ? std::distance(metadata.get_names().begin(),
|
||||
std::find_if(metadata.get_names().begin(), metadata.get_names().end(), [&](const lw_shared_ptr<cql3::column_specification>& cdef) {
|
||||
return cdef->name->name() == *clustering_key_column_name;
|
||||
})
|
||||
) : 0;
|
||||
|
||||
std::optional<utils::UUID> timestamp;
|
||||
struct Record {
|
||||
rjson::value record;
|
||||
rjson::value dynamodb;
|
||||
};
|
||||
const managed_bytes empty_managed_bytes;
|
||||
std::unordered_map<const managed_bytes*, Record, managed_bytes_ptr_hash, managed_bytes_ptr_equal> records_map;
|
||||
auto dynamodb = rjson::empty_object();
|
||||
auto record = rjson::empty_object();
|
||||
const auto dc_name = _proxy.get_token_metadata_ptr()->get_topology().get_datacenter();
|
||||
|
||||
using op_utype = std::underlying_type_t<cdc::operation>;
|
||||
|
||||
auto maybe_add_record = [&] {
|
||||
if (!dynamodb.ObjectEmpty()) {
|
||||
rjson::add(record, "dynamodb", std::move(dynamodb));
|
||||
dynamodb = rjson::empty_object();
|
||||
}
|
||||
if (!record.ObjectEmpty()) {
|
||||
rjson::add(record, "awsRegion", rjson::from_string(dc_name));
|
||||
rjson::add(record, "eventID", event_id(iter.shard.id, *timestamp));
|
||||
rjson::add(record, "eventSource", "scylladb:alternator");
|
||||
rjson::add(record, "eventVersion", "1.1");
|
||||
rjson::push_back(records, std::move(record));
|
||||
record = rjson::empty_object();
|
||||
--limit;
|
||||
}
|
||||
};
|
||||
|
||||
for (auto& row : result_set->rows()) {
|
||||
auto op = static_cast<cdc::operation>(value_cast<op_utype>(data_type_for<op_utype>()->deserialize(*row[op_index])));
|
||||
auto ts = value_cast<utils::UUID>(data_type_for<utils::UUID>()->deserialize(*row[ts_index]));
|
||||
auto eor = row[eor_index].has_value() ? value_cast<bool>(boolean_type->deserialize(*row[eor_index])) : false;
|
||||
const managed_bytes* cs_ptr = clustering_key_column_name ? &*row[clustering_key_index] : &empty_managed_bytes;
|
||||
auto records_it = records_map.emplace(cs_ptr, Record{});
|
||||
auto &record = records_it.first->second;
|
||||
|
||||
if (records_it.second) {
|
||||
record.dynamodb = rjson::empty_object();
|
||||
record.record = rjson::empty_object();
|
||||
if (!dynamodb.HasMember("Keys")) {
|
||||
auto keys = rjson::empty_object();
|
||||
describe_single_item(*selection, row, key_names, keys);
|
||||
rjson::add(record.dynamodb, "Keys", std::move(keys));
|
||||
rjson::add(record.dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
||||
rjson::add(record.dynamodb, "SequenceNumber", sequence_number(ts));
|
||||
rjson::add(record.dynamodb, "StreamViewType", type);
|
||||
rjson::add(dynamodb, "Keys", std::move(keys));
|
||||
rjson::add(dynamodb, "ApproximateCreationDateTime", utils::UUID_gen::unix_timestamp_in_sec(ts).count());
|
||||
rjson::add(dynamodb, "SequenceNumber", sequence_number(ts));
|
||||
rjson::add(dynamodb, "StreamViewType", type);
|
||||
// TODO: SizeBytes
|
||||
}
|
||||
|
||||
@@ -1010,10 +992,6 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
* flags on CDC log, instead we use data to
|
||||
* drive what is returned. This is (afaict)
|
||||
* consistent with dynamo streams
|
||||
*
|
||||
* Note: BatchWriteItem will generate multiple records with
|
||||
* the same timestamp, when write isolation is set to always
|
||||
* (which triggers lwt), so we need to unpack them based on clustering key.
|
||||
*/
|
||||
switch (op) {
|
||||
case cdc::operation::pre_image:
|
||||
@@ -1022,14 +1000,14 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
auto item = rjson::empty_object();
|
||||
describe_single_item(*selection, row, attr_names, item, nullptr, true);
|
||||
describe_single_item(*selection, row, key_names, item);
|
||||
rjson::add(record.dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
|
||||
rjson::add(dynamodb, op == cdc::operation::pre_image ? "OldImage" : "NewImage", std::move(item));
|
||||
break;
|
||||
}
|
||||
case cdc::operation::update:
|
||||
rjson::add(record.record, "eventName", "MODIFY");
|
||||
rjson::add(record, "eventName", "MODIFY");
|
||||
break;
|
||||
case cdc::operation::insert:
|
||||
rjson::add(record.record, "eventName", "INSERT");
|
||||
rjson::add(record, "eventName", "INSERT");
|
||||
break;
|
||||
case cdc::operation::service_row_delete:
|
||||
case cdc::operation::service_partition_delete:
|
||||
@@ -1037,41 +1015,28 @@ future<executor::request_return_type> executor::get_records(client_state& client
|
||||
auto user_identity = rjson::empty_object();
|
||||
rjson::add(user_identity, "Type", "Service");
|
||||
rjson::add(user_identity, "PrincipalId", "dynamodb.amazonaws.com");
|
||||
rjson::add(record.record, "userIdentity", std::move(user_identity));
|
||||
rjson::add(record.record, "eventName", "REMOVE");
|
||||
rjson::add(record, "userIdentity", std::move(user_identity));
|
||||
rjson::add(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
default:
|
||||
rjson::add(record.record, "eventName", "REMOVE");
|
||||
rjson::add(record, "eventName", "REMOVE");
|
||||
break;
|
||||
}
|
||||
if (eor) {
|
||||
size_t index = 0;
|
||||
for (auto& [_, rec] : records_map) {
|
||||
rjson::add(rec.record, "awsRegion", rjson::from_string(dc_name));
|
||||
rjson::add(rec.record, "eventID", event_id(iter.shard.id, *timestamp, index++));
|
||||
rjson::add(rec.record, "eventSource", "scylladb:alternator");
|
||||
rjson::add(rec.record, "eventVersion", "1.1");
|
||||
|
||||
rjson::add(rec.record, "dynamodb", std::move(rec.dynamodb));
|
||||
rjson::push_back(records, std::move(rec.record));
|
||||
}
|
||||
|
||||
records_map.clear();
|
||||
maybe_add_record();
|
||||
timestamp = ts;
|
||||
if (records.Size() >= limit) {
|
||||
// Note: we might have more than limit rows here - BatchWriteItem will emit multiple items
|
||||
// with the same timestamp and we have no way of resume iteration midway through those,
|
||||
// so we return all of them here.
|
||||
if (limit == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto ret = rjson::empty_object();
|
||||
auto nrecords = records.Size();
|
||||
rjson::add(ret, "Records", std::move(records));
|
||||
|
||||
if (timestamp) {
|
||||
if (nrecords != 0) {
|
||||
// #9642. Set next iterators threshold to > last
|
||||
shard_iterator next_iter(iter.table, iter.shard, *timestamp, false);
|
||||
// Note that here we unconditionally return NextShardIterator,
|
||||
@@ -1122,7 +1087,6 @@ bool executor::add_stream_options(const rjson::value& stream_specification, sche
|
||||
|
||||
cdc::options opts;
|
||||
opts.enabled(true);
|
||||
// cdc::delta_mode is ignored by Alternator, so aim for the least overhead.
|
||||
opts.set_delta_mode(cdc::delta_mode::keys);
|
||||
opts.ttl(std::chrono::duration_cast<std::chrono::seconds>(dynamodb_streams_max_window).count());
|
||||
|
||||
|
||||
@@ -743,7 +743,7 @@
|
||||
"parameters":[
|
||||
{
|
||||
"name":"tag",
|
||||
"description":"The snapshot tag to delete. If omitted, all snapshots are removed.",
|
||||
"description":"the tag given to the snapshot",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -751,7 +751,7 @@
|
||||
},
|
||||
{
|
||||
"name":"kn",
|
||||
"description":"Comma-separated list of keyspace names to delete snapshots from. If omitted, snapshots are deleted from all keyspaces.",
|
||||
"description":"Comma-separated keyspaces name that their snapshot will be deleted",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -759,7 +759,7 @@
|
||||
},
|
||||
{
|
||||
"name":"cf",
|
||||
"description":"A table name used to filter which table's snapshots are deleted. If omitted or empty, snapshots for all tables are eligible. When provided together with 'kn', the table is looked up in each listed keyspace independently. For secondary indexes, the logical index name (e.g. 'myindex') can be used and is resolved automatically.",
|
||||
"description":"an optional table name that its snapshot will be deleted",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
@@ -3166,83 +3166,6 @@
|
||||
]
|
||||
},
|
||||
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}",
|
||||
"operations":[{
|
||||
"method":"POST",
|
||||
"summary":"Start vnodes-to-tablets migration for all tables in a keyspace",
|
||||
"type":"void",
|
||||
"nickname":"create_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get a keyspace's vnodes-to-tablets migration status",
|
||||
"type":"vnode_tablet_migration_status",
|
||||
"nickname":"get_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/node/storage_mode",
|
||||
"operations":[{
|
||||
"method":"PUT",
|
||||
"summary":"Set the intended storage mode for this node during vnodes-to-tablets migration",
|
||||
"type":"void",
|
||||
"nickname":"set_vnode_tablet_migration_node_storage_mode",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"intended_mode",
|
||||
"description":"Intended storage mode (tablets or vnodes)",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/vnode_tablet_migrations/keyspaces/{keyspace}/finalization",
|
||||
"operations":[{
|
||||
"method":"POST",
|
||||
"summary":"Finalize vnodes-to-tablets migration for all tables in a keyspace",
|
||||
"type":"void",
|
||||
"nickname":"finalize_vnode_tablet_migration",
|
||||
"produces":["application/json"],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"Keyspace name",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/quiesce_topology",
|
||||
"operations":[
|
||||
@@ -3860,45 +3783,6 @@
|
||||
"description":"The resulting compression ratio (estimated on a random sample of files)"
|
||||
}
|
||||
}
|
||||
},
|
||||
"vnode_tablet_migration_node_status":{
|
||||
"id":"vnode_tablet_migration_node_status",
|
||||
"description":"Node storage mode info during vnodes-to-tablets migration",
|
||||
"properties":{
|
||||
"host_id":{
|
||||
"type":"string",
|
||||
"description":"The host ID"
|
||||
},
|
||||
"current_mode":{
|
||||
"type":"string",
|
||||
"description":"The current storage mode: `vnodes` or `tablets`"
|
||||
},
|
||||
"intended_mode":{
|
||||
"type":"string",
|
||||
"description":"The intended storage mode: `vnodes` or `tablets`"
|
||||
}
|
||||
}
|
||||
},
|
||||
"vnode_tablet_migration_status":{
|
||||
"id":"vnode_tablet_migration_status",
|
||||
"description":"Vnodes-to-tablets migration status for a keyspace",
|
||||
"properties":{
|
||||
"keyspace":{
|
||||
"type":"string",
|
||||
"description":"The keyspace name"
|
||||
},
|
||||
"status":{
|
||||
"type":"string",
|
||||
"description":"The migration status: `vnodes` (not started), `migrating_to_tablets` (in progress), or `tablets` (complete)"
|
||||
},
|
||||
"nodes":{
|
||||
"type":"array",
|
||||
"items":{
|
||||
"$ref":"vnode_tablet_migration_node_status"
|
||||
},
|
||||
"description":"Per-node storage mode information. Empty if the keyspace is not being migrated."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ void set_error_injection(http_context& ctx, routes& r) {
|
||||
|
||||
hf::enable_injection.set(r, [](std::unique_ptr<request> req) -> future<json::json_return_type> {
|
||||
sstring injection = req->get_path_param("injection");
|
||||
bool one_shot = strcasecmp(req->get_query_param("one_shot").c_str(), "true") == 0;
|
||||
bool one_shot = req->get_query_param("one_shot") == "True";
|
||||
auto params = co_await util::read_entire_stream_contiguous(*req->content_stream);
|
||||
|
||||
const size_t max_params_size = 1024 * 1024;
|
||||
|
||||
@@ -30,7 +30,6 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "service/load_meter.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
@@ -573,6 +572,14 @@ void unset_view_builder(http_context& ctx, routes& r) {
|
||||
cf::get_built_indexes.unset(r);
|
||||
}
|
||||
|
||||
static future<json::json_return_type> describe_ring_as_json(sharded<service::storage_service>& ss, sstring keyspace) {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
}
|
||||
|
||||
static future<json::json_return_type> describe_ring_as_json_for_table(const sharded<service::storage_service>& ss, sstring keyspace, sstring table) {
|
||||
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring_for_table(keyspace, table), token_range_endpoints_to_json));
|
||||
}
|
||||
|
||||
namespace {
|
||||
template <typename Key, typename Value>
|
||||
storage_service_json::mapper map_to_json(const std::pair<Key, Value>& i) {
|
||||
@@ -670,16 +677,13 @@ rest_describe_ring(http_context& ctx, sharded<service::storage_service>& ss, std
|
||||
if (!req->param.exists("keyspace")) {
|
||||
throw bad_param_exception("The keyspace param is not provided");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
auto keyspace = req->get_path_param("keyspace");
|
||||
auto table = req->get_query_param("table");
|
||||
utils::chunked_vector<dht::token_range_endpoints> ranges;
|
||||
if (!table.empty()) {
|
||||
auto table_id = validate_table(ctx.db.local(), keyspace, table);
|
||||
ranges = co_await ss.local().describe_ring_for_table(table_id);
|
||||
} else {
|
||||
ranges = co_await ss.local().describe_ring(keyspace);
|
||||
validate_table(ctx.db.local(), keyspace, table);
|
||||
return describe_ring_as_json_for_table(ss, keyspace, table);
|
||||
}
|
||||
co_return json::json_return_type(stream_range_as_array(std::move(ranges), token_range_endpoints_to_json));
|
||||
return describe_ring_as_json(ss, validate_keyspace(ctx, req));
|
||||
}
|
||||
|
||||
static
|
||||
@@ -1723,69 +1727,6 @@ rest_tablet_balancing_enable(sharded<service::storage_service>& ss, std::unique_
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_create_vnode_tablet_migration(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
if (!ss.local().get_feature_service().vnodes_to_tablets_migrations) {
|
||||
apilog.warn("create_vnode_tablet_migration: called before the cluster feature was enabled");
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
co_await ss.local().prepare_for_tablets_migration(keyspace);
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_get_vnode_tablet_migration(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
if (!ss.local().get_feature_service().vnodes_to_tablets_migrations) {
|
||||
apilog.warn("get_vnode_tablet_migration: called before the cluster feature was enabled");
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
auto status = co_await ss.local().get_tablets_migration_status(keyspace);
|
||||
|
||||
ss::vnode_tablet_migration_status result;
|
||||
result.keyspace = status.keyspace;
|
||||
result.status = status.status;
|
||||
result.nodes._set = true;
|
||||
for (const auto& node : status.nodes) {
|
||||
ss::vnode_tablet_migration_node_status n;
|
||||
n.host_id = fmt::to_string(node.host_id);
|
||||
n.current_mode = node.current_mode;
|
||||
n.intended_mode = node.intended_mode;
|
||||
result.nodes.push(n);
|
||||
}
|
||||
co_return result;
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_set_vnode_tablet_migration_node_storage_mode(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
if (!ss.local().get_feature_service().vnodes_to_tablets_migrations) {
|
||||
apilog.warn("set_vnode_tablet_migration_node_storage_mode: called before the cluster feature was enabled");
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto mode_str = req->get_query_param("intended_mode");
|
||||
auto mode = service::intended_storage_mode_from_string(mode_str);
|
||||
co_await ss.local().set_node_intended_storage_mode(mode);
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_finalize_vnode_tablet_migration(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
if (!ss.local().get_feature_service().vnodes_to_tablets_migrations) {
|
||||
apilog.warn("finalize_vnode_tablet_migration: called before the cluster feature was enabled");
|
||||
throw std::runtime_error("vnodes-to-tablets migration requires all nodes to support the VNODES_TO_TABLETS_MIGRATIONS cluster feature");
|
||||
}
|
||||
auto keyspace = validate_keyspace(ctx, req);
|
||||
validate_keyspace(ctx, keyspace);
|
||||
|
||||
co_await ss.local().finalize_tablets_migration(keyspace);
|
||||
co_return json_void();
|
||||
}
|
||||
|
||||
static
|
||||
future<json::json_return_type>
|
||||
rest_quiesce_topology(sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
|
||||
@@ -1936,10 +1877,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
ss::del_tablet_replica.set(r, rest_bind(rest_del_tablet_replica, ctx, ss));
|
||||
ss::repair_tablet.set(r, rest_bind(rest_repair_tablet, ctx, ss));
|
||||
ss::tablet_balancing_enable.set(r, rest_bind(rest_tablet_balancing_enable, ss));
|
||||
ss::create_vnode_tablet_migration.set(r, rest_bind(rest_create_vnode_tablet_migration, ctx, ss));
|
||||
ss::get_vnode_tablet_migration.set(r, rest_bind(rest_get_vnode_tablet_migration, ctx, ss));
|
||||
ss::set_vnode_tablet_migration_node_storage_mode.set(r, rest_bind(rest_set_vnode_tablet_migration_node_storage_mode, ctx, ss));
|
||||
ss::finalize_vnode_tablet_migration.set(r, rest_bind(rest_finalize_vnode_tablet_migration, ctx, ss));
|
||||
ss::quiesce_topology.set(r, rest_bind(rest_quiesce_topology, ss));
|
||||
sp::get_schema_versions.set(r, rest_bind(rest_get_schema_versions, ss));
|
||||
ss::drop_quarantined_sstables.set(r, rest_bind(rest_drop_quarantined_sstables, ctx, ss));
|
||||
@@ -2019,10 +1956,6 @@ void unset_storage_service(http_context& ctx, routes& r) {
|
||||
ss::del_tablet_replica.unset(r);
|
||||
ss::repair_tablet.unset(r);
|
||||
ss::tablet_balancing_enable.unset(r);
|
||||
ss::create_vnode_tablet_migration.unset(r);
|
||||
ss::get_vnode_tablet_migration.unset(r);
|
||||
ss::set_vnode_tablet_migration_node_storage_mode.unset(r);
|
||||
ss::finalize_vnode_tablet_migration.unset(r);
|
||||
ss::quiesce_topology.unset(r);
|
||||
sp::get_schema_versions.unset(r);
|
||||
ss::drop_quarantined_sstables.unset(r);
|
||||
@@ -2113,8 +2046,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
co_await snap_ctl.local().take_column_family_snapshot(keynames[0], column_families, tag, opts);
|
||||
}
|
||||
co_return json_void();
|
||||
} catch (const data_dictionary::no_such_column_family& e) {
|
||||
throw httpd::bad_param_exception(e.what());
|
||||
} catch (...) {
|
||||
apilog.error("take_snapshot failed: {}", std::current_exception());
|
||||
throw;
|
||||
@@ -2151,8 +2082,6 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
|
||||
try {
|
||||
co_await snap_ctl.local().clear_snapshot(tag, keynames, column_family);
|
||||
co_return json_void();
|
||||
} catch (const data_dictionary::no_such_column_family& e) {
|
||||
throw httpd::bad_param_exception(e.what());
|
||||
} catch (...) {
|
||||
apilog.error("del_snapshot failed: {}", std::current_exception());
|
||||
throw;
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include "utils/to_string.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
@@ -106,9 +105,6 @@ auth::authentication_option_set auth::certificate_authenticator::alterable_optio
|
||||
}
|
||||
|
||||
future<std::optional<auth::authenticated_user>> auth::certificate_authenticator::authenticate(session_dn_func f) const {
|
||||
if (auto user = utils::get_local_injector().inject_parameter("transport_early_auth_bypass")) {
|
||||
co_return auth::authenticated_user{sstring(*user)};
|
||||
}
|
||||
if (!f) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
#
|
||||
# Copyright 2025-present ScyllaDB
|
||||
#
|
||||
|
||||
#
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
#
|
||||
|
||||
# Custom FindLua module that uses pkg-config, matching configure.py's
|
||||
# approach. CMake's built-in FindLua resolves to the versioned library
|
||||
# (e.g. liblua-5.4.so) instead of the unversioned symlink (liblua.so),
|
||||
# causing a name mismatch between the two build systems.
|
||||
|
||||
find_package(PkgConfig REQUIRED)
|
||||
|
||||
# configure.py: lua53 on Debian-like, lua on others
|
||||
pkg_search_module(PC_lua QUIET lua53 lua)
|
||||
|
||||
find_library(Lua_LIBRARY
|
||||
NAMES lua lua5.3 lua53
|
||||
HINTS
|
||||
${PC_lua_LIBDIR}
|
||||
${PC_lua_LIBRARY_DIRS})
|
||||
|
||||
find_path(Lua_INCLUDE_DIR
|
||||
NAMES lua.h
|
||||
HINTS
|
||||
${PC_lua_INCLUDEDIR}
|
||||
${PC_lua_INCLUDE_DIRS})
|
||||
|
||||
mark_as_advanced(
|
||||
Lua_LIBRARY
|
||||
Lua_INCLUDE_DIR)
|
||||
|
||||
include(FindPackageHandleStandardArgs)
|
||||
|
||||
find_package_handle_standard_args(Lua
|
||||
REQUIRED_VARS
|
||||
Lua_LIBRARY
|
||||
Lua_INCLUDE_DIR
|
||||
VERSION_VAR PC_lua_VERSION)
|
||||
|
||||
if(Lua_FOUND)
|
||||
set(LUA_LIBRARIES ${Lua_LIBRARY})
|
||||
set(LUA_INCLUDE_DIR ${Lua_INCLUDE_DIR})
|
||||
endif()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
set(CMAKE_CXX_FLAGS_COVERAGE
|
||||
"-fprofile-instr-generate -fcoverage-mapping"
|
||||
"-fprofile-instr-generate -fcoverage-mapping -fprofile-list=${CMAKE_SOURCE_DIR}/coverage_sources.list"
|
||||
CACHE
|
||||
INTERNAL
|
||||
"")
|
||||
@@ -8,33 +8,18 @@ update_build_flags(Coverage
|
||||
OPTIMIZATION_LEVEL "g")
|
||||
|
||||
set(scylla_build_mode_Coverage "coverage")
|
||||
|
||||
# Coverage mode sets cmake_build_type='Debug' for Seastar
|
||||
# (configure.py:515), so Seastar's pkg-config --cflags output
|
||||
# (configure.py:2252-2267, queried at configure.py:3039) includes debug
|
||||
# defines, sanitizer compile flags, and -fstack-clash-protection.
|
||||
# Seastar's CMake generator expressions only activate these for
|
||||
# Debug/Sanitize configs, so we add them explicitly for Coverage.
|
||||
set(Seastar_DEFINITIONS_COVERAGE
|
||||
SCYLLA_BUILD_MODE=${scylla_build_mode_Coverage}
|
||||
SEASTAR_DEBUG
|
||||
SEASTAR_DEFAULT_ALLOCATOR
|
||||
SEASTAR_SHUFFLE_TASK_QUEUE
|
||||
SEASTAR_DEBUG_SHARED_PTR
|
||||
SEASTAR_DEBUG_PROMISE
|
||||
SEASTAR_TYPE_ERASE_MORE)
|
||||
DEBUG
|
||||
SANITIZE
|
||||
DEBUG_LSA_SANITIZER
|
||||
SCYLLA_ENABLE_ERROR_INJECTION)
|
||||
foreach(definition ${Seastar_DEFINITIONS_COVERAGE})
|
||||
add_compile_definitions(
|
||||
$<$<CONFIG:Coverage>:${definition}>)
|
||||
endforeach()
|
||||
|
||||
add_compile_options(
|
||||
$<$<CONFIG:Coverage>:-fsanitize=address>
|
||||
$<$<CONFIG:Coverage>:-fsanitize=undefined>
|
||||
$<$<CONFIG:Coverage>:-fsanitize=vptr>
|
||||
$<$<CONFIG:Coverage>:-fstack-clash-protection>)
|
||||
|
||||
set(CMAKE_EXE_LINKER_FLAGS_COVERAGE
|
||||
set(CMAKE_STATIC_LINKER_FLAGS_COVERAGE
|
||||
"-fprofile-instr-generate -fcoverage-mapping")
|
||||
|
||||
maybe_limit_stack_usage_in_KB(40 Coverage)
|
||||
|
||||
@@ -131,7 +131,6 @@ function(maybe_limit_stack_usage_in_KB stack_usage_threshold_in_KB config)
|
||||
check_cxx_compiler_flag(${_stack_usage_threshold_flag} _stack_usage_flag_supported)
|
||||
if(_stack_usage_flag_supported)
|
||||
add_compile_options($<$<CONFIG:${config}>:${_stack_usage_threshold_flag}>)
|
||||
add_compile_options($<$<CONFIG:${config}>:-Wno-error=stack-usage=>)
|
||||
endif()
|
||||
endfunction()
|
||||
|
||||
@@ -261,23 +260,6 @@ endif()
|
||||
|
||||
# Force SHA1 build-id generation
|
||||
add_link_options("LINKER:--build-id=sha1")
|
||||
|
||||
# Match configure.py: add -fno-lto globally. configure.py adds -fno-lto to
|
||||
# all binaries (except standalone cpp_apps like patchelf) via the per-binary
|
||||
# $libs variable. LTO-enabled targets (scylla binary in RelWithDebInfo) will
|
||||
# override with -flto=thin -ffat-lto-objects via enable_lto().
|
||||
add_link_options(-fno-lto)
|
||||
|
||||
# Match configure.py:2633-2636 — sanitizer link flags for standalone binaries
|
||||
# (e.g. patchelf) that don't link Seastar. Seastar-linked targets get these
|
||||
# via seastar_libs (configure.py:2649).
|
||||
# Coverage mode gets sanitizer link flags via the seastar target instead
|
||||
# (see CMakeLists.txt), matching configure.py where only seastar_libs_coverage
|
||||
# carries -fsanitize (not cxx_ld_flags).
|
||||
add_link_options(
|
||||
$<$<CONFIG:Debug,Sanitize>:-fsanitize=address>
|
||||
$<$<CONFIG:Debug,Sanitize>:-fsanitize=undefined>)
|
||||
|
||||
include(CheckLinkerFlag)
|
||||
set(Scylla_USE_LINKER
|
||||
""
|
||||
|
||||
@@ -44,7 +44,6 @@
|
||||
#include "dht/partition_filter.hh"
|
||||
#include "mutation_writer/shard_based_splitting_writer.hh"
|
||||
#include "mutation_writer/partition_based_splitting_writer.hh"
|
||||
#include "mutation_writer/token_group_based_splitting_writer.hh"
|
||||
#include "mutation/mutation_source_metadata.hh"
|
||||
#include "mutation/mutation_fragment_stream_validator.hh"
|
||||
#include "utils/assert.hh"
|
||||
@@ -1934,7 +1933,6 @@ class resharding_compaction final : public compaction {
|
||||
};
|
||||
std::vector<estimated_values> _estimation_per_shard;
|
||||
std::vector<sstables::run_id> _run_identifiers;
|
||||
bool _reshard_vnodes;
|
||||
private:
|
||||
// return estimated partitions per sstable for a given shard
|
||||
uint64_t partitions_per_sstable(shard_id s) const {
|
||||
@@ -1947,11 +1945,7 @@ public:
|
||||
: compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no)
|
||||
, _estimation_per_shard(smp::count)
|
||||
, _run_identifiers(smp::count)
|
||||
, _reshard_vnodes(descriptor.options.as<compaction_type_options::reshard>().vnodes_resharding)
|
||||
{
|
||||
if (_reshard_vnodes && !_owned_ranges) {
|
||||
on_internal_error(clogger, "Resharding vnodes requires owned_ranges");
|
||||
}
|
||||
for (auto& sst : _sstables) {
|
||||
const auto& shards = sst->get_shards_for_this_sstable();
|
||||
auto size = sst->bytes_on_disk();
|
||||
@@ -1989,25 +1983,8 @@ public:
|
||||
}
|
||||
|
||||
mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override {
|
||||
auto owned_ranges = _reshard_vnodes ? _owned_ranges : nullptr;
|
||||
return [end_consumer = std::move(end_consumer), owned_ranges = std::move(owned_ranges)] (mutation_reader reader) mutable -> future<> {
|
||||
if (owned_ranges) {
|
||||
auto classify = [owned_ranges, it = owned_ranges->begin(), idx = mutation_writer::token_group_id(0)] (dht::token t) mutable -> mutation_writer::token_group_id {
|
||||
dht::token_comparator cmp;
|
||||
while (it != owned_ranges->end() && it->after(t, cmp)) {
|
||||
clogger.debug("Token {} is after current range {}: advancing to the next range", t, *it);
|
||||
++it;
|
||||
++idx;
|
||||
}
|
||||
if (it == owned_ranges->end() || !it->contains(t, cmp)) {
|
||||
on_internal_error(clogger, fmt::format("Token {} is outside of owned ranges", t));
|
||||
}
|
||||
return idx;
|
||||
};
|
||||
return mutation_writer::segregate_by_token_group(std::move(reader), std::move(classify), std::move(end_consumer));
|
||||
} else {
|
||||
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
|
||||
}
|
||||
return [end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> {
|
||||
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -87,8 +87,6 @@ public:
|
||||
drop_unfixable_sstables drop_unfixable = drop_unfixable_sstables::no;
|
||||
};
|
||||
struct reshard {
|
||||
// If set, resharding compaction will apply the owned_ranges to segregate sstables in vnode boundaries.
|
||||
bool vnodes_resharding = false;
|
||||
};
|
||||
struct reshape {
|
||||
};
|
||||
@@ -117,8 +115,8 @@ public:
|
||||
return compaction_type_options(reshape{});
|
||||
}
|
||||
|
||||
static compaction_type_options make_reshard(bool vnodes_resharding = false) {
|
||||
return compaction_type_options(reshard{.vnodes_resharding = vnodes_resharding});
|
||||
static compaction_type_options make_reshard() {
|
||||
return compaction_type_options(reshard{});
|
||||
}
|
||||
|
||||
static compaction_type_options make_regular() {
|
||||
|
||||
@@ -132,7 +132,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so
|
||||
// A creator function must be passed that will create an SSTable object in the correct shard,
|
||||
// and an I/O priority must be specified.
|
||||
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
|
||||
compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info)
|
||||
compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, tasks::task_info parent_info)
|
||||
{
|
||||
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
|
||||
// to reshard in this shard.
|
||||
@@ -160,22 +160,13 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
|
||||
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
|
||||
// parallel_for_each so the statistics about pending jobs are updated to reflect all
|
||||
// jobs. But only one will run in parallel at a time
|
||||
//
|
||||
// The compaction group view is used here only for job registration and gate-holding;
|
||||
// resharding never reads or writes the group's own SSTables. With static (vnode)
|
||||
// sharding there is exactly one group per shard; with tablets there may be many.
|
||||
// In either case, any registered group suffices.
|
||||
auto* cg = table.get_any_compaction_group();
|
||||
if (!cg) {
|
||||
on_internal_error(tasks::tmlogger, format("No compaction group found for table {}.{}", table.schema()->ks_name(), table.schema()->cf_name()));
|
||||
}
|
||||
auto& t = cg->view_for_unrepaired_data();
|
||||
auto& t = table.try_get_compaction_group_view_with_static_sharding();
|
||||
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
|
||||
return table.get_compaction_manager().run_custom_job(t, compaction_type::Reshard, "Reshard compaction", [&] (compaction_data& info, compaction_progress_monitor& progress_monitor) -> future<> {
|
||||
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
|
||||
|
||||
compaction_descriptor desc(sstlist);
|
||||
desc.options = compaction_type_options::make_reshard(vnodes_resharding);
|
||||
desc.options = compaction_type_options::make_reshard();
|
||||
desc.creator = creator;
|
||||
desc.sharder = &erm->get_sharder(*table.schema());
|
||||
desc.owned_ranges = owned_ranges_ptr;
|
||||
@@ -915,7 +906,7 @@ future<> table_resharding_compaction_task_impl::run() {
|
||||
if (_owned_ranges_ptr) {
|
||||
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
|
||||
}
|
||||
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), _vnodes_resharding, destinations);
|
||||
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), destinations);
|
||||
co_await task->done();
|
||||
}));
|
||||
|
||||
@@ -935,14 +926,12 @@ shard_resharding_compaction_task_impl::shard_resharding_compaction_task_impl(tas
|
||||
replica::database& db,
|
||||
compaction_sstable_creator_fn creator,
|
||||
compaction::owned_ranges_ptr local_owned_ranges_ptr,
|
||||
bool vnodes_resharding,
|
||||
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
|
||||
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), std::move(table), "", parent_id)
|
||||
, _dir(dir)
|
||||
, _db(db)
|
||||
, _creator(std::move(creator))
|
||||
, _local_owned_ranges_ptr(std::move(local_owned_ranges_ptr))
|
||||
, _vnodes_resharding(vnodes_resharding)
|
||||
, _destinations(destinations)
|
||||
{
|
||||
_expected_workload = _destinations[this_shard_id()].size();
|
||||
@@ -952,7 +941,7 @@ future<> shard_resharding_compaction_task_impl::run() {
|
||||
auto& table = _db.find_column_family(_status.keyspace, _status.table);
|
||||
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
|
||||
tasks::task_info info{_status.id, _status.shard};
|
||||
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), _vnodes_resharding, info);
|
||||
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), info);
|
||||
co_await _dir.local().move_foreign_sstables(_dir);
|
||||
}
|
||||
|
||||
|
||||
@@ -693,7 +693,6 @@ private:
|
||||
sharded<replica::database>& _db;
|
||||
compaction_sstable_creator_fn _creator;
|
||||
compaction::owned_ranges_ptr _owned_ranges_ptr;
|
||||
bool _vnodes_resharding;
|
||||
public:
|
||||
table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
@@ -701,14 +700,12 @@ public:
|
||||
sharded<sstables::sstable_directory>& dir,
|
||||
sharded<replica::database>& db,
|
||||
compaction_sstable_creator_fn creator,
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr,
|
||||
bool vnodes_resharding) noexcept
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr) noexcept
|
||||
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
|
||||
, _dir(dir)
|
||||
, _db(db)
|
||||
, _creator(std::move(creator))
|
||||
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
|
||||
, _vnodes_resharding(vnodes_resharding)
|
||||
{}
|
||||
protected:
|
||||
virtual future<> run() override;
|
||||
@@ -721,7 +718,6 @@ private:
|
||||
replica::database& _db;
|
||||
compaction_sstable_creator_fn _creator;
|
||||
compaction::owned_ranges_ptr _local_owned_ranges_ptr;
|
||||
bool _vnodes_resharding;
|
||||
std::vector<replica::reshard_shard_descriptor>& _destinations;
|
||||
public:
|
||||
shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
@@ -732,7 +728,6 @@ public:
|
||||
replica::database& db,
|
||||
compaction_sstable_creator_fn creator,
|
||||
compaction::owned_ranges_ptr local_owned_ranges_ptr,
|
||||
bool vnodes_resharding,
|
||||
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept;
|
||||
protected:
|
||||
virtual future<> run() override;
|
||||
|
||||
@@ -488,7 +488,6 @@ sstable_format: ms
|
||||
# compressed.
|
||||
# can be: all - all traffic is compressed
|
||||
# dc - traffic between different datacenters is compressed
|
||||
# rack - traffic between different racks is compressed
|
||||
# none - nothing is compressed.
|
||||
# internode_compression: none
|
||||
|
||||
|
||||
@@ -1708,14 +1708,12 @@ deps['test/boost/combined_tests'] += [
|
||||
'test/boost/sstable_compression_config_test.cc',
|
||||
'test/boost/sstable_directory_test.cc',
|
||||
'test/boost/sstable_set_test.cc',
|
||||
'test/boost/sstable_tablet_streaming.cc',
|
||||
'test/boost/statement_restrictions_test.cc',
|
||||
'test/boost/storage_proxy_test.cc',
|
||||
'test/boost/tablets_test.cc',
|
||||
'test/boost/tracing_test.cc',
|
||||
'test/boost/user_function_test.cc',
|
||||
'test/boost/user_types_test.cc',
|
||||
'test/boost/vector_index_test.cc',
|
||||
'test/boost/view_build_test.cc',
|
||||
'test/boost/view_complex_test.cc',
|
||||
'test/boost/view_schema_ckey_test.cc',
|
||||
@@ -2233,20 +2231,16 @@ abseil_libs = ['absl/' + lib for lib in [
|
||||
'container/libabsl_raw_hash_set.a',
|
||||
'synchronization/libabsl_synchronization.a',
|
||||
'synchronization/libabsl_graphcycles_internal.a',
|
||||
'synchronization/libabsl_kernel_timeout_internal.a',
|
||||
'debugging/libabsl_stacktrace.a',
|
||||
'debugging/libabsl_symbolize.a',
|
||||
'debugging/libabsl_debugging_internal.a',
|
||||
'debugging/libabsl_demangle_internal.a',
|
||||
'debugging/libabsl_demangle_rust.a',
|
||||
'debugging/libabsl_decode_rust_punycode.a',
|
||||
'debugging/libabsl_utf8_for_code_point.a',
|
||||
'debugging/libabsl_borrowed_fixup_buffer.a',
|
||||
'time/libabsl_time.a',
|
||||
'time/libabsl_time_zone.a',
|
||||
'numeric/libabsl_int128.a',
|
||||
'hash/libabsl_hash.a',
|
||||
'hash/libabsl_city.a',
|
||||
'hash/libabsl_low_level_hash.a',
|
||||
'base/libabsl_malloc_internal.a',
|
||||
'base/libabsl_spinlock_wait.a',
|
||||
'base/libabsl_base.a',
|
||||
|
||||
@@ -143,6 +143,15 @@ public:
|
||||
return value_type();
|
||||
}
|
||||
|
||||
bool update_result_metadata_id(const key_type& key, cql3::cql_metadata_id_type metadata_id) {
|
||||
cache_value_ptr vp = _cache.find(key.key());
|
||||
if (!vp) {
|
||||
return false;
|
||||
}
|
||||
(*vp)->update_result_metadata_id(std::move(metadata_id));
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename Pred>
|
||||
requires std::is_invocable_r_v<bool, Pred, ::shared_ptr<cql_statement>>
|
||||
void remove_if(Pred&& pred) {
|
||||
|
||||
@@ -260,6 +260,10 @@ public:
|
||||
return _prepared_cache.find(key);
|
||||
}
|
||||
|
||||
bool update_prepared_result_metadata_id(const prepared_cache_key_type& key, cql_metadata_id_type metadata_id) {
|
||||
return _prepared_cache.update_result_metadata_id(key, std::move(metadata_id));
|
||||
}
|
||||
|
||||
inline
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_prepared(
|
||||
|
||||
@@ -201,10 +201,6 @@ public:
|
||||
return _clustering_columns_restrictions;
|
||||
}
|
||||
|
||||
const expr::expression& get_nonprimary_key_restrictions() const {
|
||||
return _nonprimary_key_restrictions;
|
||||
}
|
||||
|
||||
// Get a set of columns restricted by the IS NOT NULL restriction.
|
||||
// IS NOT NULL is a special case that is handled separately from other restrictions.
|
||||
const std::unordered_set<const column_definition*> get_not_null_columns() const;
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include "create_index_statement.hh"
|
||||
#include "db/config.hh"
|
||||
@@ -36,10 +35,8 @@
|
||||
#include "db/schema_tables.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/vector.hh"
|
||||
#include "db/tags/extension.hh"
|
||||
#include "tombstone_gc_extension.hh"
|
||||
#include "index/secondary_index.hh"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
@@ -119,58 +116,6 @@ static data_type type_for_computed_column(cql3::statements::index_target::target
|
||||
}
|
||||
}
|
||||
|
||||
// Cassandra SAI compatibility: detect the StorageAttachedIndex class name
|
||||
// used by Cassandra to create vector and metadata indexes.
|
||||
static bool is_sai_class_name(const sstring& class_name) {
|
||||
return class_name == "org.apache.cassandra.index.sai.StorageAttachedIndex"
|
||||
|| boost::iequals(class_name, "storageattachedindex")
|
||||
|| boost::iequals(class_name, "sai");
|
||||
}
|
||||
|
||||
// Returns true if the custom class name refers to a vector-capable index
|
||||
// (either ScyllaDB's native vector_index or Cassandra's SAI).
|
||||
static bool is_vector_capable_class(const sstring& class_name) {
|
||||
return class_name == "vector_index" || is_sai_class_name(class_name);
|
||||
}
|
||||
|
||||
// When the custom class is SAI, verify that at least one target is a
|
||||
// vector column and rewrite the class to ScyllaDB's native "vector_index".
|
||||
// Non-vector single-column targets and multi-column (local-index partition
|
||||
// key) targets are skipped — they are treated as filtering columns by
|
||||
// vector_index::check_target().
|
||||
static void maybe_rewrite_sai_to_vector_index(
|
||||
const schema& schema,
|
||||
const std::vector<::shared_ptr<index_target>>& targets,
|
||||
index_specific_prop_defs& props) {
|
||||
if (!props.custom_class || !is_sai_class_name(*props.custom_class)) {
|
||||
return;
|
||||
}
|
||||
for (const auto& target : targets) {
|
||||
auto* ident = std::get_if<::shared_ptr<column_identifier>>(&target->value);
|
||||
if (!ident) {
|
||||
// Multi-column target (local-index partition key) — skip.
|
||||
continue;
|
||||
}
|
||||
auto cd = schema.get_column_definition((*ident)->name());
|
||||
if (!cd) {
|
||||
// Nonexistent column — skip; vector_index::validate() will catch it.
|
||||
continue;
|
||||
}
|
||||
if (dynamic_cast<const vector_type_impl*>(cd->type.get())) {
|
||||
props.custom_class = "vector_index";
|
||||
return;
|
||||
}
|
||||
}
|
||||
throw exceptions::invalid_request_exception(
|
||||
"StorageAttachedIndex (SAI) is only supported on vector columns; "
|
||||
"use a secondary index for non-vector columns");
|
||||
}
|
||||
|
||||
static bool is_vector_index(const index_options_map& options) {
|
||||
auto class_it = options.find(db::index::secondary_index::custom_class_option_name);
|
||||
return class_it != options.end() && is_vector_capable_class(class_it->second);
|
||||
}
|
||||
|
||||
view_ptr create_index_statement::create_view_for_index(const schema_ptr schema, const index_metadata& im,
|
||||
const data_dictionary::database& db) const
|
||||
{
|
||||
@@ -320,8 +265,8 @@ create_index_statement::validate(query_processor& qp, const service::client_stat
|
||||
|
||||
_idx_properties->validate();
|
||||
|
||||
|
||||
const bool is_vector_index = _idx_properties->custom_class && is_vector_capable_class(*_idx_properties->custom_class);
|
||||
// FIXME: This is ugly and can be improved.
|
||||
const bool is_vector_index = _idx_properties->custom_class && *_idx_properties->custom_class == "vector_index";
|
||||
const bool uses_view_properties = _view_properties.properties()->count() > 0
|
||||
|| _view_properties.use_compact_storage()
|
||||
|| _view_properties.defined_ordering().size() > 0;
|
||||
@@ -407,8 +352,6 @@ create_index_statement::validate_while_executing(data_dictionary::database db, l
|
||||
targets.emplace_back(raw_target->prepare(*schema));
|
||||
}
|
||||
|
||||
maybe_rewrite_sai_to_vector_index(*schema, targets, *_idx_properties);
|
||||
|
||||
if (_idx_properties && _idx_properties->custom_class) {
|
||||
auto custom_index_factory = secondary_index::secondary_index_manager::get_custom_class_factory(*_idx_properties->custom_class);
|
||||
if (!custom_index_factory) {
|
||||
@@ -754,9 +697,7 @@ index_metadata create_index_statement::make_index_metadata(const std::vector<::s
|
||||
const index_options_map& options)
|
||||
{
|
||||
index_options_map new_options = options;
|
||||
auto target_option = is_vector_index(options)
|
||||
? secondary_index::vector_index::serialize_targets(targets)
|
||||
: secondary_index::target_parser::serialize_targets(targets);
|
||||
auto target_option = secondary_index::target_parser::serialize_targets(targets);
|
||||
new_options.emplace(index_target::target_option_name, target_option);
|
||||
|
||||
const auto& first_target = targets.front()->value;
|
||||
|
||||
@@ -52,6 +52,7 @@ public:
|
||||
std::vector<sstring> warnings;
|
||||
private:
|
||||
cql_metadata_id_type _metadata_id;
|
||||
bool _result_metadata_is_empty;
|
||||
|
||||
public:
|
||||
prepared_statement(audit::audit_info_ptr&& audit_info, seastar::shared_ptr<cql_statement> statement_, std::vector<seastar::lw_shared_ptr<column_specification>> bound_names_,
|
||||
@@ -71,6 +72,15 @@ public:
|
||||
void calculate_metadata_id();
|
||||
|
||||
cql_metadata_id_type get_metadata_id() const;
|
||||
|
||||
bool result_metadata_is_empty() const {
|
||||
return _result_metadata_is_empty;
|
||||
}
|
||||
|
||||
void update_result_metadata_id(cql_metadata_id_type metadata_id) {
|
||||
_metadata_id = std::move(metadata_id);
|
||||
_result_metadata_is_empty = false;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -49,6 +49,7 @@ prepared_statement::prepared_statement(
|
||||
, partition_key_bind_indices(std::move(partition_key_bind_indices))
|
||||
, warnings(std::move(warnings))
|
||||
, _metadata_id(bytes{})
|
||||
, _result_metadata_is_empty(statement->get_result_metadata()->flags().contains<metadata::flag::NO_METADATA>())
|
||||
{
|
||||
statement->set_audit_info(std::move(audit_info));
|
||||
}
|
||||
|
||||
@@ -52,7 +52,6 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
|
||||
}
|
||||
|
||||
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
|
||||
|
||||
const auto mutate_result = co_await coordinator.get().mutate(_statement->s,
|
||||
keys[0].start()->value().token(),
|
||||
[&](api::timestamp_type ts) {
|
||||
@@ -66,7 +65,7 @@ future<shared_ptr<result_message>> modification_statement::execute_without_check
|
||||
raw_cql_statement, muts.size()));
|
||||
}
|
||||
return std::move(*muts.begin());
|
||||
}, timeout, qs.get_client_state().get_abort_source());
|
||||
});
|
||||
|
||||
using namespace service::strong_consistency;
|
||||
if (const auto* redirect = get_if<need_redirect>(&mutate_result)) {
|
||||
|
||||
@@ -42,7 +42,7 @@ future<::shared_ptr<result_message>> select_statement::do_execute(query_processo
|
||||
const auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
|
||||
auto [coordinator, holder] = qp.acquire_strongly_consistent_coordinator();
|
||||
auto query_result = co_await coordinator.get().query(_query_schema, *read_command,
|
||||
key_ranges, state.get_trace_state(), timeout, state.get_client_state().get_abort_source());
|
||||
key_ranges, state.get_trace_state(), timeout);
|
||||
|
||||
using namespace service::strong_consistency;
|
||||
if (const auto* redirect = get_if<need_redirect>(&query_result)) {
|
||||
@@ -54,4 +54,4 @@ future<::shared_ptr<result_message>> select_statement::do_execute(query_processo
|
||||
read_command, options, now);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -250,8 +250,8 @@ void keyspace_metadata::validate(const gms::feature_service& fs, const locator::
|
||||
if (params.consistency && !fs.strongly_consistent_tables) {
|
||||
throw exceptions::configuration_exception("The strongly_consistent_tables feature must be enabled to use a consistency option");
|
||||
}
|
||||
if (params.consistency && *params.consistency == data_dictionary::consistency_config_option::local) {
|
||||
throw exceptions::configuration_exception("Local consistency is not supported yet");
|
||||
if (params.consistency && *params.consistency == data_dictionary::consistency_config_option::global) {
|
||||
throw exceptions::configuration_exception("Global consistency is not supported yet");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ corrupt_data_handler::corrupt_data_handler(register_metrics rm) {
|
||||
_metrics.add_group("corrupt_data", {
|
||||
sm::make_counter("entries_reported", _stats.corrupt_data_reported,
|
||||
sm::description("Counts the number of corrupt data instances reported to the corrupt data handler. "
|
||||
"A non-zero value indicates that the database suffered data corruption.")).set_skip_when_empty()
|
||||
"A non-zero value indicates that the database suffered data corruption."))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,9 @@ future<> hint_endpoint_manager::do_store_hint(schema_ptr s, lw_shared_ptr<const
|
||||
size_t mut_size = fm->representation().size();
|
||||
shard_stats().size_of_hints_in_progress += mut_size;
|
||||
|
||||
co_await utils::get_local_injector().inject("slow_down_writing_hints", std::chrono::seconds(10));
|
||||
if (utils::get_local_injector().enter("slow_down_writing_hints")) {
|
||||
co_await seastar::sleep(std::chrono::seconds(10));
|
||||
}
|
||||
|
||||
try {
|
||||
const auto shared_lock = co_await get_shared_lock(file_update_mutex());
|
||||
|
||||
@@ -186,7 +186,7 @@ void manager::register_metrics(const sstring& group_name) {
|
||||
sm::description("Number of unexpected errors during sending, sending will be retried later")),
|
||||
|
||||
sm::make_counter("corrupted_files", _stats.corrupted_files,
|
||||
sm::description("Number of hints files that were discarded during sending because the file was corrupted.")).set_skip_when_empty(),
|
||||
sm::description("Number of hints files that were discarded during sending because the file was corrupted.")),
|
||||
|
||||
sm::make_gauge("pending_drains",
|
||||
sm::description("Number of tasks waiting in the queue for draining hints"),
|
||||
|
||||
@@ -206,7 +206,7 @@ void rate_limiter_base::register_metrics() {
|
||||
sm::description("Number of times a lookup returned an already allocated entry.")),
|
||||
|
||||
sm::make_counter("failed_allocations", _metrics.failed_allocations,
|
||||
sm::description("Number of times the rate limiter gave up trying to allocate.")).set_skip_when_empty(),
|
||||
sm::description("Number of times the rate limiter gave up trying to allocate.")),
|
||||
|
||||
sm::make_counter("probe_count", _metrics.probe_count,
|
||||
sm::description("Number of probes made during lookups.")),
|
||||
|
||||
@@ -174,7 +174,7 @@ cache_tracker::setup_metrics() {
|
||||
sm::make_counter("sstable_reader_recreations", sm::description("number of times sstable reader was recreated due to memtable flush"), _stats.underlying_recreations),
|
||||
sm::make_counter("sstable_partition_skips", sm::description("number of times sstable reader was fast forwarded across partitions"), _stats.underlying_partition_skips),
|
||||
sm::make_counter("sstable_row_skips", sm::description("number of times sstable reader was fast forwarded within a partition"), _stats.underlying_row_skips),
|
||||
sm::make_counter("pinned_dirty_memory_overload", sm::description("amount of pinned bytes that we tried to unpin over the limit. This should sit constantly at 0, and any number different than 0 is indicative of a bug"), _stats.pinned_dirty_memory_overload).set_skip_when_empty(),
|
||||
sm::make_counter("pinned_dirty_memory_overload", sm::description("amount of pinned bytes that we tried to unpin over the limit. This should sit constantly at 0, and any number different than 0 is indicative of a bug"), _stats.pinned_dirty_memory_overload),
|
||||
sm::make_counter("rows_processed_from_memtable", _stats.rows_processed_from_memtable,
|
||||
sm::description("total number of rows in memtables which were processed during cache update on memtable flush")),
|
||||
sm::make_counter("rows_dropped_from_memtable", _stats.rows_dropped_from_memtable,
|
||||
|
||||
@@ -144,7 +144,7 @@ static std::vector<sstring> get_keyspaces(const schema& s, const replica::databa
|
||||
/**
|
||||
* Makes a wrapping range of ring_position from a nonwrapping range of token, used to select sstables.
|
||||
*/
|
||||
static dht::partition_range as_ring_position_range(const dht::token_range& r) {
|
||||
static dht::partition_range as_ring_position_range(dht::token_range& r) {
|
||||
std::optional<wrapping_interval<dht::ring_position>::bound> start_bound, end_bound;
|
||||
if (r.start()) {
|
||||
start_bound = {{ dht::ring_position(r.start()->value(), dht::ring_position::token_bound::start), r.start()->is_inclusive() }};
|
||||
@@ -156,14 +156,11 @@ static dht::partition_range as_ring_position_range(const dht::token_range& r) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new range_estimates for the specified range, considering the sstables associated
|
||||
* with the table identified by `cf_id` across all shards.
|
||||
* Add a new range_estimates for the specified range, considering the sstables associated with `cf`.
|
||||
*/
|
||||
static future<system_keyspace::range_estimates> estimate(replica::database& db, table_id cf_id, schema_ptr schema, const token_range& r) {
|
||||
struct shard_estimate {
|
||||
int64_t count = 0;
|
||||
utils::estimated_histogram hist{0};
|
||||
};
|
||||
static future<system_keyspace::range_estimates> estimate(const replica::column_family& cf, const token_range& r) {
|
||||
int64_t count{0};
|
||||
utils::estimated_histogram hist{0};
|
||||
auto from_bytes = [] (auto& b) {
|
||||
return dht::token::from_sstring(utf8_type->to_string(b));
|
||||
};
|
||||
@@ -172,35 +169,14 @@ static future<system_keyspace::range_estimates> estimate(replica::database& db,
|
||||
wrapping_interval<dht::token>({{ from_bytes(r.start), false }}, {{ from_bytes(r.end) }}),
|
||||
dht::token_comparator(),
|
||||
[&] (auto&& rng) { ranges.push_back(std::move(rng)); });
|
||||
|
||||
// Estimate partition count and size distribution from sstables on a single shard.
|
||||
auto estimate_on_shard = [cf_id, ranges] (replica::database& local_db) -> future<shard_estimate> {
|
||||
auto table_ptr = local_db.get_tables_metadata().get_table_if_exists(cf_id);
|
||||
if (!table_ptr) {
|
||||
co_return shard_estimate{};
|
||||
for (auto&& r : ranges) {
|
||||
auto rp_range = as_ring_position_range(r);
|
||||
for (auto&& sstable : cf.select_sstables(rp_range)) {
|
||||
count += co_await sstable->estimated_keys_for_range(r);
|
||||
hist.merge(sstable->get_stats_metadata().estimated_partition_size);
|
||||
}
|
||||
auto& cf = *table_ptr;
|
||||
shard_estimate result;
|
||||
for (auto&& r : ranges) {
|
||||
auto rp_range = as_ring_position_range(r);
|
||||
for (auto&& sstable : cf.select_sstables(rp_range)) {
|
||||
result.count += co_await sstable->estimated_keys_for_range(r);
|
||||
result.hist.merge(sstable->get_stats_metadata().estimated_partition_size);
|
||||
}
|
||||
}
|
||||
co_return result;
|
||||
};
|
||||
|
||||
// Combine partial results from two shards.
|
||||
auto reduce = [] (shard_estimate a, const shard_estimate& b) {
|
||||
a.count += b.count;
|
||||
a.hist.merge(b.hist);
|
||||
return a;
|
||||
};
|
||||
|
||||
auto aggregate = co_await db.container().map_reduce0(std::move(estimate_on_shard), shard_estimate{}, std::move(reduce));
|
||||
int64_t mean_size = aggregate.count > 0 ? aggregate.hist.mean() : 0;
|
||||
co_return system_keyspace::range_estimates{std::move(schema), r.start, r.end, aggregate.count, mean_size};
|
||||
}
|
||||
co_return system_keyspace::range_estimates{cf.schema(), r.start, r.end, count, count > 0 ? hist.mean() : 0};
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -345,7 +321,7 @@ size_estimates_mutation_reader::estimates_for_current_keyspace(std::vector<token
|
||||
auto rows_to_estimate = range.slice(rows, virtual_row_comparator(_schema));
|
||||
for (auto&& r : rows_to_estimate) {
|
||||
auto& cf = _db.find_column_family(*_current_partition, utf8_type->to_string(r.cf_name));
|
||||
estimates.push_back(co_await estimate(_db, cf.schema()->id(), cf.schema(), r.tokens));
|
||||
estimates.push_back(co_await estimate(cf, r.tokens));
|
||||
if (estimates.size() >= _slice.partition_row_limit()) {
|
||||
co_return estimates;
|
||||
}
|
||||
|
||||
@@ -18,11 +18,8 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "db/snapshot-ctl.hh"
|
||||
#include "db/snapshot/backup_task.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/global_table_ptr.hh"
|
||||
#include "replica/schema_describe_helper.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
|
||||
@@ -157,56 +154,14 @@ future<> snapshot_ctl::do_take_cluster_column_family_snapshot(std::vector<sstrin
|
||||
);
|
||||
}
|
||||
|
||||
sstring snapshot_ctl::resolve_table_name(const sstring& ks_name, const sstring& name) const {
|
||||
try {
|
||||
_db.local().find_uuid(ks_name, name);
|
||||
return name;
|
||||
} catch (const data_dictionary::no_such_column_family&) {
|
||||
// The name may be a logical index name (e.g. "myindex").
|
||||
// Only indexes with a backing view have a separate backing table
|
||||
// that can be snapshotted. Custom indexes such as vector indexes
|
||||
// do not, so keep rejecting them here rather than mapping them to
|
||||
// a synthetic name.
|
||||
auto schema = _db.local().find_indexed_table(ks_name, name);
|
||||
if (schema) {
|
||||
const auto& im = schema->all_indices().at(name);
|
||||
if (db::schema_tables::view_should_exist(im)) {
|
||||
return secondary_index::index_table_name(name);
|
||||
}
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::do_take_column_family_snapshot(sstring ks_name, std::vector<sstring> tables, sstring tag, snapshot_options opts) {
|
||||
for (auto& t : tables) {
|
||||
t = resolve_table_name(ks_name, t);
|
||||
}
|
||||
co_await check_snapshot_not_exist(ks_name, tag, tables);
|
||||
co_await replica::database::snapshot_tables_on_all_shards(_db, ks_name, std::move(tables), std::move(tag), opts);
|
||||
}
|
||||
|
||||
future<> snapshot_ctl::clear_snapshot(sstring tag, std::vector<sstring> keyspace_names, sstring cf_name) {
|
||||
co_return co_await run_snapshot_modify_operation([this, tag = std::move(tag), keyspace_names = std::move(keyspace_names), cf_name = std::move(cf_name)] (this auto) -> future<> {
|
||||
// clear_snapshot enumerates keyspace_names and uses cf_name as a
|
||||
// filter in each. When cf_name needs resolution (e.g. logical index
|
||||
// name -> backing table name), the result may differ per keyspace,
|
||||
// so resolve and clear individually.
|
||||
if (!cf_name.empty() && !keyspace_names.empty()) {
|
||||
std::vector<std::pair<sstring, sstring>> resolved_targets;
|
||||
resolved_targets.reserve(keyspace_names.size());
|
||||
|
||||
// Resolve every keyspace first so a later failure doesn't delete
|
||||
// snapshots that were already matched in earlier keyspaces.
|
||||
for (const auto& ks_name : keyspace_names) {
|
||||
resolved_targets.emplace_back(ks_name, resolve_table_name(ks_name, cf_name));
|
||||
}
|
||||
for (auto& [ks_name, resolved_cf_name] : resolved_targets) {
|
||||
co_await _db.local().clear_snapshot(tag, {ks_name}, std::move(resolved_cf_name));
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
co_await _db.local().clear_snapshot(std::move(tag), std::move(keyspace_names), cf_name);
|
||||
return run_snapshot_modify_operation([this, tag = std::move(tag), keyspace_names = std::move(keyspace_names), cf_name = std::move(cf_name)] {
|
||||
return _db.local().clear_snapshot(tag, keyspace_names, cf_name);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -215,26 +170,7 @@ snapshot_ctl::get_snapshot_details() {
|
||||
using snapshot_map = std::unordered_map<sstring, db_snapshot_details>;
|
||||
|
||||
co_return co_await run_snapshot_list_operation(coroutine::lambda([this] () -> future<snapshot_map> {
|
||||
auto details = co_await _db.local().get_snapshot_details();
|
||||
|
||||
for (auto& [snapshot_name, snapshot_details] : details) {
|
||||
for (auto& table : snapshot_details) {
|
||||
auto schema = _db.local().as_data_dictionary().try_find_table(
|
||||
table.ks, table.cf);
|
||||
if (!schema || !schema->schema()->is_view()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto helper = replica::make_schema_describe_helper(
|
||||
schema->schema(), _db.local().as_data_dictionary());
|
||||
if (helper.type == schema_describe_helper::type::index) {
|
||||
table.cf = secondary_index::index_name_from_table_name(
|
||||
table.cf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
co_return details;
|
||||
return _db.local().get_snapshot_details();
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -299,4 +235,4 @@ future<int64_t> snapshot_ctl::true_snapshots_size(sstring ks, sstring cf) {
|
||||
}));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -133,12 +133,6 @@ private:
|
||||
|
||||
future<> check_snapshot_not_exist(sstring ks_name, sstring name, std::optional<std::vector<sstring>> filter = {});
|
||||
|
||||
// Resolve a user-provided table name that may be a logical index name
|
||||
// (e.g. "myindex") to its backing column family name (e.g.
|
||||
// "myindex_index"). Returns the name unchanged if it already
|
||||
// matches a column family.
|
||||
sstring resolve_table_name(const sstring& ks_name, const sstring& name) const;
|
||||
|
||||
future<> run_snapshot_modify_operation(noncopyable_function<future<>()> &&);
|
||||
|
||||
template <typename Func>
|
||||
@@ -157,4 +151,4 @@ private:
|
||||
future<> do_take_cluster_column_family_snapshot(std::vector<sstring> ks_names, std::vector<sstring> tables, sstring tag, snapshot_options opts = {});
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
@@ -281,7 +281,6 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("cleanup_status", utf8_type)
|
||||
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("request_id", timeuuid_type)
|
||||
.with_column("intended_storage_mode", utf8_type)
|
||||
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
|
||||
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
|
||||
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column) // deprecated
|
||||
@@ -324,7 +323,6 @@ schema_ptr system_keyspace::topology_requests() {
|
||||
.with_column("snapshot_tag", utf8_type)
|
||||
.with_column("snapshot_expiry", timestamp_type)
|
||||
.with_column("snapshot_skip_flush", boolean_type)
|
||||
.with_column("finalize_migration_ks_name", utf8_type)
|
||||
.set_comment("Topology request tracking")
|
||||
.with_hash_version()
|
||||
.build();
|
||||
@@ -3171,11 +3169,6 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<service::intended_storage_mode> storage_mode;
|
||||
if (row.has("intended_storage_mode")) {
|
||||
storage_mode = service::intended_storage_mode_from_string(row.get_as<sstring>("intended_storage_mode"));
|
||||
}
|
||||
|
||||
std::unordered_map<raft::server_id, service::replica_state>* map = nullptr;
|
||||
if (nstate == service::node_state::normal) {
|
||||
map = &ret.normal_nodes;
|
||||
@@ -3200,7 +3193,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
|
||||
map->emplace(host_id, service::replica_state{
|
||||
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
||||
ring_slice, shard_count, ignore_msb, std::move(supported_features),
|
||||
service::cleanup_status_from_string(cleanup_status), request_id, storage_mode});
|
||||
service::cleanup_status_from_string(cleanup_status), request_id});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3513,9 +3506,6 @@ system_keyspace::topology_requests_entry system_keyspace::topology_request_row_t
|
||||
entry.snapshot_expiry = row.get_as<db_clock::time_point>("snapshot_expiry");
|
||||
}
|
||||
}
|
||||
if (row.has("finalize_migration_ks_name")) {
|
||||
entry.finalize_migration_ks_name = row.get_as<sstring>("finalize_migration_ks_name");
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
@@ -427,7 +427,6 @@ public:
|
||||
std::optional<sstring> snapshot_tag;
|
||||
std::optional<db_clock::time_point> snapshot_expiry;
|
||||
bool snapshot_skip_flush;
|
||||
std::optional<sstring> finalize_migration_ks_name;
|
||||
};
|
||||
using topology_requests_entries = std::unordered_map<utils::UUID, system_keyspace::topology_requests_entry>;
|
||||
|
||||
|
||||
@@ -143,18 +143,10 @@ dht::token_range view_building_worker::get_tablet_token_range(table_id table_id,
|
||||
}
|
||||
|
||||
future<> view_building_worker::drain() {
|
||||
auto drain_started = std::exchange(_drain_started, started_drain::yes);
|
||||
if (drain_started == started_drain::no) {
|
||||
_drain_finished = shared_future(do_drain());
|
||||
}
|
||||
return _drain_finished.get_future();
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_drain() {
|
||||
if (!_as.abort_requested()) {
|
||||
_as.request_abort();
|
||||
}
|
||||
co_await _staging_sstables_mutex.wait();
|
||||
_state._mutex.broken();
|
||||
_staging_sstables_mutex.broken();
|
||||
_sstables_to_register_event.broken();
|
||||
if (this_shard_id() == 0) {
|
||||
@@ -164,9 +156,7 @@ future<> view_building_worker::do_drain() {
|
||||
co_await std::move(state_observer);
|
||||
co_await _mnotifier.unregister_listener(this);
|
||||
}
|
||||
co_await _state._mutex.wait();
|
||||
_state._mutex.broken();
|
||||
co_await _state.drain();
|
||||
co_await _state.clear();
|
||||
co_await uninit_messaging_service();
|
||||
}
|
||||
|
||||
@@ -210,7 +200,9 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
while (!_as.abort_requested()) {
|
||||
bool sleep = false;
|
||||
try {
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
co_await create_staging_sstable_tasks();
|
||||
lock.return_all();
|
||||
_as.check();
|
||||
co_await _sstables_to_register_event.when();
|
||||
} catch (semaphore_aborted&) {
|
||||
@@ -235,45 +227,13 @@ future<> view_building_worker::run_staging_sstables_registrator() {
|
||||
}
|
||||
}
|
||||
|
||||
future<std::vector<foreign_ptr<semaphore_units<>>>> view_building_worker::lock_staging_mutex_on_multiple_shards(std::flat_set<shard_id> shards) {
|
||||
SCYLLA_ASSERT(this_shard_id() == 0);
|
||||
// Collect `_staging_sstables_mutex` locks from multiple shards,
|
||||
// so other shards won't interact with their `_staging_sstables` map
|
||||
// until the caller releases them.
|
||||
std::vector<foreign_ptr<semaphore_units<>>> locks;
|
||||
locks.resize(smp::count);
|
||||
// Locks are acquired from multiple shards in parallel.
|
||||
// This is the only place where multiple-shard locks are acquired at once
|
||||
// and the method is called only once at a time (from `create_staging_sstable_tasks()`
|
||||
// on shard 0), so no deadlock may occur.
|
||||
co_await coroutine::parallel_for_each(shards, [&locks, &sharded_vbw = container()] (auto shard_id) -> future<> {
|
||||
auto lock_ptr = co_await smp::submit_to(shard_id, [&sharded_vbw] () -> future<foreign_ptr<semaphore_units<>>> {
|
||||
auto& vbw = sharded_vbw.local();
|
||||
auto lock = co_await get_units(vbw._staging_sstables_mutex, 1, vbw._as);
|
||||
co_return make_foreign(std::move(lock));
|
||||
});
|
||||
locks[shard_id] = std::move(lock_ptr);
|
||||
});
|
||||
co_return std::move(locks);
|
||||
}
|
||||
|
||||
future<> view_building_worker::create_staging_sstable_tasks() {
|
||||
// Explicitly lock shard0 beforehand to prevent other shards from modifying `_sstables_to_register` from `register_staging_sstable_tasks()`
|
||||
auto lock0 = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
|
||||
if (_sstables_to_register.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto shards = _sstables_to_register
|
||||
| std::views::values
|
||||
| std::views::join
|
||||
| std::views::transform([] (const auto& sst_info) { return sst_info.shard; })
|
||||
| std::ranges::to<std::flat_set<shard_id>>();
|
||||
shards.erase(0); // We're already holding shard0 lock
|
||||
auto locks = co_await lock_staging_mutex_on_multiple_shards(std::move(shards));
|
||||
|
||||
utils::chunked_vector<canonical_mutation> cmuts;
|
||||
|
||||
auto guard = co_await _group0.client().start_operation(_as);
|
||||
auto my_host_id = _db.get_token_metadata().get_topology().my_host_id();
|
||||
for (auto& [table_id, sst_infos]: _sstables_to_register) {
|
||||
@@ -500,16 +460,6 @@ static std::unordered_set<table_id> get_ids_of_all_views(replica::database& db,
|
||||
}) | std::ranges::to<std::unordered_set>();;
|
||||
}
|
||||
|
||||
void view_building_worker::state::start_batch(std::unique_ptr<batch> batch) {
|
||||
if (_drained) {
|
||||
on_internal_error(vbw_logger, "view_building_worker::state was already drained");
|
||||
} else if (_batch) {
|
||||
on_internal_error(vbw_logger, fmt::format("view_building_worker::state::start_batch(): some batch (tasks: {}) is already running", _batch->tasks | std::views::keys));
|
||||
}
|
||||
_batch = std::move(batch);
|
||||
_batch->start();
|
||||
}
|
||||
|
||||
// If `state::processing_base_table` is different that the `view_building_state::currently_processed_base_table`,
|
||||
// clear the state, save and flush new base table
|
||||
future<> view_building_worker::state::update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as) {
|
||||
@@ -535,10 +485,6 @@ future<> view_building_worker::state::clean_up_after_batch() {
|
||||
|
||||
// Flush base table, set is as currently processing base table and save which views exist at the time of flush
|
||||
future<> view_building_worker::state::flush_base_table(replica::database& db, table_id base_table_id, abort_source& as) {
|
||||
if (_drained) {
|
||||
on_internal_error(vbw_logger, "view_building_worker::state was already drained");
|
||||
}
|
||||
|
||||
auto cf = db.find_column_family(base_table_id).shared_from_this();
|
||||
co_await when_all(cf->await_pending_writes(), cf->await_pending_streams());
|
||||
co_await flush_base(cf, as);
|
||||
@@ -557,11 +503,6 @@ future<> view_building_worker::state::clear() {
|
||||
flushed_views.clear();
|
||||
}
|
||||
|
||||
future<> view_building_worker::state::drain() {
|
||||
_drained = true;
|
||||
co_await clear();
|
||||
}
|
||||
|
||||
view_building_worker::batch::batch(sharded<view_building_worker>& vbw, std::unordered_map<utils::UUID, view_building_task> tasks, table_id base_id, locator::tablet_replica replica)
|
||||
: base_id(base_id)
|
||||
, replica(replica)
|
||||
@@ -726,34 +667,24 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
|
||||
}
|
||||
|
||||
future<> view_building_worker::do_process_staging(table_id table_id, dht::token last_token) {
|
||||
if (_staging_sstables[table_id].empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto table = _db.get_tables_metadata().get_table(table_id).shared_from_this();
|
||||
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
|
||||
auto tid = tablet_map.get_tablet_id(last_token);
|
||||
auto tablet_range = tablet_map.get_token_range(tid);
|
||||
|
||||
// Select sstables belonging to the tablet (identified by `last_token`)
|
||||
std::vector<sstables::shared_sstable> sstables_to_process;
|
||||
|
||||
try {
|
||||
// Acquire `_staging_sstables_mutex` to prevent `create_staging_sstable_tasks()` from
|
||||
// concurrently modifying `_staging_sstables` (moving entries from `_sstables_to_register`)
|
||||
// while we read them.
|
||||
auto lock = co_await get_units(_staging_sstables_mutex, 1, _as);
|
||||
auto& tablet_map = table->get_effective_replication_map()->get_token_metadata().tablets().get_tablet_map(table_id);
|
||||
auto tid = tablet_map.get_tablet_id(last_token);
|
||||
auto tablet_range = tablet_map.get_token_range(tid);
|
||||
|
||||
// Select sstables belonging to the tablet (identified by `last_token`)
|
||||
for (auto& sst: _staging_sstables[table_id]) {
|
||||
auto sst_last_token = sst->get_last_decorated_key().token();
|
||||
if (tablet_range.contains(sst_last_token, dht::token_comparator())) {
|
||||
sstables_to_process.push_back(sst);
|
||||
}
|
||||
for (auto& sst: _staging_sstables[table_id]) {
|
||||
auto sst_last_token = sst->get_last_decorated_key().token();
|
||||
if (tablet_range.contains(sst_last_token, dht::token_comparator())) {
|
||||
sstables_to_process.push_back(sst);
|
||||
}
|
||||
lock.return_all();
|
||||
} catch (semaphore_aborted&) {
|
||||
vbw_logger.warn("Semaphore was aborted while waiting to removed processed sstables for table {}", table_id);
|
||||
co_return;
|
||||
}
|
||||
|
||||
if (sstables_to_process.empty()) {
|
||||
co_return;
|
||||
}
|
||||
co_await _vug.process_staging_sstables(std::move(table), sstables_to_process);
|
||||
|
||||
try {
|
||||
@@ -868,8 +799,8 @@ future<std::vector<utils::UUID>> view_building_worker::work_on_tasks(raft::term_
|
||||
}
|
||||
|
||||
// Create and start the batch
|
||||
auto batch = std::make_unique<view_building_worker::batch>(container(), std::move(tasks), *building_state.currently_processed_base_table, my_replica);
|
||||
_state.start_batch(std::move(batch));
|
||||
_state._batch = std::make_unique<batch>(container(), std::move(tasks), *building_state.currently_processed_base_table, my_replica);
|
||||
_state._batch->start();
|
||||
}
|
||||
|
||||
if (std::ranges::all_of(ids, [&] (auto& id) { return !_state._batch->tasks.contains(id); })) {
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <flat_set>
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "raft/raft.hh"
|
||||
@@ -99,14 +98,11 @@ class view_building_worker : public seastar::peering_sharded_service<view_buildi
|
||||
std::unordered_set<table_id> flushed_views;
|
||||
|
||||
semaphore _mutex = semaphore(1);
|
||||
bool _drained = false;
|
||||
// All of the methods below should be executed while holding `_mutex` unit!
|
||||
void start_batch(std::unique_ptr<batch> batch);
|
||||
future<> update_processing_base_table(replica::database& db, const view_building_state& building_state, abort_source& as);
|
||||
future<> flush_base_table(replica::database& db, table_id base_table_id, abort_source& as);
|
||||
future<> clean_up_after_batch();
|
||||
future<> clear();
|
||||
future<> drain();
|
||||
};
|
||||
|
||||
// Wrapper which represents information needed to create
|
||||
@@ -173,24 +169,14 @@ private:
|
||||
future<> do_process_staging(table_id base_id, dht::token last_token);
|
||||
|
||||
future<> run_staging_sstables_registrator();
|
||||
// Acquires `_staging_sstables_mutex` on all shards internally,
|
||||
// so callers must not hold `_staging_sstables_mutex` when invoking it.
|
||||
// Caller must hold units from `_staging_sstables_mutex`
|
||||
future<> create_staging_sstable_tasks();
|
||||
future<> discover_existing_staging_sstables();
|
||||
std::unordered_map<table_id, std::vector<staging_sstable_task_info>> discover_local_staging_sstables(building_tasks building_tasks);
|
||||
// Acquire `_staging_sstables_mutex` on multiple shards in parallel.
|
||||
// Must be called only from shard 0.
|
||||
// Must be called ONLY by `create_staging_sstable_tasks()` and only once at a time to avoid deadlock.
|
||||
future<std::vector<foreign_ptr<semaphore_units<>>>> lock_staging_mutex_on_multiple_shards(std::flat_set<shard_id> shards);
|
||||
|
||||
void init_messaging_service();
|
||||
future<> uninit_messaging_service();
|
||||
future<std::vector<utils::UUID>> work_on_tasks(raft::term_t term, std::vector<utils::UUID> ids);
|
||||
|
||||
using started_drain = bool_class<struct started_drain_tag>;
|
||||
started_drain _drain_started = started_drain::no;
|
||||
shared_future<> _drain_finished;
|
||||
future<> do_drain();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -99,7 +99,7 @@ public:
|
||||
|
||||
set_cell(cr, "up", gossiper.is_alive(hostid));
|
||||
if (gossiper.is_shutdown(endpoint)) {
|
||||
set_cell(cr, "status", "shutdown");
|
||||
set_cell(cr, "status", gossiper.get_gossip_status(endpoint));
|
||||
} else {
|
||||
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
|
||||
}
|
||||
@@ -224,12 +224,12 @@ public:
|
||||
}
|
||||
|
||||
if (_db.find_keyspace(e.name).get_replication_strategy().uses_tablets()) {
|
||||
co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id tid, lw_shared_ptr<replica::table> table) -> future<> {
|
||||
co_await _db.get_tables_metadata().for_each_table_gently([&, this] (table_id, lw_shared_ptr<replica::table> table) -> future<> {
|
||||
if (table->schema()->ks_name() != e.name) {
|
||||
co_return;
|
||||
}
|
||||
const auto& table_name = table->schema()->cf_name();
|
||||
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring_for_table(tid);
|
||||
utils::chunked_vector<dht::token_range_endpoints> ranges = co_await _ss.describe_ring_for_table(e.name, table_name);
|
||||
co_await emit_ring(result, e.key, table_name, std::move(ranges));
|
||||
});
|
||||
} else {
|
||||
|
||||
28
dist/common/scripts/scylla_swap_setup
vendored
28
dist/common/scripts/scylla_swap_setup
vendored
@@ -9,7 +9,6 @@
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shlex
|
||||
import argparse
|
||||
import psutil
|
||||
from pathlib import Path
|
||||
@@ -104,41 +103,16 @@ if __name__ == '__main__':
|
||||
run('dd if=/dev/zero of={} bs=1M count={}'.format(swapfile, swapsize_mb), shell=True, check=True)
|
||||
swapfile.chmod(0o600)
|
||||
run('mkswap -f {}'.format(swapfile), shell=True, check=True)
|
||||
|
||||
mount_point = find_mount_point(swap_directory)
|
||||
mount_unit = out(f'systemd-escape -p --suffix=mount {shlex.quote(str(mount_point))}')
|
||||
|
||||
# Add DefaultDependencies=no to the swap unit to avoid getting the default
|
||||
# Before=swap.target dependency. We apply this to all clouds, but the
|
||||
# requirement came from Azure:
|
||||
#
|
||||
# On Azure, the swap directory is on the Azure ephemeral disk (mounted on /mnt).
|
||||
# However, cloud-init makes this mount (i.e., the mnt.mount unit) depend on
|
||||
# the network (After=network-online.target). By extension, this means that
|
||||
# the swap unit depends on the network. If we didn't use DefaultDependencies=no,
|
||||
# then the swap unit would be part of the swap.target which other services
|
||||
# assume to be a local boot target, so we would end up with dependency cycles
|
||||
# such as:
|
||||
#
|
||||
# swap.target -> mnt-swapfile.swap -> mnt.mount -> network-online.target -> network.target -> systemd-resolved.service -> tmp.mount -> swap.target
|
||||
#
|
||||
# By removing the automatic Before=swap.target, the swap unit is no longer
|
||||
# part of swap.target, avoiding such cycles. The swap will still be
|
||||
# activated via WantedBy=multi-user.target.
|
||||
unit_data = '''
|
||||
[Unit]
|
||||
Description=swapfile
|
||||
DefaultDependencies=no
|
||||
After={}
|
||||
Conflicts=umount.target
|
||||
Before=umount.target
|
||||
|
||||
[Swap]
|
||||
What={}
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
'''[1:-1].format(mount_unit, swapfile)
|
||||
'''[1:-1].format(swapfile)
|
||||
with swapunit.open('w') as f:
|
||||
f.write(unit_data)
|
||||
systemd_unit.reload()
|
||||
|
||||
@@ -1,12 +1,6 @@
|
||||
### a dictionary of redirections
|
||||
#old path: new path
|
||||
|
||||
# Move the Upgrade Support (About Upgrade) page
|
||||
|
||||
/stable/upgrade/about-upgrade.html: https://docs.scylladb.com/stable/versioning/upgrade-policy.html
|
||||
/branch-2025.4/upgrade/about-upgrade.html: https://docs.scylladb.com/stable/versioning/upgrade-policy.html
|
||||
/branch-2026.1/upgrade/about-upgrade.html: https://docs.scylladb.com/stable/versioning/upgrade-policy.html
|
||||
|
||||
# Move the OS Support page
|
||||
|
||||
/stable/getting-started/os-support.html: https://docs.scylladb.com/stable/versioning/os-support-per-version.html
|
||||
|
||||
@@ -31,7 +31,7 @@ was used. Alternator currently supports two compression algorithms, `gzip`
|
||||
and `deflate`, both standardized in ([RFC 9110](https://www.rfc-editor.org/rfc/rfc9110.html)).
|
||||
Other standard compression types which are listed in
|
||||
[IANA's HTTP Content Coding Registry](https://www.iana.org/assignments/http-parameters/http-parameters.xhtml#content-coding),
|
||||
including `zstd` ([RFC 8878](https://www.rfc-editor.org/rfc/rfc8878.html)),
|
||||
including `zstd` ([RFC 8878][https://www.rfc-editor.org/rfc/rfc8878.html]),
|
||||
are not yet supported by Alternator.
|
||||
|
||||
Note that HTTP's compression only compresses the request's _body_ - not the
|
||||
|
||||
@@ -261,51 +261,8 @@ The following options are supported for vector indexes. All of them are optional
|
||||
| | * ``true``: Enable rescoring. | |
|
||||
| | * ``false``: Disable rescoring. | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
| ``source_model`` | The name of the embedding model that produced the vectors (e.g., ``"ada002"``). Cassandra client | *(none)* |
|
||||
| | libraries such as CassIO send this option to tag the index with the model. Cassandra SAI rejects it as | |
|
||||
| | an unrecognized property; ScyllaDB accepts and preserves it in ``DESCRIBE`` output for compatibility | |
|
||||
| | with those libraries, but does not act on it. | |
|
||||
+------------------------------+----------------------------------------------------------------------------------------------------------+---------------+
|
||||
|
||||
|
||||
.. _cassandra-sai-compatibility:
|
||||
|
||||
Cassandra SAI Compatibility for Vector Search
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
ScyllaDB accepts the Cassandra ``StorageAttachedIndex`` (SAI) class name in ``CREATE CUSTOM INDEX``
|
||||
statements **for vector columns**. Cassandra libraries such as
|
||||
`CassIO <https://cassio.org/>`_ and `LangChain <https://www.langchain.com/>`_ use SAI to create
|
||||
vector indexes; ScyllaDB recognizes these statements for compatibility.
|
||||
|
||||
When ScyllaDB encounters an SAI class name on a **vector column**, the index is automatically
|
||||
created as a native ``vector_index``. The following class names are recognized:
|
||||
|
||||
* ``org.apache.cassandra.index.sai.StorageAttachedIndex`` (exact case required)
|
||||
* ``StorageAttachedIndex`` (case-insensitive)
|
||||
* ``SAI`` (case-insensitive)
|
||||
|
||||
Example::
|
||||
|
||||
-- Cassandra SAI statement accepted by ScyllaDB:
|
||||
CREATE CUSTOM INDEX ON my_table (embedding)
|
||||
USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE'};
|
||||
|
||||
-- Equivalent to:
|
||||
CREATE CUSTOM INDEX ON my_table (embedding)
|
||||
USING 'vector_index'
|
||||
WITH OPTIONS = {'similarity_function': 'COSINE'};
|
||||
|
||||
The ``similarity_function`` option is supported by both Cassandra SAI and ScyllaDB.
|
||||
|
||||
.. note::
|
||||
|
||||
SAI class names are only supported on **vector columns**. Using an SAI class name on a
|
||||
non-vector column (e.g., ``text`` or ``int``) will result in an error. General SAI
|
||||
indexing of non-vector columns is not supported by ScyllaDB; use a
|
||||
:doc:`secondary index </cql/secondary-indexes>` instead.
|
||||
|
||||
.. _drop-index-statement:
|
||||
|
||||
DROP INDEX
|
||||
|
||||
111
docs/dev/audit.md
Normal file
111
docs/dev/audit.md
Normal file
@@ -0,0 +1,111 @@
|
||||
# Introduction
|
||||
|
||||
Similar to the approach described in CASSANDRA-12151, we add the
|
||||
concept of an audit specification. An audit has a target (syslog or a
|
||||
table) and a set of events/actions that it wants recorded. We
|
||||
introduce new CQL syntax for Scylla users to describe and manipulate
|
||||
audit specifications.
|
||||
|
||||
Prior art:
|
||||
- Microsoft SQL Server [audit
|
||||
description](https://docs.microsoft.com/en-us/sql/relational-databases/security/auditing/sql-server-audit-database-engine?view=sql-server-ver15)
|
||||
- pgAudit [docs](https://github.com/pgaudit/pgaudit/blob/master/README.md)
|
||||
- MySQL audit_log docs in
|
||||
[MySQL](https://dev.mysql.com/doc/refman/8.0/en/audit-log.html) and
|
||||
[Azure](https://docs.microsoft.com/en-us/azure/mysql/concepts-audit-logs)
|
||||
- DynamoDB can [use CloudTrail](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/logging-using-cloudtrail.html) to log all events
|
||||
|
||||
# CQL extensions
|
||||
|
||||
## Create an audit
|
||||
|
||||
```cql
|
||||
CREATE AUDIT [IF NOT EXISTS] audit-name WITH TARGET { SYSLOG | table-name }
|
||||
[ AND TRIGGER KEYSPACE IN (ks1, ks2, ks3) ]
|
||||
[ AND TRIGGER TABLE IN (tbl1, tbl2, tbl3) ]
|
||||
[ AND TRIGGER ROLE IN (usr1, usr2, usr3) ]
|
||||
[ AND TRIGGER CATEGORY IN (cat1, cat2, cat3) ]
|
||||
;
|
||||
```
|
||||
|
||||
From this point on, every database event that matches all present
|
||||
triggers will be recorded in the target. When the target is a table,
|
||||
it behaves like the [current
|
||||
design](https://docs.scylladb.com/operating-scylla/security/auditing/#table-storage).
|
||||
|
||||
The audit name must be different from all other audits, unless IF NOT
|
||||
EXISTS precedes it, in which case the existing audit must be identical
|
||||
to the new definition. Case sensitivity and length limit are the same
|
||||
as for table names.
|
||||
|
||||
A trigger kind (ie, `KEYSPACE`, `TABLE`, `ROLE`, or `CATEGORY`) can be
|
||||
specified at most once.
|
||||
|
||||
## Show an audit
|
||||
|
||||
```cql
|
||||
DESCRIBE AUDIT [audit-name ...];
|
||||
```
|
||||
|
||||
Prints definitions of all audits named herein. If no names are
|
||||
provided, prints all audits.
|
||||
|
||||
## Delete an audit
|
||||
|
||||
```cql
|
||||
DROP AUDIT audit-name;
|
||||
```
|
||||
|
||||
Stops logging events specified by this audit. Doesn't impact the
|
||||
already logged events. If the target is a table, it remains as it is.
|
||||
|
||||
## Alter an audit
|
||||
|
||||
```cql
|
||||
ALTER AUDIT audit-name WITH {same syntax as CREATE}
|
||||
```
|
||||
|
||||
Any trigger provided will be updated (or newly created, if previously
|
||||
absent). To drop a trigger, use `IN *`.
|
||||
|
||||
## Permissions
|
||||
|
||||
Only superusers can modify audits or turn them on and off.
|
||||
|
||||
Only superusers can read tables that are audit targets; no user can
|
||||
modify them. Only superusers can drop tables that are audit targets,
|
||||
after the audit itself is dropped. If a superuser doesn't drop a
|
||||
target table, it remains in existence indefinitely.
|
||||
|
||||
# Implementation
|
||||
|
||||
## Efficient trigger evaluation
|
||||
|
||||
```c++
|
||||
namespace audit {
|
||||
|
||||
/// Stores triggers from an AUDIT statement.
|
||||
class triggers {
|
||||
// Use trie structures for speedy string lookup.
|
||||
optional<trie> _ks_trigger, _tbl_trigger, _usr_trigger;
|
||||
|
||||
// A logical-AND filter.
|
||||
optional<unsigned> _cat_trigger;
|
||||
|
||||
public:
|
||||
/// True iff every non-null trigger matches the corresponding ainf element.
|
||||
bool should_audit(const audit_info& ainf);
|
||||
};
|
||||
|
||||
} // namespace audit
|
||||
```
|
||||
|
||||
To prevent modification of target tables, `audit::inspect()` will
|
||||
check the statement and throw if it is disallowed, similar to what
|
||||
`check_access()` currently does.
|
||||
|
||||
## Persisting audit definitions
|
||||
|
||||
Obviously, an audit definition must survive a server restart and stay
|
||||
consistent among all nodes in a cluster. We'll accomplish both by
|
||||
storing audits in a system table.
|
||||
@@ -1,155 +0,0 @@
|
||||
# Comparing Build Systems: configure.py vs CMake
|
||||
|
||||
ScyllaDB has two build systems: the primary `configure.py` + Ninja pipeline
|
||||
and an alternative CMake build (used mainly for IDE integration — CLion,
|
||||
clangd, etc.). Both must produce equivalent compilation and link commands.
|
||||
|
||||
`scripts/compare_build_systems.py` verifies this by parsing the `build.ninja`
|
||||
files generated by each system and comparing:
|
||||
|
||||
1. **Per-file compilation flags** — defines, warnings, optimization, language
|
||||
flags for every Scylla source file.
|
||||
2. **Link target sets** — are the same executables produced by both systems?
|
||||
3. **Per-target linker settings** — link flags and libraries for every common
|
||||
executable.
|
||||
|
||||
`configure.py` is treated as the baseline. CMake should match it.
|
||||
|
||||
## Quick start
|
||||
|
||||
```bash
|
||||
# Compare a single mode
|
||||
scripts/compare_build_systems.py -m dev
|
||||
|
||||
# Compare all modes
|
||||
scripts/compare_build_systems.py
|
||||
|
||||
# Verbose output — show per-file and per-target differences
|
||||
scripts/compare_build_systems.py -m debug -v
|
||||
```
|
||||
|
||||
The script automatically configures both build systems into a temporary
|
||||
directory for every run — the user's existing build tree is never touched.
|
||||
No manual `configure.py` or `cmake` invocation is required.
|
||||
|
||||
## Mode mapping
|
||||
|
||||
| configure.py | CMake |
|
||||
|--------------|------------------|
|
||||
| `debug` | `Debug` |
|
||||
| `dev` | `Dev` |
|
||||
| `release` | `RelWithDebInfo` |
|
||||
| `sanitize` | `Sanitize` |
|
||||
| `coverage` | `Coverage` |
|
||||
|
||||
## Examples
|
||||
|
||||
```bash
|
||||
# Check dev mode only (fast, most common during development)
|
||||
scripts/compare_build_systems.py -m dev
|
||||
|
||||
# Check all modes
|
||||
scripts/compare_build_systems.py
|
||||
|
||||
# CI mode: quiet, strict (exit 1 on any diff)
|
||||
scripts/compare_build_systems.py --ci
|
||||
|
||||
# Verbose output for debugging a specific mode
|
||||
scripts/compare_build_systems.py -m sanitize -v
|
||||
|
||||
# Quiet mode — only prints summary and errors
|
||||
scripts/compare_build_systems.py -m dev -q
|
||||
```
|
||||
|
||||
## Exit codes
|
||||
|
||||
| Code | Meaning |
|
||||
|------|--------------------------------------------------------------------------|
|
||||
| `0` | All checked modes match |
|
||||
| `1` | Differences found |
|
||||
| `2` | Configuration failure or some modes could not be compared (e.g. skipped) |
|
||||
|
||||
## What it ignores
|
||||
|
||||
The script intentionally ignores certain structural differences that are
|
||||
inherent to how the two build systems work:
|
||||
|
||||
- **Include paths** (`-I`, `-isystem`) — directory layout differs between
|
||||
the two systems.
|
||||
- **LTO/PGO flags** — these are configuration-dependent options, not
|
||||
mode-inherent.
|
||||
- **Internal library targets** — CMake creates intermediate static/shared
|
||||
libraries (e.g., `scylla-main`, `test-lib`, abseil targets) while
|
||||
`configure.py` links `.o` files directly.
|
||||
- **Per-component Boost defines** — CMake adds `BOOST_REGEX_DYN_LINK` etc.
|
||||
per component; `configure.py` uses a single `BOOST_ALL_DYN_LINK`.
|
||||
|
||||
## Typical workflow
|
||||
|
||||
After modifying `CMakeLists.txt` or `cmake/mode.*.cmake`:
|
||||
|
||||
```bash
|
||||
# 1. Run the comparison (auto-configures both systems in a temp dir)
|
||||
scripts/compare_build_systems.py -m dev -v
|
||||
|
||||
# 2. Fix any differences, repeat
|
||||
```
|
||||
|
||||
## AI agent workflow
|
||||
|
||||
When the script reports mismatches, you can paste its summary output into
|
||||
an AI coding agent (GitHub Copilot, etc.) and ask it to fix the
|
||||
discrepancies. The agent has access to both `configure.py` and the
|
||||
CMake files and can resolve most differences automatically.
|
||||
|
||||
### Example interaction
|
||||
|
||||
**1. Run the script:**
|
||||
|
||||
```bash
|
||||
scripts/compare_build_systems.py
|
||||
```
|
||||
|
||||
**2. Copy the summary and paste it to the agent:**
|
||||
|
||||
> I ran `scripts/compare_build_systems.py` and got:
|
||||
>
|
||||
> ```
|
||||
> Summary
|
||||
> ══════════════════════════════════════════════════════════════════════
|
||||
> debug (CMake: Debug ): ✗ MISMATCH
|
||||
> Compilation: 3 files with flag diffs, 1 sources only in configure.py
|
||||
> only-configure.py defines: -DSOME_FLAG (3 files)
|
||||
> Link targets: 1 only in configure.py
|
||||
> Linker: 2 targets with lib diffs
|
||||
> lib only in CMake: boost_filesystem (2 targets)
|
||||
> dev (CMake: Dev ): ✗ MISMATCH
|
||||
> Compilation: 1 sources only in configure.py
|
||||
> Link targets: 1 only in configure.py
|
||||
> release (CMake: RelWithDebInfo ): ✓ MATCH
|
||||
> sanitize (CMake: Sanitize ): ✓ MATCH
|
||||
> coverage (CMake: Coverage ): ✓ MATCH
|
||||
> ```
|
||||
>
|
||||
> Please fix all issues and commit according to project guidelines.
|
||||
|
||||
**3. The agent will:**
|
||||
|
||||
- Identify each discrepancy (missing sources, missing targets, extra
|
||||
libraries, missing defines).
|
||||
- Trace root causes — e.g., a test added to `configure.py` but not to
|
||||
`test/boost/CMakeLists.txt`, or an unnecessary `Boost::filesystem`
|
||||
link in a CMake target.
|
||||
- Apply fixes to the appropriate `CMakeLists.txt` files.
|
||||
- Re-run cmake and the comparison script to verify the fix.
|
||||
- Commit each fix to the correct commit in the series (using
|
||||
`git commit --fixup` + `git rebase --autosquash`).
|
||||
|
||||
### Tips
|
||||
|
||||
- **Paste the full summary block** — the inline diff details (compilation,
|
||||
link targets, linker) give the agent enough context to act without
|
||||
scrolling through verbose output.
|
||||
- **Use `-v` for stubborn issues** — if the agent needs per-file or
|
||||
per-target detail, re-run with `-v` and paste the relevant section.
|
||||
|
||||
@@ -1,81 +0,0 @@
|
||||
# Counters
|
||||
|
||||
Counters are special kinds of cells which value can only be incremented, decremented, read and (with some limitations) deleted. In particular, once deleted, that counter cannot be used again. For example:
|
||||
|
||||
```cql
|
||||
> UPDATE cf SET my_counter = my_counter + 6 WHERE pk = 0
|
||||
> SELECT * FROM cf;
|
||||
pk | my_counter
|
||||
----+------------
|
||||
0 | 6
|
||||
|
||||
(1 rows)
|
||||
> UPDATE cf SET my_counter = my_counter - 1 WHERE pk = 0
|
||||
> SELECT * FROM cf;
|
||||
pk | my_counter
|
||||
----+------------
|
||||
0 | 5
|
||||
|
||||
(1 rows)
|
||||
> DELETE my_counter FROM cf WHERE pk = 0;
|
||||
> SELECT * FROM cf;
|
||||
pk | my_counter
|
||||
----+------------
|
||||
|
||||
(0 rows)
|
||||
> UPDATE cf SET my_counter = my_counter + 3 WHERE pk = 0
|
||||
> SELECT * FROM cf;
|
||||
pk | my_counter
|
||||
----+------------
|
||||
|
||||
(0 rows)
|
||||
```
|
||||
|
||||
## Counters representation
|
||||
Counters are represented as sets of, so called, shards which are triples containing:
|
||||
* counter id – uuid identifying the writer owning that shard (see below)
|
||||
* logical clock – incremented each time the owning writer modifies the shard value
|
||||
* current value – sum of increments and decrements done by the owning writer
|
||||
|
||||
During each write operation one of the replicas is chosen as a leader. The leader reads its shard, increments logical clock, updates current value and then sends the new version of its shard to the other replicas.
|
||||
|
||||
Shards owned by the same writer are merged (see below) so that each counter cell contains only one shard per counter id. Reading the actual counter value requires summing values of all shards.
|
||||
|
||||
### Counter id
|
||||
|
||||
The counter id is a 128-bit UUID that identifies which writer owns a shard. How it is assigned depends on whether the table uses vnodes or tablets.
|
||||
|
||||
**Vnodes:** the counter id is the host id of the node that owns the shard. Each node in the cluster gets a unique counter id, so the number of shards in a counter cell grows with the number of distinct nodes that have ever written to it.
|
||||
|
||||
**Tablets:** the counter id is rack-based rather than node-based. It is a deterministic type-3 (name-based) UUID derived from the string `"<datacenter>:<rack>"`. All nodes in the same rack share the same counter id.
|
||||
|
||||
During tablet migration, since there are two active replicas in a rack and in order to avoid conflicts, the node that is a *pending replica* uses the **negated** rack UUID as its counter id.
|
||||
|
||||
This bounds the number of shards in a counter cell to at most `2 × (number of racks)` regardless of node replacements.
|
||||
|
||||
### Merging and reconciliation
|
||||
Reconciliation of two counters requires merging all shards belonging to the same counter id. The rule is: the shard with the highest logical clock wins.
|
||||
|
||||
Since support of deleting counters is limited so that once deleted they cannot be used again, during reconciliation tombstones win with live counter cell regardless of their timestamps.
|
||||
|
||||
### Digest
|
||||
Computing a digest of counter cells needs to be done based solely on the shard contents (counter id, value, logical clock) rather than any structural metadata.
|
||||
|
||||
## Writes
|
||||
1. Counter update starts with a client sending counter delta as a long (CQL3 `bigint`) to the coordinator.
|
||||
2. CQL3 creates a `CounterMutation` containing a `counter_update` cell which is just a delta.
|
||||
3. Coordinator chooses the leader of the counter update and sends it the mutation. The leader is always one of the replicas owning the partition the modified counter belongs to.
|
||||
4. Now, the leader needs to transform counter deltas into shards. To do that it reads the current value of the shard it owns, and produces a new shard with the value modified by the delta and the logical clock incremented.
|
||||
5. The mutation with the newly created shard is both used to update the memtable on the leader as well as sent to the other nodes for replication.
|
||||
|
||||
### Choosing leader
|
||||
Choosing a replica which becomes a leader for a counter update is completely at the coordinator discretion. It is not a static role in any way and any concurrent update could be forwarded to a different leader. This means that all problems related to leader election are avoided.
|
||||
|
||||
The coordinator chooses the leader using the following algorithm:
|
||||
|
||||
1. If the coordinator can be a leader it chooses itself.
|
||||
2. Otherwise, a random replica from the local DC is chosen.
|
||||
3. If there is no eligible node available in the local DC the replica closest to the coordinator (according to the snitch) is chosen.
|
||||
|
||||
## Reads
|
||||
Querying counter values is much simpler than updating it. First part of the read operation is performed as for all other cell types. When counter cells from different sources are being reconciled their shards are merged. Once the final counter cell value is known and the `CounterCell` is serialised, current values of all shards are summed up and the output of serialisation is a long integer.
|
||||
@@ -192,10 +192,14 @@ For example, to configure ScyllaDB to use listen address `10.0.0.5`:
|
||||
$ docker run --name some-scylla -d scylladb/scylla --listen-address 10.0.0.5
|
||||
```
|
||||
|
||||
**Since: 1.4**
|
||||
|
||||
#### `--alternator-address ADDR`
|
||||
|
||||
The `--alternator-address` command line option configures the Alternator API listen address. The default value is the same as `--listen-address`.
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
#### `--alternator-port PORT`
|
||||
|
||||
The `--alternator-port` command line option configures the Alternator API listen port. The Alternator API is disabled by default. You need to specify the port to enable it.
|
||||
@@ -206,16 +210,22 @@ For example, to configure ScyllaDB to listen to Alternator API at port `8000`:
|
||||
$ docker run --name some-scylla -d scylladb/scylla --alternator-port 8000
|
||||
```
|
||||
|
||||
**Since: 3.2**
|
||||
|
||||
#### `--alternator-https-port PORT`
|
||||
|
||||
The `--alternator-https-port` option is similar to `--alternator-port`, just enables an encrypted (HTTPS) port. Either the `--alternator-https-port` or `--alternator-http-port`, or both, can be used to enable Alternator.
|
||||
|
||||
Note that the `--alternator-https-port` option also requires that files `/etc/scylla/scylla.crt` and `/etc/scylla/scylla.key` be inserted into the image. These files contain an SSL certificate and key, respectively.
|
||||
|
||||
**Since: 4.2**
|
||||
|
||||
#### `--alternator-write-isolation policy`
|
||||
|
||||
The `--alternator-write-isolation` command line option chooses between four allowed write isolation policies described in docs/alternator/alternator.md. This option must be specified if Alternator is enabled - it does not have a default.
|
||||
|
||||
**Since: 4.1**
|
||||
|
||||
#### `--broadcast-address ADDR`
|
||||
|
||||
The `--broadcast-address` command line option configures the IP address the ScyllaDB instance tells other ScyllaDB nodes in the cluster to connect to.
|
||||
@@ -294,6 +304,8 @@ For example, to skip running I/O setup:
|
||||
$ docker run --name some-scylla -d scylladb/scylla --io-setup 0
|
||||
```
|
||||
|
||||
**Since: 4.3**
|
||||
|
||||
#### `--cpuset CPUSET`
|
||||
|
||||
The `--cpuset` command line option restricts ScyllaDB to run on only on CPUs specified by `CPUSET`.
|
||||
@@ -329,18 +341,26 @@ For example, to enable the User Defined Functions (UDF) feature:
|
||||
$ docker run --name some-scylla -d scylladb/scylla --experimental-feature=udf
|
||||
```
|
||||
|
||||
**Since: 2.0**
|
||||
|
||||
#### `--disable-version-check`
|
||||
|
||||
The `--disable-version-check` disable the version validation check.
|
||||
|
||||
**Since: 2.2**
|
||||
|
||||
#### `--authenticator AUTHENTICATOR`
|
||||
|
||||
The `--authenticator` command lines option allows to provide the authenticator class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthenticator` which performs no credentials checks. The second option is using the `PasswordAuthenticator` parameter, which relies on username/password pairs to authenticate users.
|
||||
|
||||
**Since: 2.3**
|
||||
|
||||
#### `--authorizer AUTHORIZER`
|
||||
|
||||
The `--authorizer` command lines option allows to provide the authorizer class ScyllaDB will use. By default ScyllaDB uses the `AllowAllAuthorizer` which allows any action to any user. The second option is using the `CassandraAuthorizer` parameter, which stores permissions in `system.permissions` table.
|
||||
|
||||
**Since: 2025.4**
|
||||
|
||||
#### `--dc NAME`
|
||||
|
||||
The `--dc` command line option sets the datacenter name for the ScyllaDB node.
|
||||
|
||||
@@ -37,17 +37,8 @@ Global index's target is usually just the indexed column name, unless the index
|
||||
- index on map, set or list values: VALUES(v)
|
||||
- index on map entries: ENTRIES(v)
|
||||
|
||||
Their serialization uses lowercase type names as prefixes, except for `full` which is serialized
|
||||
as just the column name (without any prefix):
|
||||
`"v"`, `"keys(v)"`, `"values(v)"`, `"entries(v)"` are valid targets; a frozen full collection
|
||||
index on column `v` is stored simply as `"v"` (same as a regular index).
|
||||
|
||||
If the column name contains characters that could be confused with the above formats
|
||||
(e.g., a name containing parentheses or braces), it is escaped using the CQL
|
||||
quoted-identifier syntax (column_identifier::to_cql_string()), which wraps the
|
||||
name in double quotes and doubles any embedded double-quote characters. For example,
|
||||
a column named `hEllo` is stored as `"hEllo"`, and a column named `keys(m)` is
|
||||
stored as `"keys(m)"`.
|
||||
Their serialization is just string representation, so:
|
||||
"v", "FULL(v)", "KEYS(v)", "VALUES(v)", "ENTRIES(v)" are all valid targets.
|
||||
|
||||
## Local index
|
||||
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
# System Keyspaces Overview
|
||||
|
||||
This page gives a high-level overview of several internal keyspaces and what they are used for.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [system_replicated_keys](#system_replicated_keys)
|
||||
- [system_distributed](#system_distributed)
|
||||
- [system_distributed_everywhere](#system_distributed_everywhere)
|
||||
- [system_auth](#system_auth)
|
||||
- [system](#system)
|
||||
- [system_schema](#system_schema)
|
||||
- [system_traces](#system_traces)
|
||||
- [system_audit/audit](#system_auditaudit)
|
||||
|
||||
## `system_replicated_keys`
|
||||
|
||||
Internal keyspace for encryption-at-rest key material used by the replicated key provider. It stores encrypted data keys so nodes can retrieve the correct key IDs when reading encrypted data.
|
||||
|
||||
This keyspace is created as an internal system keyspace and uses `EverywhereStrategy` so key metadata is available on every node. It is not intended for user data.
|
||||
|
||||
## `system_distributed`
|
||||
|
||||
Internal distributed metadata keyspace used for cluster-wide coordination data that is shared across nodes.
|
||||
|
||||
In practice, it is used for metadata such as:
|
||||
|
||||
- materialized view build coordination state
|
||||
- CDC stream/timestamp metadata exposed to clients
|
||||
- service level definitions used by workload prioritization
|
||||
|
||||
This keyspace is managed by Scylla and is not intended for application tables.
|
||||
It is created as an internal keyspace (historically with `SimpleStrategy` and RF=3 by default).
|
||||
|
||||
## `system_distributed_everywhere`
|
||||
|
||||
Legacy keyspace. It is no longer used.
|
||||
|
||||
## `system_auth`
|
||||
|
||||
Legacy auth keyspace name kept primarily for compatibility.
|
||||
|
||||
Auth tables have moved to the `system` keyspace (`roles`, `role_members`, `role_permissions`, and related auth state). `system_auth` may still exist for compatibility with legacy tooling/queries, but it is no longer where current auth state is primarily stored.
|
||||
|
||||
## `system`
|
||||
|
||||
This keyspace is local one, so each node has its own, independent content for tables in this keyspace. For some tables, the content is coordinated at a higher level (RAFT), but not via the traditional replication systems (storage proxy).
|
||||
|
||||
See the detailed table-level documentation here: [system_keyspace](system_keyspace.md)
|
||||
|
||||
## `system_schema`
|
||||
|
||||
This keyspace is local one, so each node has its own, independent content for tables in this keyspace. All tables in this keyspace are coordinated via the schema replication system.
|
||||
|
||||
See the detailed table-level documentation here: [system_schema_keyspace](system_schema_keyspace.md)
|
||||
|
||||
## `system_traces`
|
||||
|
||||
Internal tracing keyspace used for query tracing and slow-query logging records (`sessions`, `events`, and related index/log tables).
|
||||
|
||||
This keyspace is written by Scylla's tracing subsystem for diagnostics and observability. It is operational metadata, not user application data (historically created with `SimpleStrategy` and RF=2).
|
||||
|
||||
## `system_audit`/`audit`
|
||||
|
||||
Internal audit-logging keyspace used to persist audit events when table-backed auditing is enabled.
|
||||
|
||||
Scylla's audit table storage is implemented as an internal audit keyspace for audit records (for example, auth/admin/DCL activity depending on audit configuration). In current code this keyspace is named `audit`, while operational material may refer to it as its historical name (`system_audit`). It is intended for security/compliance observability, not for application data.
|
||||
@@ -1611,7 +1611,6 @@ CREATE TABLE system.topology (
|
||||
cleanup_status text,
|
||||
datacenter text,
|
||||
ignore_msb int,
|
||||
intended_storage_mode text,
|
||||
node_state text,
|
||||
num_tokens int,
|
||||
rack text,
|
||||
@@ -1664,7 +1663,6 @@ CREATE TABLE system.topology (
|
||||
- `tokens_string`: Alternative representation of tokens
|
||||
- `shard_count`: Number of shards on the node
|
||||
- `ignore_msb`: MSB bits to ignore for token calculation
|
||||
- `intended_storage_mode`: Intended storage mode for tables under vnodes-to-tablets migration. The node switches to this mode on next restart.
|
||||
- `cleanup_status`: Status of cleanup operations
|
||||
- `supported_features`: Features supported by this node
|
||||
- `request_id`: ID of the current topology request for this node
|
||||
|
||||
@@ -700,7 +700,6 @@ CREATE TABLE system.topology (
|
||||
host_id uuid,
|
||||
datacenter text,
|
||||
ignore_msb int,
|
||||
intended_storage_mode text,
|
||||
node_state text,
|
||||
num_tokens int,
|
||||
rack text,
|
||||
@@ -742,7 +741,6 @@ Each node has a clustering row in the table where its `host_id` is the clusterin
|
||||
- `datacenter` - a name of the datacenter the node belongs to
|
||||
- `rack` - a name of the rack the node belongs to
|
||||
- `ignore_msb` - the value of the node's `murmur3_partitioner_ignore_msb_bits` parameter
|
||||
- `intended_storage_mode` - if set, it indicates the intended storage mode for tables under vnodes-to-tablets migration
|
||||
- `shard_count` - the node's `smp::count`
|
||||
- `release_version` - the node's `version::current()` (corresponding to a Cassandra version, used by drivers)
|
||||
- `node_state` - current state of the node (as described earlier)
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
# Vector index in Scylla
|
||||
|
||||
Vector indexes are custom indexes (USING 'vector\_index'). Their `target` option in `system_schema.indexes` uses following format:
|
||||
|
||||
- Simple single-column vector index `(v)`: just the (escaped) column name, e.g. `v`
|
||||
- Vector index with filtering columns `(v, f1, f2)`: JSON with `tc` (target column) and `fc` (filtering columns): `{"tc":"v","fc":["f1","f2"]}`
|
||||
- Local vector index `((p1, p2), v)`: JSON with `tc` and `pk` (partition key columns): `{"tc":"v","pk":["p1","p2"]}`
|
||||
- Local vector index with filtering columns `((p1, p2), v, f1, f2)`: JSON with `tc`, `pk`, and `fc`: `{"tc":"v","pk":["p1","p2"],"fc":["f1","f2"]}`
|
||||
|
||||
The `target` option acts as the interface for the vector-store service, providing the metadata necessary to determine which columns are indexed and how they are structured.
|
||||
@@ -289,7 +289,7 @@ Yes, but it will require running a full repair (or cleanup) to change the replic
|
||||
- If you're reducing the replication factor, run ``nodetool cleanup <updated Keyspace>`` on the keyspace you modified to remove surplus replicated data.
|
||||
Cleanup runs on a per-node basis.
|
||||
- If you're increasing the replication factor, refer to :doc:`How to Safely Increase the RF </kb/rf-increase>`
|
||||
- Note that you need to provide the keyspace name. If you do not, the cleanup or repair operation runs on all keyspaces for the specific node.
|
||||
- Note that you need to provide the keyspace namr. If you do not, the cleanup or repair operation runs on all keyspaces for the specific node.
|
||||
|
||||
Why can't I set ``listen_address`` to listen to 0.0.0.0 (all my addresses)?
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
@@ -181,7 +181,6 @@ internode_compression controls whether traffic between nodes is compressed.
|
||||
|
||||
* all - all traffic is compressed.
|
||||
* dc - traffic between different datacenters is compressed.
|
||||
* rack - traffic between different racks is compressed.
|
||||
* none - nothing is compressed (default).
|
||||
|
||||
Configuring TLS/SSL in scylla.yaml
|
||||
|
||||
@@ -10,7 +10,6 @@ ScyllaDB Configuration Procedures
|
||||
How to do a Rolling Restart <rolling-restart>
|
||||
Advanced Internode (RPC) Compression <advanced-internode-compression>
|
||||
Shared-dictionary compression for SSTables <sstable-dictionary-compression>
|
||||
Migrate a Keyspace from Vnodes to Tablets <migrate-vnodes-to-tablets>
|
||||
|
||||
Procedures to change ScyllaDB Configuration settings.
|
||||
|
||||
@@ -23,5 +22,3 @@ Procedures to change ScyllaDB Configuration settings.
|
||||
* :doc:`Advanced Internode (RPC) Compression </operating-scylla/procedures/config-change/advanced-internode-compression>`
|
||||
|
||||
* :doc:`Shared-dictionary compression for SSTables </operating-scylla/procedures/config-change/sstable-dictionary-compression>`
|
||||
|
||||
* :doc:`Migrate a Keyspace from Vnodes to Tablets </operating-scylla/procedures/config-change/migrate-vnodes-to-tablets>`
|
||||
|
||||
@@ -1,393 +0,0 @@
|
||||
Migrate a Keyspace from Vnodes to Tablets
|
||||
==========================================
|
||||
|
||||
This procedure describes how to migrate an existing keyspace from vnodes
|
||||
to tablets. Tablets are designed to be the long-term replacement for vnodes,
|
||||
offering numerous benefits such as faster topology operations, automatic load
|
||||
balancing, automatic cleanups, and improved streaming performance. Migrating to
|
||||
tablets is strongly recommended. See :doc:`Data Distribution with Tablets </architecture/tablets/>`
|
||||
for details.
|
||||
|
||||
.. note::
|
||||
|
||||
The migration is an online operation. This means that the keyspace remains
|
||||
fully available to users throughout the migration, provided that its
|
||||
replication factor is greater than 1. Reads and writes continue to be served
|
||||
using vnodes until the migration is finished.
|
||||
|
||||
.. warning::
|
||||
|
||||
During the migration, you should expect degraded performance on the migrating
|
||||
keyspace. The reasons are the following:
|
||||
|
||||
* **Rolling restart**: Each node must upgrade its storage from vnodes to
|
||||
tablets. This is an offline operation happening on startup, so a restart is
|
||||
needed. Upon restart, each node performs a heavy and time-consuming
|
||||
resharding operation to reorganize its data based on tablets, and remains
|
||||
offline until this operation completes. Resharding may last from minutes to
|
||||
hours, depending on the amount of data that the node holds. At this time,
|
||||
the node cannot serve any requests.
|
||||
* **Unbalanced tablets**: The initial tablet layout mirrors the vnode layout.
|
||||
The tablet load balancer does not rebalance tablets until the migration is
|
||||
finished, so some shards may carry more data than others during the
|
||||
migration. The imbalance is expected to be more prominent in clusters with
|
||||
very large nodes (hundreds of vCPUs).
|
||||
* **Loss of shard awareness**: During the migration and until the rolling
|
||||
restart is complete, the cluster is in a mixed state with some nodes using
|
||||
vnodes and others using tablets. In this state, queries may cause
|
||||
cross-shard operations within nodes, reducing performance.
|
||||
|
||||
The performance will return to normal after the migration finishes and the
|
||||
tablet load balancer rebalances the data.
|
||||
|
||||
Prerequisites
|
||||
-------------
|
||||
|
||||
* All nodes in the cluster must be **up and running**. You can check the status
|
||||
of all nodes with
|
||||
:doc:`nodetool status </operating-scylla/nodetool-commands/status/>`.
|
||||
* All nodes must be running ScyllaDB 2026.2 or later.
|
||||
|
||||
Limitations
|
||||
-----------
|
||||
|
||||
The current migration procedure has the following limitations:
|
||||
|
||||
* The total number of **vnode tokens** in the cluster must be a **power of two**
|
||||
and the tokens must be **evenly spaced** across the token ring. This is
|
||||
verified automatically when starting the migration.
|
||||
* **No schema changes** during the migration. Do not create, alter, or drop
|
||||
tables in the migrating keyspace until the migration is finished.
|
||||
* **No topology changes** during the migration. Do not add, remove, decommission,
|
||||
or replace nodes while a migration is in progress.
|
||||
* **No TRUNCATE** on tables in the migrating keyspace during the migration.
|
||||
* Only **CQL base tables** can be migrated. Materialized views, secondary
|
||||
indexes, CDC tables, and Alternator tables are not supported.
|
||||
* Tables with **counters** or **LWTs** cannot be migrated.
|
||||
|
||||
Overview
|
||||
--------
|
||||
|
||||
The migration consists of three phases:
|
||||
|
||||
1. **Prepare**: Create tablet maps for all tables in the keyspace. Each tablet
|
||||
inherits its token range and replica set from the corresponding vnode range.
|
||||
2. **Storage upgrade**: Restart each node one at a time, upgrading its storage
|
||||
from vnodes to tablets. Upon restart, the node begins resharding data into
|
||||
tablets. This is a storage-layer operation and is unrelated to ScyllaDB
|
||||
version upgrades.
|
||||
3. **Finalize**: Once all nodes have been upgraded, commit the migration by
|
||||
clearing the migration state and switching the keyspace schema to tablets.
|
||||
|
||||
During the first two phases, the migration is reversible; you can roll back to
|
||||
vnodes. However, once the migration is finalized, it cannot be reversed.
|
||||
|
||||
.. note::
|
||||
|
||||
In the following sections, any reference to "upgrade" or "downgrade" of a
|
||||
node will refer to the migration of its storage from vnodes to tablets or
|
||||
vice versa. Do not confuse it with version upgrades/downgrades.
|
||||
|
||||
Procedure
|
||||
---------
|
||||
|
||||
#. Prepare the keyspace for migration:
|
||||
|
||||
#. Create tablet maps for all tables in the keyspace:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets start <keyspace>
|
||||
|
||||
#. Verify that the keyspace is in ``migrating_to_tablets`` state and all nodes are still using vnodes:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c uses vnodes
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 uses vnodes
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
.. _upgrade-nodes:
|
||||
|
||||
#. Upgrade all nodes to tablets:
|
||||
|
||||
#. Pick a node.
|
||||
|
||||
#. Mark the node for upgrade to tablets:
|
||||
|
||||
.. note::
|
||||
|
||||
This is a node-local operation. Use the IP address of the node that
|
||||
you are upgrading.
|
||||
|
||||
.. caution::
|
||||
|
||||
Do not mark more than one node for upgrade at the same time. Even if
|
||||
you restart them serially, unexpected restarts can happen for various
|
||||
reasons (crashes, power failures, etc.) leading to parallel node
|
||||
upgrades which can reduce availability.
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool -h <node-ip> migrate-to-tablets upgrade
|
||||
|
||||
#. Verify that the node status changed from ``vnodes`` to ``migrating to tablets``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c migrating to tablets <---
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 uses vnodes
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
#. Drain and stop the node:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool -h <node-ip> drain
|
||||
|
||||
.. include:: /rst_include/scylla-commands-stop-index.rst
|
||||
|
||||
#. Restart the node:
|
||||
|
||||
.. include:: /rst_include/scylla-commands-start-index.rst
|
||||
|
||||
#. Wait until the node is UP and has returned to the ScyllaDB cluster using :doc:`nodetool status </operating-scylla/nodetool-commands/status/>`.
|
||||
This operation may take a long time due to resharding. To monitor
|
||||
resharding progress, use the task manager API:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool tasks list compaction -h <node-ip> --keyspace <keyspace> | grep -i reshard
|
||||
|
||||
#. Verify that the node status changed from ``migrating to tablets`` to ``uses tablets``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c uses tablets <---
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 uses vnodes
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
#. Move to the next node and repeat from step a until all nodes are upgraded.
|
||||
|
||||
#. Finalize the migration:
|
||||
|
||||
.. warning::
|
||||
|
||||
Finalization **cannot be undone**. Once the migration is finalized, the
|
||||
keyspace cannot be switched back to vnodes.
|
||||
|
||||
#. Issue the finalization request:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets finalize <keyspace>
|
||||
|
||||
#. Verify that the keyspace status changed to ``tablets``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets finalize ks
|
||||
Keyspace: ks
|
||||
Status: tablets
|
||||
|
||||
Rollback Procedure
|
||||
------------------
|
||||
|
||||
.. note::
|
||||
|
||||
Rollback is only possible **before finalization**. Once the migration is
|
||||
finalized, it cannot be reversed.
|
||||
|
||||
If you need to abort the migration **before finalization**, you can roll back
|
||||
by downgrading each node back to vnodes. The rollback procedure is the
|
||||
following:
|
||||
|
||||
#. Find all nodes that have been upgraded to tablets (status: ``uses tablets``)
|
||||
or they are in the process of upgrading to tablets (status: ``migrating to tablets``):
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c uses tablets <---
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 migrating to tablets <---
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
#. For **each upgraded or upgrading node** in the cluster, perform a downgrade
|
||||
(one node at a time):
|
||||
|
||||
#. Mark the node for downgrade:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool -h <node-ip> migrate-to-tablets downgrade
|
||||
|
||||
#. Check the node status. The status for a previously upgraded node should
|
||||
change from ``uses tablets`` to ``migrating to vnodes``. The status for a
|
||||
previously upgrading node should change from ``migrating to tablets`` to
|
||||
``uses vnodes`` or ``migrating to vnodes``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c migrating to vnodes <---
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 migrating to tablets
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
#. If the node status is ``uses vnodes``, the downgrade is complete. Move to
|
||||
the next node and repeat from step a.
|
||||
|
||||
#. If the node is ``migrating to vnodes``, restart it to complete the
|
||||
downgrade:
|
||||
|
||||
#. Drain and stop the node:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool -h <node-ip> drain
|
||||
|
||||
.. include:: /rst_include/scylla-commands-stop-index.rst
|
||||
|
||||
#. Restart the node:
|
||||
|
||||
.. include:: /rst_include/scylla-commands-start-index.rst
|
||||
|
||||
#. Wait until the node is UP and has returned to the ScyllaDB cluster using :doc:`nodetool status </operating-scylla/nodetool-commands/status/>`.
|
||||
This operation may take a long time due to resharding. To monitor
|
||||
resharding progress, use the task manager API:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool tasks list compaction -h <node-ip> --keyspace <keyspace> | grep -i reshard
|
||||
|
||||
#. Verify that the node status changed from ``migrating to vnodes`` to ``uses vnodes``:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace>
|
||||
|
||||
**Example:**
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ scylla nodetool migrate-to-tablets status ks
|
||||
Keyspace: ks
|
||||
Status: migrating_to_tablets
|
||||
|
||||
Nodes:
|
||||
Host ID Status
|
||||
99d8de76-3954-4727-911a-6a07251b180c uses vnodes <---
|
||||
0b5fd6f6-9670-4faf-a480-ad58cf119007 migrating to tablets
|
||||
017dd39a-3d06-4c8a-8ac4-379f9e595607 uses vnodes
|
||||
|
||||
#. Move to the next node and repeat from step a until all nodes are
|
||||
downgraded.
|
||||
|
||||
#. Once all nodes have been downgraded, finalize the rollback:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets finalize <keyspace>
|
||||
|
||||
Migrating multiple keyspaces
|
||||
----------------------------
|
||||
|
||||
Migrating multiple keyspaces simultaneously is supported. The procedure is the
|
||||
same as with a single keyspace except that the preparation and finalization
|
||||
steps need to be repeated for each keyspace. However, note that a new migration
|
||||
cannot be started once another migration is in the upgrade phase. The migrations
|
||||
need to be prepared and finalized together.
|
||||
|
||||
To migrate multiple keyspaces simultaneously, follow these steps:
|
||||
|
||||
#. For **each keyspace**, prepare it for migration:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets start <keyspace1>
|
||||
scylla nodetool migrate-to-tablets start <keyspace2>
|
||||
...
|
||||
|
||||
Verify that all keyspaces are in ``migrating_to_tablets`` state before
|
||||
proceeding:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets status <keyspace1>
|
||||
scylla nodetool migrate-to-tablets status <keyspace2>
|
||||
...
|
||||
|
||||
#. Upgrade all nodes in the cluster following the same :ref:`procedure <upgrade-nodes>`
|
||||
as for a single keyspace. Each node restart reshards all keyspaces under
|
||||
migration in one pass.
|
||||
|
||||
#. For **each keyspace**, finalize the migration:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
scylla nodetool migrate-to-tablets finalize <keyspace1>
|
||||
scylla nodetool migrate-to-tablets finalize <keyspace2>
|
||||
...
|
||||
@@ -2,8 +2,8 @@
|
||||
ScyllaDB Auditing Guide
|
||||
========================
|
||||
|
||||
Auditing allows the administrator to monitor activities on a ScyllaDB cluster, including CQL queries and data changes, as well as Alternator (DynamoDB-compatible API) requests.
|
||||
The information is stored in a Syslog or a ScyllaDB table.
|
||||
Auditing allows the administrator to monitor activities on a Scylla cluster, including queries and data changes.
|
||||
The information is stored in a Syslog or a Scylla table.
|
||||
|
||||
Prerequisite
|
||||
------------
|
||||
@@ -14,15 +14,15 @@ Enable ScyllaDB :doc:`Authentication </operating-scylla/security/authentication>
|
||||
Enabling Audit
|
||||
---------------
|
||||
|
||||
By default, auditing is **enabled** with the ``table`` backend. Enabling auditing is controlled by the ``audit:`` parameter in the ``scylla.yaml`` file.
|
||||
By default, table auditing is **enabled**. Enabling auditing is controlled by the ``audit:`` parameter in the ``scylla.yaml`` file.
|
||||
You can set the following options:
|
||||
|
||||
* ``none`` - Audit is disabled.
|
||||
* ``table`` - Audit is enabled, and messages are stored in a ScyllaDB table (default).
|
||||
* ``table`` - Audit is enabled, and messages are stored in a Scylla table (default).
|
||||
* ``syslog`` - Audit is enabled, and messages are sent to Syslog.
|
||||
* ``syslog,table`` - Audit is enabled, and messages are stored in a ScyllaDB table and sent to Syslog.
|
||||
* ``syslog,table`` - Audit is enabled, and messages are stored in a Scylla table and sent to Syslog.
|
||||
|
||||
Configuring any other value results in an error at ScyllaDB startup.
|
||||
Configuring any other value results in an error at Scylla startup.
|
||||
|
||||
Configuring Audit
|
||||
-----------------
|
||||
@@ -34,9 +34,7 @@ Flag Default Value Description
|
||||
================== ================================== ========================================================================================================================
|
||||
audit_categories "DCL,AUTH,ADMIN" Comma-separated list of statement categories that should be audited
|
||||
------------------ ---------------------------------- ------------------------------------------------------------------------------------------------------------------------
|
||||
audit_tables “” Comma-separated list of table names that should be audited, in the format ``<keyspace_name>.<table_name>``.
|
||||
|
||||
For Alternator tables use the ``alternator.<table_name>`` format (see :ref:`alternator-auditing`).
|
||||
audit_tables “” Comma-separated list of table names that should be audited, in the format of <keyspacename>.<tablename>
|
||||
------------------ ---------------------------------- ------------------------------------------------------------------------------------------------------------------------
|
||||
audit_keyspaces “” Comma-separated list of keyspaces that should be audited. You must specify at least one keyspace.
|
||||
If you leave this option empty, no keyspace will be audited.
|
||||
@@ -49,137 +47,30 @@ You can use DCL, AUTH, and ADMIN audit categories without including any keyspace
|
||||
audit_categories parameter description
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
========= ========================================================================================= ====================
|
||||
Parameter Logs Description Applies To
|
||||
========= ========================================================================================= ====================
|
||||
AUTH Logs login events CQL
|
||||
--------- ----------------------------------------------------------------------------------------- --------------------
|
||||
DML Logs insert, update, delete, and other data manipulation language (DML) events CQL, Alternator
|
||||
--------- ----------------------------------------------------------------------------------------- --------------------
|
||||
DDL Logs object and role create, alter, drop, and other data definition language (DDL) events CQL, Alternator
|
||||
--------- ----------------------------------------------------------------------------------------- --------------------
|
||||
DCL Logs grant, revoke, create role, drop role, and list roles events CQL
|
||||
--------- ----------------------------------------------------------------------------------------- --------------------
|
||||
QUERY Logs all queries CQL, Alternator
|
||||
--------- ----------------------------------------------------------------------------------------- --------------------
|
||||
ADMIN Logs service level operations: create, alter, drop, attach, detach, list. CQL
|
||||
========= =========================================================================================
|
||||
Parameter Logs Description
|
||||
========= =========================================================================================
|
||||
AUTH Logs login events
|
||||
--------- -----------------------------------------------------------------------------------------
|
||||
DML Logs insert, update, delete, and other data manipulation language (DML) events
|
||||
--------- -----------------------------------------------------------------------------------------
|
||||
DDL Logs object and role create, alter, drop, and other data definition language (DDL) events
|
||||
--------- -----------------------------------------------------------------------------------------
|
||||
DCL Logs grant, revoke, create role, drop role, and list roles events
|
||||
--------- -----------------------------------------------------------------------------------------
|
||||
QUERY Logs all queries
|
||||
--------- -----------------------------------------------------------------------------------------
|
||||
ADMIN Logs service level operations: create, alter, drop, attach, detach, list.
|
||||
For :ref:`service level <workload-priorization-service-level-management>`
|
||||
auditing.
|
||||
========= ========================================================================================= ====================
|
||||
|
||||
For details on auditing Alternator operations, see :ref:`alternator-auditing`.
|
||||
========= =========================================================================================
|
||||
|
||||
Note that enabling audit may negatively impact performance and audit-to-table may consume extra storage. That's especially true when auditing DML and QUERY categories, which generate a high volume of audit messages.
|
||||
|
||||
.. _alternator-auditing:
|
||||
|
||||
Auditing Alternator Requests
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
When auditing is enabled, Alternator (DynamoDB-compatible API) requests are audited using the same
|
||||
backends and the same filtering configuration (``audit_categories``, ``audit_keyspaces``,
|
||||
``audit_tables``) as CQL operations. No additional configuration is needed.
|
||||
|
||||
Both successful and failed Alternator requests are audited.
|
||||
|
||||
Alternator Operation Categories
|
||||
""""""""""""""""""""""""""""""""
|
||||
|
||||
Each Alternator API operation is assigned to one of the standard audit categories:
|
||||
|
||||
========= ====================================================================================================
|
||||
Category Alternator Operations
|
||||
========= ====================================================================================================
|
||||
DDL CreateTable, DeleteTable, UpdateTable, TagResource, UntagResource, UpdateTimeToLive
|
||||
--------- ----------------------------------------------------------------------------------------------------
|
||||
DML PutItem, UpdateItem, DeleteItem, BatchWriteItem
|
||||
--------- ----------------------------------------------------------------------------------------------------
|
||||
QUERY GetItem, BatchGetItem, Query, Scan, DescribeTable, ListTables, DescribeEndpoints,
|
||||
ListTagsOfResource, DescribeTimeToLive, DescribeContinuousBackups,
|
||||
ListStreams, DescribeStream, GetShardIterator, GetRecords
|
||||
========= ====================================================================================================
|
||||
|
||||
.. note:: AUTH, DCL, and ADMIN categories do not apply to Alternator operations. These categories
|
||||
are specific to CQL authentication, authorization, and service-level management.
|
||||
|
||||
Operation Field Format
|
||||
"""""""""""""""""""""""
|
||||
|
||||
For CQL operations, the ``operation`` field in the audit log contains the raw CQL query string.
|
||||
For Alternator operations, the format is:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
<OperationName>|<JSON request body>
|
||||
|
||||
For example:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
PutItem|{"TableName":"my_table","Item":{"p":{"S":"pk_val"},"c":{"S":"ck_val"},"v":{"S":"data"}}}
|
||||
|
||||
.. note:: The full JSON request body is included in the ``operation`` field. For batch operations
|
||||
(such as BatchWriteItem), this can be very large (up to 16 MB).
|
||||
|
||||
Keyspace and Table Filtering for Alternator
|
||||
""""""""""""""""""""""""""""""""""""""""""""
|
||||
|
||||
The real keyspace name of an Alternator table ``T`` is ``alternator_T``.
|
||||
The ``audit_tables`` config flag uses the shorthand format ``alternator.T`` to refer to such
|
||||
tables -- the parser expands it to the real keyspace name automatically.
|
||||
For ``audit_keyspaces``, use the real keyspace name directly.
|
||||
|
||||
For example, to audit an Alternator table called ``my_table_name`` use either of the below:
|
||||
|
||||
.. code-block:: yaml
|
||||
|
||||
# Using audit_tables - use 'alternator' as the keyspace name:
|
||||
audit_tables: "alternator.my_table_name"
|
||||
|
||||
# Using audit_keyspaces - use the real keyspace name:
|
||||
audit_keyspaces: "alternator_my_table_name"
|
||||
|
||||
**Global and batch operations**: Some Alternator operations are not scoped to a single table:
|
||||
|
||||
* ``ListTables`` and ``DescribeEndpoints`` have no associated keyspace or table.
|
||||
* ``BatchWriteItem`` and ``BatchGetItem`` may span multiple tables.
|
||||
|
||||
These operations are logged whenever their category matches ``audit_categories``, regardless of
|
||||
``audit_keyspaces`` or ``audit_tables`` filters. Their ``keyspace_name`` field is empty, and for
|
||||
batch operations the ``table_name`` field contains a pipe-separated (``|``) list of all involved table names.
|
||||
|
||||
**DynamoDB Streams operations**: For streams-related operations (``DescribeStream``, ``GetShardIterator``,
|
||||
``GetRecords``), the ``table_name`` field contains the base table name and the CDC log table name
|
||||
separated by a pipe (e.g., ``my_table|my_table_scylla_cdc_log``).
|
||||
|
||||
Alternator Audit Log Examples
|
||||
""""""""""""""""""""""""""""""
|
||||
|
||||
Syslog output example (PutItem):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
Mar 18 10:15:03 ip-10-143-2-108 scylla-audit[28387]: node="10.143.2.108", category="DML", cl="LOCAL_QUORUM", error="false", keyspace="alternator_my_table", query="PutItem|{\"TableName\":\"my_table\",\"Item\":{\"p\":{\"S\":\"pk_val\"}}}", client_ip="127.0.0.1", table="my_table", username="anonymous"
|
||||
|
||||
Table output example (PutItem):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
SELECT * FROM audit.audit_log ;
|
||||
|
||||
returns:
|
||||
|
||||
.. code-block:: none
|
||||
|
||||
date | node | event_time | category | consistency | error | keyspace_name | operation | source | table_name | username |
|
||||
-------------------------+--------------+--------------------------------------+----------+--------------+-------+-----------------------+----------------------------------------------------------------------------------+-----------+------------+-----------+
|
||||
2026-03-18 00:00:00+0000 | 10.143.2.108 | 3429b1a5-2a94-11e8-8f4e-000000000001 | DML | LOCAL_QUORUM | False | alternator_my_table | PutItem|{"TableName":"my_table","Item":{"p":{"S":"pk_val"}}} | 127.0.0.1 | my_table | anonymous |
|
||||
(1 row)
|
||||
|
||||
Configuring Audit Storage
|
||||
---------------------------
|
||||
|
||||
Auditing messages can be sent to :ref:`Syslog <auditing-syslog-storage>` or stored in a ScyllaDB :ref:`table <auditing-table-storage>` or both.
|
||||
Auditing messages can be sent to :ref:`Syslog <auditing-syslog-storage>` or stored in a Scylla :ref:`table <auditing-table-storage>` or both.
|
||||
|
||||
.. _auditing-syslog-storage:
|
||||
|
||||
@@ -208,13 +99,13 @@ Storing Audit Messages in Syslog
|
||||
# All tables in those keyspaces will be audited
|
||||
audit_keyspaces: "mykespace"
|
||||
|
||||
#. Restart the ScyllaDB node.
|
||||
#. Restart the Scylla node.
|
||||
|
||||
.. include:: /rst_include/scylla-commands-restart-index.rst
|
||||
|
||||
By default, audit messages are written to the same destination as ScyllaDB :doc:`logging </getting-started/logging>`, with ``scylla-audit`` as the process name.
|
||||
By default, audit messages are written to the same destination as Scylla :doc:`logging </getting-started/logging>`, with ``scylla-audit`` as the process name.
|
||||
|
||||
Logging output example (CQL drop table):
|
||||
Logging output example (drop table):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
@@ -232,7 +123,7 @@ To redirect the Syslog output to a file, follow the steps below (available only
|
||||
Storing Audit Messages in a Table
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Messages are stored in a ScyllaDB table named ``audit.audit_log``.
|
||||
Messages are stored in a Scylla table named ``audit.audit_log``.
|
||||
|
||||
For example:
|
||||
|
||||
@@ -279,11 +170,11 @@ For example:
|
||||
# All tables in those keyspaces will be audited
|
||||
audit_keyspaces: "mykespace"
|
||||
|
||||
#. Restart the ScyllaDB node.
|
||||
#. Restart Scylla node.
|
||||
|
||||
.. include:: /rst_include/scylla-commands-restart-index.rst
|
||||
|
||||
Table output example (CQL drop table):
|
||||
Table output example (drop table):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
@@ -305,7 +196,7 @@ Storing Audit Messages in a Table and Syslog Simultaneously
|
||||
|
||||
**Procedure**
|
||||
|
||||
#. Follow both procedures from above, and set the ``audit`` parameter in the ``scylla.yaml`` file to both ``syslog`` and ``table``. You need to restart ScyllaDB only once.
|
||||
#. Follow both procedures from above, and set the ``audit`` parameter in the ``scylla.yaml`` file to both ``syslog`` and ``table``. You need to restart scylla only once.
|
||||
|
||||
To have both syslog and table you need to specify both backends separated by a comma:
|
||||
|
||||
|
||||
41
docs/upgrade/about-upgrade.rst
Normal file
41
docs/upgrade/about-upgrade.rst
Normal file
@@ -0,0 +1,41 @@
|
||||
================
|
||||
About Upgrade
|
||||
================
|
||||
|
||||
ScyllaDB upgrade is a rolling procedure - it does not require a full cluster
|
||||
shutdown and is performed without any downtime or disruption of service.
|
||||
|
||||
To ensure a successful upgrade, follow
|
||||
the :doc:`documented upgrade procedures <upgrade-guides/index>` tested by
|
||||
ScyllaDB. This means that:
|
||||
|
||||
* You should follow the upgrade policy:
|
||||
|
||||
* Starting with version **2025.4**, upgrades can **skip minor versions** if:
|
||||
|
||||
* They remain within the same major version (for example, upgrading
|
||||
directly from *2025.1 → 2025.4* is supported).
|
||||
* You upgrade to the next major version (for example, upgrading
|
||||
directly from *2025.3 → 2026.1* is supported).
|
||||
|
||||
* For versions **prior to 2025.4**, upgrades must be performed consecutively—
|
||||
each successive X.Y version must be installed in order, **without skipping
|
||||
any major or minor version** (for example, upgrading directly from 2025.1 → 2025.3
|
||||
is not supported).
|
||||
* You cannot skip major versions. Upgrades must move from one major version to
|
||||
the next using the documented major-version upgrade path.
|
||||
* You should upgrade to a supported version of ScyllaDB.
|
||||
See `ScyllaDB Version Support <https://docs.scylladb.com/stable/versioning/version-support.html>`_.
|
||||
* Before you upgrade to the next version, the whole cluster (each node) must
|
||||
be upgraded to the previous version.
|
||||
* You cannot perform an upgrade by replacing the nodes in the cluster with new
|
||||
nodes with a different ScyllaDB version. You should never add a new node with
|
||||
a different version to a cluster - if you
|
||||
:doc:`add a node </operating-scylla/procedures/cluster-management/add-node-to-cluster>`,
|
||||
it must have the same X.Y.Z (major.minor.patch) version as the other nodes in
|
||||
the cluster.
|
||||
|
||||
Upgrading to each patch version by following the Maintenance Release Upgrade
|
||||
Guide is optional. However, we recommend upgrading to the latest patch release
|
||||
for your version before upgrading to a new version.
|
||||
|
||||
@@ -5,6 +5,7 @@ Upgrade ScyllaDB
|
||||
.. toctree::
|
||||
:titlesonly:
|
||||
|
||||
About Upgrade <about-upgrade>
|
||||
Upgrade Guides <upgrade-guides/index>
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ Upgrade ScyllaDB
|
||||
.. toctree::
|
||||
|
||||
ScyllaDB 2025.x to ScyllaDB 2026.1 <upgrade-guide-from-2025.x-to-2026.1/index>
|
||||
ScyllaDB 2026.x Patch Upgrades <upgrade-guide-from-2026.x.y-to-2026.x.z>
|
||||
ScyllaDB Image <ami-upgrade>
|
||||
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL), CentOS,
|
||||
and Ubuntu. See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported versions. It also applies when using the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
See `Upgrade Policy <https://docs.scylladb.com/stable/versioning/upgrade-policy.html>`_ for the ScyllaDB upgrade policy.
|
||||
See :doc:`About Upgrade </upgrade/about-upgrade/>` for the ScyllaDB upgrade policy.
|
||||
|
||||
Before You Upgrade ScyllaDB
|
||||
==============================
|
||||
|
||||
@@ -1,268 +0,0 @@
|
||||
.. |SCYLLA_NAME| replace:: ScyllaDB
|
||||
|
||||
.. |SRC_VERSION| replace:: 2026.x.y
|
||||
.. |NEW_VERSION| replace:: 2026.x.z
|
||||
|
||||
==========================================================================
|
||||
Upgrade - |SCYLLA_NAME| |SRC_VERSION| to |NEW_VERSION| (Patch Upgrades)
|
||||
==========================================================================
|
||||
|
||||
This document describes a step-by-step procedure for upgrading from
|
||||
|SCYLLA_NAME| |SRC_VERSION| to |SCYLLA_NAME| |NEW_VERSION| (where "z" is
|
||||
the latest available version), and rolling back to version |SRC_VERSION|
|
||||
if necessary.
|
||||
|
||||
This guide covers upgrading ScyllaDB on Red Hat Enterprise Linux (RHEL),
|
||||
CentOS, Debian, and Ubuntu.
|
||||
See `OS Support by Platform and Version <https://docs.scylladb.com/stable/versioning/os-support-per-version.html>`_
|
||||
for information about supported versions.
|
||||
|
||||
It also applies to the ScyllaDB official image on EC2, GCP, or Azure.
|
||||
|
||||
See `Upgrade Policy <https://docs.scylladb.com/stable/versioning/upgrade-policy.html>`_ for the ScyllaDB upgrade policy.
|
||||
|
||||
Upgrade Procedure
|
||||
=================
|
||||
|
||||
.. note::
|
||||
Apply the following procedure **serially** on each node. Do not move to the next
|
||||
node before validating that the node is up and running the new version.
|
||||
|
||||
A ScyllaDB upgrade is a rolling procedure that does **not** require a full cluster
|
||||
shutdown. For each of the nodes in the cluster, you will:
|
||||
|
||||
#. Drain the node and back up the data.
|
||||
#. Backup configuration file.
|
||||
#. Stop ScyllaDB.
|
||||
#. Download and install new ScyllaDB packages.
|
||||
#. Start ScyllaDB.
|
||||
#. Validate that the upgrade was successful.
|
||||
|
||||
**Before** upgrading, check which version you are running now using
|
||||
``scylla --version``. Note the current version in case you want to roll back
|
||||
the upgrade.
|
||||
|
||||
**During** the rolling upgrade it is highly recommended:
|
||||
|
||||
* Not to use new |NEW_VERSION| features.
|
||||
* Not to run administration functions, like repairs, refresh, rebuild or add
|
||||
or remove nodes. See
|
||||
`sctool <https://manager.docs.scylladb.com/stable/sctool/>`_ for suspending
|
||||
ScyllaDB Manager's scheduled or running repairs.
|
||||
* Not to apply schema changes.
|
||||
|
||||
Upgrade Steps
|
||||
=============
|
||||
|
||||
Back up the data
|
||||
------------------------------
|
||||
|
||||
Back up all the data to an external device. We recommend using
|
||||
`ScyllaDB Manager <https://manager.docs.scylladb.com/stable/backup/index.html>`_
|
||||
to create backups.
|
||||
|
||||
Alternatively, you can use the ``nodetool snapshot`` command.
|
||||
For **each** node in the cluster, run the following:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
nodetool snapshot
|
||||
|
||||
Take note of the directory name that nodetool gives you, and copy all
|
||||
the directories with this name under ``/var/lib/scylla`` to a backup device.
|
||||
|
||||
When the upgrade is completed on all nodes, remove the snapshot with the
|
||||
``nodetool clearsnapshot -t <snapshot>`` command to prevent running out of
|
||||
space.
|
||||
|
||||
Back up the configuration file
|
||||
------------------------------
|
||||
|
||||
Back up the ``scylla.yaml`` configuration file and the ScyllaDB packages
|
||||
in case you need to roll back the upgrade.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/apt/sources.list.d/scylla.list ~/scylla.list-backup
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo cp -a /etc/scylla/scylla.yaml /etc/scylla/scylla.yaml.backup
|
||||
sudo cp /etc/yum.repos.d/scylla.repo ~/scylla.repo-backup
|
||||
|
||||
Gracefully stop the node
|
||||
------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server stop
|
||||
|
||||
Download and install the new release
|
||||
------------------------------------
|
||||
|
||||
You don’t need to update the ScyllaDB DEB or RPM repo when you upgrade to
|
||||
a patch release.
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
To install a patch version on Debian or Ubuntu, run:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
To install a patch version on RHEL or CentOS, run:
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo yum clean all
|
||||
sudo yum update scylla\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you're using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions.
|
||||
|
||||
If you're using your own image and have installed ScyllaDB packages for
|
||||
Ubuntu or Debian, you need to apply an extended upgrade procedure:
|
||||
|
||||
#. Install the new ScyllaDB version with the additional
|
||||
``scylla-machine-image`` package:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
sudo apt-get clean all
|
||||
sudo apt-get update
|
||||
sudo apt-get dist-upgrade scylla
|
||||
sudo apt-get dist-upgrade scylla-machine-image
|
||||
#. Run ``scylla_setup`` without ``running io_setup``.
|
||||
#. Run ``sudo /opt/scylladb/scylla-machine-image/scylla_cloud_io_setup``.
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service start scylla-server
|
||||
|
||||
Validate
|
||||
--------
|
||||
#. Check cluster status with ``nodetool status`` and make sure **all** nodes,
|
||||
including the one you just upgraded, are in UN status.
|
||||
#. Use ``curl -X GET "http://localhost:10000/storage_service/scylla_release_version"``
|
||||
to check the ScyllaDB version.
|
||||
#. Use ``journalctl _COMM=scylla`` to check there are no new errors in the log.
|
||||
#. Check again after 2 minutes to validate that no new issues are introduced.
|
||||
|
||||
Once you are sure the node upgrade is successful, move to the next node in
|
||||
the cluster.
|
||||
|
||||
Rollback Procedure
|
||||
==================
|
||||
|
||||
The following procedure describes a rollback from ScyllaDB release
|
||||
|NEW_VERSION| to |SRC_VERSION|. Apply this procedure if an upgrade from
|
||||
|SRC_VERSION| to |NEW_VERSION| failed before completing on all nodes.
|
||||
|
||||
* Use this procedure only on nodes you upgraded to |NEW_VERSION|.
|
||||
* Execute the following commands one node at a time, moving to the next node only
|
||||
after the rollback procedure is completed successfully.
|
||||
|
||||
ScyllaDB rollback is a rolling procedure that does **not** require a full
|
||||
cluster shutdown. For each of the nodes to roll back to |SRC_VERSION|, you will:
|
||||
|
||||
#. Drain the node and stop ScyllaDB.
|
||||
#. Downgrade to the previous release.
|
||||
#. Restore the configuration file.
|
||||
#. Restart ScyllaDB.
|
||||
#. Validate the rollback success.
|
||||
|
||||
Rollback Steps
|
||||
==============
|
||||
|
||||
Gracefully shutdown ScyllaDB
|
||||
-----------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
nodetool drain
|
||||
sudo service stop scylla-server
|
||||
|
||||
Downgrade to the previous release
|
||||
----------------------------------
|
||||
|
||||
.. tabs::
|
||||
|
||||
.. group-tab:: Debian/Ubuntu
|
||||
|
||||
To downgrade to |SRC_VERSION| on Debian or Ubuntu, run:
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
.. group-tab:: RHEL/CentOS
|
||||
|
||||
To downgrade to |SRC_VERSION| on RHEL or CentOS, run:
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo yum downgrade scylla\*-|SRC_VERSION|-\* -y
|
||||
|
||||
.. group-tab:: EC2/GCP/Azure Ubuntu Image
|
||||
|
||||
If you’re using the ScyllaDB official image (recommended), see
|
||||
the **Debian/Ubuntu** tab for upgrade instructions.
|
||||
|
||||
If you’re using your own image and have installed ScyllaDB packages for
|
||||
Ubuntu or Debian, you need to additionally downgrade
|
||||
the ``scylla-machine-image`` package.
|
||||
|
||||
.. code-block:: console
|
||||
:substitutions:
|
||||
|
||||
sudo apt-get install scylla=|SRC_VERSION|\* scylla-server=|SRC_VERSION|\* scylla-tools=|SRC_VERSION|\* scylla-tools-core=|SRC_VERSION|\* scylla-kernel-conf=|SRC_VERSION|\* scylla-conf=|SRC_VERSION|\*
|
||||
sudo apt-get install scylla-machine-image=|SRC_VERSION|\*
|
||||
|
||||
Answer ‘y’ to the first two questions.
|
||||
|
||||
|
||||
Restore the configuration file
|
||||
------------------------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo rm -rf /etc/scylla/scylla.yaml
|
||||
sudo cp -a /etc/scylla/scylla.yaml.backup /etc/scylla/scylla.yaml
|
||||
|
||||
Start the node
|
||||
--------------
|
||||
|
||||
.. code:: sh
|
||||
|
||||
sudo service scylla-server start
|
||||
|
||||
Validate
|
||||
--------
|
||||
Check upgrade instruction above for validation. Once you are sure the node
|
||||
rollback is successful, move to the next node in the cluster.
|
||||
@@ -227,19 +227,13 @@ Security
|
||||
Indexing and Caching
|
||||
^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
+----------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
| Options | Support |
|
||||
+================================================================+======================================================================================+
|
||||
|:doc:`Secondary Index </features/secondary-indexes>` | |v| |
|
||||
+----------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|StorageAttachedIndex (SAI) | |x| |
|
||||
+----------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|:ref:`SAI for vector search <cassandra-sai-compatibility>` | |v| :sup:`*` |
|
||||
+----------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|:doc:`Materialized Views </features/materialized-views>` | |v| |
|
||||
+----------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|
||||
:sup:`*` SAI class name on vector columns is rewritten to native ``vector_index``
|
||||
+--------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
| Options | Support |
|
||||
+==============================================================+======================================================================================+
|
||||
|:doc:`Secondary Index </features/secondary-indexes>` | |v| |
|
||||
+--------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|:doc:`Materialized Views </features/materialized-views>` | |v| |
|
||||
+--------------------------------------------------------------+--------------------------------------------------------------------------------------+
|
||||
|
||||
|
||||
Additional Features
|
||||
|
||||
@@ -380,7 +380,7 @@ public:
|
||||
}
|
||||
|
||||
template<typename HostType, typename CacheType, typename ConfigType>
|
||||
shared_ptr<HostType> get_host(const sstring& host, CacheType& cache, const ConfigType& config_map, std::string_view config_entry_name) {
|
||||
shared_ptr<HostType> get_host(const sstring& host, CacheType& cache, const ConfigType& config_map) {
|
||||
auto& host_cache = cache[this_shard_id()];
|
||||
auto it = host_cache.find(host);
|
||||
if (it != host_cache.end()) {
|
||||
@@ -394,26 +394,23 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
throw std::invalid_argument(fmt::format(
|
||||
"Encryption host \"{}\" is not defined in scylla.yaml. "
|
||||
"Make sure it is listed under the \"{}\" section.",
|
||||
host, config_entry_name));
|
||||
throw std::invalid_argument("No such host: " + host);
|
||||
}
|
||||
|
||||
shared_ptr<kmip_host> get_kmip_host(const sstring& host) override {
|
||||
return get_host<kmip_host>(host, _per_thread_kmip_host_cache, _cfg->kmip_hosts(), "kmip_hosts");
|
||||
return get_host<kmip_host>(host, _per_thread_kmip_host_cache, _cfg->kmip_hosts());
|
||||
}
|
||||
|
||||
shared_ptr<kms_host> get_kms_host(const sstring& host) override {
|
||||
return get_host<kms_host>(host, _per_thread_kms_host_cache, _cfg->kms_hosts(), "kms_hosts");
|
||||
return get_host<kms_host>(host, _per_thread_kms_host_cache, _cfg->kms_hosts());
|
||||
}
|
||||
|
||||
shared_ptr<gcp_host> get_gcp_host(const sstring& host) override {
|
||||
return get_host<gcp_host>(host, _per_thread_gcp_host_cache, _cfg->gcp_hosts(), "gcp_hosts");
|
||||
return get_host<gcp_host>(host, _per_thread_gcp_host_cache, _cfg->gcp_hosts());
|
||||
}
|
||||
|
||||
shared_ptr<azure_host> get_azure_host(const sstring& host) override {
|
||||
return get_host<azure_host>(host, _per_thread_azure_host_cache, _cfg->azure_hosts(), "azure_hosts");
|
||||
return get_host<azure_host>(host, _per_thread_azure_host_cache, _cfg->azure_hosts());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -437,6 +437,7 @@ void ldap_connection::poll_results() {
|
||||
const auto found = _msgid_to_promise.find(id);
|
||||
if (found == _msgid_to_promise.end()) {
|
||||
mylog.error("poll_results: got valid result for unregistered id {}, dropping it", id);
|
||||
ldap_msgfree(result);
|
||||
} else {
|
||||
found->second.set_value(std::move(result_ptr));
|
||||
_msgid_to_promise.erase(found);
|
||||
|
||||
@@ -41,7 +41,7 @@ public:
|
||||
_ip == other._ip;
|
||||
}
|
||||
|
||||
explicit endpoint_state(inet_address ip) noexcept
|
||||
endpoint_state(inet_address ip) noexcept
|
||||
: _heart_beat_state()
|
||||
, _update_timestamp(clk::now())
|
||||
, _ip(ip)
|
||||
|
||||
@@ -179,7 +179,6 @@ public:
|
||||
gms::feature topology_noop_request { *this, "TOPOLOGY_NOOP_REQUEST"sv };
|
||||
gms::feature tablets_intermediate_fallback_cleanup { *this, "TABLETS_INTERMEDIATE_FALLBACK_CLEANUP"sv };
|
||||
gms::feature batchlog_v2 { *this, "BATCHLOG_V2"sv };
|
||||
gms::feature vnodes_to_tablets_migrations { *this, "VNODES_TO_TABLETS_MIGRATIONS"sv };
|
||||
public:
|
||||
|
||||
const std::unordered_map<sstring, std::reference_wrapper<feature>>& registered_features() const;
|
||||
|
||||
197
gms/gossiper.cc
197
gms/gossiper.cc
@@ -59,6 +59,7 @@ using clk = gossiper::clk;
|
||||
static logging::logger logger("gossip");
|
||||
|
||||
constexpr std::chrono::milliseconds gossiper::INTERVAL;
|
||||
constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME;
|
||||
constexpr generation_type::value_type gossiper::MAX_GENERATION_DIFFERENCE;
|
||||
|
||||
const sstring& gossiper::get_cluster_name() const noexcept {
|
||||
@@ -647,7 +648,7 @@ future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state
|
||||
}
|
||||
// Re-rake after apply_new_states
|
||||
es = get_endpoint_state_ptr(node);
|
||||
if (!is_alive(es->get_host_id()) && !is_left(*es) && !shadow_round) { // unless of course, it was dead
|
||||
if (!is_alive(es->get_host_id()) && !is_dead_state(*es) && !shadow_round) { // unless of course, it was dead
|
||||
mark_alive(es);
|
||||
}
|
||||
} else {
|
||||
@@ -766,7 +767,7 @@ future<> gossiper::remove_endpoint(locator::host_id endpoint, permit_id pid) {
|
||||
|
||||
if (was_alive) {
|
||||
try {
|
||||
logger.info("InetAddress {}/{} is now DOWN, status = {}", host_id, ip, get_node_status(host_id));
|
||||
logger.info("InetAddress {}/{} is now DOWN, status = {}", state->get_host_id(), ip, get_gossip_status(*state));
|
||||
co_await do_on_dead_notifications(ip, std::move(state), pid);
|
||||
} catch (...) {
|
||||
logger.warn("Fail to call on_dead callback: {}", std::current_exception());
|
||||
@@ -1173,10 +1174,10 @@ future<> gossiper::unregister_(shared_ptr<i_endpoint_state_change_subscriber> su
|
||||
|
||||
std::set<locator::host_id> gossiper::get_live_members() const {
|
||||
std::set<locator::host_id> live_members(_live_endpoints.begin(), _live_endpoints.end());
|
||||
auto myid = my_host_id();
|
||||
auto myip = get_broadcast_address();
|
||||
logger.debug("live_members before={}", live_members);
|
||||
if (!is_shutdown(myid)) {
|
||||
live_members.insert(myid);
|
||||
if (!is_shutdown(myip)) {
|
||||
live_members.insert(my_host_id());
|
||||
}
|
||||
logger.debug("live_members after={}", live_members);
|
||||
return live_members;
|
||||
@@ -1247,6 +1248,7 @@ future<> gossiper::evict_from_membership(locator::host_id hid, permit_id pid) {
|
||||
}
|
||||
g._endpoint_state_map.erase(hid);
|
||||
});
|
||||
_expire_time_endpoint_map.erase(hid);
|
||||
logger.debug("evicting {} from gossip", hid);
|
||||
}
|
||||
|
||||
@@ -1319,6 +1321,21 @@ future<> gossiper::replicate(endpoint_state es, permit_id pid) {
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::advertise_token_removed(locator::host_id host_id, permit_id pid) {
|
||||
auto permit = co_await lock_endpoint(host_id, pid);
|
||||
pid = permit.id();
|
||||
auto eps = get_endpoint_state(host_id);
|
||||
eps.update_timestamp(); // make sure we don't evict it too soon
|
||||
eps.get_heart_beat_state().force_newer_generation_unsafe();
|
||||
auto expire_time = compute_expire_time();
|
||||
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
|
||||
logger.info("Completing removal of {}", host_id);
|
||||
add_expire_time_for_endpoint(host_id, expire_time);
|
||||
co_await replicate(std::move(eps), pid);
|
||||
// ensure at least one gossip round occurs before returning
|
||||
co_await sleep_abortable(INTERVAL * 2, _abort_source);
|
||||
}
|
||||
|
||||
future<> gossiper::assassinate_endpoint(sstring address) {
|
||||
throw std::runtime_error("Assassinating endpoint is not supported in topology over raft mode");
|
||||
}
|
||||
@@ -1351,10 +1368,13 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
|
||||
std::uniform_real_distribution<double> dist(0, 1);
|
||||
double rand_dbl = dist(_random_engine);
|
||||
if (rand_dbl < prob) {
|
||||
auto addrs = _unreachable_endpoints | std::ranges::views::keys | std::views::filter([this] (auto ep) {
|
||||
// Ignore the node which is no longer part of the cluster
|
||||
return !_topo_sm._topology.left_nodes.contains(raft::server_id(ep.uuid()));
|
||||
}) | std::ranges::to<std::set>();
|
||||
std::set<locator::host_id> addrs;
|
||||
for (auto&& x : _unreachable_endpoints) {
|
||||
// Ignore the node which is decommissioned
|
||||
if (get_gossip_status(_address_map.get(x.first)) != sstring(versioned_value::STATUS_LEFT)) {
|
||||
addrs.insert(x.first);
|
||||
}
|
||||
}
|
||||
logger.trace("do_gossip_to_unreachable_member: live_endpoint nr={} unreachable_endpoints nr={}",
|
||||
live_endpoint_count, unreachable_endpoint_count);
|
||||
return send_gossip(message, addrs);
|
||||
@@ -1363,6 +1383,17 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
clk::time_point gossiper::get_expire_time_for_endpoint(locator::host_id id) const noexcept {
|
||||
/* default expire_time is A_VERY_LONG_TIME */
|
||||
auto it = _expire_time_endpoint_map.find(id);
|
||||
if (it == _expire_time_endpoint_map.end()) {
|
||||
return compute_expire_time();
|
||||
} else {
|
||||
auto stored_time = it->second;
|
||||
return stored_time;
|
||||
}
|
||||
}
|
||||
|
||||
endpoint_state_ptr gossiper::get_endpoint_state_ptr(locator::host_id ep) const noexcept {
|
||||
auto it = _endpoint_state_map.find(ep);
|
||||
if (it == _endpoint_state_map.end()) {
|
||||
@@ -1389,7 +1420,7 @@ endpoint_state& gossiper::my_endpoint_state() {
|
||||
auto ep = get_broadcast_address();
|
||||
auto it = _endpoint_state_map.find(id);
|
||||
if (it == _endpoint_state_map.end()) {
|
||||
it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr(endpoint_state{ep})).first;
|
||||
it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr({ep})).first;
|
||||
}
|
||||
return const_cast<endpoint_state&>(*it->second);
|
||||
}
|
||||
@@ -1603,8 +1634,9 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
}
|
||||
|
||||
// Do not mark a node with status shutdown as UP.
|
||||
if (is_shutdown(*es)) {
|
||||
logger.warn("Skip marking node {} with status = shutdown as UP", host_id);
|
||||
auto status = sstring(get_gossip_status(*es));
|
||||
if (status == sstring(versioned_value::SHUTDOWN)) {
|
||||
logger.warn("Skip marking node {} with status = {} as UP", host_id, status);
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -1617,6 +1649,7 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
auto [it_, inserted] = data.live.insert(addr);
|
||||
was_live = !inserted;
|
||||
});
|
||||
_expire_time_endpoint_map.erase(host_id);
|
||||
if (was_live) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1629,7 +1662,7 @@ future<> gossiper::real_mark_alive(locator::host_id host_id) {
|
||||
|
||||
auto addr = es->get_ip();
|
||||
|
||||
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, get_node_status(host_id));
|
||||
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, status);
|
||||
|
||||
co_await _subscribers.for_each([addr, host_id, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
|
||||
co_await subscriber->on_alive(addr, host_id, es, pid);
|
||||
@@ -1645,7 +1678,7 @@ future<> gossiper::mark_dead(locator::host_id addr, endpoint_state_ptr state, pe
|
||||
data.live.erase(addr);
|
||||
data.unreachable[addr] = now();
|
||||
});
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_node_status(addr));
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state));
|
||||
co_await do_on_dead_notifications(state->get_ip(), std::move(state), pid);
|
||||
}
|
||||
|
||||
@@ -1655,14 +1688,14 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
|
||||
endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);
|
||||
|
||||
if (!is_left(eps) && !shadow_round) {
|
||||
if (!is_dead_state(eps) && !shadow_round) {
|
||||
if (_endpoint_state_map.contains(ep)) {
|
||||
logger.info("Node {} has restarted, now UP, status = {}", ep, get_node_status(ep));
|
||||
logger.info("Node {} has restarted, now UP, status = {}", ep, get_gossip_status(eps));
|
||||
} else {
|
||||
logger.debug("Node {} is now part of the cluster, status = {}", ep, get_node_status(ep));
|
||||
logger.debug("Node {} is now part of the cluster, status = {}", ep, get_gossip_status(eps));
|
||||
}
|
||||
}
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_node_status(ep));
|
||||
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
|
||||
co_await replicate(eps, pid);
|
||||
|
||||
if (shadow_round) {
|
||||
@@ -1680,10 +1713,10 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
if (!ep_state) {
|
||||
throw std::out_of_range(format("ep={}", ep));
|
||||
}
|
||||
if (!is_left(*ep_state)) {
|
||||
if (!is_dead_state(*ep_state)) {
|
||||
mark_alive(ep_state);
|
||||
} else {
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_node_status(ep));
|
||||
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
|
||||
co_await mark_dead(ep, ep_state, pid);
|
||||
}
|
||||
|
||||
@@ -1697,8 +1730,8 @@ future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid,
|
||||
}
|
||||
}
|
||||
|
||||
bool gossiper::is_left(const endpoint_state& eps) const {
|
||||
return _topo_sm._topology.left_nodes.contains(raft::server_id(eps.get_host_id().uuid()));
|
||||
bool gossiper::is_dead_state(const endpoint_state& eps) const {
|
||||
return std::ranges::any_of(DEAD_STATES, [state = get_gossip_status(eps)](const auto& deadstate) { return state == deadstate; });
|
||||
}
|
||||
|
||||
bool gossiper::is_shutdown(const locator::host_id& endpoint) const {
|
||||
@@ -1713,6 +1746,10 @@ bool gossiper::is_normal(const locator::host_id& endpoint) const {
|
||||
return get_gossip_status(endpoint) == versioned_value::STATUS_NORMAL;
|
||||
}
|
||||
|
||||
bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
|
||||
return std::ranges::any_of(SILENT_SHUTDOWN_STATES, [state = get_gossip_status(ep_state)](const auto& deadstate) { return state == deadstate; });
|
||||
}
|
||||
|
||||
future<> gossiper::apply_new_states(endpoint_state local_state, const endpoint_state& remote_state, permit_id pid, bool shadow_round) {
|
||||
// don't SCYLLA_ASSERT here, since if the node restarts the version will go back to zero
|
||||
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
|
||||
@@ -2136,14 +2173,16 @@ future<> gossiper::do_stop_gossiping() {
|
||||
logger.info("gossip is already stopped");
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto my_ep_state = get_this_endpoint_state_ptr();
|
||||
if (my_ep_state && _topo_sm._topology.normal_nodes.contains(raft::server_id(my_host_id().uuid()))) {
|
||||
if (my_ep_state) {
|
||||
logger.info("My status = {}", get_gossip_status(*my_ep_state));
|
||||
}
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
auto local_generation = my_ep_state->get_heart_beat_state().get_generation();
|
||||
logger.info("Announcing shutdown");
|
||||
co_await add_local_application_state(application_state::STATUS, versioned_value::shutdown(true));
|
||||
auto live_endpoints = _live_endpoints;
|
||||
co_await coroutine::parallel_for_each(live_endpoints, [this, &local_generation] (locator::host_id id) -> future<> {
|
||||
for (locator::host_id id : live_endpoints) {
|
||||
logger.info("Sending a GossipShutdown to {} with generation {}", id, local_generation);
|
||||
try {
|
||||
co_await ser::gossip_rpc_verbs::send_gossip_shutdown(&_messaging, id, get_broadcast_address(), local_generation.value());
|
||||
@@ -2151,7 +2190,7 @@ future<> gossiper::do_stop_gossiping() {
|
||||
} catch (...) {
|
||||
logger.warn("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
|
||||
}
|
||||
});
|
||||
}
|
||||
co_await sleep(std::chrono::milliseconds(_gcfg.shutdown_announce_ms));
|
||||
} else {
|
||||
logger.warn("No local state or state is in silent shutdown, not announcing shutdown");
|
||||
@@ -2202,6 +2241,19 @@ bool gossiper::is_enabled() const {
|
||||
return _enabled && !_abort_source.abort_requested();
|
||||
}
|
||||
|
||||
void gossiper::add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time) {
|
||||
auto now_ = now();
|
||||
auto diff = std::chrono::duration_cast<std::chrono::seconds>(expire_time - now_).count();
|
||||
logger.info("Node {} will be removed from gossip at [{:%Y-%m-%d %T %z}]: (expire = {}, now = {}, diff = {} seconds)",
|
||||
endpoint, fmt::gmtime(clk::to_time_t(expire_time)), expire_time.time_since_epoch().count(),
|
||||
now_.time_since_epoch().count(), diff);
|
||||
_expire_time_endpoint_map[endpoint] = expire_time;
|
||||
}
|
||||
|
||||
clk::time_point gossiper::compute_expire_time() {
|
||||
return now() + A_VERY_LONG_TIME;
|
||||
}
|
||||
|
||||
bool gossiper::is_alive(locator::host_id id) const {
|
||||
if (id == my_host_id()) {
|
||||
return true;
|
||||
@@ -2321,22 +2373,91 @@ std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) c
|
||||
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
|
||||
}
|
||||
|
||||
std::string gossiper::get_node_status(const locator::host_id& endpoint) const noexcept {
|
||||
if (this_shard_id() != 0) {
|
||||
on_internal_error(logger, "get_node_status should only be called on shard 0");
|
||||
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
|
||||
// We allow to bootstrap a new node in only two cases:
|
||||
// 1) The node is a completely new node and no state in gossip at all
|
||||
// 2) The node has state in gossip and it is already removed from the
|
||||
// cluster either by nodetool decommission or nodetool removenode
|
||||
bool allowed = true;
|
||||
auto host_id = try_get_host_id(endpoint);
|
||||
if (!host_id) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
}
|
||||
if (is_shutdown(endpoint)) {
|
||||
return "shutdown";
|
||||
auto eps = get_endpoint_state_ptr(*host_id);
|
||||
if (!eps) {
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
|
||||
return allowed;
|
||||
}
|
||||
auto n = _topo_sm._topology.find(raft::server_id{endpoint.uuid()});
|
||||
if (!n) {
|
||||
if (_topo_sm._topology.left_nodes.contains(raft::server_id{endpoint.uuid()})) {
|
||||
return "left";
|
||||
auto status = get_gossip_status(*eps);
|
||||
std::unordered_set<std::string_view> allowed_statuses{
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
};
|
||||
allowed = allowed_statuses.contains(status);
|
||||
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
|
||||
return allowed;
|
||||
}
|
||||
|
||||
std::set<sstring> gossiper::get_supported_features(locator::host_id endpoint) const {
|
||||
auto app_state = get_application_state_ptr(endpoint, application_state::SUPPORTED_FEATURES);
|
||||
if (!app_state) {
|
||||
return {};
|
||||
}
|
||||
return feature_service::to_feature_set(app_state->value());
|
||||
}
|
||||
|
||||
std::set<sstring> gossiper::get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const {
|
||||
std::unordered_map<locator::host_id, std::set<sstring>> features_map;
|
||||
std::set<sstring> common_features;
|
||||
|
||||
for (auto& x : loaded_peer_features) {
|
||||
auto features = feature_service::to_feature_set(x.second);
|
||||
if (features.empty()) {
|
||||
logger.warn("Loaded empty features for peer node {}", x.first);
|
||||
} else {
|
||||
features_map.emplace(x.first, std::move(features));
|
||||
}
|
||||
return "unknown";
|
||||
} else {
|
||||
return fmt::format("{}", n->second.state);
|
||||
}
|
||||
|
||||
for (auto& x : _endpoint_state_map) {
|
||||
auto host_id = x.second->get_host_id();
|
||||
auto features = get_supported_features(host_id);
|
||||
if (ignore_local_node && host_id == my_host_id()) {
|
||||
logger.debug("Ignore SUPPORTED_FEATURES of local node: features={}", features);
|
||||
continue;
|
||||
}
|
||||
if (features.empty()) {
|
||||
auto it = loaded_peer_features.find(host_id);
|
||||
if (it != loaded_peer_features.end()) {
|
||||
logger.info("Node {} does not contain SUPPORTED_FEATURES in gossip, using features saved in system table, features={}", host_id, feature_service::to_feature_set(it->second));
|
||||
} else {
|
||||
logger.warn("Node {} does not contain SUPPORTED_FEATURES in gossip or system table", host_id);
|
||||
}
|
||||
} else {
|
||||
// Replace the features with live info
|
||||
features_map[host_id] = std::move(features);
|
||||
}
|
||||
}
|
||||
|
||||
if (ignore_local_node) {
|
||||
features_map.erase(my_host_id());
|
||||
}
|
||||
|
||||
if (!features_map.empty()) {
|
||||
common_features = features_map.begin()->second;
|
||||
}
|
||||
|
||||
for (auto& x : features_map) {
|
||||
auto& features = x.second;
|
||||
std::set<sstring> result;
|
||||
std::set_intersection(features.begin(), features.end(),
|
||||
common_features.begin(), common_features.end(),
|
||||
std::inserter(result, result.end()));
|
||||
common_features = std::move(result);
|
||||
}
|
||||
common_features.erase("");
|
||||
return common_features;
|
||||
}
|
||||
|
||||
void gossiper::check_snitch_name_matches(sstring local_snitch_name) const {
|
||||
|
||||
@@ -91,6 +91,7 @@ struct loaded_endpoint_state {
|
||||
class gossiper : public seastar::async_sharded_service<gossiper>, public seastar::peering_sharded_service<gossiper> {
|
||||
public:
|
||||
using clk = seastar::lowres_system_clock;
|
||||
using ignore_features_of_local_node = bool_class<class ignore_features_of_local_node_tag>;
|
||||
using generation_for_nodes = std::unordered_map<locator::host_id, generation_type>;
|
||||
private:
|
||||
using messaging_verb = netw::messaging_verb;
|
||||
@@ -197,7 +198,18 @@ private:
|
||||
endpoint_locks_map _endpoint_locks;
|
||||
|
||||
public:
|
||||
static constexpr std::array DEAD_STATES{
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
};
|
||||
static constexpr std::array SILENT_SHUTDOWN_STATES{
|
||||
versioned_value::REMOVED_TOKEN,
|
||||
versioned_value::STATUS_LEFT,
|
||||
versioned_value::STATUS_BOOTSTRAPPING,
|
||||
versioned_value::STATUS_UNKNOWN,
|
||||
};
|
||||
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
||||
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
||||
|
||||
// Maximum difference between remote generation value and generation
|
||||
// value this node would get if this node were restarted that we are
|
||||
@@ -229,6 +241,7 @@ private:
|
||||
/* initial seeds for joining the cluster */
|
||||
std::set<inet_address> _seeds;
|
||||
|
||||
std::map<locator::host_id, clk::time_point> _expire_time_endpoint_map;
|
||||
|
||||
bool _in_shadow_round = false;
|
||||
|
||||
@@ -328,6 +341,13 @@ private:
|
||||
utils::chunked_vector<gossip_digest> make_random_gossip_digest() const;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
|
||||
*
|
||||
* @param endpoint
|
||||
* @param host_id
|
||||
*/
|
||||
future<> advertise_token_removed(locator::host_id host_id, permit_id);
|
||||
|
||||
/**
|
||||
* Do not call this method unless you know what you are doing.
|
||||
@@ -343,6 +363,7 @@ public:
|
||||
future<generation_type> get_current_generation_number(locator::host_id endpoint) const;
|
||||
future<version_type> get_current_heart_beat_version(locator::host_id endpoint) const;
|
||||
|
||||
bool is_safe_for_bootstrap(inet_address endpoint) const;
|
||||
private:
|
||||
/**
|
||||
* Returns true if the chosen target was also a seed. False otherwise
|
||||
@@ -362,6 +383,7 @@ private:
|
||||
future<> do_gossip_to_unreachable_member(gossip_digest_syn message);
|
||||
|
||||
public:
|
||||
clk::time_point get_expire_time_for_endpoint(locator::host_id endpoint) const noexcept;
|
||||
|
||||
// Gets a shared pointer to the endpoint_state, if exists.
|
||||
// Otherwise, returns a null ptr.
|
||||
@@ -445,7 +467,7 @@ private:
|
||||
public:
|
||||
bool is_alive(locator::host_id id) const;
|
||||
|
||||
bool is_left(const endpoint_state& eps) const;
|
||||
bool is_dead_state(const endpoint_state& eps) const;
|
||||
// Wait for nodes to be alive on all shards
|
||||
future<> wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout);
|
||||
future<> wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout);
|
||||
@@ -566,12 +588,17 @@ public:
|
||||
public:
|
||||
bool is_enabled() const;
|
||||
|
||||
public:
|
||||
void add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time);
|
||||
|
||||
static clk::time_point compute_expire_time();
|
||||
public:
|
||||
bool is_seed(const inet_address& endpoint) const;
|
||||
bool is_shutdown(const locator::host_id& endpoint) const;
|
||||
bool is_shutdown(const endpoint_state& eps) const;
|
||||
bool is_normal(const locator::host_id& endpoint) const;
|
||||
bool is_cql_ready(const locator::host_id& endpoint) const;
|
||||
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
|
||||
void force_newer_generation();
|
||||
public:
|
||||
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
|
||||
@@ -588,8 +615,12 @@ private:
|
||||
gossip_address_map& _address_map;
|
||||
gossip_config _gcfg;
|
||||
condition_variable _failure_detector_loop_cv;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(locator::host_id endpoint) const;
|
||||
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
||||
std::string get_node_status(const locator::host_id& endpoint) const noexcept;
|
||||
public:
|
||||
// Get features supported by all the nodes this node knows about
|
||||
std::set<sstring> get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const;
|
||||
private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
public:
|
||||
|
||||
@@ -10,6 +10,10 @@
|
||||
#include "gms/versioned_value.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <charconv>
|
||||
|
||||
namespace gms {
|
||||
|
||||
static_assert(std::is_nothrow_default_constructible_v<versioned_value>);
|
||||
@@ -19,6 +23,11 @@ versioned_value versioned_value::network_version() {
|
||||
return versioned_value(format("{}", netw::messaging_service::current_version));
|
||||
}
|
||||
|
||||
sstring versioned_value::make_full_token_string(const std::unordered_set<dht::token>& tokens) {
|
||||
return fmt::to_string(fmt::join(tokens | std::views::transform([] (const dht::token& t) {
|
||||
return t.to_sstring(); }), ";"));
|
||||
}
|
||||
|
||||
sstring versioned_value::make_token_string(const std::unordered_set<dht::token>& tokens) {
|
||||
if (tokens.empty()) {
|
||||
return "";
|
||||
@@ -26,4 +35,16 @@ sstring versioned_value::make_token_string(const std::unordered_set<dht::token>&
|
||||
return tokens.begin()->to_sstring();
|
||||
}
|
||||
|
||||
} // namespace gms
|
||||
std::unordered_set<dht::token> versioned_value::tokens_from_string(const sstring& s) {
|
||||
if (s.size() == 0) {
|
||||
return {}; // boost::split produces one element for empty string
|
||||
}
|
||||
std::vector<sstring> tokens;
|
||||
boost::split(tokens, s, boost::is_any_of(";"));
|
||||
std::unordered_set<dht::token> ret;
|
||||
for (auto str : tokens) {
|
||||
ret.emplace(dht::token::from_sstring(str));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/state_id.hh"
|
||||
#include "version.hh"
|
||||
#include "cdc/generation_id.hh"
|
||||
#include <set>
|
||||
#include <unordered_set>
|
||||
|
||||
@@ -44,7 +45,11 @@ public:
|
||||
|
||||
// values for ApplicationState.STATUS
|
||||
static constexpr std::string_view STATUS_UNKNOWN{"UNKNOWN"};
|
||||
static constexpr std::string_view STATUS_BOOTSTRAPPING{"BOOT"};
|
||||
static constexpr std::string_view STATUS_NORMAL{"NORMAL"};
|
||||
static constexpr std::string_view STATUS_LEFT{"LEFT"};
|
||||
|
||||
static constexpr std::string_view REMOVED_TOKEN{"removed"};
|
||||
|
||||
static constexpr std::string_view SHUTDOWN{"shutdown"};
|
||||
|
||||
@@ -75,18 +80,26 @@ public:
|
||||
: _version(-1) {
|
||||
}
|
||||
|
||||
static versioned_value clone_with_higher_version(const versioned_value& value) noexcept {
|
||||
return versioned_value(value.value());
|
||||
}
|
||||
|
||||
private:
|
||||
static sstring version_string(const std::initializer_list<sstring>& args) {
|
||||
return fmt::to_string(fmt::join(args, versioned_value::DELIMITER));
|
||||
}
|
||||
|
||||
static sstring make_full_token_string(const std::unordered_set<dht::token>& tokens);
|
||||
static sstring make_token_string(const std::unordered_set<dht::token>& tokens);
|
||||
static sstring make_cdc_generation_id_string(std::optional<cdc::generation_id>);
|
||||
|
||||
// Reverse of `make_full_token_string`.
|
||||
static std::unordered_set<dht::token> tokens_from_string(const sstring&);
|
||||
|
||||
static versioned_value clone_with_higher_version(const versioned_value& value) noexcept {
|
||||
return versioned_value(value.value());
|
||||
}
|
||||
|
||||
static versioned_value bootstrapping(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_BOOTSTRAPPING),
|
||||
make_token_string(tokens)}));
|
||||
}
|
||||
|
||||
public:
|
||||
static versioned_value normal(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_NORMAL),
|
||||
make_token_string(tokens)}));
|
||||
@@ -100,10 +113,24 @@ public:
|
||||
return versioned_value(new_version.to_sstring());
|
||||
}
|
||||
|
||||
static versioned_value left(const std::unordered_set<dht::token>& tokens, int64_t expire_time) {
|
||||
return versioned_value(version_string({sstring(versioned_value::STATUS_LEFT),
|
||||
make_token_string(tokens),
|
||||
std::to_string(expire_time)}));
|
||||
}
|
||||
|
||||
static versioned_value host_id(const locator::host_id& host_id) {
|
||||
return versioned_value(host_id.to_sstring());
|
||||
}
|
||||
|
||||
static versioned_value tokens(const std::unordered_set<dht::token>& tokens) {
|
||||
return versioned_value(make_full_token_string(tokens));
|
||||
}
|
||||
|
||||
static versioned_value removed_nonlocal(const locator::host_id& host_id, int64_t expire_time) {
|
||||
return versioned_value(sstring(REMOVED_TOKEN) + sstring(DELIMITER) + host_id.to_sstring() + sstring(DELIMITER) + to_sstring(expire_time));
|
||||
}
|
||||
|
||||
static versioned_value shutdown(bool value) {
|
||||
return versioned_value(sstring(SHUTDOWN) + sstring(DELIMITER) + (value ? "true" : "false"));
|
||||
}
|
||||
@@ -142,6 +169,10 @@ public:
|
||||
return versioned_value(private_ip);
|
||||
}
|
||||
|
||||
static versioned_value severity(double value) {
|
||||
return versioned_value(to_sstring(value));
|
||||
}
|
||||
|
||||
static versioned_value supported_features(const std::set<std::string_view>& features) {
|
||||
return versioned_value(fmt::to_string(fmt::join(features, ",")));
|
||||
}
|
||||
|
||||
@@ -202,10 +202,6 @@ std::optional<sstring> secondary_index_manager::custom_index_class(const schema&
|
||||
|
||||
// This function returns a factory, as the custom index class should be lightweight, preferably not holding any state.
|
||||
// We prefer this over a static custom index class instance, as it allows us to avoid any issues with thread safety.
|
||||
//
|
||||
// Note: SAI class names (StorageAttachedIndex, sai) are not listed here
|
||||
// because maybe_rewrite_sai_to_vector_index() in create_index_statement.cc
|
||||
// rewrites them to "vector_index" before the index metadata is persisted.
|
||||
std::optional<std::function<std::unique_ptr<custom_index>()>> secondary_index_manager::get_custom_class_factory(const sstring& class_name) {
|
||||
sstring lower_class_name = class_name;
|
||||
std::transform(lower_class_name.begin(), lower_class_name.end(), lower_class_name.begin(), ::tolower);
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include <ranges>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
@@ -104,126 +103,19 @@ const static std::unordered_map<sstring, std::function<void(const sstring&, cons
|
||||
{"oversampling", std::bind_front(validate_factor_option, 1.0f, 100.0f)},
|
||||
// 'rescoring' enables recalculating of similarity scores of candidates retrieved from vector store when quantization is used.
|
||||
{"rescoring", std::bind_front(validate_enumerated_option, boolean_values)},
|
||||
// 'source_model' is a Cassandra SAI option specifying the embedding model name.
|
||||
// Used by Cassandra libraries (e.g., CassIO) to tag indexes with the model that produced the vectors.
|
||||
// Accepted for compatibility but not used by ScyllaDB.
|
||||
{"source_model", [](const sstring&, const sstring&) { /* accepted for Cassandra compatibility */ }},
|
||||
};
|
||||
|
||||
static constexpr auto TC_TARGET_KEY = "tc";
|
||||
static constexpr auto PK_TARGET_KEY = "pk";
|
||||
static constexpr auto FC_TARGET_KEY = "fc";
|
||||
|
||||
// Convert a serialized targets string (as produced by serialize_targets())
|
||||
// back into the CQL column list used inside CREATE INDEX ... ON table(<here>).
|
||||
//
|
||||
// JSON examples:
|
||||
// {"tc":"v","fc":["f1","f2"]} -> "v, f1, f2"
|
||||
// {"tc":"v","pk":["p1","p2"]} -> "(p1, p2), v"
|
||||
// {"tc":"v","pk":["p1","p2"],"fc":["f1"]} -> "(p1, p2), v, f1"
|
||||
static sstring targets_to_cql(const sstring& targets) {
|
||||
sstring get_vector_index_target_column(const sstring& targets) {
|
||||
std::optional<rjson::value> json_value = rjson::try_parse(targets);
|
||||
if (!json_value || !json_value->IsObject()) {
|
||||
return cql3::util::maybe_quote(cql3::statements::index_target::column_name_from_target_string(targets));
|
||||
return target_parser::get_target_column_name_from_string(targets);
|
||||
}
|
||||
|
||||
sstring result;
|
||||
|
||||
const rjson::value* pk = rjson::find(*json_value, PK_TARGET_KEY);
|
||||
rjson::value* pk = rjson::find(*json_value, "pk");
|
||||
if (pk && pk->IsArray() && !pk->Empty()) {
|
||||
result += "(";
|
||||
auto pk_cols = std::views::all(pk->GetArray()) | std::views::transform([&](const rjson::value& col) {
|
||||
return cql3::util::maybe_quote(sstring(rjson::to_string_view(col)));
|
||||
}) | std::ranges::to<std::vector<sstring>>();
|
||||
result += boost::algorithm::join(pk_cols, ", ");
|
||||
result += "), ";
|
||||
return sstring(rjson::to_string_view(pk->GetArray()[0]));
|
||||
}
|
||||
|
||||
const rjson::value* tc = rjson::find(*json_value, TC_TARGET_KEY);
|
||||
if (tc && tc->IsString()) {
|
||||
result += cql3::util::maybe_quote(sstring(rjson::to_string_view(*tc)));
|
||||
}
|
||||
|
||||
const rjson::value* fc = rjson::find(*json_value, FC_TARGET_KEY);
|
||||
if (fc && fc->IsArray()) {
|
||||
for (rapidjson::SizeType i = 0; i < fc->Size(); ++i) {
|
||||
result += ", ";
|
||||
result += cql3::util::maybe_quote(sstring(rjson::to_string_view((*fc)[i])));
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Serialize vector index targets into a format using:
|
||||
// "tc" for the target (vector) column,
|
||||
// "pk" for partition key columns (local index),
|
||||
// "fc" for filtering columns.
|
||||
// For a simple single-column vector index, returns just the column name.
|
||||
// Examples:
|
||||
// (v) -> "v"
|
||||
// (v, f1, f2) -> {"tc":"v","fc":["f1","f2"]}
|
||||
// ((p1, p2), v) -> {"tc":"v","pk":["p1","p2"]}
|
||||
// ((p1, p2), v, f1, f2) -> {"tc":"v","pk":["p1","p2"],"fc":["f1","f2"]}
|
||||
sstring vector_index::serialize_targets(const std::vector<::shared_ptr<cql3::statements::index_target>>& targets) {
|
||||
using cql3::statements::index_target;
|
||||
|
||||
if (targets.size() == 0) {
|
||||
throw exceptions::invalid_request_exception("Vector index must have at least one target column");
|
||||
}
|
||||
|
||||
if (targets.size() == 1) {
|
||||
auto tc = targets[0]->value;
|
||||
if (!std::holds_alternative<index_target::single_column>(tc)) {
|
||||
throw exceptions::invalid_request_exception("Missing vector column target for local vector index");
|
||||
}
|
||||
return index_target::escape_target_column(*std::get<index_target::single_column>(tc));
|
||||
}
|
||||
|
||||
const bool has_pk = std::holds_alternative<index_target::multiple_columns>(targets.front()->value);
|
||||
const size_t tc_idx = has_pk ? 1 : 0;
|
||||
const size_t fc_count = targets.size() - tc_idx - 1;
|
||||
|
||||
if (!std::holds_alternative<index_target::single_column>(targets[tc_idx]->value)) {
|
||||
throw exceptions::invalid_request_exception("Vector index target column must be a single column");
|
||||
}
|
||||
|
||||
rjson::value json_map = rjson::empty_object();
|
||||
rjson::add_with_string_name(json_map, TC_TARGET_KEY, rjson::from_string(std::get<index_target::single_column>(targets[tc_idx]->value)->text()));
|
||||
|
||||
if (has_pk) {
|
||||
rjson::value pk_json = rjson::empty_array();
|
||||
for (const auto& col : std::get<index_target::multiple_columns>(targets.front()->value)) {
|
||||
rjson::push_back(pk_json, rjson::from_string(col->text()));
|
||||
}
|
||||
rjson::add_with_string_name(json_map, PK_TARGET_KEY, std::move(pk_json));
|
||||
}
|
||||
|
||||
if (fc_count > 0) {
|
||||
rjson::value fc_json = rjson::empty_array();
|
||||
for (size_t i = tc_idx + 1; i < targets.size(); ++i) {
|
||||
if (!std::holds_alternative<index_target::single_column>(targets[i]->value)) {
|
||||
throw exceptions::invalid_request_exception("Vector index filtering column must be a single column");
|
||||
}
|
||||
rjson::push_back(fc_json, rjson::from_string(std::get<index_target::single_column>(targets[i]->value)->text()));
|
||||
}
|
||||
rjson::add_with_string_name(json_map, FC_TARGET_KEY, std::move(fc_json));
|
||||
}
|
||||
|
||||
return rjson::print(json_map);
|
||||
}
|
||||
|
||||
sstring vector_index::get_target_column(const sstring& targets) {
|
||||
std::optional<rjson::value> json_value = rjson::try_parse(targets);
|
||||
if (!json_value || !json_value->IsObject()) {
|
||||
return cql3::statements::index_target::column_name_from_target_string(targets);
|
||||
}
|
||||
|
||||
rjson::value* tc = rjson::find(*json_value, TC_TARGET_KEY);
|
||||
if (tc && tc->IsString()) {
|
||||
return sstring(rjson::to_string_view(*tc));
|
||||
}
|
||||
return cql3::statements::index_target::column_name_from_target_string(targets);
|
||||
return target_parser::get_target_column_name_from_string(targets);
|
||||
}
|
||||
|
||||
bool vector_index::is_rescoring_enabled(const index_options_map& properties) {
|
||||
@@ -254,37 +146,12 @@ bool vector_index::view_should_exist() const {
|
||||
}
|
||||
|
||||
std::optional<cql3::description> vector_index::describe(const index_metadata& im, const schema& base_schema) const {
|
||||
static const std::unordered_set<sstring> system_options = {
|
||||
cql3::statements::index_target::target_option_name,
|
||||
db::index::secondary_index::custom_class_option_name,
|
||||
db::index::secondary_index::index_version_option_name,
|
||||
};
|
||||
|
||||
fragmented_ostringstream os;
|
||||
os << "CREATE CUSTOM INDEX " << cql3::util::maybe_quote(im.name()) << " ON " << cql3::util::maybe_quote(base_schema.ks_name()) << "."
|
||||
<< cql3::util::maybe_quote(base_schema.cf_name()) << "(" << targets_to_cql(im.options().at(cql3::statements::index_target::target_option_name)) << ")"
|
||||
os << "CREATE CUSTOM INDEX " << cql3::util::maybe_quote(im.name()) << " ON "
|
||||
<< cql3::util::maybe_quote(base_schema.ks_name()) << "." << cql3::util::maybe_quote(base_schema.cf_name())
|
||||
<< "(" << cql3::util::maybe_quote(im.options().at(cql3::statements::index_target::target_option_name)) << ")"
|
||||
<< " USING 'vector_index'";
|
||||
|
||||
// Collect user-provided options (excluding system keys like target, class_name, index_version).
|
||||
std::map<sstring, sstring> user_options;
|
||||
for (const auto& [key, value] : im.options()) {
|
||||
if (!system_options.contains(key)) {
|
||||
user_options.emplace(key, value);
|
||||
}
|
||||
}
|
||||
if (!user_options.empty()) {
|
||||
os << " WITH OPTIONS = {";
|
||||
bool first = true;
|
||||
for (const auto& [key, value] : user_options) {
|
||||
if (!first) {
|
||||
os << ", ";
|
||||
}
|
||||
os << "'" << key << "': '" << value << "'";
|
||||
first = false;
|
||||
}
|
||||
os << "}";
|
||||
}
|
||||
|
||||
return cql3::description{
|
||||
.keyspace = base_schema.ks_name(),
|
||||
.type = "index",
|
||||
@@ -479,7 +346,7 @@ bool vector_index::is_vector_index_on_column(const index_metadata& im, const sst
|
||||
auto target_it = im.options().find(cql3_parser::index_target::target_option_name);
|
||||
if (class_it != im.options().end() && target_it != im.options().end()) {
|
||||
auto custom_class = secondary_index_manager::get_custom_class_factory(class_it->second);
|
||||
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && get_target_column(target_it->second) == target_name;
|
||||
return custom_class && dynamic_cast<vector_index*>((*custom_class)().get()) && get_vector_index_target_column(target_it->second) == target_name;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -37,9 +37,6 @@ public:
|
||||
static bool is_vector_index_on_column(const index_metadata& im, const sstring& target_name);
|
||||
static void check_cdc_options(const schema& schema);
|
||||
|
||||
static sstring serialize_targets(const std::vector<::shared_ptr<cql3::statements::index_target>>& targets);
|
||||
static sstring get_target_column(const sstring& targets);
|
||||
|
||||
static bool is_rescoring_enabled(const index_options_map& properties);
|
||||
static float get_oversampling(const index_options_map& properties);
|
||||
static sstring get_cql_similarity_function_name(const index_options_map& properties);
|
||||
|
||||
@@ -1269,7 +1269,7 @@ private:
|
||||
return info->next;
|
||||
}
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid write replica selector: {}", static_cast<int>(info->writes)));
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
|
||||
});
|
||||
tablet_logger.trace("get_replicas_for_write({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
return replicas;
|
||||
@@ -1296,7 +1296,7 @@ private:
|
||||
case write_replica_set_selector::next:
|
||||
return {};
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid write replica selector: {}", static_cast<int>(info->writes)));
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
|
||||
}
|
||||
|
||||
host_id_vector_replica_set get_for_reading_helper(const token& search_token) const {
|
||||
@@ -1314,7 +1314,7 @@ private:
|
||||
return info->next;
|
||||
}
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid read replica selector: {}", static_cast<int>(info->reads)));
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->reads)));
|
||||
});
|
||||
tablet_logger.trace("get_endpoints_for_reading({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
return to_host_set(replicas);
|
||||
|
||||
@@ -367,8 +367,6 @@ future<> token_metadata_impl::clear_gently() noexcept {
|
||||
co_await utils::clear_gently(_sorted_tokens);
|
||||
co_await _topology.clear_gently();
|
||||
co_await _tablets.clear_gently();
|
||||
co_await utils::clear_gently(_topology_change_info);
|
||||
_topology_change_info.reset();
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
@@ -490,10 +490,6 @@ const endpoint_dc_rack& topology::get_location_slow(host_id id) const {
|
||||
throw std::runtime_error(format("Requested location for node {} not in topology. backtrace {}", id, lazy_backtrace()));
|
||||
}
|
||||
|
||||
utils::UUID topology::get_rack_uuid() const {
|
||||
return utils::UUID_gen::get_name_UUID(format("{}:{}", get_location().dc, get_location().rack));
|
||||
}
|
||||
|
||||
void topology::sort_by_proximity(locator::host_id address, host_id_vector_replica_set& addresses) const {
|
||||
if (can_sort_by_proximity()) {
|
||||
do_sort_by_proximity(address, addresses);
|
||||
|
||||
@@ -344,8 +344,6 @@ public:
|
||||
return get_location(id).rack;
|
||||
}
|
||||
|
||||
utils::UUID get_rack_uuid() const;
|
||||
|
||||
auto get_local_dc_filter() const noexcept {
|
||||
return [ this, local_dc = get_datacenter() ] (auto ep) {
|
||||
return get_datacenter(ep) == local_dc;
|
||||
|
||||
12
main.cc
12
main.cc
@@ -1807,17 +1807,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
checkpoint(stop_signal, "starting repair service");
|
||||
auto max_memory_repair = memory::stats().total_memory() * 0.1;
|
||||
auto repair_config = sharded_parameter([&] {
|
||||
return repair_service::config{
|
||||
.enable_small_table_optimization_for_rbno = cfg->enable_small_table_optimization_for_rbno,
|
||||
.repair_hints_batchlog_flush_cache_time_in_ms = cfg->repair_hints_batchlog_flush_cache_time_in_ms,
|
||||
.repair_partition_count_estimation_ratio = cfg->repair_partition_count_estimation_ratio,
|
||||
.critical_disk_utilization_level = cfg->critical_disk_utilization_level,
|
||||
.repair_multishard_reader_buffer_hint_size = cfg->repair_multishard_reader_buffer_hint_size,
|
||||
.repair_multishard_reader_enable_read_ahead = cfg->repair_multishard_reader_enable_read_ahead,
|
||||
};
|
||||
});
|
||||
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair, std::move(repair_config)).get();
|
||||
repair.start(std::ref(tsm), std::ref(gossiper), std::ref(messaging), std::ref(db), std::ref(proxy), std::ref(bm), std::ref(sys_ks), std::ref(view_builder), std::ref(view_building_worker), std::ref(task_manager), std::ref(mm), max_memory_repair).get();
|
||||
auto stop_repair_service = defer_verbose_shutdown("repair service", [&repair] {
|
||||
repair.stop().get();
|
||||
});
|
||||
|
||||
@@ -175,9 +175,11 @@ std::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, ato
|
||||
}
|
||||
|
||||
|
||||
void transform_counter_updates_to_shards(mutation& m, const mutation* current_state, uint64_t clock_offset, counter_id local_id) {
|
||||
void transform_counter_updates_to_shards(mutation& m, const mutation* current_state, uint64_t clock_offset, locator::host_id local_host_id) {
|
||||
// FIXME: allow current_state to be frozen_mutation
|
||||
|
||||
utils::UUID local_id = local_host_id.uuid();
|
||||
|
||||
auto transform_new_row_to_shards = [&s = *m.schema(), clock_offset, local_id] (column_kind kind, auto& cells) {
|
||||
cells.for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) {
|
||||
auto& cdef = s.column_at(kind, id);
|
||||
@@ -186,7 +188,7 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st
|
||||
return; // continue -- we are in lambda
|
||||
}
|
||||
auto delta = acv.counter_update_value();
|
||||
auto cs = counter_shard(local_id, delta, clock_offset + 1);
|
||||
auto cs = counter_shard(counter_id(local_id), delta, clock_offset + 1);
|
||||
ac_o_c = counter_cell_builder::from_single_shard(acv.timestamp(), cs);
|
||||
});
|
||||
};
|
||||
@@ -210,7 +212,7 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st
|
||||
return; // continue -- we are in lambda
|
||||
}
|
||||
auto ccv = counter_cell_view(acv);
|
||||
auto cs = ccv.get_shard(local_id);
|
||||
auto cs = ccv.get_shard(counter_id(local_id));
|
||||
if (!cs) {
|
||||
return; // continue
|
||||
}
|
||||
@@ -230,7 +232,7 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st
|
||||
auto delta = acv.counter_update_value();
|
||||
|
||||
if (shards.empty() || shards.front().first > id) {
|
||||
auto cs = counter_shard(local_id, delta, clock_offset + 1);
|
||||
auto cs = counter_shard(counter_id(local_id), delta, clock_offset + 1);
|
||||
ac_o_c = counter_cell_builder::from_single_shard(acv.timestamp(), cs);
|
||||
} else {
|
||||
auto& cs = shards.front().second;
|
||||
|
||||
@@ -370,7 +370,7 @@ struct counter_cell_mutable_view : basic_counter_cell_view<mutable_view::yes> {
|
||||
// Transforms mutation dst from counter updates to counter shards using state
|
||||
// stored in current_state.
|
||||
// If current_state is present it has to be in the same schema as dst.
|
||||
void transform_counter_updates_to_shards(mutation& dst, const mutation* current_state, uint64_t clock_offset, counter_id local_id);
|
||||
void transform_counter_updates_to_shards(mutation& dst, const mutation* current_state, uint64_t clock_offset, locator::host_id local_id);
|
||||
|
||||
template<>
|
||||
struct appending_hash<counter_shard_view> {
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:54662978b9ce4a6e25790b1b0a5099e6063173ffa95a399a6287cf474376ed09
|
||||
size 6595952
|
||||
oid sha256:e59fe56eac435fd03c2f0d7dfc11c6998d7c0750e1851535575497dd13d96015
|
||||
size 6505524
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:0cf44ea1fb2ae20de45d26fe8095054e60cb8700cddcb2fd79ef79705484b18a
|
||||
size 6603780
|
||||
oid sha256:d424ce6cc7f65338c34dd35881d23f5ad3425651d66e47dc2c3a20dc798848d4
|
||||
size 6598648
|
||||
|
||||
23
raft/fsm.cc
23
raft/fsm.cc
@@ -1098,8 +1098,7 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
|
||||
// Make sure that only a leader or a node that is part of the config can request read barrier
|
||||
// Nodes outside of the config may never get the data, so they will not be able to read it.
|
||||
follower_progress* opt_progress = leader_state().tracker.find(requester);
|
||||
if (requester != _my_id && opt_progress == nullptr) {
|
||||
if (requester != _my_id && leader_state().tracker.find(requester) == nullptr) {
|
||||
throw std::runtime_error(fmt::format("Read barrier requested by a node outside of the configuration {}", requester));
|
||||
}
|
||||
|
||||
@@ -1110,23 +1109,19 @@ std::optional<std::pair<read_id, index_t>> fsm::start_read_barrier(server_id req
|
||||
return {};
|
||||
}
|
||||
|
||||
// Optimization for read barriers requested on non-voters. A non-voter doesn't receive the read_quorum message, so
|
||||
// it might update its commit index only after another leader tick, which would slow down wait_for_apply() at the
|
||||
// end of the read barrier. Prevent that by replicating to the non-voting requester here.
|
||||
if (requester != _my_id && opt_progress->commit_idx < _commit_idx && opt_progress->match_idx == _log.last_idx()
|
||||
&& !opt_progress->can_vote) {
|
||||
logger.trace("start_read_barrier[{}]: replicate to {} because follower commit_idx={} < commit_idx={}, "
|
||||
"follower match_idx={} == last_idx={}, and follower can_vote={}",
|
||||
_my_id, requester, opt_progress->commit_idx, _commit_idx, opt_progress->match_idx,
|
||||
_log.last_idx(), opt_progress->can_vote);
|
||||
replicate_to(*opt_progress, true);
|
||||
}
|
||||
|
||||
read_id id = next_read_id();
|
||||
logger.trace("start_read_barrier[{}] starting read barrier with id {}", _my_id, id);
|
||||
return std::make_pair(id, _commit_idx);
|
||||
}
|
||||
|
||||
void fsm::maybe_update_commit_idx_for_read(index_t read_idx) {
|
||||
// read_idx from the leader might not be replicated to the local node yet.
|
||||
const bool in_local_log = read_idx <= _log.last_idx();
|
||||
if (in_local_log && log_term_for(read_idx) == get_current_term()) {
|
||||
advance_commit_idx(read_idx);
|
||||
}
|
||||
}
|
||||
|
||||
void fsm::stop() {
|
||||
if (is_leader()) {
|
||||
// Become follower to stop accepting requests
|
||||
|
||||
@@ -480,6 +480,15 @@ public:
|
||||
|
||||
std::optional<std::pair<read_id, index_t>> start_read_barrier(server_id requester);
|
||||
|
||||
// Update the commit index to the read index (a read barrier result from the leader) if the local entry with the
|
||||
// read index belongs to the current term.
|
||||
//
|
||||
// Satisfying the condition above guarantees that the local log matches the current leader's log up to the read
|
||||
// index (the Log Matching Property), so the current leader won't drop the local entry with the read index.
|
||||
// Moreover, this entry has been committed by the leader, so future leaders also won't drop it (the Leader
|
||||
// Completeness Property). Hence, updating the commit index is safe.
|
||||
void maybe_update_commit_idx_for_read(index_t read_idx);
|
||||
|
||||
size_t in_memory_log_size() const {
|
||||
return _log.in_memory_size();
|
||||
}
|
||||
|
||||
@@ -1571,6 +1571,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
|
||||
co_return stop_iteration::no;
|
||||
}
|
||||
read_idx = std::get<index_t>(res);
|
||||
_fsm->maybe_update_commit_idx_for_read(read_idx);
|
||||
co_return stop_iteration::yes;
|
||||
});
|
||||
|
||||
|
||||
@@ -113,6 +113,8 @@ void tracker::set_configuration(const configuration& configuration, index_t next
|
||||
}
|
||||
auto newp = this->progress::find(s.addr.id);
|
||||
if (newp != this->progress::end()) {
|
||||
// Processing joint configuration and already added
|
||||
// an entry for this id.
|
||||
continue;
|
||||
}
|
||||
auto oldp = old_progress.find(s.addr.id);
|
||||
@@ -121,7 +123,7 @@ void tracker::set_configuration(const configuration& configuration, index_t next
|
||||
} else {
|
||||
newp = this->progress::emplace(s.addr.id, follower_progress{s.addr.id, next_idx}).first;
|
||||
}
|
||||
newp->second.can_vote = configuration.can_vote(s.addr.id);
|
||||
newp->second.can_vote = s.can_vote;
|
||||
}
|
||||
};
|
||||
emplace_simple_config(configuration.current, _current_voters);
|
||||
|
||||
@@ -137,9 +137,6 @@ public:
|
||||
// unless the reader is fast-forwarded to a new range.
|
||||
bool _end_of_stream = false;
|
||||
|
||||
// Set by fill buffer for segregating output by partition range.
|
||||
std::optional<dht::partition_range> _next_range;
|
||||
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
friend class mutation_reader;
|
||||
|
||||
@@ -46,9 +46,7 @@ private:
|
||||
const dht::sharder& remote_sharder,
|
||||
unsigned remote_shard,
|
||||
gc_clock::time_point compaction_time,
|
||||
incremental_repair_meta inc,
|
||||
uint64_t multishard_reader_buffer_hint_size,
|
||||
bool multishard_reader_enable_read_ahead);
|
||||
incremental_repair_meta inc);
|
||||
|
||||
public:
|
||||
repair_reader(
|
||||
@@ -62,9 +60,7 @@ public:
|
||||
uint64_t seed,
|
||||
read_strategy strategy,
|
||||
gc_clock::time_point compaction_time,
|
||||
incremental_repair_meta inc,
|
||||
uint64_t multishard_reader_buffer_hint_size,
|
||||
bool multishard_reader_enable_read_ahead);
|
||||
incremental_repair_meta inc);
|
||||
|
||||
future<mutation_fragment_opt>
|
||||
read_mutation_fragment();
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "repair.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
@@ -136,8 +137,9 @@ std::string_view format_as(row_level_diff_detect_algorithm algo) {
|
||||
return "unknown";
|
||||
}
|
||||
|
||||
bool should_enable_small_table_optimization_for_rbno(bool enable_small_table_optimization_for_rbno, sstring keyspace, streaming::stream_reason reason) {
|
||||
bool should_enable_small_table_optimization_for_rbno(const replica::database& db, sstring keyspace, streaming::stream_reason reason) {
|
||||
bool small_table_optimization = false;
|
||||
auto enable_small_table_optimization_for_rbno = db.get_config().enable_small_table_optimization_for_rbno();
|
||||
if (enable_small_table_optimization_for_rbno) {
|
||||
static const std::unordered_set<sstring> small_table_optimization_enabled_ks = {
|
||||
"system_distributed",
|
||||
@@ -1505,7 +1507,7 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
auto id = get_repair_uniq_id();
|
||||
|
||||
size_t ranges_reduced_factor = 1;
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(rs.get_config().enable_small_table_optimization_for_rbno(), keyspace, _reason);
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace, _reason);
|
||||
if (small_table_optimization) {
|
||||
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
|
||||
ranges_reduced_factor = _ranges.size();
|
||||
@@ -1599,7 +1601,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
continue;
|
||||
}
|
||||
auto nr_tables = get_nr_tables(db, keyspace_name);
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason);
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
|
||||
if (small_table_optimization) {
|
||||
nr_ranges_total += 1 * nr_tables;
|
||||
continue;
|
||||
@@ -1619,7 +1621,7 @@ future<> repair_service::bootstrap_with_repair(locator::token_metadata_ptr tmptr
|
||||
rlogger.info("bootstrap_with_repair: keyspace={} does not exist any more, ignoring it", keyspace_name);
|
||||
continue;
|
||||
}
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(_config.enable_small_table_optimization_for_rbno(), keyspace_name, reason);
|
||||
bool small_table_optimization = should_enable_small_table_optimization_for_rbno(db, keyspace_name, reason);
|
||||
dht::token_range_vector desired_ranges;
|
||||
//Collects the source that will have its range moved to the new node
|
||||
std::unordered_map<dht::token_range, repair_neighbors> range_sources;
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/coroutine/all.hh>
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "db/batchlog_manager.hh"
|
||||
@@ -286,9 +287,7 @@ mutation_reader repair_reader::make_reader(
|
||||
const dht::sharder& remote_sharder,
|
||||
unsigned remote_shard,
|
||||
gc_clock::time_point compaction_time,
|
||||
incremental_repair_meta inc,
|
||||
uint64_t multishard_reader_buffer_hint_size,
|
||||
bool multishard_reader_enable_read_ahead) {
|
||||
incremental_repair_meta inc) {
|
||||
switch (strategy) {
|
||||
case read_strategy::local: {
|
||||
auto ms = mutation_source([&cf, compaction_time] (
|
||||
@@ -314,11 +313,12 @@ mutation_reader repair_reader::make_reader(
|
||||
}
|
||||
case read_strategy::multishard_split: {
|
||||
std::optional<size_t> multishard_reader_buffer_size;
|
||||
if (multishard_reader_buffer_hint_size) {
|
||||
const auto& dbconfig = db.local().get_config();
|
||||
if (dbconfig.repair_multishard_reader_buffer_hint_size()) {
|
||||
// Setting the repair buffer size as the multishard reader's buffer
|
||||
// size helps avoid extra cross-shard round-trips and possible
|
||||
// evict-recreate cycles.
|
||||
multishard_reader_buffer_size = multishard_reader_buffer_hint_size;
|
||||
multishard_reader_buffer_size = dbconfig.repair_multishard_reader_buffer_hint_size();
|
||||
}
|
||||
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
@@ -326,7 +326,7 @@ mutation_reader repair_reader::make_reader(
|
||||
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
|
||||
}
|
||||
return std::optional<dht::partition_range>();
|
||||
}, compaction_time, multishard_reader_buffer_size, read_ahead(multishard_reader_enable_read_ahead));
|
||||
}, compaction_time, multishard_reader_buffer_size, read_ahead(dbconfig.repair_multishard_reader_enable_read_ahead()));
|
||||
}
|
||||
case read_strategy::multishard_filter: {
|
||||
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time, {}, read_ahead::yes),
|
||||
@@ -354,17 +354,14 @@ repair_reader::repair_reader(
|
||||
uint64_t seed,
|
||||
read_strategy strategy,
|
||||
gc_clock::time_point compaction_time,
|
||||
incremental_repair_meta inc,
|
||||
uint64_t multishard_reader_buffer_hint_size,
|
||||
bool multishard_reader_enable_read_ahead)
|
||||
incremental_repair_meta inc)
|
||||
: _schema(s)
|
||||
, _permit(std::move(permit))
|
||||
, _range(dht::to_partition_range(range))
|
||||
, _sharder(remote_sharder, range, remote_shard)
|
||||
, _seed(seed)
|
||||
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
|
||||
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc,
|
||||
multishard_reader_buffer_hint_size, multishard_reader_enable_read_ahead))
|
||||
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time, inc))
|
||||
{ }
|
||||
|
||||
future<mutation_fragment_opt>
|
||||
@@ -1324,9 +1321,7 @@ private:
|
||||
return read_strategy;
|
||||
}),
|
||||
_compaction_time,
|
||||
_incremental_repair_meta,
|
||||
_rs.get_config().repair_multishard_reader_buffer_hint_size(),
|
||||
bool(_rs.get_config().repair_multishard_reader_enable_read_ahead()));
|
||||
_incremental_repair_meta);
|
||||
}
|
||||
try {
|
||||
while (cur_size < _max_row_buf_size) {
|
||||
@@ -2182,13 +2177,8 @@ public:
|
||||
auto& cm = table.get_compaction_manager();
|
||||
int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1;
|
||||
|
||||
// Keep the new sstables marked as being_repaired until repair_update_compaction_ctrl
|
||||
// is called (after sstables_repaired_at is committed to Raft). This is an additional
|
||||
// in-memory guard; the classifier itself also protects these sstables via the
|
||||
// repaired_at > sstables_repaired_at check.
|
||||
auto modifier = [repaired_at, session = _frozen_topology_guard] (sstables::sstable& new_sst) {
|
||||
auto modifier = [repaired_at] (sstables::sstable& new_sst) {
|
||||
new_sst.update_repaired_at(repaired_at);
|
||||
new_sst.mark_as_being_repaired(session);
|
||||
};
|
||||
|
||||
std::unordered_map<compaction::compaction_group_view*, std::vector<sstables::shared_sstable>> sstables_by_group;
|
||||
@@ -2635,7 +2625,7 @@ future<repair_flush_hints_batchlog_response> repair_service::repair_flush_hints_
|
||||
auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1);
|
||||
bool updated = false;
|
||||
auto now = gc_clock::now();
|
||||
auto cache_time = std::chrono::milliseconds(_config.repair_hints_batchlog_flush_cache_time_in_ms());
|
||||
auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms());
|
||||
auto cache_disabled = cache_time == std::chrono::milliseconds(0);
|
||||
auto flush_time = now;
|
||||
db::all_batches_replayed all_replayed = db::all_batches_replayed::yes;
|
||||
@@ -3505,7 +3495,7 @@ public:
|
||||
// To save memory and have less different conditions, we
|
||||
// use the estimation for RBNO repair as well.
|
||||
|
||||
_estimated_partitions *= _shard_task.rs.get_config().repair_partition_count_estimation_ratio();
|
||||
_estimated_partitions *= _shard_task.db.local().get_config().repair_partition_count_estimation_ratio();
|
||||
}
|
||||
|
||||
parallel_for_each(master.all_nodes(), coroutine::lambda([&] (repair_node_state& ns) -> future<> {
|
||||
@@ -3641,8 +3631,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory,
|
||||
config cfg)
|
||||
size_t max_repair_memory)
|
||||
: _tsm(tsm)
|
||||
, _gossiper(gossiper)
|
||||
, _messaging(ms)
|
||||
@@ -3657,7 +3646,6 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
, _node_ops_metrics(_repair_module)
|
||||
, _max_repair_memory(max_repair_memory)
|
||||
, _memory_sem(max_repair_memory)
|
||||
, _config(std::move(cfg))
|
||||
{
|
||||
tm.register_module("repair", _repair_module);
|
||||
if (this_shard_id() == 0) {
|
||||
@@ -3668,7 +3656,7 @@ repair_service::repair_service(sharded<service::topology_state_machine>& tsm,
|
||||
|
||||
future<> repair_service::start(utils::disk_space_monitor* dsm) {
|
||||
if (dsm && (this_shard_id() == 0)) {
|
||||
_out_of_space_subscription = dsm->subscribe(_config.critical_disk_utilization_level, [this] (auto threshold_reached) {
|
||||
_out_of_space_subscription = dsm->subscribe(_db.local().get_config().critical_disk_utilization_level, [this] (auto threshold_reached) {
|
||||
if (threshold_reached) {
|
||||
return container().invoke_on_all([] (repair_service& rs) { return rs.drain(); });
|
||||
}
|
||||
|
||||
@@ -109,17 +109,6 @@ struct repair_task_progress {
|
||||
};
|
||||
|
||||
class repair_service : public seastar::peering_sharded_service<repair_service> {
|
||||
public:
|
||||
struct config {
|
||||
utils::updateable_value<bool> enable_small_table_optimization_for_rbno = utils::updateable_value<bool>(true);
|
||||
utils::updateable_value<uint32_t> repair_hints_batchlog_flush_cache_time_in_ms = utils::updateable_value<uint32_t>(60*1000);
|
||||
utils::updateable_value<double> repair_partition_count_estimation_ratio = utils::updateable_value<double>(0.1);
|
||||
utils::updateable_value<float> critical_disk_utilization_level = utils::updateable_value<float>(0.98);
|
||||
utils::updateable_value<uint64_t> repair_multishard_reader_buffer_hint_size = utils::updateable_value<uint64_t>(1024 * 1024);
|
||||
utils::updateable_value<uint64_t> repair_multishard_reader_enable_read_ahead = utils::updateable_value<uint64_t>(0);
|
||||
};
|
||||
|
||||
private:
|
||||
sharded<service::topology_state_machine>& _tsm;
|
||||
sharded<gms::gossiper>& _gossiper;
|
||||
netw::messaging_service& _messaging;
|
||||
@@ -173,9 +162,6 @@ private:
|
||||
sstring keyspace, std::vector<sstring> cfs,
|
||||
std::unordered_set<locator::host_id> ignore_nodes);
|
||||
|
||||
config _config;
|
||||
static config default_config() { return {}; }
|
||||
|
||||
public:
|
||||
std::unordered_map<locator::global_tablet_id, std::vector<seastar::rwlock::holder>> _repair_compaction_locks;
|
||||
|
||||
@@ -191,15 +177,12 @@ public:
|
||||
sharded<db::view::view_building_worker>& vbw,
|
||||
tasks::task_manager& tm,
|
||||
service::migration_manager& mm,
|
||||
size_t max_repair_memory,
|
||||
repair_service::config cfg = default_config()
|
||||
size_t max_repair_memory
|
||||
);
|
||||
~repair_service();
|
||||
future<> start(utils::disk_space_monitor* dsm);
|
||||
future<> stop();
|
||||
|
||||
const config& get_config() const noexcept { return _config; }
|
||||
|
||||
// shutdown() stops all ongoing repairs started on this node (and
|
||||
// prevents any further repairs from being started). It returns a future
|
||||
// saying when all repairs have stopped, and attempts to stop them as
|
||||
|
||||
@@ -97,8 +97,6 @@ class compaction_group {
|
||||
std::optional<compaction::compaction_backlog_tracker> _backlog_tracker;
|
||||
repair_classifier_func _repair_sstable_classifier;
|
||||
|
||||
counter_id _counter_id;
|
||||
|
||||
lw_shared_ptr<logstor::segment_set> _logstor_segments;
|
||||
std::optional<logstor::separator_buffer> _logstor_separator;
|
||||
std::vector<future<>> _separator_flushes;
|
||||
@@ -193,14 +191,6 @@ public:
|
||||
|
||||
future<> update_repaired_at_for_merge();
|
||||
|
||||
void set_counter_id(counter_id cid) noexcept {
|
||||
_counter_id = cid;
|
||||
}
|
||||
|
||||
counter_id get_counter_id() const noexcept {
|
||||
return _counter_id;
|
||||
}
|
||||
|
||||
void set_compaction_strategy_state(compaction::compaction_strategy_state compaction_strategy_state) noexcept;
|
||||
|
||||
lw_shared_ptr<memtable_list>& memtables() noexcept;
|
||||
|
||||
@@ -635,10 +635,8 @@ database::setup_metrics() {
|
||||
sm::description("Counts sstables that survived the clustering key filtering. "
|
||||
"High value indicates that bloom filter is not very efficient and still have to access a lot of sstables to get data.")),
|
||||
|
||||
// NOTE: dropped_view_updates is registered as a metric but never incremented in the current
|
||||
// codebase. Consider removing it entirely if it is confirmed dead.
|
||||
sm::make_counter("dropped_view_updates", _cf_stats.dropped_view_updates,
|
||||
sm::description("Counts the number of view updates that have been dropped due to cluster overload. "))(basic_level).set_skip_when_empty(),
|
||||
sm::description("Counts the number of view updates that have been dropped due to cluster overload. "))(basic_level),
|
||||
|
||||
sm::make_counter("view_building_paused", _cf_stats.view_building_paused,
|
||||
sm::description("Counts the number of times view building process was paused (e.g. due to node unavailability). ")),
|
||||
@@ -657,7 +655,7 @@ database::setup_metrics() {
|
||||
sm::description("Counts write operations which were rejected on the replica side because the per-partition limit was reached."))(basic_level),
|
||||
|
||||
sm::make_counter("total_writes_rejected_due_to_out_of_space_prevention", _stats->total_writes_rejected_due_to_out_of_space_prevention,
|
||||
sm::description("Counts write operations which were rejected due to disabled user tables writes."))(basic_level).set_skip_when_empty(),
|
||||
sm::description("Counts write operations which were rejected due to disabled user tables writes."))(basic_level),
|
||||
|
||||
sm::make_counter("total_reads_rate_limited", _stats->total_reads_rate_limited,
|
||||
sm::description("Counts read operations which were rejected on the replica side because the per-partition limit was reached.")),
|
||||
@@ -706,13 +704,11 @@ database::setup_metrics() {
|
||||
sm::make_counter("multishard_query_unpopped_bytes", _stats->multishard_query_unpopped_bytes,
|
||||
sm::description("The total number of bytes that were extracted from the shard reader but were unconsumed by the query and moved back into the reader.")),
|
||||
|
||||
// NOTE: multishard_query_failed_reader_stops appears to have no increment site in the
|
||||
// current codebase. Consider removing it entirely if it is confirmed dead.
|
||||
sm::make_counter("multishard_query_failed_reader_stops", _stats->multishard_query_failed_reader_stops,
|
||||
sm::description("The number of times the stopping of a shard reader failed.")).set_skip_when_empty(),
|
||||
sm::description("The number of times the stopping of a shard reader failed.")),
|
||||
|
||||
sm::make_counter("multishard_query_failed_reader_saves", _stats->multishard_query_failed_reader_saves,
|
||||
sm::description("The number of times the saving of a shard reader failed.")).set_skip_when_empty(),
|
||||
sm::description("The number of times the saving of a shard reader failed.")),
|
||||
|
||||
sm::make_total_operations("counter_cell_lock_acquisition", _cl_stats->lock_acquisitions,
|
||||
sm::description("The number of acquired counter cell locks.")),
|
||||
@@ -815,7 +811,7 @@ bool database::is_in_critical_disk_utilization_mode() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> database::parse_system_tables(sharded<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks, std::optional<service::intended_storage_mode> storage_mode) {
|
||||
future<> database::parse_system_tables(sharded<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks) {
|
||||
using namespace db::schema_tables;
|
||||
co_await do_parse_schema_tables(proxy, db::schema_tables::KEYSPACES, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
|
||||
auto scylla_specific_rs = co_await extract_scylla_specific_keyspace_info(proxy, v);
|
||||
@@ -853,7 +849,7 @@ future<> database::parse_system_tables(sharded<service::storage_proxy>& proxy, s
|
||||
co_await do_parse_schema_tables(proxy, db::schema_tables::TABLES, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
|
||||
std::map<sstring, schema_ptr> tables = co_await create_tables_from_tables_partition(proxy, v.second);
|
||||
co_await coroutine::parallel_for_each(tables, [&] (auto& t) -> future<> {
|
||||
co_await this->add_column_family_and_make_directory(t.second, replica::database::is_new_cf::no, storage_mode);
|
||||
co_await this->add_column_family_and_make_directory(t.second, replica::database::is_new_cf::no);
|
||||
auto s = t.second;
|
||||
// Recreate missing column mapping entries in case
|
||||
// we failed to persist them for some reason after a schema change
|
||||
@@ -868,7 +864,7 @@ future<> database::parse_system_tables(sharded<service::storage_proxy>& proxy, s
|
||||
std::vector<view_ptr> views = co_await create_views_from_schema_partition(proxy, v.second);
|
||||
co_await coroutine::parallel_for_each(views, [&] (auto&& v) -> future<> {
|
||||
check_no_legacy_secondary_index_mv_schema(*this, v, nullptr);
|
||||
co_await this->add_column_family_and_make_directory(v, replica::database::is_new_cf::no, storage_mode);
|
||||
co_await this->add_column_family_and_make_directory(v, replica::database::is_new_cf::no);
|
||||
});
|
||||
}));
|
||||
}
|
||||
@@ -1160,7 +1156,7 @@ db::commitlog* database::commitlog_for(const schema_ptr& schema) {
|
||||
: _commitlog.get();
|
||||
}
|
||||
|
||||
void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata, std::optional<service::intended_storage_mode> storage_mode) {
|
||||
void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata) {
|
||||
schema = local_schema_registry().learn(schema);
|
||||
auto&& rs = ks.get_replication_strategy();
|
||||
locator::effective_replication_map_ptr erm;
|
||||
@@ -1172,21 +1168,7 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family:
|
||||
}
|
||||
erm = pt_rs->make_replication_map(schema->id(), metadata_ptr);
|
||||
} else {
|
||||
auto metadata_ptr = not_commited_new_metadata ? not_commited_new_metadata : _shared_token_metadata.get();
|
||||
auto table_is_migrating = metadata_ptr->tablets().has_tablet_map(schema->id());
|
||||
if (table_is_migrating && storage_mode.has_value() && storage_mode.value() == service::intended_storage_mode::tablets) {
|
||||
// Table under vnode-to-tablet migration: the keyspace uses vnode-based
|
||||
// replication but this table already has a tablet map persisted in group0.
|
||||
// Build a tablet-aware RS with the same replication options so the table
|
||||
// gets a tablet ERM (and thus a tablet_storage_group_manager).
|
||||
locator::replication_strategy_params params(rs.get_config_options(), 0, std::nullopt);
|
||||
auto tablet_rs = locator::abstract_replication_strategy::create_replication_strategy(
|
||||
ks.metadata()->strategy_name(), params, metadata_ptr->get_topology());
|
||||
auto pt_rs = tablet_rs->maybe_as_per_table();
|
||||
erm = pt_rs->make_replication_map(schema->id(), metadata_ptr);
|
||||
} else {
|
||||
erm = ks.get_static_effective_replication_map();
|
||||
}
|
||||
erm = ks.get_static_effective_replication_map();
|
||||
}
|
||||
// avoid self-reporting
|
||||
auto& sst_manager = get_sstables_manager(*schema);
|
||||
@@ -1229,12 +1211,12 @@ future<> database::make_column_family_directory(schema_ptr schema) {
|
||||
co_await cf.init_storage();
|
||||
}
|
||||
|
||||
future<> database::add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new, std::optional<service::intended_storage_mode> storage_mode) {
|
||||
future<> database::add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new) {
|
||||
auto lock = co_await get_tables_metadata().hold_write_lock();
|
||||
auto& ks = find_keyspace(schema->ks_name());
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
add_column_family(ks, schema, ks.make_column_family_config(*schema, *this), is_new, nullptr, storage_mode);
|
||||
add_column_family(ks, schema, ks.make_column_family_config(*schema, *this), is_new);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
@@ -2048,12 +2030,10 @@ future<mutation> database::read_and_transform_counter_mutation_to_shards(mutatio
|
||||
co_await seastar::sleep(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
counter_id my_counter_id = cf.get_counter_id(m);
|
||||
|
||||
// ...now, that we got existing state of all affected counter
|
||||
// cells we can look for our shard in each of them, increment
|
||||
// its clock and apply the delta.
|
||||
transform_counter_updates_to_shards(m, mopt ? &*mopt : nullptr, cf.failed_counter_applies_to_memtable(), my_counter_id);
|
||||
transform_counter_updates_to_shards(m, mopt ? &*mopt : nullptr, cf.failed_counter_applies_to_memtable(), get_token_metadata().get_my_id());
|
||||
|
||||
co_return std::move(m);
|
||||
}
|
||||
|
||||
@@ -700,10 +700,6 @@ public:
|
||||
future<> maybe_split_compaction_group_of(locator::tablet_id);
|
||||
|
||||
dht::token_range get_token_range_after_split(const dht::token&) const noexcept;
|
||||
|
||||
// Returns a counter_id for use in local counter updates.
|
||||
counter_id get_counter_id(const mutation&) const;
|
||||
|
||||
private:
|
||||
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
||||
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
||||
@@ -728,9 +724,6 @@ private:
|
||||
|
||||
std::unique_ptr<storage_group_manager> make_storage_group_manager();
|
||||
compaction_group* get_compaction_group(size_t id) const;
|
||||
public:
|
||||
compaction_group* get_any_compaction_group() const;
|
||||
private:
|
||||
// NOTE: all readers must only operate on storage groups, which can provide all data belonging to
|
||||
// a given tablet replica. Interfaces below should only be used in the context of writes, for
|
||||
// example, to append data to memtable. Iterating on compaction groups is susceptible to races
|
||||
@@ -1829,7 +1822,7 @@ public:
|
||||
|
||||
// Load the schema definitions kept in schema tables from disk and initialize in-memory schema data structures
|
||||
// (keyspace/table definitions, column mappings etc.)
|
||||
future<> parse_system_tables(sharded<service::storage_proxy>&, sharded<db::system_keyspace>&, std::optional<service::intended_storage_mode> storage_mode = std::nullopt);
|
||||
future<> parse_system_tables(sharded<service::storage_proxy>&, sharded<db::system_keyspace>&);
|
||||
|
||||
database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm,
|
||||
compaction::compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&,
|
||||
@@ -1887,9 +1880,9 @@ public:
|
||||
void init_schema_commitlog();
|
||||
|
||||
using is_new_cf = bool_class<struct is_new_cf_tag>;
|
||||
void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata = nullptr, std::optional<service::intended_storage_mode> storage_mode = std::nullopt);
|
||||
void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata = nullptr);
|
||||
future<> make_column_family_directory(schema_ptr schema);
|
||||
future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new, std::optional<service::intended_storage_mode> storage_mode = std::nullopt);
|
||||
future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new);
|
||||
|
||||
|
||||
/* throws no_such_column_family if missing */
|
||||
|
||||
@@ -101,9 +101,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded<sstables::sstabl
|
||||
// - The second part calls each shard's distributed object to reshard the SSTables they were
|
||||
// assigned.
|
||||
future<>
|
||||
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) {
|
||||
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) {
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding);
|
||||
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr));
|
||||
co_await task->done();
|
||||
}
|
||||
|
||||
@@ -288,15 +288,13 @@ class table_populator {
|
||||
global_table_ptr& _global_table;
|
||||
std::vector<lw_shared_ptr<sharded<sstables::sstable_directory>>> _sstable_directories;
|
||||
sstables::sstable_version_types _version_for_reshaping = sstables::oldest_writable_sstable_format;
|
||||
bool _migrate_to_tablets = false;
|
||||
|
||||
public:
|
||||
table_populator(global_table_ptr& ptr, sharded<replica::database>& db, sstring ks, sstring cf, bool migrate_to_tablets = false)
|
||||
table_populator(global_table_ptr& ptr, sharded<replica::database>& db, sstring ks, sstring cf)
|
||||
: _db(db)
|
||||
, _ks(std::move(ks))
|
||||
, _cf(std::move(cf))
|
||||
, _global_table(ptr)
|
||||
, _migrate_to_tablets(migrate_to_tablets)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -402,20 +400,7 @@ sstables::shared_sstable make_sstable(replica::table& table, sstables::sstable_s
|
||||
|
||||
future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>& directory) {
|
||||
auto state = directory.local().state();
|
||||
dblog.debug("Populating {}/{}/{} state={} resharding_mode={}", _ks, _cf, _global_table->get_storage_options(), state, _migrate_to_tablets ? "vnodes-to-tablets" : "normal");
|
||||
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr;
|
||||
if (_migrate_to_tablets) {
|
||||
// Build owned_ranges from the tablet map.
|
||||
auto table_uuid = _global_table->schema()->id();
|
||||
auto& tmap = _db.local().get_shared_token_metadata().get()->tablets().get_tablet_map(table_uuid);
|
||||
dht::token_range_vector ranges;
|
||||
ranges.reserve(tmap.tablet_count());
|
||||
for (auto tid : tmap.tablet_ids()) {
|
||||
ranges.push_back(tmap.get_token_range(tid));
|
||||
}
|
||||
owned_ranges_ptr = compaction::make_owned_ranges_ptr(std::move(ranges));
|
||||
}
|
||||
dblog.debug("Populating {}/{}/{} state={}", _ks, _cf, _global_table->get_storage_options(), state);
|
||||
|
||||
co_await distributed_loader::reshard(directory, _db, _ks, _cf, [this, state] (shard_id shard) mutable {
|
||||
auto gen = smp::submit_to(shard, [this] () {
|
||||
@@ -423,7 +408,7 @@ future<> table_populator::populate_subdir(sharded<sstables::sstable_directory>&
|
||||
}).get();
|
||||
|
||||
return make_sstable(*_global_table, state, gen, _version_for_reshaping);
|
||||
}, owned_ranges_ptr, _migrate_to_tablets);
|
||||
});
|
||||
|
||||
// The node is offline at this point so we are very lenient with what we consider
|
||||
// offstrategy.
|
||||
@@ -467,12 +452,7 @@ future<> distributed_loader::populate_keyspace(sharded<replica::database>& db,
|
||||
|
||||
dblog.info("Keyspace {}: Reading CF {} id={} version={} storage={}", ks_name, cfname, uuid, s->version(), cf.get_storage_options());
|
||||
|
||||
bool migrating_to_tablets = cf.uses_tablets() && !ks.uses_tablets();
|
||||
if (migrating_to_tablets) {
|
||||
dblog.info("Keyspace {}: CF {} is in vnodes-to-tablets migration mode", ks_name, cfname);
|
||||
}
|
||||
|
||||
auto metadata = table_populator(gtable, db, ks_name, cfname, migrating_to_tablets);
|
||||
auto metadata = table_populator(gtable, db, ks_name, cfname);
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
@@ -555,16 +535,8 @@ future<> distributed_loader::init_system_keyspace(sharded<db::system_keyspace>&
|
||||
future<> distributed_loader::init_non_system_keyspaces(sharded<replica::database>& db,
|
||||
sharded<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks) {
|
||||
return seastar::async([&db, &proxy, &sys_ks] {
|
||||
// Load the node's intended storage mode from topology.
|
||||
// This determines the ERM flavor and resharding direction for tables
|
||||
// under vnodes-to-tablets migration.
|
||||
auto topology = sys_ks.local().load_topology_state({}).get();
|
||||
auto host_id = db.local().get_token_metadata().get_my_id();
|
||||
auto node = topology.normal_nodes.find(raft::server_id{host_id.uuid()});
|
||||
std::optional<service::intended_storage_mode> storage_mode = node != topology.normal_nodes.end() ? node->second.storage_mode : std::nullopt;
|
||||
|
||||
db.invoke_on_all([&proxy, &sys_ks, &storage_mode] (replica::database& db) {
|
||||
return db.parse_system_tables(proxy, sys_ks, storage_mode);
|
||||
db.invoke_on_all([&proxy, &sys_ks] (replica::database& db) {
|
||||
return db.parse_system_tables(proxy, sys_ks);
|
||||
}).get();
|
||||
|
||||
const auto& cfg = db.local().get_config();
|
||||
|
||||
@@ -70,7 +70,7 @@ class distributed_loader {
|
||||
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, compaction::reshape_mode mode,
|
||||
sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function<bool (const sstables::shared_sstable&)> filter);
|
||||
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator,
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false);
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr);
|
||||
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
|
||||
static future<> lock_table(global_table_ptr&, sharded<sstables::sstable_directory>& dir);
|
||||
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,
|
||||
|
||||
@@ -861,31 +861,14 @@ private:
|
||||
return idx;
|
||||
}
|
||||
|
||||
// Returns true if the sstable is currently being repaired. Checks the in-memory
|
||||
// being_repaired flag first, then falls back to a durable check: if the sstable's
|
||||
// repaired_at equals sstables_repaired_at+1 and the tablet is undergoing repair
|
||||
// (i.e. tablet_transition_kind::repair), the sstable belongs to the current repair
|
||||
// round but sstables_repaired_at+1 hasn't been committed to Raft yet (race window).
|
||||
bool is_being_repaired(const sstables::shared_sstable& sst, int64_t sstables_repaired_at) const noexcept {
|
||||
if (!sst->being_repaired.uuid().is_null()) {
|
||||
return true;
|
||||
}
|
||||
auto repaired_at = sst->get_stats_metadata().repaired_at;
|
||||
if (repaired_at != sstables_repaired_at + 1) {
|
||||
return false;
|
||||
}
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
auto trinfo = tablet_map().get_tablet_transition_info(locator::tablet_id(cg.group_id()));
|
||||
return trinfo && trinfo->transition == locator::tablet_transition_kind::repair;
|
||||
}
|
||||
|
||||
repair_classifier_func make_repair_sstable_classifier_func() const {
|
||||
return [this] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
|
||||
// FIXME: implement it for incremental repair!
|
||||
return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
|
||||
bool is_repaired = repair::is_repaired(sstables_repaired_at, sst);
|
||||
if (is_repaired) {
|
||||
return repair_sstable_classification::repaired;
|
||||
} else {
|
||||
if (is_being_repaired(sst, sstables_repaired_at)) {
|
||||
if (!sst->being_repaired.uuid().is_null()) {
|
||||
return repair_sstable_classification::repairing;
|
||||
} else {
|
||||
return repair_sstable_classification::unrepaired;
|
||||
@@ -1283,14 +1266,6 @@ dht::token_range table::get_token_range_after_split(const dht::token& token) con
|
||||
return _sg_manager->get_token_range_after_split(token);
|
||||
}
|
||||
|
||||
counter_id table::get_counter_id(const mutation& m) const {
|
||||
if (uses_tablets()) {
|
||||
return storage_group_for_token(m.token()).main_compaction_group()->get_counter_id();
|
||||
} else {
|
||||
return counter_id(_erm->get_token_metadata().get_my_id().uuid());
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<storage_group_manager> table::make_storage_group_manager() {
|
||||
std::unique_ptr<storage_group_manager> ret;
|
||||
if (uses_tablets()) {
|
||||
@@ -1305,14 +1280,6 @@ compaction_group* table::get_compaction_group(size_t id) const {
|
||||
return storage_group_for_id(id).main_compaction_group().get();
|
||||
}
|
||||
|
||||
compaction_group* table::get_any_compaction_group() const {
|
||||
auto& groups = _sg_manager->storage_groups();
|
||||
if (groups.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
return groups.begin()->second->main_compaction_group().get();
|
||||
}
|
||||
|
||||
storage_group& table::storage_group_for_token(dht::token token) const {
|
||||
return _sg_manager->storage_group_for_token(token);
|
||||
}
|
||||
@@ -3425,7 +3392,7 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(locator::effec
|
||||
|
||||
auto it = _storage_groups.find(group_id);
|
||||
if (it == _storage_groups.end()) {
|
||||
throw std::runtime_error(format("Unable to find sibling tablet of id {} for table {}", group_id, table_id));
|
||||
throw std::runtime_error(format("Unable to find sibling tablet of id for table {}", group_id, table_id));
|
||||
}
|
||||
auto& sg = it->second;
|
||||
sg->for_each_compaction_group([&new_sg, new_range, new_tid, group_id] (const compaction_group_ptr& cg) {
|
||||
@@ -3507,22 +3474,10 @@ void tablet_storage_group_manager::update_effective_replication_map(
|
||||
for_each_storage_group([&] (size_t group_id, storage_group& sg) {
|
||||
const locator::tablet_id tid = static_cast<locator::tablet_id>(group_id);
|
||||
const locator::tablet_info& tinfo = new_tablet_map->get_tablet_info(tid);
|
||||
const bool is_pending_replica = !std::ranges::contains(tinfo.replicas, this_replica);
|
||||
const bool tombstone_gc_enabled = !is_pending_replica;
|
||||
const bool tombstone_gc_enabled = std::ranges::contains(tinfo.replicas, this_replica);
|
||||
|
||||
// construct a counter id for use in local counter updates.
|
||||
// there is a single replica in a rack, so we can reuse a single counter id for all replicas
|
||||
// in a rack. replicas in different racks use different counter ids.
|
||||
// during migration there are two active counter replicas in a rack, then the pending
|
||||
// replica uses a variation of the rack's counter id, so there are at most two distinct
|
||||
// counter ids per rack.
|
||||
auto rack_uuid = erm.get_topology().get_rack_uuid();
|
||||
auto my_counter_uuid = is_pending_replica ? utils::UUID_gen::negate(rack_uuid) : rack_uuid;
|
||||
counter_id my_counter_id(my_counter_uuid);
|
||||
|
||||
sg.for_each_compaction_group([tombstone_gc_enabled, my_counter_id] (const compaction_group_ptr& cg_ptr) {
|
||||
sg.for_each_compaction_group([tombstone_gc_enabled] (const compaction_group_ptr& cg_ptr) {
|
||||
cg_ptr->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
||||
cg_ptr->set_counter_id(my_counter_id);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -30,9 +30,6 @@ class caching_options {
|
||||
friend class schema;
|
||||
caching_options();
|
||||
public:
|
||||
// do not used schema.cdc_options().enabled(), use cdc::cdc_enabled(schema)
|
||||
// instead. This is because cdc_enabled() also checks for CDC being enabled
|
||||
// by a vector index.
|
||||
bool enabled() const {
|
||||
return _enabled;
|
||||
}
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user