Compare commits

...

19 Commits

Author SHA1 Message Date
Yaniv Michael Kaul
7b17429d99 treewide: add explicit includes for headers losing transitive availability via PCH
Adding Scylla internal headers to the PCH changes which transitive
includes are available. Add explicit includes where needed:
- group0_fwd.hh: timer, lowres_clock, variant, vector
- discovery.hh: unordered_set
- Various test files: result_message.hh, result_set.hh, selection.hh,
  gossiper.hh, seastar net/api and core/seastar headers
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
8f491eb7c7 pch: exclude small test sources from precompiled header
Small test binaries with partial link sets cannot satisfy symbol
references injected by -fpch-instantiate-templates. Exclude source
files used by tests with fewer than 50 dependencies from PCH
compilation to avoid linker failures.
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
b9823f3053 pch: add replica/database.hh to precompiled header
replica/database.hh is included by 112 translation units and is one of
the heaviest headers in the codebase. Adding it to the PCH provides a
major compile-time reduction as its large transitive include tree is
parsed only once.

Clean dev build time drops from ~14m to ~6m20s (with previous PCH
commits; ~22m33s baseline without any PCH changes).
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
3e6d959867 lang: remove redundant std::equal_to<scheduling_group> specialization
The specialization is unused and conflicts with PCH template
pre-instantiation. scheduling_group already has operator==, so
std::equal_to works via the default template.
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
75c4fe1f33 replica/logstor: declare hist_key specialization before instantiation
Move the declaration of hist_key<segment_descriptor> specialization
into compaction.hh so it is visible before the primary template gets
instantiated via log_heap. This prevents -fpch-instantiate-templates
from instantiating the primary template in the PCH, which would
conflict with the explicit specialization in the .cc file.
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
28e59bae5a utils, db: qualify seastar::coroutine:: to avoid shadowing by utils::coroutine class
Inside namespace utils, unqualified coroutine:: resolves to the
utils::coroutine class (utils/coroutine.hh) rather than the
seastar::coroutine namespace. This causes build failures when
replica/database.hh is added to the precompiled header, because
utils/coroutine.hh becomes transitively visible in all TUs.

Qualify all coroutine:: references with seastar:: in affected files
under utils/ and db/.
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
66618cd869 pch: expand precompiled header with more high-impact Scylla headers
Add to stdafx.hh: locator/token_metadata.hh, gms/gossiper.hh,
db/system_keyspace.hh, service/topology_state_machine.hh,
cql3/query_options.hh, service/client_state.hh, cql3/query_processor.hh,
db/config.hh, service/storage_proxy.hh, schema/schema_builder.hh,
exceptions/exceptions.hh, gms/feature_service.hh,
service/migration_manager.hh, sstables/sstables.hh,
service/storage_service.hh, transport/messages/result_message.hh.

These headers are included by 40-140 translation units each. Adding them
to the PCH avoids redundant parsing across the build. Combined with the
previous PCH commit, clean dev build time drops from 22m33s to ~14m23s
(-36.2%).
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
b4586f0789 utils: fix PCH compatibility in config_file and object_storage
Convert config_file.cc read_from_file() from continuation-style to
coroutines, avoiding a template instantiation conflict with
-fpch-instantiate-templates when heavy Scylla headers are in the PCH.

Qualify input_stream<char> in object_storage.cc lambda parameter with
seastar:: to resolve the same PCH template parsing issue.
2026-04-19 10:54:19 +03:00
Yaniv Michael Kaul
37280265ef pch: add commonly-used Scylla internal headers to precompiled header
Add schema/schema.hh, types/types.hh, mutation/mutation_partition.hh,
mutation/mutation_fragment.hh and their dependencies (bytes.hh, keys.hh,
dht/token.hh, locator types, etc.) to the PCH. These are included by
the vast majority of translation units and benefit greatly from being
precompiled once rather than parsed ~400 times.

Reduces clean dev build time from ~22m to ~18m (~19% faster).
2026-04-19 10:54:18 +03:00
Yaniv Michael Kaul
2fbba4a071 raft, service, locator: create raft_fwd.hh and reduce heavy header includes
Create raft/raft_fwd.hh with lightweight type aliases (server_id, group_id,
term_t, index_t) backed only by raft/internal.hh, avoiding the heavy
raft/raft.hh (832 lines with futures, abort_source, bytes_ostream).

Replace raft/raft.hh with raft/raft_fwd.hh in headers that only need the
basic ID types: tablets.hh, topology_state_machine.hh,
topology_coordinator.hh, storage_service.hh, group0_fwd.hh,
view_building_coordinator.hh, view_building_worker.hh.

Also remove gossiper.hh and tablet_allocator.hh from storage_service.hh
(forward declarations suffice), and remove unused reactor.hh from
tablets.hh. Add explicit includes in .cc files that lost transitive
availability.
2026-04-17 01:08:04 +03:00
Yaniv Michael Kaul
be5fa64d36 db: break gossiper.hh include from system_keyspace.hh
Extract loaded_endpoint_state into a standalone lightweight header to
avoid pulling the heavy gossiper.hh (and transitively query-result-set.hh)
into every includer of system_keyspace.hh. Add explicit includes where
the full definitions are actually needed.

Reduces clean dev build time by ~2 minutes (-8%).
2026-04-16 23:27:55 +03:00
Yaniv Michael Kaul
5c918d29cc service: remove unused storage_service.hh include from storage_proxy.hh
storage_proxy.hh included storage_service.hh but never referenced any
symbol from it. storage_service.hh costs 3.7s to parse per file, and
storage_proxy.hh has 75 direct includers. While most of those also
include database.hh (which shares transitive deps), removing this
unnecessary include still reduces total parse work.

