Compare commits

...

15 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
339f1ae1a0 service, message: move session_id to a lightweight header
Move service::session_id and default_session_id into
service/session_id.hh so high-fanout headers can depend on the ID type
without including the full session machinery.

Also add direct advanced_rpc_compressor.hh includes at the remaining users
that need the full walltime_compressor_tracker type.
2026-04-20 13:32:39 +03:00
Yaniv Michael Kaul
07d69aa8fa message, db: extract RPC compression config types to a lightweight header
Move the config-facing RPC compression and dict-training types into
message/rpc_compression_types.hh so db/config.hh and the compression
protocol declarations do not need the full compressor and trainer
implementation headers.

Update dictionary_service to use the same top-level dict_training_when type
and add the direct <filesystem> include now required by db/config.hh.
2026-04-20 13:32:39 +03:00
Yaniv Michael Kaul
c50bfb995b pch: fix template and direct-include fallout in non-PCH users
Adjust follow-on fallout from the broader PCH changes:
- remove the redundant scheduling_group std::equal_to specialization from
  lang/wasm_instance_cache.cc
- exclude small test link sets from PCH use in configure.py
- add direct includes at files that no longer get needed declarations
  transitively
2026-04-20 13:32:38 +03:00
Yaniv Michael Kaul
e7dbccbdcd replica/logstor: declare hist_key specialization before use
Declare the hist_key<segment_descriptor> specialization in compaction.hh
before templates in that header can instantiate the primary template.

This avoids conflicting primary-template instantiations when the header is
seen through the PCH.
2026-04-20 13:32:38 +03:00
Yaniv Michael Kaul
faa2f8ba76 utils, db: qualify seastar::coroutine references
Qualify coroutine references with seastar:: in utils and db code so they
cannot resolve to utils::coroutine once that header becomes more broadly
visible through the PCH.
2026-04-20 13:32:38 +03:00
Yaniv Michael Kaul
7aca42aa31 pch: extend stdafx.hh with additional high-fanout Scylla headers
Add another batch of frequently included Scylla headers to stdafx.hh,
including token metadata, gossiper, config, query processor,
storage_proxy, storage_service, and result_message.
2026-04-20 13:32:29 +03:00
Yaniv Michael Kaul
92e0597807 pch: precompile common schema and mutation headers
Add the core schema, type, token, and mutation headers that are used by a
large fraction of translation units to stdafx.hh.

Also add the local include directories required for the dedicated PCH
library target and qualify the GCP object-storage stream parameter for the
broader PCH environment.
2026-04-20 13:14:41 +03:00
Yaniv Michael Kaul
0798c112d0 raft, service, locator: add raft_fwd.hh and trim heavy includes
Introduce raft/raft_fwd.hh for the lightweight raft ID and term types and
use it in headers that do not need raft/raft.hh.

Also remove a few other heavy transitive includes from storage_service.hh
and locator/tablets.hh and add direct includes in source files that still
need the full definitions.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
9650390482 db: extract loaded_endpoint_state into a lightweight header
Move loaded_endpoint_state from gossiper.hh to
gms/loaded_endpoint_state.hh so db/system_keyspace.hh no longer needs the
full gossiper header.

Add direct gossiper includes at the remaining users that still need it.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
a1e8ef8d6e service: drop unused storage_service.hh from storage_proxy.hh
storage_proxy.hh does not use declarations from storage_service.hh, so
remove that include.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
ea00cfad3d db: extract replication_strategy_type into a lightweight header
Move replication_strategy_type from
locator/abstract_replication_strategy.hh to
locator/replication_strategy_type.hh so db/config.hh can use it without
including the full replication strategy header.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
0fd89d77b3 pch: drop seastar/http/api_docs.hh from stdafx.hh
Remove seastar/http/api_docs.hh from the precompiled header and add the
direct utils/exceptions.hh include that utils/s3/client.cc needs once that
header is no longer pulled in transitively.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
361a717d89 cql3: trim query_processor header fanout
Reduce direct dependencies around query_processor without changing its
cache layout.

Forward-declare vector_store_client and query_processor users that only
need declarations, keep the prepared-statement caches by value, and use
prepared_statement::checked_weak_ptr directly in transport/server.cc.
2026-04-20 12:46:37 +03:00
Yaniv Michael Kaul
9df4fc3e2f cql3: move prepared_cache_key_type to a lightweight header
Extract prepared_cache_key_type and its hash and fmt specializations from
prepared_statements_cache.hh into cql3/prepared_cache_key_type.hh.

This keeps the cache key type available without pulling in the loading
cache implementation header.
2026-04-20 12:45:56 +03:00
Yaniv Michael Kaul
d1b4fd5683 cql3, service, test: add explicit direct includes
Add direct includes for result_message.hh and unimplemented.hh at files
that use those declarations directly instead of relying on transitive
includes.
2026-04-20 12:45:04 +03:00
76 changed files with 551 additions and 307 deletions

View File

@@ -234,11 +234,15 @@ generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_include_directories(scylla-precompiled-header PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}"
"${scylla_gen_build_dir}")
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree
absl::hash
absl::raw_hash_set
idl
Seastar::seastar
Snappy::snappy
systemd

View File

@@ -2769,6 +2769,25 @@ def write_build_file(f,
f.write('build {}: rust_source {}\n'.format(cc, src))
obj = cc.replace('.cc', '.o')
compiles[obj] = cc
# Sources shared between scylla (compiled with PCH) and small tests
# (with custom deps and partial link sets) must not use the PCH,
# because -fpch-instantiate-templates injects symbol references that
# the small test link sets cannot satisfy.
small_test_srcs = set()
for test_binary, test_deps in deps.items():
if not test_binary.startswith('test/'):
continue
# Only exclude PCH for tests with truly small/partial link sets.
# Tests that include scylla_core or similar large dep sets link
# against enough objects to satisfy PCH-injected symbol refs.
if len(test_deps) > 50:
continue
for src in test_deps:
if src.endswith('.cc'):
small_test_srcs.add(src)
for src in small_test_srcs:
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
compiles_with_pch.discard(obj)
for obj in compiles:
src = compiles[obj]
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'

View File

@@ -0,0 +1,84 @@
/*
* Copyright (C) 2017-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.1 and Apache-2.0)
*/
#pragma once
#include "bytes.hh"
#include "utils/hash.hh"
#include "cql3/dialect.hh"
namespace cql3 {
typedef bytes cql_prepared_id_type;
/// \brief The key of the prepared statements cache
///
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
/// the latter was introduced for unifying the CQL and Thrift prepared
/// statements so that they can be stored in the same cache.
class prepared_cache_key_type {
public:
// derive from cql_prepared_id_type so we can customize the formatter of
// cache_key_type
struct cache_key_type : public cql_prepared_id_type {
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
bool operator==(const cache_key_type& other) const = default;
};
private:
cache_key_type _key;
public:
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
cache_key_type& key() { return _key; }
const cache_key_type& key() const { return _key; }
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
return key.key();
}
bool operator==(const prepared_cache_key_type& other) const = default;
};
}
namespace std {
template<>
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k);
}
};
template<>
struct hash<cql3::prepared_cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k.key());
}
};
}
// for prepared_statements_cache log printouts
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
}
};
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", p.key());
}
};

View File

@@ -12,6 +12,7 @@
#include "utils/loading_cache.hh"
#include "utils/hash.hh"
#include "cql3/prepared_cache_key_type.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/column_specification.hh"
#include "cql3/dialect.hh"
@@ -27,39 +28,6 @@ struct prepared_cache_entry_size {
}
};
typedef bytes cql_prepared_id_type;
/// \brief The key of the prepared statements cache
///
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
/// the latter was introduced for unifying the CQL and Thrift prepared
/// statements so that they can be stored in the same cache.
class prepared_cache_key_type {
public:
// derive from cql_prepared_id_type so we can customize the formatter of
// cache_key_type
struct cache_key_type : public cql_prepared_id_type {
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
bool operator==(const cache_key_type& other) const = default;
};
private:
cache_key_type _key;
public:
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
cache_key_type& key() { return _key; }
const cache_key_type& key() const { return _key; }
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
return key.key();
}
bool operator==(const prepared_cache_key_type& other) const = default;
};
class prepared_statements_cache {
public:
struct stats {
@@ -164,35 +132,3 @@ public:
}
};
}
namespace std {
template<>
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k);
}
};
template<>
struct hash<cql3::prepared_cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k.key());
}
};
}
// for prepared_statements_cache log printouts
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
}
};
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", p.key());
}
};

View File

@@ -17,6 +17,9 @@
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/try_future.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "transport/messages/result_message.hh"
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
#include "service/mapreduce_service.hh"
@@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
@@ -86,7 +89,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _mcfg(mcfg)
, _cql_config(cql_cfg)
, _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size)
, _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log)
, _authorized_prepared_cache(auth_prep_cache_cfg, authorized_prepared_statements_cache_log)
, _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); })
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
@@ -1074,7 +1077,7 @@ query_processor::execute_batch_without_checking_exception_message(
::shared_ptr<statements::batch_statement> batch,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state()));
bool failed = access_future.failed();
co_await audit::inspect(batch, query_state, options, failed);

View File