Speedup: part of a series measured at -5.8% wall-clock improvement
(same-session A/B: 16m14s -> 15m17s at -j16, 16 cores).
2026-04-16 18:22:56 +03:00
Yaniv Michael Kaul
43e337a663 db, test: add explicit includes for storage_service.hh and system_keyspace.hh
Add explicit includes that were previously available transitively through
service/storage_proxy.hh -> service/storage_service.hh.

This prepares for removing the unused storage_service.hh include from
storage_proxy.hh in a follow-up commit.

Speedup: prerequisite for storage_proxy.hh include chain reduction
(measured -5.8% wall-clock combined with all changes in this series,
same-session A/B: 16m14s -> 15m17s at -j16).
2026-04-16 18:22:41 +03:00
Yaniv Michael Kaul
a67efb031c db: break heavy include chain from config.hh by extracting replication_strategy_type
Extract replication_strategy_type enum from locator/abstract_replication_strategy.hh
into a new lightweight header locator/replication_strategy_type.hh, and use it in
db/config.hh instead of the full abstract_replication_strategy.hh.

abstract_replication_strategy.hh pulls in a large transitive dependency tree
(schema.hh, mutation serializers, etc.) costing ~1.7s per file. With this change,
config.hh's incremental parse cost drops from 1.7s to 0.6s. Since ~85 files
include config.hh without also including database.hh (which would bring in these
deps anyway), this saves ~93s total CPU.

Speedup: part of a series measured at -5.8% wall-clock improvement
(same-session A/B: 16m14s -> 15m17s at -j16, 16 cores).
2026-04-16 18:19:19 +03:00
Yaniv Michael Kaul
5b0933c453 utils: add explicit include for exceptions.hh in s3/client.cc
Add explicit #include for utils/exceptions.hh which was previously
available transitively through db/config.hh -> abstract_replication_strategy.hh.

This prepares for removing the heavy abstract_replication_strategy.hh
include from db/config.hh in a follow-up commit.

Speedup: prerequisite for config.hh include chain reduction
(measured -5.8% wall-clock combined with all changes in this series,
same-session A/B: 16m14s -> 15m17s at -j16).
2026-04-16 18:19:04 +03:00
Yaniv Michael Kaul
2ac834d797 pch: remove seastar/http/api_docs.hh from precompiled header
The api_docs.hh header contains inline method bodies (api_registry::handle)
that call seastar::json::formatter::to_json(), forcing the compiler to
instantiate seastar::json template specializations (json_list_template,
formatter::write, do_with, etc.) in every compilation unit — even files
that never use any HTTP/JSON API types.

Measured ~6s of wasted template instantiation per file × ~620 files =
~3,700s total CPU. Only 2 files outside the PCH include api_docs.hh
directly, so removing it has no impact on code that actually uses these
types.

Wall-clock build time (-j16, Seastar/Abseil cached):
  Before (with loading_cache fix): avg 23m29s
  After:                           avg 23m04s  (-1.8%)
  vs original baseline:            avg 24m01s  (-4.0%)
2026-04-15 09:29:25 +03:00
Yaniv Michael Kaul
b324c84a04 cql3: break loading_cache include chain from query_processor.hh
utils/loading_cache.hh is an expensive template header that costs
~2,494 seconds of aggregate CPU time across 133 files that include it.
88 of those files include it only transitively via query_processor.hh
through the chain: query_processor.hh -> prepared_statements_cache.hh
-> loading_cache.hh, costing ~1,690s of template instantiation.

Break the chain by:
- Replacing #include of prepared_statements_cache.hh and
  authorized_prepared_statements_cache.hh in query_processor.hh with
  forward declarations and the lightweight prepared_cache_key_type.hh
- Replacing #include of result_message.hh with result_message_base.hh
  (which doesn't pull in prepared_statements_cache.hh)
- Changing prepared_statements_cache and authorized_prepared_statements_cache
  members to std::unique_ptr (PImpl) since forward-declared types
  cannot be held by value
- Moving get_prepared(), execute_prepared(), execute_direct(), and
  execute_batch() method bodies from the header to query_processor.cc
- Updating transport/server.cc to use the concrete type instead of the
  no-longer-visible authorized_prepared_statements_cache::value_type

Per-file measurement: files including query_processor.hh now show zero
loading_cache template instantiation events (previously 20-32s each).

Wall-clock measurement (clean build, -j16, 16 cores, Seastar cached):
  Baseline (origin/master):           avg 24m01s (24m03s, 23m59s)
  With loading_cache chain break:     avg 23m29s (23m32s, 23m29s, 23m27s)
  Improvement:                        ~32s, ~2.2%
2026-04-15 04:21:15 +03:00
Yaniv Michael Kaul
b499dc8e9d cql3: extract prepared_cache_key_type into standalone lightweight header
Move prepared_cache_key_type class and its std::hash / fmt::formatter
specializations from prepared_statements_cache.hh into a new header
cql3/prepared_cache_key_type.hh.

The new header only depends on bytes.hh, utils/hash.hh, and
cql3/dialect.hh -- it does NOT include utils/loading_cache.hh.
This allows code that needs the cache key type (e.g. for function
signatures) without pulling in the expensive loading_cache template
machinery.

prepared_statements_cache.hh now includes prepared_cache_key_type.hh,
so existing includers are unaffected.

No functional change. Prepares for breaking the loading_cache include
chain from query_processor.hh.
2026-04-15 04:20:57 +03:00
Yaniv Michael Kaul
8ad8e76c3b cql3, service, test: add explicit includes for headers losing transitive availability
Add explicit #include directives for headers that are currently
available transitively through cql3/query_processor.hh but will stop
being available after a subsequent refactoring that removes the
loading_cache include chain.

Files changed:
- cql3/statements/drop_keyspace_statement.cc: add unimplemented.hh
- cql3/statements/truncate_statement.cc: add unimplemented.hh
- cql3/statements/batch_statement.cc: add result_message.hh
- cql3/statements/broadcast_modification_statement.cc: add result_message.hh
- service/paxos/paxos_state.cc: add result_message.hh
- test/lib/cql_test_env.cc: add result_message.hh
- table_helper.cc: add result_message.hh

No functional change. Prepares for subsequent query_processor.hh cleanup.
2026-04-15 04:20:49 +03:00
56 changed files with 432 additions and 227 deletions

View File

@@ -234,6 +234,9 @@ generate_scylla_version()
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
add_library(scylla-precompiled-header STATIC exported_templates.cc)
target_include_directories(scylla-precompiled-header PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}"
"${scylla_gen_build_dir}")
target_link_libraries(scylla-precompiled-header PRIVATE
absl::headers
absl::btree

View File

@@ -2766,6 +2766,25 @@ def write_build_file(f,
f.write('build {}: rust_source {}\n'.format(cc, src))
obj = cc.replace('.cc', '.o')
compiles[obj] = cc
# Sources shared between scylla (compiled with PCH) and small tests
# (with custom deps and partial link sets) must not use the PCH,
# because -fpch-instantiate-templates injects symbol references that
# the small test link sets cannot satisfy.
small_test_srcs = set()
for test_binary, test_deps in deps.items():
if not test_binary.startswith('test/'):
continue
# Only exclude PCH for tests with truly small/partial link sets.
# Tests that include scylla_core or similar large dep sets link
# against enough objects to satisfy PCH-injected symbol refs.
if len(test_deps) > 50:
continue
for src in test_deps:
if src.endswith('.cc'):
small_test_srcs.add(src)
for src in small_test_srcs:
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
compiles_with_pch.discard(obj)
for obj in compiles:
src = compiles[obj]
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'

View File

@@ -0,0 +1,84 @@
/*
* Copyright (C) 2017-present ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.1 and Apache-2.0)
*/
#pragma once
#include "bytes.hh"
#include "utils/hash.hh"
#include "cql3/dialect.hh"
namespace cql3 {
typedef bytes cql_prepared_id_type;
/// \brief The key of the prepared statements cache
///
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
/// the latter was introduced for unifying the CQL and Thrift prepared
/// statements so that they can be stored in the same cache.
class prepared_cache_key_type {
public:
// derive from cql_prepared_id_type so we can customize the formatter of
// cache_key_type
struct cache_key_type : public cql_prepared_id_type {
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
bool operator==(const cache_key_type& other) const = default;
};
private:
cache_key_type _key;
public:
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
cache_key_type& key() { return _key; }
const cache_key_type& key() const { return _key; }
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
return key.key();
}
bool operator==(const prepared_cache_key_type& other) const = default;
};
}
namespace std {
template<>
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k);
}
};
template<>
struct hash<cql3::prepared_cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k.key());
}
};
}
// for prepared_statements_cache log printouts
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
}
};
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", p.key());
}
};

View File

@@ -12,6 +12,7 @@
#include "utils/loading_cache.hh"
#include "utils/hash.hh"
#include "cql3/prepared_cache_key_type.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/column_specification.hh"
#include "cql3/dialect.hh"
@@ -27,39 +28,6 @@ struct prepared_cache_entry_size {
}
};
typedef bytes cql_prepared_id_type;
/// \brief The key of the prepared statements cache
///
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
/// the latter was introduced for unifying the CQL and Thrift prepared
/// statements so that they can be stored in the same cache.
class prepared_cache_key_type {
public:
// derive from cql_prepared_id_type so we can customize the formatter of
// cache_key_type
struct cache_key_type : public cql_prepared_id_type {
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
bool operator==(const cache_key_type& other) const = default;
};
private:
cache_key_type _key;
public:
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
cache_key_type& key() { return _key; }
const cache_key_type& key() const { return _key; }
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
return key.key();
}
bool operator==(const prepared_cache_key_type& other) const = default;
};
class prepared_statements_cache {
public:
struct stats {
@@ -164,35 +132,3 @@ public:
}
};
}
namespace std {
template<>
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k);
}
};
template<>
struct hash<cql3::prepared_cache_key_type> final {
size_t operator()(const cql3::prepared_cache_key_type& k) const {
return std::hash<cql3::cql_prepared_id_type>()(k.key());
}
};
}
// for prepared_statements_cache log printouts
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
}
};
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", p.key());
}
};

View File