@@ -22,13 +22,14 @@
#include "cql3/statements/prepared_statement.hh"
#include "cql3/cql_statement.hh"
#include "cql3/dialect.hh"
#include "cql3/query_options.hh"
#include "cql3/stats.hh"
#include "exceptions/exceptions.hh"
#include "service/migration_listener.hh"
#include "mutation/timestamp.hh"
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "vector_search/vector_store_client.hh"
#include "utils/assert.hh"
#include "utils/observable.hh"
#include "utils/rolling_max_tracker.hh"
@@ -41,6 +42,9 @@
namespace lang { class manager; }
namespace vector_search {
class vector_store_client;
}
namespace service {
class migration_manager;
class query_state;
@@ -58,6 +62,9 @@ struct query;
namespace cql3 {
class prepared_statements_cache;
class authorized_prepared_statements_cache;
namespace statements {
class batch_statement;
class schema_altering_statement;
@@ -184,7 +191,7 @@ public:
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm);
~query_processor();
@@ -474,7 +481,7 @@ public:
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
@@ -490,7 +497,7 @@ public:
::shared_ptr<statements::batch_statement>,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
future<service::broadcast_tables::query_result>
execute_broadcast_table_query(const service::broadcast_tables::query&);

View File

@@ -16,6 +16,7 @@
#include <seastar/core/execution_stage.hh>
#include "cas_request.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "service/storage_proxy.hh"
#include "tracing/trace_state.hh"
#include "utils/unique_view.hh"

View File

@@ -22,6 +22,7 @@
#include "cql3/expr/evaluate.hh"
#include "cql3/query_options.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/values.hh"
#include "timeout_config.hh"
#include "service/broadcast_tables/experimental/lang.hh"

View File

@@ -14,6 +14,7 @@
#include "auth/service.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/query_processor.hh"
#include "unimplemented.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "transport/event.hh"

View File

@@ -9,7 +9,6 @@
#pragma once
#include "cql3/cql_statement.hh"
#include "cql3/query_processor.hh"
#include "raw/parsed_statement.hh"
#include "service/qos/qos_common.hh"
#include "service/query_state.hh"

View File

@@ -15,6 +15,7 @@
#include "cql3/cql_statement.hh"
#include "data_dictionary/data_dictionary.hh"
#include "cql3/query_processor.hh"
#include "unimplemented.hh"
#include "service/storage_proxy.hh"
#include <optional>
#include "validation.hh"

View File

@@ -330,14 +330,14 @@ const config_type& config_type_for<std::vector<db::config::error_injection_at_st
}
template <>
const config_type& config_type_for<enum_option<netw::dict_training_loop::when>>() {
const config_type& config_type_for<enum_option<netw::dict_training_when>>() {
static config_type ct(
"dictionary training conditions", printable_to_json<enum_option<netw::dict_training_loop::when>>);
"dictionary training conditions", printable_to_json<enum_option<netw::dict_training_when>>);
return ct;
}
template <>
const config_type& config_type_for<netw::advanced_rpc_compressor::tracker::algo_config>() {
const config_type& config_type_for<netw::algo_config>() {
static config_type ct(
"advanced rpc compressor config", printable_vector_to_json<enum_option<netw::compression_algorithm>>);
return ct;
@@ -530,9 +530,9 @@ struct convert<db::config::error_injection_at_startup> {
template <>
class convert<enum_option<netw::dict_training_loop::when>> {
class convert<enum_option<netw::dict_training_when>> {
public:
static bool decode(const Node& node, enum_option<netw::dict_training_loop::when>& rhs) {
static bool decode(const Node& node, enum_option<netw::dict_training_when>& rhs) {
std::string name;
if (!convert<std::string>::decode(node, name)) {
return false;
@@ -1110,7 +1110,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"Specifies RPC compression algorithms supported by this node. ")
, internode_compression_enable_advanced(this, "internode_compression_enable_advanced", liveness::MustRestart, value_status::Used, false,
"Enables the new implementation of RPC compression. If disabled, Scylla will fall back to the old implementation.")
, rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_loop::when::type::NEVER,
, rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_when::type::NEVER,
"Specifies when RPC compression dictionary training is performed by this node.\n"
"* `never` disables it unconditionally.\n"
"* `when_leader` enables it only whenever the node is the Raft leader.\n"
@@ -2025,8 +2025,8 @@ template struct utils::config_file::named_value<enum_option<db::experimental_fea
template struct utils::config_file::named_value<enum_option<db::replication_strategy_restriction_t>>;
template struct utils::config_file::named_value<enum_option<db::consistency_level_restriction_t>>;
template struct utils::config_file::named_value<enum_option<db::tablets_mode_t>>;
template struct utils::config_file::named_value<enum_option<netw::dict_training_loop::when>>;
template struct utils::config_file::named_value<netw::advanced_rpc_compressor::tracker::algo_config>;
template struct utils::config_file::named_value<enum_option<netw::dict_training_when>>;
template struct utils::config_file::named_value<netw::algo_config>;
template struct utils::config_file::named_value<std::vector<enum_option<db::experimental_features_t>>>;
template struct utils::config_file::named_value<std::vector<enum_option<db::replication_strategy_restriction_t>>>;
template struct utils::config_file::named_value<std::vector<enum_option<db::consistency_level_restriction_t>>>;
@@ -2094,7 +2094,7 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>& addre
}
}
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
static std::vector<seastar::metrics::relabel_config> get_relable_from_yaml(const YAML::Node& yaml, const std::string& name) {

View File

@@ -9,6 +9,7 @@
#pragma once
#include <filesystem>
#include <unordered_map>
#include <seastar/core/sstring.hh>
@@ -16,15 +17,14 @@
#include <seastar/util/program-options.hh>
#include <seastar/util/log.hh>
#include "locator/abstract_replication_strategy.hh"
#include "locator/replication_strategy_type.hh"
#include "seastarx.hh"
#include "utils/config_file.hh"
#include "utils/enum_option.hh"
#include "gms/inet_address.hh"
#include "db/hints/host_filter.hh"
#include "utils/error_injection.hh"
#include "message/dict_trainer.hh"
#include "message/advanced_rpc_compressor.hh"
#include "message/rpc_compression_types.hh"
#include "db/consistency_level_type.hh"
#include "db/tri_mode_restriction.hh"
#include "sstables/compressor.hh"
@@ -325,9 +325,9 @@ public:
named_value<uint32_t> internode_compression_zstd_min_message_size;
named_value<uint32_t> internode_compression_zstd_max_message_size;
named_value<bool> internode_compression_checksumming;
named_value<netw::advanced_rpc_compressor::tracker::algo_config> internode_compression_algorithms;
named_value<netw::algo_config> internode_compression_algorithms;
named_value<bool> internode_compression_enable_advanced;
named_value<enum_option<netw::dict_training_loop::when>> rpc_dict_training_when;
named_value<enum_option<netw::dict_training_when>> rpc_dict_training_when;
named_value<uint32_t> rpc_dict_training_min_time_seconds;
named_value<uint64_t> rpc_dict_training_min_bytes;
named_value<bool> inter_dc_tcp_nodelay;
@@ -739,8 +739,8 @@ extern template struct utils::config_file::named_value<enum_option<db::experimen
extern template struct utils::config_file::named_value<enum_option<db::replication_strategy_restriction_t>>;
extern template struct utils::config_file::named_value<enum_option<db::consistency_level_restriction_t>>;
extern template struct utils::config_file::named_value<enum_option<db::tablets_mode_t>>;
extern template struct utils::config_file::named_value<enum_option<netw::dict_training_loop::when>>;
extern template struct utils::config_file::named_value<netw::advanced_rpc_compressor::tracker::algo_config>;
extern template struct utils::config_file::named_value<enum_option<netw::dict_training_when>>;
extern template struct utils::config_file::named_value<netw::algo_config>;
extern template struct utils::config_file::named_value<std::vector<enum_option<db::experimental_features_t>>>;
extern template struct utils::config_file::named_value<std::vector<enum_option<db::replication_strategy_restriction_t>>>;
extern template struct utils::config_file::named_value<std::vector<enum_option<db::consistency_level_restriction_t>>>;

View File

@@ -15,10 +15,11 @@
#include <utility>
#include <vector>
#include "db/view/view_build_status.hh"
#include "gms/gossiper.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include "gms/loaded_endpoint_state.hh"
#include "schema/schema_fwd.hh"
#include "utils/UUID.hh"
#include "query/query-result-set.hh"
#include "db_clock.hh"
#include "mutation_query.hh"
#include "system_keyspace_view_types.hh"
@@ -36,6 +37,10 @@ namespace netw {
class shared_dict;
};
namespace query {
class result_set;
}
namespace sstables {
struct entry_descriptor;
class generation_type;

View File

@@ -29,6 +29,8 @@
#include "db/config.hh"
#include "db/view/base_info.hh"
#include "gms/gossiper.hh"
#include "query/query-result-set.hh"
#include "db/view/view_build_status.hh"
#include "db/view/view_consumer.hh"
#include "mutation/canonical_mutation.hh"

View File

@@ -13,6 +13,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/on_internal_error.hh>
#include "gms/gossiper.hh"
#include "db/view/view_building_coordinator.hh"
#include "db/view/view_build_status.hh"
#include "locator/tablets.hh"

View File

@@ -13,7 +13,7 @@
#include "db/system_keyspace.hh"
#include "locator/tablets.hh"
#include "mutation/canonical_mutation.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/raft/raft_group0.hh"
#include "service/raft/raft_group0_client.hh"

View File

@@ -21,6 +21,8 @@
#include "dht/token.hh"
#include "replica/database.hh"
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "schema/schema_fwd.hh"

View File

@@ -17,7 +17,7 @@
#include <flat_set>
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include <seastar/core/gate.hh>
#include "db/view/view_building_state.hh"
#include "sstables/shared_sstable.hh"

View File

@@ -20,6 +20,7 @@
#include "cdc/metadata.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "query/query-result-set.hh"
#include "db/virtual_table.hh"
#include "partition_slice_builder.hh"
#include "db/virtual_tables.hh"

View File

@@ -34,6 +34,7 @@
#include "locator/token_metadata.hh"
#include "locator/types.hh"
#include "gms/gossip_address_map.hh"
#include "gms/loaded_endpoint_state.hh"
namespace gms {
@@ -71,11 +72,6 @@ struct gossip_config {
utils::updateable_value<utils::UUID> recovery_leader;
};
struct loaded_endpoint_state {
gms::inet_address endpoint;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
};
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module

View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <optional>
#include "gms/inet_address.hh"
#include "locator/types.hh"
namespace gms {
struct loaded_endpoint_state {
inet_address endpoint;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
};
} // namespace gms

View File

@@ -11,7 +11,7 @@
#include "query/query_id.hh"
#include "locator/host_id.hh"
#include "tasks/types.hh"
#include "service/session.hh"
#include "service/session_id.hh"
namespace utils {
class UUID final {
@@ -43,4 +43,3 @@ class host_id final {
};
} // namespace locator

View File

@@ -21,6 +21,7 @@
#include "utils/UUID_gen.hh"
#include "types/types.hh"
#include "utils/managed_string.hh"
#include "utils/rjson.hh"
#include <ranges>
#include <seastar/core/sstring.hh>
#include <boost/algorithm/string.hpp>

View File

@@ -284,14 +284,3 @@ future<> instance_cache::stop() {
}
}
namespace std {
template <>
struct equal_to<seastar::scheduling_group> {
bool operator()(seastar::scheduling_group& sg1, seastar::scheduling_group& sg2) const noexcept {
return sg1 == sg2;
}
};
}

View File

@@ -19,6 +19,7 @@
#include "utils/sequenced_set.hh"
#include "utils/simple_hashers.hh"
#include "tablets.hh"
#include "locator/replication_strategy_type.hh"
#include "data_dictionary/consistency_config_options.hh"
// forward declaration since replica/database.hh includes this file
@@ -38,13 +39,6 @@ extern logging::logger rslogger;
using inet_address = gms::inet_address;
using token = dht::token;
enum class replication_strategy_type {
simple,
local,
network_topology,
everywhere_topology,
};
using replication_strategy_config_option = std::variant<sstring, rack_list>;
using replication_strategy_config_options = std::map<sstring, replication_strategy_config_option>;

View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
namespace locator {
enum class replication_strategy_type {
simple,
local,
network_topology,
everywhere_topology,
};
} // namespace locator

View File

@@ -12,7 +12,7 @@
#include "locator/token_metadata_fwd.hh"
#include "utils/small_vector.hh"
#include "locator/host_id.hh"
#include "service/session.hh"
#include "service/session_id.hh"
#include "dht/i_partitioner_fwd.hh"
#include "dht/token-sharding.hh"
#include "dht/ring_position.hh"
@@ -21,10 +21,9 @@
#include "utils/chunked_vector.hh"
#include "utils/hash.hh"
#include "utils/UUID.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include <ranges>
#include <seastar/core/reactor.hh>
#include <seastar/util/log.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/noncopyable_function.hh>

View File

@@ -11,9 +11,10 @@
#include <seastar/core/condition-variable.hh>
#include <seastar/rpc/rpc_types.hh>
#include <utility>
#include "rpc_compression_types.hh"
#include "utils/refcounted.hh"
#include "utils/updateable_value.hh"
#include "utils/enum_option.hh"
#include "shared_dict.hh"
namespace netw {
@@ -28,103 +29,6 @@ class dict_sampler;
using dict_ptr = lw_shared_ptr<foreign_ptr<lw_shared_ptr<shared_dict>>>;
class control_protocol_frame;
// An enum wrapper, describing supported RPC compression algorithms.
// Always contains a valid value —- the constructors won't allow
// an invalid/unknown enum variant to be constructed.
struct compression_algorithm {
using underlying = uint8_t;
enum class type : underlying {
RAW,
LZ4,
ZSTD,
COUNT,
} _value;
// Construct from an integer.
// Used to deserialize the algorithm from the first byte of the frame.
constexpr compression_algorithm(underlying x) {
if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) {
throw std::runtime_error(fmt::format("Invalid value {} for enum compression_algorithm", static_cast<int>(x)));
}
_value = static_cast<type>(x);
}
// Construct from `type`. Makes sure that `type` has a valid value.
constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {}
// These names are used in multiple places:
// RPC negotiation, in metric labels, and config.
static constexpr std::string_view names[] = {
"raw",
"lz4",
"zstd",
};
static_assert(std::size(names) == static_cast<int>(compression_algorithm::type::COUNT));
// Implements enum_option.
static auto map() {
std::unordered_map<std::string, type> ret;
for (size_t i = 0; i < std::size(names); ++i) {
ret.insert(std::make_pair<std::string, type>(std::string(names[i]), compression_algorithm(i).get()));
}
return ret;
}
constexpr std::string_view name() const noexcept { return names[idx()]; }
constexpr underlying idx() const noexcept { return std::to_underlying(_value); }
constexpr type get() const noexcept { return _value; }
constexpr static size_t count() { return static_cast<size_t>(type::COUNT); };
bool operator<=>(const compression_algorithm &) const = default;
};
// Represents a set of compression algorithms.
// Backed by a bitset.
// Used for convenience during algorithm negotiations.
class compression_algorithm_set {
uint8_t _bitset;
static_assert(std::numeric_limits<decltype(_bitset)>::digits > compression_algorithm::count());
constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {}
public:
// Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order)
// than it.
constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept {
auto x = 1 << (algo.idx());
return {x + (x - 1)};
}
// Returns the strongest (greatest in the enum order) algorithm in the set.
constexpr compression_algorithm heaviest() const {
return {std::bit_width(_bitset) - 1};
}
// The usual set operations.
constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept {
return {1 << algo.idx()};
}
constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept {
return {_bitset & o._bitset};
}
constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept {
return {_bitset &~ o._bitset};
}
constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept {
return {_bitset | o._bitset};
}
constexpr bool contains(compression_algorithm algo) const noexcept {
return _bitset & (1 << algo.idx());
}
constexpr bool operator==(const compression_algorithm_set&) const = default;
// Returns the contained bitset. Used for serialization.
constexpr uint8_t value() const noexcept {
return _bitset;
}
// Reconstructs the set from the output of `value()`. Used for deserialization.
constexpr static compression_algorithm_set from_value(uint8_t bitset) {
compression_algorithm_set x = bitset;
x.heaviest(); // This is a validation check. It will throw if the bitset contains some illegal/unknown bits.
return x;
}
};
using algo_config = std::vector<enum_option<compression_algorithm>>;
// See docs/dev/advanced_rpc_compression.md,
// section `Negotiation` for more information about the protocol.
struct control_protocol {
@@ -248,7 +152,7 @@ struct per_algorithm_stats {
// prevent a misuse of the API (dangling references).
class advanced_rpc_compressor::tracker : public utils::refcounted {
public:
using algo_config = algo_config;
using algo_config = netw::algo_config;
struct config {
utils::updateable_value<uint32_t> zstd_min_msg_size{0};
utils::updateable_value<uint32_t> zstd_max_msg_size{std::numeric_limits<uint32_t>::max()};

View File

@@ -9,7 +9,7 @@
#pragma once
#include "shared_dict.hh"
#include "advanced_rpc_compressor.hh"
#include "rpc_compression_types.hh"
namespace netw {

View File

@@ -8,6 +8,7 @@
#pragma once
#include "rpc_compression_types.hh"
#include "utils/reservoir_sampling.hh"
#include "utils/updateable_value.hh"
#include <seastar/core/future.hh>
@@ -88,28 +89,7 @@ class dict_training_loop {
seastar::semaphore _pause{0};
seastar::abort_source _pause_as;
public:
struct when {
enum class type {
NEVER,
WHEN_LEADER,
ALWAYS,
COUNT,
};
static constexpr std::string_view names[] = {
"never",
"when_leader",
"always",
};
static_assert(std::size(names) == static_cast<size_t>(type::COUNT));
// Implements enum_option.
static std::unordered_map<std::string, type> map() {
std::unordered_map<std::string, type> ret;
for (size_t i = 0; i < std::size(names); ++i) {
ret.insert({std::string(names[i]), type(i)});
}
return ret;
}
};
using when = netw::dict_training_when;
void pause();
void unpause();
void cancel() noexcept;

View File

@@ -54,11 +54,11 @@ dictionary_service::dictionary_service(
void dictionary_service::maybe_toggle_dict_training() {
auto when = _rpc_dict_training_when();
netw::dict_trainer_logger.debug("dictionary_service::maybe_toggle_dict_training(), called, _is_leader={}, when={}", _is_leader, when);
if (when == netw::dict_training_loop::when::type::NEVER) {
if (when == netw::dict_training_when::type::NEVER) {
_training_fiber.pause();
} else if (when == netw::dict_training_loop::when::type::ALWAYS) {
} else if (when == netw::dict_training_when::type::ALWAYS) {
_training_fiber.unpause();
} else if (when == netw::dict_training_loop::when::type::WHEN_LEADER) {
} else if (when == netw::dict_training_when::type::WHEN_LEADER) {
_is_leader ? _training_fiber.unpause() : _training_fiber.pause();
}
};

View File

@@ -40,7 +40,7 @@ namespace gms {
class dictionary_service {
db::system_keyspace& _sys_ks;
locator::host_id _our_host_id;
utils::updateable_value<enum_option<netw::dict_training_loop::when>> _rpc_dict_training_when;
utils::updateable_value<enum_option<netw::dict_training_when>> _rpc_dict_training_when;
service::raft_group0_client& _raft_group0_client;
abort_source& _as;
netw::dict_training_loop _training_fiber;
@@ -48,7 +48,7 @@ class dictionary_service {
bool _is_leader = false;
utils::observer<bool> _leadership_observer;
utils::observer<enum_option<netw::dict_training_loop::when>> _when_observer;
utils::observer<enum_option<netw::dict_training_when>> _when_observer;
std::optional<std::any> _feature_observer;
void maybe_toggle_dict_training();
@@ -61,7 +61,7 @@ public:
locator::host_id our_host_id = Uninitialized();
utils::updateable_value<uint32_t> rpc_dict_training_min_time_seconds = Uninitialized();
utils::updateable_value<uint64_t> rpc_dict_training_min_bytes = Uninitialized();
utils::updateable_value<enum_option<netw::dict_training_loop::when>> rpc_dict_training_when = Uninitialized();
utils::updateable_value<enum_option<netw::dict_training_when>> rpc_dict_training_when = Uninitialized();
};
// Note: the training fiber will start as soon as the relevant cluster feature is enabled.
dictionary_service(

View File

@@ -19,6 +19,7 @@
#include <seastar/coroutine/all.hh>
#include "message/messaging_service.hh"
#include "message/advanced_rpc_compressor.hh"
#include <seastar/core/sharded.hh>
#include "gms/gossiper.hh"
#include "service/storage_service.hh"

View File

@@ -19,11 +19,11 @@
#include "schema/schema_fwd.hh"
#include "streaming/stream_fwd.hh"
#include "locator/host_id.hh"
#include "service/session.hh"
#include "service/session_id.hh"
#include "service/maintenance_mode.hh"
#include "gms/gossip_address_map.hh"
#include "gms/generation-number.hh"
#include "tasks/types.hh"
#include "message/advanced_rpc_compressor.hh"
#include "utils/chunked_vector.hh"
#include <list>
@@ -120,6 +120,8 @@ namespace qos {
namespace netw {
class walltime_compressor_tracker;
/* All verb handler identifiers */
enum class messaging_verb : int32_t {
CLIENT_ID = 0,

View File

@@ -0,0 +1,155 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <bit>
#include <compare>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <limits>
#include <stdexcept>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
#include "utils/enum_option.hh"
namespace netw {
// An enum wrapper, describing supported RPC compression algorithms.
// Always contains a valid value -- the constructors won't allow
// an invalid/unknown enum variant to be constructed.
struct compression_algorithm {
using underlying = uint8_t;
enum class type : underlying {
RAW,
LZ4,
ZSTD,
COUNT,
} _value;
// Construct from an integer.
// Used to deserialize the algorithm from the first byte of the frame.
constexpr compression_algorithm(underlying x) {
if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) {
throw std::runtime_error(std::string("Invalid value ") + std::to_string(unsigned(x)) + " for enum compression_algorithm");
}
_value = static_cast<type>(x);
}
// Construct from `type`. Makes sure that `type` has a valid value.
constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {}
// These names are used in multiple places:
// RPC negotiation, in metric labels, and config.
static constexpr std::string_view names[] = {
"raw",
"lz4",
"zstd",
};
static_assert(std::size(names) == static_cast<int>(compression_algorithm::type::COUNT));
// Implements enum_option.
static auto map() {
std::unordered_map<std::string, type> ret;
for (size_t i = 0; i < std::size(names); ++i) {
ret.insert(std::make_pair(std::string(names[i]), compression_algorithm(i).get()));
}
return ret;
}
constexpr std::string_view name() const noexcept { return names[idx()]; }
constexpr underlying idx() const noexcept { return std::to_underlying(_value); }
constexpr type get() const noexcept { return _value; }
constexpr static size_t count() { return static_cast<size_t>(type::COUNT); }
bool operator<=>(const compression_algorithm&) const = default;
};
// Represents a set of compression algorithms.
// Backed by a bitset.
// Used for convenience during algorithm negotiations.
class compression_algorithm_set {
uint8_t _bitset;
static_assert(std::numeric_limits<decltype(_bitset)>::digits > compression_algorithm::count());
constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {}
public:
// Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order)
// than it.
constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept {
auto x = 1 << algo.idx();
return {uint8_t(x + (x - 1))};
}
// Returns the strongest (greatest in the enum order) algorithm in the set.
constexpr compression_algorithm heaviest() const {
return {compression_algorithm::underlying(std::bit_width(_bitset) - 1)};
}
// The usual set operations.
constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept {
return {uint8_t(1 << algo.idx())};
}
constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept {
return {uint8_t(_bitset & o._bitset)};
}
constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept {
return {uint8_t(_bitset &~ o._bitset)};
}
constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept {
return {uint8_t(_bitset | o._bitset)};
}
constexpr bool contains(compression_algorithm algo) const noexcept {
return _bitset & (1 << algo.idx());
}
constexpr bool operator==(const compression_algorithm_set&) const = default;
// Returns the contained bitset. Used for serialization.
constexpr uint8_t value() const noexcept {
return _bitset;
}
// Reconstructs the set from the output of `value()`. Used for deserialization.
constexpr static compression_algorithm_set from_value(uint8_t bitset) {
compression_algorithm_set x = bitset;
x.heaviest(); // Validation: throws on illegal/unknown bits.
return x;
}
};
using algo_config = std::vector<enum_option<compression_algorithm>>;
struct dict_training_when {
enum class type {
NEVER,
WHEN_LEADER,
ALWAYS,
COUNT,
};
static constexpr std::string_view names[] = {
"never",
"when_leader",
"always",
};
static_assert(std::size(names) == static_cast<size_t>(type::COUNT));
// Implements enum_option.
static std::unordered_map<std::string, type> map() {
std::unordered_map<std::string, type> ret;
for (size_t i = 0; i < std::size(names); ++i) {
ret.insert({std::string(names[i]), type(i)});
}
return ret;
}
};
} // namespace netw

27
raft/raft_fwd.hh Normal file
View File

@@ -0,0 +1,27 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
// Lightweight forward-declaration header for commonly used raft types.
// Include this instead of raft/raft.hh when only the basic ID/index types
// are needed (e.g. in other header files), to avoid pulling in the full
// raft machinery (futures, abort_source, bytes_ostream, etc.).
#include "internal.hh"
namespace raft {
using server_id = internal::tagged_id<struct server_id_tag>;
using group_id = internal::tagged_id<struct group_id_tag>;
using term_t = internal::tagged_uint64<struct term_tag>;
using index_t = internal::tagged_uint64<struct index_tag>;
using read_id = internal::tagged_uint64<struct read_id_tag>;
class server;
} // namespace raft

View File

@@ -8,7 +8,6 @@
#pragma once
#include "locator/abstract_replication_strategy.hh"
#include "index/secondary_index_manager.hh"
#include <seastar/core/abort_source.hh>
#include <seastar/core/sstring.hh>
@@ -113,6 +112,10 @@ namespace gms {
class feature_service;
}
namespace locator {
class abstract_replication_strategy;
}
namespace alternator {
class table_stats;
}

View File

@@ -69,6 +69,13 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
}
};
} // namespace replica::logstor
template<>
size_t hist_key<replica::logstor::segment_descriptor>(const replica::logstor::segment_descriptor& desc);
namespace replica::logstor {
using segment_descriptor_hist = log_heap<segment_descriptor, segment_descriptor_hist_options>;
struct segment_set {

View File

@@ -10,7 +10,7 @@
#include "mutation/mutation.hh"
#include "db/system_keyspace.hh"
#include "service/session.hh"
#include "service/session_id.hh"
#include "locator/tablets.hh"
namespace replica {

View File

@@ -16,6 +16,7 @@
#include "service/paxos/paxos_state.hh"
#include "service/query_state.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/untyped_result_set.hh"
#include "db/system_keyspace.hh"
#include "replica/database.hh"

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <unordered_set>
#include "service/raft/group0_fwd.hh"
namespace service {

View File

@@ -9,7 +9,11 @@
#pragma once
#include <iosfwd>
#include "raft/raft.hh"
#include <variant>
#include <vector>
#include <seastar/core/timer.hh>
#include <seastar/core/lowres_clock.hh>
#include "raft/raft_fwd.hh"
#include "gms/inet_address.hh"
namespace service {

View File

@@ -8,7 +8,7 @@
#pragma once
#include "utils/UUID.hh"
#include "service/session_id.hh"
#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>
@@ -19,12 +19,6 @@
namespace service {
using session_id = utils::tagged_uuid<struct session_id_tag>;
// We want it be different than default-constructed session_id to catch mistakes.
constexpr session_id default_session_id = session_id(
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
/// Session is used to track execution of work related to some greater task, identified by session_id.
/// Work can enter the session using enter(), and is considered to be part of the session
/// as long as the guard returned by enter() is alive.

21
service/session_id.hh Normal file
View File

@@ -0,0 +1,21 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include "utils/UUID.hh"
namespace service {
using session_id = utils::tagged_uuid<struct session_id_tag>;
// We want it to be different than a default-constructed session_id to catch mistakes.
constexpr session_id default_session_id = session_id(
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
} // namespace service

View File

@@ -38,7 +38,6 @@
#include "replica/exceptions.hh"
#include "locator/host_id.hh"
#include "dht/token_range_endpoints.hh"
#include "service/storage_service.hh"
#include "service/cas_shard.hh"
#include "service/storage_proxy_fwd.hh"

View File

@@ -15,6 +15,7 @@
#include <seastar/core/shared_future.hh>
#include "absl-flat_hash_map.hh"
#include "gms/endpoint_state.hh"
#include "gms/gossip_address_map.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/client_routes.hh"
@@ -40,11 +41,9 @@
#include <seastar/core/shared_ptr.hh>
#include "cdc/generation_id.hh"
#include "db/system_keyspace.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "node_ops/id.hh"
#include "raft/server.hh"
#include "db/view/view_building_state.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "mutation/timestamp.hh"
#include "utils/UUID.hh"
@@ -115,6 +114,10 @@ class tablet_mutation_builder;
namespace auth { class cache; }
namespace service {
class tablet_allocator;
}
namespace utils {
class disk_space_monitor;
}

View File

@@ -11,7 +11,10 @@
#include "locator/abstract_replication_strategy.hh"
#include "message/messaging_service.hh"
#include "service/raft/raft_group_registry.hh"
#include "cql3/query_processor.hh"
namespace cql3 {
class query_processor;
}
namespace db {
class system_keyspace;

View File

@@ -17,7 +17,7 @@
#include <seastar/core/metrics.hh>
#include "utils/log.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_state_machine.hh"
#include "db/view/view_building_state.hh"

View File

@@ -17,9 +17,9 @@
#include <seastar/core/sstring.hh>
#include "cdc/generation_id.hh"
#include "dht/token.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "utils/UUID.hh"
#include "service/session.hh"
#include "service/session_id.hh"
#include "mutation/canonical_mutation.hh"
#include "replica/database_fwd.hh"
#include "locator/host_id.hh"

View File

@@ -44,7 +44,7 @@
#include "tracing/trace_state.hh"
#include "utils/updateable_value.hh"
#include "dht/decorated_key.hh"
#include "service/session.hh"
#include "service/session_id.hh"
#include "sstables/trie/bti_index.hh"
#include "sstables/file_size_stats.hh"

View File

@@ -254,7 +254,6 @@
#include <seastar/coroutine/generator.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/client.hh>
#include <seastar/http/common.hh>
#include <seastar/http/connection_factory.hh>
@@ -401,4 +400,44 @@
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
// Scylla internal headers included by most translation units
#include "bytes.hh"
#include "cql3/query_options.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "db/timeout_clock.hh"
#include "db_clock.hh"
#include "dht/token.hh"
#include "exceptions/exceptions.hh"
#include "gc_clock.hh"
#include "gms/feature_service.hh"
#include "gms/gossiper.hh"
#include "gms/inet_address.hh"
#include "keys/keys.hh"
#include "locator/host_id.hh"
#include "locator/token_metadata.hh"
#include "locator/token_metadata_fwd.hh"
#include "locator/types.hh"
#include "mutation/mutation_fragment.hh"
#include "mutation/mutation_partition.hh"
#include "replica/database.hh"
#include "schema/schema.hh"
#include "schema/schema_builder.hh"
#include "schema/schema_fwd.hh"
#include "seastarx.hh"
#include "service/client_state.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "service/topology_state_machine.hh"
#include "sstables/sstables.hh"
#include "transport/messages/result_message.hh"
#include "types/types.hh"
#include "utils/chunked_vector.hh"
#include "utils/fragment_range.hh"
#include "utils/managed_bytes.hh"
#include "utils/tagged_integer.hh"
#include "utils/UUID.hh"
#endif

View File

@@ -8,7 +8,7 @@
#pragma once
#include "utils/UUID.hh"
#include "service/session_id.hh"
namespace streaming {
@@ -21,9 +21,3 @@ class stream_state;
using plan_id = utils::tagged_uuid<struct plan_id_tag>;
} // namespace streaming
namespace service {
using session_id = utils::tagged_uuid<struct session_id_tag>;
}

View File

@@ -13,6 +13,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "table_helper.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "replica/database.hh"

View File

@@ -13,6 +13,7 @@
#include <seastar/core/gate.hh>
#include <seastar/core/when_all.hh>
#include <seastar/rpc/rpc_types.hh>
#include "gms/gossiper.hh"
#include <seastar/util/defer.hh>

View File

@@ -13,6 +13,7 @@
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_assertions.hh"
#include "db/system_keyspace.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/error_injection.hh"
#include "test/lib/log.hh"

View File

@@ -11,6 +11,7 @@
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "transport/messages/result_message.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
#include "db/config.hh"

View File

@@ -9,6 +9,7 @@
#include "mutation/mutation.hh"
#include "service/storage_proxy.hh"
#include "cql3/selection/selection.hh"
BOOST_AUTO_TEST_SUITE(per_partition_rate_limit_test)

View File

@@ -18,6 +18,8 @@
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "transport/messages/result_message.hh"
#include "cql3/result_set.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "schema/schema_builder.hh"

View File

@@ -18,6 +18,7 @@
#include "test/lib/random_utils.hh"
#include "service/topology_mutation.hh"
#include "service/storage_service.hh"
#include "gms/gossiper.hh"
#include <fmt/ranges.h>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/on_internal_error.hh>

View File

@@ -19,6 +19,7 @@
#include "cdc/generation_service.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
@@ -35,6 +36,7 @@
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/tablet_allocator.hh"
#include "compaction/compaction_manager.hh"
#include "message/advanced_rpc_compressor.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_address_map.hh"
#include "service/raft/raft_group_registry.hh"

View File

@@ -16,6 +16,7 @@
#include "db/config.hh"
#include "db/system_distributed_keyspace.hh"
#include "gms/feature_service.hh"
#include "message/advanced_rpc_compressor.hh"
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/application_state.hh"

View File

@@ -18,6 +18,7 @@
#include <seastar/util/closeable.hh>
#include "db/config.hh"
#include "gms/feature_service.hh"
#include "message/advanced_rpc_compressor.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"

View File

@@ -11,6 +11,8 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/socket_defs.hh>
#include <seastar/net/api.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>

View File

@@ -26,6 +26,7 @@
#include "db/config.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "tools/utils.hh"

View File

@@ -14,6 +14,7 @@
#include "vector_search/vector_store_client.hh"
#include "vs_mock_server.hh"
#include "test/lib/cql_test_env.hh"
#include "transport/messages/result_message.hh"
#include "utils/rjson.hh"
#include <boost/test/tools/old/interface.hpp>
#include <seastar/core/seastar.hh>

View File

@@ -9,6 +9,7 @@
#include "tools/load_system_tablets.hh"
#include <seastar/core/thread.hh>
#include "query/query-result-set.hh"
#include <seastar/util/closeable.hh>
#include "locator/abstract_replication_strategy.hh"

View File

@@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
std::vector<cql3::statements::batch_statement::single_statement> modifications;
std::vector<cql3::raw_value_view_vector_with_unset> values;
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
std::unordered_map<cql3::prepared_cache_key_type, cql3::statements::prepared_statement::checked_weak_ptr> pending_authorization_entries;
modifications.reserve(n.assume_value());
values.reserve(n.assume_value());

View File

@@ -566,7 +566,7 @@ public:
}
if (need_preempt() && i != _cache.end()) {
auto key = i->idx;
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
i = _cache.lower_bound(key);
}
}

View File

@@ -361,7 +361,7 @@ utils::config_file::configs utils::config_file::unset_values() const {
future<> utils::config_file::read_from_file(file f, error_handler h) {
return f.size().then([this, f, h](size_t s) {
return do_with(make_file_input_stream(f), [this, s, h](input_stream<char>& in) {
return do_with(make_file_input_stream(f), [this, s, h](seastar::input_stream<char>& in) {
return in.read_exactly(s).then([this, h](temporary_buffer<char> buf) {
read_from_yaml(sstring(buf.begin(), buf.end()), h);
if (!_initialization_completed) {

View File

@@ -83,7 +83,7 @@ directories::directories(bool developer_mode)
future<> directories::create_and_verify(directories::set dir_set, recursive recursive) {
std::vector<file_lock> locks;
locks.reserve(dir_set.get_paths().size());
co_await coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
file_lock lock = co_await touch_and_lock(path);
locks.emplace_back(std::move(lock));
co_await disk_sanity(path, _developer_mode);
@@ -144,7 +144,7 @@ future<> directories::do_verify_owner_and_mode(fs::path path, recursive recurse,
co_return;
}
auto lister = directory_lister(path, lister::dir_entry_types::full(), do_verify_subpath, lister::show_hidden::no);
co_await with_closeable(std::move(lister), coroutine::lambda([&] (auto& lister) -> future<> {
co_await with_closeable(std::move(lister), seastar::coroutine::lambda([&] (auto& lister) -> future<> {
while (auto de = co_await lister.get()) {
co_await do_verify_owner_and_mode(path / de->name, recurse, level + 1, do_verify_subpath);
}
@@ -168,7 +168,7 @@ future<> directories::verify_owner_and_mode(fs::path path, recursive recursive)
future<> directories::verify_owner_and_mode_of_data_dir(directories::set dir_set) {
// verify data and index files in the first iteration and the other files in the second iteration.
for (auto verify_data_and_index_files : { true, false }) {
co_await coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
return do_verify_owner_and_mode(std::move(path), recursive::yes, 0, [verify_data_and_index_files] (const fs::path& dir, const directory_entry& de) {
auto path = dir / de.name;
component_type path_component_type;

View File

@@ -335,7 +335,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
} catch (...) {
// just disregard the failure, we will retry below in the wrapped handler
}
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, seastar::input_stream<char>& in) -> future<> {
auto _in = std::move(in);
auto status_class = reply::classify_status(rep._status);
/*
@@ -352,7 +352,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
auto content = co_await util::read_entire_stream_contiguous(_in);
auto error_msg = get_gcp_error_message(std::string_view(content));
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
}
std::exception_ptr eptr;
@@ -366,7 +366,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
co_await seastar::coroutine::return_exception_ptr(std::move(eptr));
}
};
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);

View File

@@ -84,7 +84,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
_gen.emplace(_opened.experimental_list_directory());
}
if (!_gen) {
co_return coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
co_return seastar::coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
}
std::exception_ptr ex;
try {
@@ -114,7 +114,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
_gen.reset();
if (ex) {
co_await _opened.close();
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
co_return std::nullopt;
}

View File

@@ -438,7 +438,7 @@ private:
break;
}
_reclaim(free_memory_threshold - memory::free_memory());
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
}
llogger.debug("background_reclaimer::main_loop: exit");
}

View File

@@ -136,7 +136,7 @@ public:
_map.erase(_map.begin());
}
if (++ret % evictions_per_yield == 0) {
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
}
}
co_return ret;

View File

@@ -8,6 +8,7 @@
#include <fmt/format.h>
#include <exception>
#include "utils/exceptions.hh"
#include <initializer_list>
#include <memory>
#include <regex>
@@ -354,7 +355,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
should_retry = utils::http::retryable::yes;
co_await authorize(request);
}
co_await coroutine::return_exception_ptr(std::make_exception_ptr(
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(
aws::aws_exception(aws_error(possible_error->get_error_type(), possible_error->get_error_message().c_str(), should_retry))));
}
@@ -373,7 +374,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
}
};
}
@@ -554,7 +555,7 @@ future<> client::put_object_tagging(sstring object_name, tag_set tagging, seasta
}
co_await output.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::ok, as);
@@ -624,7 +625,7 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf, sea
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
@@ -650,7 +651,7 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
@@ -900,7 +901,7 @@ future<> dump_multipart_upload_parts(output_stream<char> out, const utils::chunk
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
}
@@ -916,7 +917,7 @@ future<> client::multipart_upload::start_upload() {
auto body = co_await util::read_entire_stream_contiguous(in);
_upload_id = parse_multipart_upload_id(body);
if (_upload_id.empty()) {
co_await coroutine::return_exception(std::runtime_error("cannot initiate upload"));
co_await seastar::coroutine::return_exception(std::runtime_error("cannot initiate upload"));
}
s3l.trace("created uploads for {} -> id = {}", _object_name, _upload_id);
}, http::reply::status_type::ok, _as);
@@ -951,7 +952,7 @@ future<> client::multipart_upload::upload_part(memory_data_sink_buffers bufs) {
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
// note: At this point the buffers are sent, but the response is not yet
// received. However, claim is released and next part may start uploading
@@ -1000,7 +1001,7 @@ future<> client::multipart_upload::finalize_upload() {
unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
if (parts_xml_len == 0) {
co_await coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
co_await seastar::coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
}
s3l.trace("POST upload completion {} parts (upload id {})", _part_etags.size(), _upload_id);
@@ -1016,15 +1017,15 @@ future<> client::multipart_upload::finalize_upload() {
auto status_class = http::reply::classify_status(rep._status);
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
if (possible_error) {
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
co_await seastar::coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
}
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
co_await seastar::coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
}
if (rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
// this point it is ok since we are not interested in parsing this particular response
@@ -1458,7 +1459,7 @@ auto client::download_source::request_body() -> future<external_body> {
(void)_client->make_request(std::move(req), [this, &p] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
s3l.trace("GET {} got the body ({} {} bytes)", _object_name, rep._status, rep.content_length);
if (rep._status != http::reply::status_type::partial_content && rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
auto in = std::move(in_);
@@ -1548,7 +1549,7 @@ class client::do_upload_file : private multipart_upload {
co_await output.close();
co_await input.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
}
@@ -1823,7 +1824,7 @@ future<> client::close() {
_creds_invalidation_timer.cancel();
_creds_update_timer.cancel();
}
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await seastar::coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.http.close();
});
@@ -1934,7 +1935,7 @@ future<std::optional<directory_entry>> client::bucket_lister::get() {
}
co_await close();
if (ex) {
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
co_return std::nullopt;
}