@@ -17,6 +17,9 @@
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/try_future.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "transport/messages/result_message.hh"
#include "service/storage_proxy.hh"
#include "service/migration_manager.hh"
#include "service/mapreduce_service.hh"
@@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
@@ -85,8 +88,8 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
, _vector_store_client(vsc)
, _mcfg(mcfg)
, _cql_config(cql_cfg)
, _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size)
, _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log)
, _prepared_cache(std::make_unique<prepared_statements_cache>(prep_cache_log, _mcfg.prepared_statment_cache_size))
, _authorized_prepared_cache(std::make_unique<authorized_prepared_statements_cache>(auth_prep_cache_cfg, authorized_prepared_statements_cache_log))
, _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); })
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
@@ -353,12 +356,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
sm::make_gauge(
"prepared_cache_size",
[this] { return _prepared_cache.size(); },
[this] { return _prepared_cache->size(); },
sm::description("A number of entries in the prepared statements cache.")),
sm::make_gauge(
"prepared_cache_memory_footprint",
[this] { return _prepared_cache.memory_footprint(); },
[this] { return _prepared_cache->memory_footprint(); },
sm::description("Size (in bytes) of the prepared statements cache.")),
sm::make_counter(
@@ -449,12 +452,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
sm::make_gauge(
"authorized_prepared_statements_cache_size",
[this] { return _authorized_prepared_cache.size(); },
[this] { return _authorized_prepared_cache->size(); },
sm::description("Number of entries in the authenticated prepared statements cache.")),
sm::make_gauge(
"user_prepared_auth_cache_footprint",
[this] { return _authorized_prepared_cache.memory_footprint(); },
[this] { return _authorized_prepared_cache->memory_footprint(); },
sm::description("Size (in bytes) of the authenticated prepared statements cache.")),
sm::make_counter(
@@ -554,6 +557,81 @@ query_processor::~query_processor() {
}
}
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache->find(*user, key);
if (vp) {
try {
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
// corresponds to the last time its value has been read.
//
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
// we need to create space in the prepared statements cache for a new entry.
//
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
// breaking the LRU paradigm of these caches.
_prepared_cache->touch(key);
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache->remove(*user, key);
}
}
}
return statements::prepared_statement::checked_weak_ptr();
}
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const prepared_cache_key_type& key) {
return _prepared_cache->find(key);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_prepared(
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_direct(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options) {
return execute_direct_without_checking_exception_message(
query_string,
query_state,
d,
options)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_batch(
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
options,
std::move(pending_authorization_entries))
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
query_processor::acquire_strongly_consistent_coordinator() {
auto [remote_, holder] = remote();
@@ -577,8 +655,8 @@ future<> query_processor::stop_remote() {
future<> query_processor::stop() {
co_await _mnotifier.unregister_listener(_migration_subscriber.get());
co_await _authorized_prepared_cache.stop();
co_await _prepared_cache.stop();
co_await _authorized_prepared_cache->stop();
co_await _prepared_cache->stop();
}
future<::shared_ptr<cql_transport::messages::result_message>> query_processor::execute_with_guard(
@@ -697,7 +775,7 @@ query_processor::do_execute_prepared(
if (needs_authorization) {
co_await statement->check_access(*this, query_state.get_client_state());
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
@@ -733,7 +811,7 @@ future<::shared_ptr<cql_transport::messages::result_message::prepared>>
query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) {
try {
auto key = compute_id(query_string, client_state.get_raw_keyspace(), d);
auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] {
auto prep_entry = co_await _prepared_cache->get_pinned(key, [this, &query_string, &client_state, d] {
auto prepared = get_statement(query_string, client_state, d);
prepared->calculate_metadata_id();
auto bound_terms = prepared->statement->get_bound_terms();
@@ -1069,11 +1147,11 @@ query_processor::execute_batch_without_checking_exception_message(
::shared_ptr<statements::batch_statement> batch,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state()));
co_await coroutine::parallel_for_each(pending_authorization_entries, [this, &query_state] (auto& e) -> future<> {
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
@@ -1241,7 +1319,7 @@ void query_processor::migration_subscriber::on_drop_view(const sstring& ks_name,
void query_processor::migration_subscriber::remove_invalid_prepared_statements(
sstring ks_name,
std::optional<sstring> cf_name) {
_qp->_prepared_cache.remove_if([&] (::shared_ptr<cql_statement> stmt) {
_qp->_prepared_cache->remove_if([&] (::shared_ptr<cql_statement> stmt) {
return this->should_invalidate(ks_name, cf_name, stmt);
});
}
@@ -1298,13 +1376,13 @@ void query_processor::update_authorized_prepared_cache_config() {
std::chrono::duration_cast<std::chrono::milliseconds>(prepared_statements_cache::entry_expiry));
cfg.refresh = std::chrono::milliseconds(_db.get_config().permissions_update_interval_in_ms());
if (!_authorized_prepared_cache.update_config(std::move(cfg))) {
if (!_authorized_prepared_cache->update_config(std::move(cfg))) {
log.error("Failed to apply authorized prepared statements cache changes. Please read the documentation of these parameters");
}
}
void query_processor::reset_cache() {
_authorized_prepared_cache.reset();
_authorized_prepared_cache->reset();
}
}

View File

@@ -17,15 +17,16 @@
#include <seastar/core/sharded.hh>
#include <seastar/core/shared_ptr.hh>
#include "cql3/prepared_statements_cache.hh"
#include "cql3/authorized_prepared_statements_cache.hh"
#include "cql3/prepared_cache_key_type.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/cql_statement.hh"
#include "cql3/dialect.hh"
#include "cql3/query_options.hh"
#include "cql3/stats.hh"
#include "exceptions/exceptions.hh"
#include "service/migration_listener.hh"
#include "mutation/timestamp.hh"
#include "transport/messages/result_message.hh"
#include "transport/messages/result_message_base.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "vector_search/vector_store_client.hh"
@@ -41,6 +42,11 @@
namespace lang { class manager; }
namespace utils {
template <typename Clock>
struct loading_cache_config_base;
using loading_cache_config = loading_cache_config_base<seastar::lowres_clock>;
}
namespace service {
class migration_manager;
class query_state;
@@ -58,6 +64,9 @@ struct query;
namespace cql3 {
class prepared_statements_cache;
class authorized_prepared_statements_cache;
namespace statements {
class batch_statement;
class schema_altering_statement;
@@ -132,8 +141,8 @@ private:
seastar::metrics::metric_groups _metrics;
prepared_statements_cache _prepared_cache;
authorized_prepared_statements_cache _authorized_prepared_cache;
std::unique_ptr<prepared_statements_cache> _prepared_cache;
std::unique_ptr<authorized_prepared_statements_cache> _authorized_prepared_cache;
// Tracks the rolling maximum of gross bytes allocated during CQL parsing
utils::rolling_max_tracker _parsing_cost_tracker{1000};
@@ -184,7 +193,7 @@ public:
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm);
~query_processor();
@@ -231,53 +240,17 @@ public:
return _vector_store_client;
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);
if (vp) {
try {
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
// corresponds to the last time its value has been read.
//
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
// we need to create space in the prepared statements cache for a new entry.
//
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
// breaking the LRU paradigm of these caches.
_prepared_cache.touch(key);
return vp->get()->checked_weak_from_this();
} catch (seastar::checked_ptr_is_null_exception&) {
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
_authorized_prepared_cache.remove(*user, key);
}
}
}
return statements::prepared_statement::checked_weak_ptr();
}
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key);
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
return _prepared_cache.find(key);
}
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key);
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared(
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
bool needs_authorization);
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -301,20 +274,12 @@ public:
bool needs_authorization);
/// Execute a client statement that was not prepared.
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_direct(
const std::string_view& query_string,
service::query_state& query_state,
dialect d,
query_options& options) {
return execute_direct_without_checking_exception_message(
query_string,
query_state,
d,
options)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
query_options& options);
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -466,20 +431,12 @@ public:
future<> stop();
inline
future<::shared_ptr<cql_transport::messages::result_message>>
execute_batch(
::shared_ptr<statements::batch_statement> stmt,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
return execute_batch_without_checking_exception_message(
std::move(stmt),
query_state,
options,
std::move(pending_authorization_entries))
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
// The result_message::exception must be explicitly handled.
@@ -488,7 +445,7 @@ public:
::shared_ptr<statements::batch_statement>,
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
future<service::broadcast_tables::query_result>
execute_broadcast_table_query(const service::broadcast_tables::query&);

View File

@@ -16,6 +16,7 @@
#include <seastar/core/execution_stage.hh>
#include "cas_request.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "service/storage_proxy.hh"
#include "tracing/trace_state.hh"
#include "utils/unique_view.hh"

View File

@@ -22,6 +22,7 @@
#include "cql3/expr/evaluate.hh"
#include "cql3/query_options.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/values.hh"
#include "timeout_config.hh"
#include "service/broadcast_tables/experimental/lang.hh"

View File

@@ -14,6 +14,7 @@
#include "auth/service.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/query_processor.hh"
#include "unimplemented.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "transport/event.hh"

View File

@@ -15,6 +15,7 @@
#include "cql3/cql_statement.hh"
#include "data_dictionary/data_dictionary.hh"
#include "cql3/query_processor.hh"
#include "unimplemented.hh"
#include "service/storage_proxy.hh"
#include <optional>
#include "validation.hh"

View File

@@ -2002,7 +2002,7 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>& addre
}
}
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
static std::vector<seastar::metrics::relabel_config> get_relable_from_yaml(const YAML::Node& yaml, const std::string& name) {

View File

@@ -15,7 +15,7 @@
#include <seastar/util/program-options.hh>
#include <seastar/util/log.hh>
#include "locator/abstract_replication_strategy.hh"
#include "locator/replication_strategy_type.hh"
#include "seastarx.hh"
#include "utils/config_file.hh"
#include "utils/enum_option.hh"

View File

@@ -15,10 +15,11 @@
#include <utility>
#include <vector>
#include "db/view/view_build_status.hh"
#include "gms/gossiper.hh"
#include "gms/inet_address.hh"
#include "gms/generation-number.hh"
#include "gms/loaded_endpoint_state.hh"
#include "schema/schema_fwd.hh"
#include "utils/UUID.hh"
#include "query/query-result-set.hh"
#include "db_clock.hh"
#include "mutation_query.hh"
#include "system_keyspace_view_types.hh"
@@ -36,6 +37,10 @@ namespace netw {
class shared_dict;
};
namespace query {
class result_set;
}
namespace sstables {
struct entry_descriptor;
class generation_type;

View File

@@ -29,6 +29,8 @@
#include "db/config.hh"
#include "db/view/base_info.hh"
#include "gms/gossiper.hh"
#include "query/query-result-set.hh"
#include "db/view/view_build_status.hh"
#include "db/view/view_consumer.hh"
#include "mutation/canonical_mutation.hh"

View File

@@ -13,6 +13,7 @@
#include <seastar/core/abort_source.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/on_internal_error.hh>
#include "gms/gossiper.hh"
#include "db/view/view_building_coordinator.hh"
#include "db/view/view_build_status.hh"
#include "locator/tablets.hh"

View File

@@ -13,7 +13,7 @@
#include "db/system_keyspace.hh"
#include "locator/tablets.hh"
#include "mutation/canonical_mutation.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/raft/raft_group0.hh"
#include "service/raft/raft_group0_client.hh"

View File

@@ -21,6 +21,8 @@
#include "dht/token.hh"
#include "replica/database.hh"
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "schema/schema_fwd.hh"

View File

@@ -17,7 +17,7 @@
#include <flat_set>
#include "locator/abstract_replication_strategy.hh"
#include "locator/tablets.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include <seastar/core/gate.hh>
#include "db/view/view_building_state.hh"
#include "sstables/shared_sstable.hh"

View File

@@ -20,6 +20,7 @@
#include "cdc/metadata.hh"
#include "db/config.hh"
#include "db/system_keyspace.hh"
#include "query/query-result-set.hh"
#include "db/virtual_table.hh"
#include "partition_slice_builder.hh"
#include "db/virtual_tables.hh"

View File

@@ -34,6 +34,7 @@
#include "locator/token_metadata.hh"
#include "locator/types.hh"
#include "gms/gossip_address_map.hh"
#include "gms/loaded_endpoint_state.hh"
namespace gms {
@@ -71,11 +72,6 @@ struct gossip_config {
utils::updateable_value<utils::UUID> recovery_leader;
};
struct loaded_endpoint_state {
gms::inet_address endpoint;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
};
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module

View File

@@ -0,0 +1,23 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <optional>
#include "gms/inet_address.hh"
#include "locator/types.hh"
namespace gms {
struct loaded_endpoint_state {
inet_address endpoint;
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
};
} // namespace gms

View File

@@ -284,14 +284,3 @@ future<> instance_cache::stop() {
}
}
namespace std {
template <>
struct equal_to<seastar::scheduling_group> {
bool operator()(seastar::scheduling_group& sg1, seastar::scheduling_group& sg2) const noexcept {
return sg1 == sg2;
}
};
}

View File

@@ -19,6 +19,7 @@
#include "utils/sequenced_set.hh"
#include "utils/simple_hashers.hh"
#include "tablets.hh"
#include "locator/replication_strategy_type.hh"
#include "data_dictionary/consistency_config_options.hh"
// forward declaration since replica/database.hh includes this file
@@ -38,13 +39,6 @@ extern logging::logger rslogger;
using inet_address = gms::inet_address;
using token = dht::token;
enum class replication_strategy_type {
simple,
local,
network_topology,
everywhere_topology,
};
using replication_strategy_config_option = std::variant<sstring, rack_list>;
using replication_strategy_config_options = std::map<sstring, replication_strategy_config_option>;

View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
namespace locator {
enum class replication_strategy_type {
simple,
local,
network_topology,
everywhere_topology,
};
} // namespace locator

View File

@@ -21,10 +21,9 @@
#include "utils/chunked_vector.hh"
#include "utils/hash.hh"
#include "utils/UUID.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include <ranges>
#include <seastar/core/reactor.hh>
#include <seastar/util/log.hh>
#include <seastar/core/sharded.hh>
#include <seastar/util/noncopyable_function.hh>

27
raft/raft_fwd.hh Normal file
View File

@@ -0,0 +1,27 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
// Lightweight forward-declaration header for commonly used raft types.
// Include this instead of raft/raft.hh when only the basic ID/index types
// are needed (e.g. in other header files), to avoid pulling in the full
// raft machinery (futures, abort_source, bytes_ostream, etc.).
#include "internal.hh"
namespace raft {
using server_id = internal::tagged_id<struct server_id_tag>;
using group_id = internal::tagged_id<struct group_id_tag>;
using term_t = internal::tagged_uint64<struct term_tag>;
using index_t = internal::tagged_uint64<struct index_tag>;
using read_id = internal::tagged_uint64<struct read_id_tag>;
class server;
} // namespace raft

View File

@@ -69,6 +69,13 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
}
};
} // namespace replica::logstor
template<>
size_t hist_key<replica::logstor::segment_descriptor>(const replica::logstor::segment_descriptor& desc);
namespace replica::logstor {
using segment_descriptor_hist = log_heap<segment_descriptor, segment_descriptor_hist_options>;
struct segment_set {

View File

@@ -16,6 +16,7 @@
#include "service/paxos/paxos_state.hh"
#include "service/query_state.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/untyped_result_set.hh"
#include "db/system_keyspace.hh"
#include "replica/database.hh"

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <unordered_set>
#include "service/raft/group0_fwd.hh"
namespace service {

View File

@@ -9,7 +9,11 @@
#pragma once
#include <iosfwd>
#include "raft/raft.hh"
#include <variant>
#include <vector>
#include <seastar/core/timer.hh>
#include <seastar/core/lowres_clock.hh>
#include "raft/raft_fwd.hh"
#include "gms/inet_address.hh"
namespace service {

View File

@@ -38,7 +38,6 @@
#include "replica/exceptions.hh"
#include "locator/host_id.hh"
#include "dht/token_range_endpoints.hh"
#include "service/storage_service.hh"
#include "service/cas_shard.hh"
#include "service/storage_proxy_fwd.hh"

View File

@@ -15,6 +15,7 @@
#include <seastar/core/shared_future.hh>
#include "absl-flat_hash_map.hh"
#include "gms/endpoint_state.hh"
#include "gms/gossip_address_map.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/client_routes.hh"
@@ -40,11 +41,9 @@
#include <seastar/core/shared_ptr.hh>
#include "cdc/generation_id.hh"
#include "db/system_keyspace.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "node_ops/id.hh"
#include "raft/server.hh"
#include "db/view/view_building_state.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "mutation/timestamp.hh"
#include "utils/UUID.hh"
@@ -115,6 +114,10 @@ class tablet_mutation_builder;
namespace auth { class cache; }
namespace service {
class tablet_allocator;
}
namespace utils {
class disk_space_monitor;
}

View File

@@ -17,7 +17,7 @@
#include <seastar/core/metrics.hh>
#include "utils/log.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_state_machine.hh"
#include "db/view/view_building_state.hh"

View File

@@ -16,7 +16,7 @@
#include <seastar/core/sstring.hh>
#include "cdc/generation_id.hh"
#include "dht/token.hh"
#include "raft/raft.hh"
#include "raft/raft_fwd.hh"
#include "utils/UUID.hh"
#include "service/session.hh"
#include "mutation/canonical_mutation.hh"

View File

@@ -254,7 +254,6 @@
#include <seastar/coroutine/generator.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/client.hh>
#include <seastar/http/common.hh>
#include <seastar/http/connection_factory.hh>
@@ -401,4 +400,44 @@
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
// Scylla internal headers included by most translation units
#include "bytes.hh"
#include "seastarx.hh"
#include "utils/UUID.hh"
#include "utils/tagged_integer.hh"
#include "schema/schema_fwd.hh"
#include "utils/managed_bytes.hh"
#include "dht/token.hh"
#include "locator/host_id.hh"
#include "gms/inet_address.hh"
#include "utils/chunked_vector.hh"
#include "utils/fragment_range.hh"
#include "types/types.hh"
#include "keys/keys.hh"
#include "schema/schema.hh"
#include "db/timeout_clock.hh"
#include "mutation/mutation_partition.hh"
#include "mutation/mutation_fragment.hh"
#include "db_clock.hh"
#include "gc_clock.hh"
#include "locator/token_metadata_fwd.hh"
#include "locator/types.hh"
#include "locator/token_metadata.hh"
#include "gms/gossiper.hh"
#include "db/system_keyspace.hh"
#include "service/topology_state_machine.hh"
#include "cql3/query_options.hh"
#include "service/client_state.hh"
#include "cql3/query_processor.hh"
#include "db/config.hh"
#include "service/storage_proxy.hh"
#include "schema/schema_builder.hh"
#include "exceptions/exceptions.hh"
#include "gms/feature_service.hh"
#include "service/migration_manager.hh"
#include "sstables/sstables.hh"
#include "service/storage_service.hh"
#include "transport/messages/result_message.hh"
#include "replica/database.hh"
#endif

View File

@@ -13,6 +13,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "table_helper.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/statements/create_table_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "replica/database.hh"

View File

@@ -13,6 +13,7 @@
#include <seastar/core/gate.hh>
#include <seastar/core/when_all.hh>
#include <seastar/rpc/rpc_types.hh>
#include "gms/gossiper.hh"
#include <seastar/util/defer.hh>

View File

@@ -13,6 +13,7 @@
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_assertions.hh"
#include "db/system_keyspace.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/error_injection.hh"
#include "test/lib/log.hh"

View File

@@ -11,6 +11,7 @@
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "test/lib/cql_test_env.hh"
#include "transport/messages/result_message.hh"
#include "db/commitlog/commitlog_replayer.hh"
#include "db/commitlog/commitlog.hh"
#include "db/config.hh"

View File

@@ -9,6 +9,7 @@
#include "mutation/mutation.hh"
#include "service/storage_proxy.hh"
#include "cql3/selection/selection.hh"
BOOST_AUTO_TEST_SUITE(per_partition_rate_limit_test)

View File

@@ -18,6 +18,8 @@
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "transport/messages/result_message.hh"
#include "cql3/result_set.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "schema/schema_builder.hh"

View File

@@ -18,6 +18,7 @@
#include "test/lib/random_utils.hh"
#include "service/topology_mutation.hh"
#include "service/storage_service.hh"
#include "gms/gossiper.hh"
#include <fmt/ranges.h>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/testing/on_internal_error.hh>

View File

@@ -19,6 +19,7 @@
#include "cdc/generation_service.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "transport/messages/result_message.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"

View File

@@ -11,6 +11,8 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/socket_defs.hh>
#include <seastar/net/api.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/thread.hh>

View File

@@ -26,6 +26,7 @@
#include "db/config.hh"
#include "schema/schema_builder.hh"
#include "service/storage_proxy.hh"
#include "service/storage_service.hh"
#include "db/system_keyspace.hh"
#include "tools/utils.hh"

View File

@@ -14,6 +14,7 @@
#include "vector_search/vector_store_client.hh"
#include "vs_mock_server.hh"
#include "test/lib/cql_test_env.hh"
#include "transport/messages/result_message.hh"
#include "utils/rjson.hh"
#include <boost/test/tools/old/interface.hpp>
#include <seastar/core/seastar.hh>

View File

@@ -9,6 +9,7 @@
#include "tools/load_system_tablets.hh"
#include <seastar/core/thread.hh>
#include "query/query-result-set.hh"
#include <seastar/util/closeable.hh>
#include "locator/abstract_replication_strategy.hh"

View File

@@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
std::vector<cql3::statements::batch_statement::single_statement> modifications;
std::vector<cql3::raw_value_view_vector_with_unset> values;
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
std::unordered_map<cql3::prepared_cache_key_type, cql3::statements::prepared_statement::checked_weak_ptr> pending_authorization_entries;
modifications.reserve(n.assume_value());
values.reserve(n.assume_value());

View File

@@ -566,7 +566,7 @@ public:
}
if (need_preempt() && i != _cache.end()) {
auto key = i->idx;
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
i = _cache.lower_bound(key);
}
}

View File

@@ -360,24 +360,20 @@ utils::config_file::configs utils::config_file::unset_values() const {
}
future<> utils::config_file::read_from_file(file f, error_handler h) {
return f.size().then([this, f, h](size_t s) {
return do_with(make_file_input_stream(f), [this, s, h](input_stream<char>& in) {
return in.read_exactly(s).then([this, h](temporary_buffer<char> buf) {
read_from_yaml(sstring(buf.begin(), buf.end()), h);
if (!_initialization_completed) {
// Boolean value set on only one shard, but broadcast_to_all_shards().get() called later
// in main.cc will apply the required memory barriers anyway.
_initialization_completed = true;
}
});
});
});
auto s = co_await f.size();
auto in = make_file_input_stream(f);
auto buf = co_await in.read_exactly(s);
read_from_yaml(sstring(buf.begin(), buf.end()), h);
if (!_initialization_completed) {
// Boolean value set on only one shard, but broadcast_to_all_shards().get() called later
// in main.cc will apply the required memory barriers anyway.
_initialization_completed = true;
}
}
future<> utils::config_file::read_from_file(const sstring& filename, error_handler h) {
return open_file_dma(filename, open_flags::ro).then([this, h](file f) {
return read_from_file(std::move(f), h);
});
auto f = co_await open_file_dma(filename, open_flags::ro);
co_await read_from_file(std::move(f), h);
}
future<> utils::config_file::broadcast_to_all_shards() {

View File

@@ -83,7 +83,7 @@ directories::directories(bool developer_mode)
future<> directories::create_and_verify(directories::set dir_set, recursive recursive) {
std::vector<file_lock> locks;
locks.reserve(dir_set.get_paths().size());
co_await coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
file_lock lock = co_await touch_and_lock(path);
locks.emplace_back(std::move(lock));
co_await disk_sanity(path, _developer_mode);
@@ -144,7 +144,7 @@ future<> directories::do_verify_owner_and_mode(fs::path path, recursive recurse,
co_return;
}
auto lister = directory_lister(path, lister::dir_entry_types::full(), do_verify_subpath, lister::show_hidden::no);
co_await with_closeable(std::move(lister), coroutine::lambda([&] (auto& lister) -> future<> {
co_await with_closeable(std::move(lister), seastar::coroutine::lambda([&] (auto& lister) -> future<> {
while (auto de = co_await lister.get()) {
co_await do_verify_owner_and_mode(path / de->name, recurse, level + 1, do_verify_subpath);
}
@@ -168,7 +168,7 @@ future<> directories::verify_owner_and_mode(fs::path path, recursive recursive)
future<> directories::verify_owner_and_mode_of_data_dir(directories::set dir_set) {
// verify data and index files in the first iteration and the other files in the second iteration.
for (auto verify_data_and_index_files : { true, false }) {
co_await coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
return do_verify_owner_and_mode(std::move(path), recursive::yes, 0, [verify_data_and_index_files] (const fs::path& dir, const directory_entry& de) {
auto path = dir / de.name;
component_type path_component_type;

View File

@@ -335,7 +335,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
} catch (...) {
// just disregard the failure, we will retry below in the wrapped handler
}
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, seastar::input_stream<char>& in) -> future<> {
auto _in = std::move(in);
auto status_class = reply::classify_status(rep._status);
/*
@@ -352,7 +352,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
auto content = co_await util::read_entire_stream_contiguous(_in);
auto error_msg = get_gcp_error_message(std::string_view(content));
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
}
std::exception_ptr eptr;
@@ -366,7 +366,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
co_await seastar::coroutine::return_exception_ptr(std::move(eptr));
}
};
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);

View File

@@ -84,7 +84,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
_gen.emplace(_opened.experimental_list_directory());
}
if (!_gen) {
co_return coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
co_return seastar::coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
}
std::exception_ptr ex;
try {
@@ -114,7 +114,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
_gen.reset();
if (ex) {
co_await _opened.close();
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
co_return std::nullopt;
}

View File

@@ -438,7 +438,7 @@ private:
break;
}
_reclaim(free_memory_threshold - memory::free_memory());
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
}
llogger.debug("background_reclaimer::main_loop: exit");
}

View File

@@ -136,7 +136,7 @@ public:
_map.erase(_map.begin());
}
if (++ret % evictions_per_yield == 0) {
co_await coroutine::maybe_yield();
co_await seastar::coroutine::maybe_yield();
}
}
co_return ret;

View File

@@ -8,6 +8,7 @@
#include <fmt/format.h>
#include <exception>
#include "utils/exceptions.hh"
#include <initializer_list>
#include <memory>
#include <regex>
@@ -339,7 +340,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
should_retry = utils::http::retryable::yes;
co_await authorize(request);
}
co_await coroutine::return_exception_ptr(std::make_exception_ptr(
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(
aws::aws_exception(aws_error(possible_error->get_error_type(), possible_error->get_error_message().c_str(), should_retry))));
}
@@ -358,7 +359,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
eptr = std::current_exception();
}
if (eptr) {
co_await coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
}
};
}
@@ -539,7 +540,7 @@ future<> client::put_object_tagging(sstring object_name, tag_set tagging, seasta
}
co_await output.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::ok, as);
@@ -609,7 +610,7 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf, sea
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
@@ -635,7 +636,7 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
});
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
@@ -885,7 +886,7 @@ future<> dump_multipart_upload_parts(output_stream<char> out, const utils::chunk
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
}
@@ -901,7 +902,7 @@ future<> client::multipart_upload::start_upload() {
auto body = co_await util::read_entire_stream_contiguous(in);
_upload_id = parse_multipart_upload_id(body);
if (_upload_id.empty()) {
co_await coroutine::return_exception(std::runtime_error("cannot initiate upload"));
co_await seastar::coroutine::return_exception(std::runtime_error("cannot initiate upload"));
}
s3l.trace("created uploads for {} -> id = {}", _object_name, _upload_id);
}, http::reply::status_type::ok, _as);
@@ -936,7 +937,7 @@ future<> client::multipart_upload::upload_part(memory_data_sink_buffers bufs) {
}
co_await out.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
// note: At this point the buffers are sent, but the response is not yet
// received. However, claim is released and next part may start uploading
@@ -985,7 +986,7 @@ future<> client::multipart_upload::finalize_upload() {
unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
if (parts_xml_len == 0) {
co_await coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
co_await seastar::coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
}
s3l.trace("POST upload completion {} parts (upload id {})", _part_etags.size(), _upload_id);
@@ -1001,15 +1002,15 @@ future<> client::multipart_upload::finalize_upload() {
auto status_class = http::reply::classify_status(rep._status);
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
if (possible_error) {
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
co_await seastar::coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
}
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
co_await seastar::coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
}
if (rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
// this point it is ok since we are not interested in parsing this particular response
@@ -1443,7 +1444,7 @@ auto client::download_source::request_body() -> future<external_body> {
(void)_client->make_request(std::move(req), [this, &p] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
s3l.trace("GET {} got the body ({} {} bytes)", _object_name, rep._status, rep.content_length);
if (rep._status != http::reply::status_type::partial_content && rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
auto in = std::move(in_);
@@ -1533,7 +1534,7 @@ class client::do_upload_file : private multipart_upload {
co_await output.close();
co_await input.close();
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
}
}
@@ -1805,7 +1806,7 @@ future<> client::close() {
_creds_invalidation_timer.cancel();
_creds_update_timer.cancel();
}
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await seastar::coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.http.close();
});
}
@@ -1914,7 +1915,7 @@ future<std::optional<directory_entry>> client::bucket_lister::get() {
}
co_await close();
if (ex) {
co_return coroutine::exception(std::move(ex));
co_return seastar::coroutine::exception(std::move(ex));
}
co_return std::nullopt;
}