mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
Compare commits
19 Commits
per_sl_cou
...
ykaul/comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b17429d99 | ||
|
|
8f491eb7c7 | ||
|
|
b9823f3053 | ||
|
|
3e6d959867 | ||
|
|
75c4fe1f33 | ||
|
|
28e59bae5a | ||
|
|
66618cd869 | ||
|
|
b4586f0789 | ||
|
|
37280265ef | ||
|
|
2fbba4a071 | ||
|
|
be5fa64d36 | ||
|
|
5c918d29cc | ||
|
|
43e337a663 | ||
|
|
a67efb031c | ||
|
|
5b0933c453 | ||
|
|
2ac834d797 | ||
|
|
b324c84a04 | ||
|
|
b499dc8e9d | ||
|
|
8ad8e76c3b |
@@ -234,6 +234,9 @@ generate_scylla_version()
|
||||
|
||||
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
|
||||
add_library(scylla-precompiled-header STATIC exported_templates.cc)
|
||||
target_include_directories(scylla-precompiled-header PRIVATE
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}"
|
||||
"${scylla_gen_build_dir}")
|
||||
target_link_libraries(scylla-precompiled-header PRIVATE
|
||||
absl::headers
|
||||
absl::btree
|
||||
|
||||
19
configure.py
19
configure.py
@@ -2766,6 +2766,25 @@ def write_build_file(f,
|
||||
f.write('build {}: rust_source {}\n'.format(cc, src))
|
||||
obj = cc.replace('.cc', '.o')
|
||||
compiles[obj] = cc
|
||||
# Sources shared between scylla (compiled with PCH) and small tests
|
||||
# (with custom deps and partial link sets) must not use the PCH,
|
||||
# because -fpch-instantiate-templates injects symbol references that
|
||||
# the small test link sets cannot satisfy.
|
||||
small_test_srcs = set()
|
||||
for test_binary, test_deps in deps.items():
|
||||
if not test_binary.startswith('test/'):
|
||||
continue
|
||||
# Only exclude PCH for tests with truly small/partial link sets.
|
||||
# Tests that include scylla_core or similar large dep sets link
|
||||
# against enough objects to satisfy PCH-injected symbol refs.
|
||||
if len(test_deps) > 50:
|
||||
continue
|
||||
for src in test_deps:
|
||||
if src.endswith('.cc'):
|
||||
small_test_srcs.add(src)
|
||||
for src in small_test_srcs:
|
||||
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
|
||||
compiles_with_pch.discard(obj)
|
||||
for obj in compiles:
|
||||
src = compiles[obj]
|
||||
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
|
||||
|
||||
84
cql3/prepared_cache_key_type.hh
Normal file
84
cql3/prepared_cache_key_type.hh
Normal file
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.1 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
typedef bytes cql_prepared_id_type;
|
||||
|
||||
/// \brief The key of the prepared statements cache
|
||||
///
|
||||
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
|
||||
/// the latter was introduced for unifying the CQL and Thrift prepared
|
||||
/// statements so that they can be stored in the same cache.
|
||||
class prepared_cache_key_type {
|
||||
public:
|
||||
// derive from cql_prepared_id_type so we can customize the formatter of
|
||||
// cache_key_type
|
||||
struct cache_key_type : public cql_prepared_id_type {
|
||||
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
|
||||
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
|
||||
bool operator==(const cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
private:
|
||||
cache_key_type _key;
|
||||
|
||||
public:
|
||||
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
|
||||
|
||||
cache_key_type& key() { return _key; }
|
||||
const cache_key_type& key() const { return _key; }
|
||||
|
||||
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
|
||||
return key.key();
|
||||
}
|
||||
|
||||
bool operator==(const prepared_cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k);
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k.key());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// for prepared_statements_cache log printouts
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
|
||||
}
|
||||
};
|
||||
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", p.key());
|
||||
}
|
||||
};
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "cql3/prepared_cache_key_type.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/column_specification.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
@@ -27,39 +28,6 @@ struct prepared_cache_entry_size {
|
||||
}
|
||||
};
|
||||
|
||||
typedef bytes cql_prepared_id_type;
|
||||
|
||||
/// \brief The key of the prepared statements cache
|
||||
///
|
||||
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
|
||||
/// the latter was introduced for unifying the CQL and Thrift prepared
|
||||
/// statements so that they can be stored in the same cache.
|
||||
class prepared_cache_key_type {
|
||||
public:
|
||||
// derive from cql_prepared_id_type so we can customize the formatter of
|
||||
// cache_key_type
|
||||
struct cache_key_type : public cql_prepared_id_type {
|
||||
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
|
||||
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
|
||||
bool operator==(const cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
private:
|
||||
cache_key_type _key;
|
||||
|
||||
public:
|
||||
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
|
||||
|
||||
cache_key_type& key() { return _key; }
|
||||
const cache_key_type& key() const { return _key; }
|
||||
|
||||
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
|
||||
return key.key();
|
||||
}
|
||||
|
||||
bool operator==(const prepared_cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
class prepared_statements_cache {
|
||||
public:
|
||||
struct stats {
|
||||
@@ -164,35 +132,3 @@ public:
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k);
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k.key());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// for prepared_statements_cache log printouts
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
|
||||
}
|
||||
};
|
||||
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", p.key());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -17,6 +17,9 @@
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/coroutine/try_future.hh>
|
||||
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/authorized_prepared_statements_cache.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/mapreduce_service.hh"
|
||||
@@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
|
||||
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
|
||||
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm)
|
||||
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
|
||||
, _proxy(proxy)
|
||||
, _db(db)
|
||||
@@ -85,8 +88,8 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
, _vector_store_client(vsc)
|
||||
, _mcfg(mcfg)
|
||||
, _cql_config(cql_cfg)
|
||||
, _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size)
|
||||
, _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log)
|
||||
, _prepared_cache(std::make_unique<prepared_statements_cache>(prep_cache_log, _mcfg.prepared_statment_cache_size))
|
||||
, _authorized_prepared_cache(std::make_unique<authorized_prepared_statements_cache>(auth_prep_cache_cfg, authorized_prepared_statements_cache_log))
|
||||
, _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); })
|
||||
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
|
||||
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
|
||||
@@ -353,12 +356,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
|
||||
sm::make_gauge(
|
||||
"prepared_cache_size",
|
||||
[this] { return _prepared_cache.size(); },
|
||||
[this] { return _prepared_cache->size(); },
|
||||
sm::description("A number of entries in the prepared statements cache.")),
|
||||
|
||||
sm::make_gauge(
|
||||
"prepared_cache_memory_footprint",
|
||||
[this] { return _prepared_cache.memory_footprint(); },
|
||||
[this] { return _prepared_cache->memory_footprint(); },
|
||||
sm::description("Size (in bytes) of the prepared statements cache.")),
|
||||
|
||||
sm::make_counter(
|
||||
@@ -449,12 +452,12 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
|
||||
sm::make_gauge(
|
||||
"authorized_prepared_statements_cache_size",
|
||||
[this] { return _authorized_prepared_cache.size(); },
|
||||
[this] { return _authorized_prepared_cache->size(); },
|
||||
sm::description("Number of entries in the authenticated prepared statements cache.")),
|
||||
|
||||
sm::make_gauge(
|
||||
"user_prepared_auth_cache_footprint",
|
||||
[this] { return _authorized_prepared_cache.memory_footprint(); },
|
||||
[this] { return _authorized_prepared_cache->memory_footprint(); },
|
||||
sm::description("Size (in bytes) of the authenticated prepared statements cache.")),
|
||||
|
||||
sm::make_counter(
|
||||
@@ -554,6 +557,81 @@ query_processor::~query_processor() {
|
||||
}
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
||||
if (user) {
|
||||
auto vp = _authorized_prepared_cache->find(*user, key);
|
||||
if (vp) {
|
||||
try {
|
||||
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
|
||||
// corresponds to the last time its value has been read.
|
||||
//
|
||||
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
|
||||
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
|
||||
// we need to create space in the prepared statements cache for a new entry.
|
||||
//
|
||||
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
|
||||
// breaking the LRU paradigm of these caches.
|
||||
_prepared_cache->touch(key);
|
||||
return vp->get()->checked_weak_from_this();
|
||||
} catch (seastar::checked_ptr_is_null_exception&) {
|
||||
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
|
||||
_authorized_prepared_cache->remove(*user, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return statements::prepared_statement::checked_weak_ptr();
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr query_processor::get_prepared(const prepared_cache_key_type& key) {
|
||||
return _prepared_cache->find(key);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
query_processor::execute_prepared(
|
||||
statements::prepared_statement::checked_weak_ptr statement,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
service::query_state& query_state,
|
||||
const query_options& options,
|
||||
bool needs_authorization) {
|
||||
auto cql_statement = statement->statement;
|
||||
return execute_prepared_without_checking_exception_message(
|
||||
query_state,
|
||||
std::move(cql_statement),
|
||||
options,
|
||||
std::move(statement),
|
||||
std::move(cache_key),
|
||||
needs_authorization)
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
query_processor::execute_direct(
|
||||
const std::string_view& query_string,
|
||||
service::query_state& query_state,
|
||||
dialect d,
|
||||
query_options& options) {
|
||||
return execute_direct_without_checking_exception_message(
|
||||
query_string,
|
||||
query_state,
|
||||
d,
|
||||
options)
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
query_processor::execute_batch(
|
||||
::shared_ptr<statements::batch_statement> stmt,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
|
||||
return execute_batch_without_checking_exception_message(
|
||||
std::move(stmt),
|
||||
query_state,
|
||||
options,
|
||||
std::move(pending_authorization_entries))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
|
||||
std::pair<std::reference_wrapper<service::strong_consistency::coordinator>, gate::holder>
|
||||
query_processor::acquire_strongly_consistent_coordinator() {
|
||||
auto [remote_, holder] = remote();
|
||||
@@ -577,8 +655,8 @@ future<> query_processor::stop_remote() {
|
||||
|
||||
future<> query_processor::stop() {
|
||||
co_await _mnotifier.unregister_listener(_migration_subscriber.get());
|
||||
co_await _authorized_prepared_cache.stop();
|
||||
co_await _prepared_cache.stop();
|
||||
co_await _authorized_prepared_cache->stop();
|
||||
co_await _prepared_cache->stop();
|
||||
}
|
||||
|
||||
future<::shared_ptr<cql_transport::messages::result_message>> query_processor::execute_with_guard(
|
||||
@@ -697,7 +775,7 @@ query_processor::do_execute_prepared(
|
||||
if (needs_authorization) {
|
||||
co_await statement->check_access(*this, query_state.get_client_state());
|
||||
try {
|
||||
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
|
||||
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
|
||||
} catch (...) {
|
||||
log.error("failed to cache the entry: {}", std::current_exception());
|
||||
}
|
||||
@@ -733,7 +811,7 @@ future<::shared_ptr<cql_transport::messages::result_message::prepared>>
|
||||
query_processor::prepare(sstring query_string, const service::client_state& client_state, cql3::dialect d) {
|
||||
try {
|
||||
auto key = compute_id(query_string, client_state.get_raw_keyspace(), d);
|
||||
auto prep_entry = co_await _prepared_cache.get_pinned(key, [this, &query_string, &client_state, d] {
|
||||
auto prep_entry = co_await _prepared_cache->get_pinned(key, [this, &query_string, &client_state, d] {
|
||||
auto prepared = get_statement(query_string, client_state, d);
|
||||
prepared->calculate_metadata_id();
|
||||
auto bound_terms = prepared->statement->get_bound_terms();
|
||||
@@ -1069,11 +1147,11 @@ query_processor::execute_batch_without_checking_exception_message(
|
||||
::shared_ptr<statements::batch_statement> batch,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
|
||||
auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state()));
|
||||
co_await coroutine::parallel_for_each(pending_authorization_entries, [this, &query_state] (auto& e) -> future<> {
|
||||
try {
|
||||
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
|
||||
co_await _authorized_prepared_cache->insert(*query_state.get_client_state().user(), e.first, std::move(e.second));
|
||||
} catch (...) {
|
||||
log.error("failed to cache the entry: {}", std::current_exception());
|
||||
}
|
||||
@@ -1241,7 +1319,7 @@ void query_processor::migration_subscriber::on_drop_view(const sstring& ks_name,
|
||||
void query_processor::migration_subscriber::remove_invalid_prepared_statements(
|
||||
sstring ks_name,
|
||||
std::optional<sstring> cf_name) {
|
||||
_qp->_prepared_cache.remove_if([&] (::shared_ptr<cql_statement> stmt) {
|
||||
_qp->_prepared_cache->remove_if([&] (::shared_ptr<cql_statement> stmt) {
|
||||
return this->should_invalidate(ks_name, cf_name, stmt);
|
||||
});
|
||||
}
|
||||
@@ -1298,13 +1376,13 @@ void query_processor::update_authorized_prepared_cache_config() {
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(prepared_statements_cache::entry_expiry));
|
||||
cfg.refresh = std::chrono::milliseconds(_db.get_config().permissions_update_interval_in_ms());
|
||||
|
||||
if (!_authorized_prepared_cache.update_config(std::move(cfg))) {
|
||||
if (!_authorized_prepared_cache->update_config(std::move(cfg))) {
|
||||
log.error("Failed to apply authorized prepared statements cache changes. Please read the documentation of these parameters");
|
||||
}
|
||||
}
|
||||
|
||||
void query_processor::reset_cache() {
|
||||
_authorized_prepared_cache.reset();
|
||||
_authorized_prepared_cache->reset();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,15 +17,16 @@
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/authorized_prepared_statements_cache.hh"
|
||||
#include "cql3/prepared_cache_key_type.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/stats.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "mutation/timestamp.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "transport/messages/result_message_base.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/broadcast_tables/experimental/query_result.hh"
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
@@ -41,6 +42,11 @@
|
||||
|
||||
|
||||
namespace lang { class manager; }
|
||||
namespace utils {
|
||||
template <typename Clock>
|
||||
struct loading_cache_config_base;
|
||||
using loading_cache_config = loading_cache_config_base<seastar::lowres_clock>;
|
||||
}
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
class query_state;
|
||||
@@ -58,6 +64,9 @@ struct query;
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class prepared_statements_cache;
|
||||
class authorized_prepared_statements_cache;
|
||||
|
||||
namespace statements {
|
||||
class batch_statement;
|
||||
class schema_altering_statement;
|
||||
@@ -132,8 +141,8 @@ private:
|
||||
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
prepared_statements_cache _prepared_cache;
|
||||
authorized_prepared_statements_cache _authorized_prepared_cache;
|
||||
std::unique_ptr<prepared_statements_cache> _prepared_cache;
|
||||
std::unique_ptr<authorized_prepared_statements_cache> _authorized_prepared_cache;
|
||||
|
||||
// Tracks the rolling maximum of gross bytes allocated during CQL parsing
|
||||
utils::rolling_max_tracker _parsing_cost_tracker{1000};
|
||||
@@ -184,7 +193,7 @@ public:
|
||||
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
|
||||
|
||||
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
|
||||
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
|
||||
memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm);
|
||||
|
||||
~query_processor();
|
||||
|
||||
@@ -231,53 +240,17 @@ public:
|
||||
return _vector_store_client;
|
||||
}
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
|
||||
if (user) {
|
||||
auto vp = _authorized_prepared_cache.find(*user, key);
|
||||
if (vp) {
|
||||
try {
|
||||
// Touch the corresponding prepared_statements_cache entry to make sure its last_read timestamp
|
||||
// corresponds to the last time its value has been read.
|
||||
//
|
||||
// If we don't do this it may turn out that the most recently used prepared statement doesn't have
|
||||
// the newest last_read timestamp and can get evicted before the not-so-recently-read statement if
|
||||
// we need to create space in the prepared statements cache for a new entry.
|
||||
//
|
||||
// And this is going to trigger an eviction of the corresponding entry from the authorized_prepared_cache
|
||||
// breaking the LRU paradigm of these caches.
|
||||
_prepared_cache.touch(key);
|
||||
return vp->get()->checked_weak_from_this();
|
||||
} catch (seastar::checked_ptr_is_null_exception&) {
|
||||
// If the prepared statement got invalidated - remove the corresponding authorized_prepared_statements_cache entry as well.
|
||||
_authorized_prepared_cache.remove(*user, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
return statements::prepared_statement::checked_weak_ptr();
|
||||
}
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key);
|
||||
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key) {
|
||||
return _prepared_cache.find(key);
|
||||
}
|
||||
statements::prepared_statement::checked_weak_ptr get_prepared(const prepared_cache_key_type& key);
|
||||
|
||||
inline
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_prepared(
|
||||
statements::prepared_statement::checked_weak_ptr statement,
|
||||
cql3::prepared_cache_key_type cache_key,
|
||||
service::query_state& query_state,
|
||||
const query_options& options,
|
||||
bool needs_authorization) {
|
||||
auto cql_statement = statement->statement;
|
||||
return execute_prepared_without_checking_exception_message(
|
||||
query_state,
|
||||
std::move(cql_statement),
|
||||
options,
|
||||
std::move(statement),
|
||||
std::move(cache_key),
|
||||
needs_authorization)
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
bool needs_authorization);
|
||||
|
||||
// Like execute_prepared, but is allowed to return exceptions as result_message::exception.
|
||||
// The result_message::exception must be explicitly handled.
|
||||
@@ -301,20 +274,12 @@ public:
|
||||
bool needs_authorization);
|
||||
|
||||
/// Execute a client statement that was not prepared.
|
||||
inline
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_direct(
|
||||
const std::string_view& query_string,
|
||||
service::query_state& query_state,
|
||||
dialect d,
|
||||
query_options& options) {
|
||||
return execute_direct_without_checking_exception_message(
|
||||
query_string,
|
||||
query_state,
|
||||
d,
|
||||
options)
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
query_options& options);
|
||||
|
||||
// Like execute_direct, but is allowed to return exceptions as result_message::exception.
|
||||
// The result_message::exception must be explicitly handled.
|
||||
@@ -466,20 +431,12 @@ public:
|
||||
|
||||
future<> stop();
|
||||
|
||||
inline
|
||||
future<::shared_ptr<cql_transport::messages::result_message>>
|
||||
execute_batch(
|
||||
::shared_ptr<statements::batch_statement> stmt,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
||||
return execute_batch_without_checking_exception_message(
|
||||
std::move(stmt),
|
||||
query_state,
|
||||
options,
|
||||
std::move(pending_authorization_entries))
|
||||
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
|
||||
}
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
|
||||
|
||||
// Like execute_batch, but is allowed to return exceptions as result_message::exception.
|
||||
// The result_message::exception must be explicitly handled.
|
||||
@@ -488,7 +445,7 @@ public:
|
||||
::shared_ptr<statements::batch_statement>,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
|
||||
|
||||
future<service::broadcast_tables::query_result>
|
||||
execute_broadcast_table_query(const service::broadcast_tables::query&);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "cas_request.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "utils/unique_view.hh"
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "cql3/expr/evaluate.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "auth/service.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "transport/event.hh"
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include <optional>
|
||||
#include "validation.hh"
|
||||
|
||||
@@ -2002,7 +2002,7 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>& addre
|
||||
}
|
||||
}
|
||||
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
|
||||
static std::vector<seastar::metrics::relabel_config> get_relable_from_yaml(const YAML::Node& yaml, const std::string& name) {
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
#include <seastar/util/program-options.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/replication_strategy_type.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/config_file.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
@@ -15,10 +15,11 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/generation-number.hh"
|
||||
#include "gms/loaded_endpoint_state.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "mutation_query.hh"
|
||||
#include "system_keyspace_view_types.hh"
|
||||
@@ -36,6 +37,10 @@ namespace netw {
|
||||
class shared_dict;
|
||||
};
|
||||
|
||||
namespace query {
|
||||
class result_set;
|
||||
}
|
||||
|
||||
namespace sstables {
|
||||
struct entry_descriptor;
|
||||
class generation_type;
|
||||
|
||||
@@ -29,6 +29,8 @@
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/view/base_info.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "db/view/view_consumer.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "gms/gossiper.hh"
|
||||
#include "db/view/view_building_coordinator.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "locator/tablets.hh"
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#include "dht/token.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
#include <flat_set>
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include <seastar/core/gate.hh>
|
||||
#include "db/view/view_building_state.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "cdc/metadata.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db/virtual_table.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "db/virtual_tables.hh"
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/types.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "gms/loaded_endpoint_state.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -71,11 +72,6 @@ struct gossip_config {
|
||||
utils::updateable_value<utils::UUID> recovery_leader;
|
||||
};
|
||||
|
||||
struct loaded_endpoint_state {
|
||||
gms::inet_address endpoint;
|
||||
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
|
||||
};
|
||||
|
||||
/**
|
||||
* This module is responsible for Gossiping information for the local endpoint. This abstraction
|
||||
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
|
||||
|
||||
23
gms/loaded_endpoint_state.hh
Normal file
23
gms/loaded_endpoint_state.hh
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/types.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
struct loaded_endpoint_state {
|
||||
inet_address endpoint;
|
||||
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
@@ -284,14 +284,3 @@ future<> instance_cache::stop() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template <>
|
||||
struct equal_to<seastar::scheduling_group> {
|
||||
bool operator()(seastar::scheduling_group& sg1, seastar::scheduling_group& sg2) const noexcept {
|
||||
return sg1 == sg2;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "utils/sequenced_set.hh"
|
||||
#include "utils/simple_hashers.hh"
|
||||
#include "tablets.hh"
|
||||
#include "locator/replication_strategy_type.hh"
|
||||
#include "data_dictionary/consistency_config_options.hh"
|
||||
|
||||
// forward declaration since replica/database.hh includes this file
|
||||
@@ -38,13 +39,6 @@ extern logging::logger rslogger;
|
||||
using inet_address = gms::inet_address;
|
||||
using token = dht::token;
|
||||
|
||||
enum class replication_strategy_type {
|
||||
simple,
|
||||
local,
|
||||
network_topology,
|
||||
everywhere_topology,
|
||||
};
|
||||
|
||||
using replication_strategy_config_option = std::variant<sstring, rack_list>;
|
||||
using replication_strategy_config_options = std::map<sstring, replication_strategy_config_option>;
|
||||
|
||||
|
||||
20
locator/replication_strategy_type.hh
Normal file
20
locator/replication_strategy_type.hh
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright (C) 2015-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace locator {
|
||||
|
||||
enum class replication_strategy_type {
|
||||
simple,
|
||||
local,
|
||||
network_topology,
|
||||
everywhere_topology,
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
@@ -21,10 +21,9 @@
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
|
||||
#include <ranges>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
27
raft/raft_fwd.hh
Normal file
27
raft/raft_fwd.hh
Normal file
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
// Lightweight forward-declaration header for commonly used raft types.
|
||||
// Include this instead of raft/raft.hh when only the basic ID/index types
|
||||
// are needed (e.g. in other header files), to avoid pulling in the full
|
||||
// raft machinery (futures, abort_source, bytes_ostream, etc.).
|
||||
|
||||
#include "internal.hh"
|
||||
|
||||
namespace raft {
|
||||
|
||||
using server_id = internal::tagged_id<struct server_id_tag>;
|
||||
using group_id = internal::tagged_id<struct group_id_tag>;
|
||||
using term_t = internal::tagged_uint64<struct term_tag>;
|
||||
using index_t = internal::tagged_uint64<struct index_tag>;
|
||||
using read_id = internal::tagged_uint64<struct read_id_tag>;
|
||||
|
||||
class server;
|
||||
|
||||
} // namespace raft
|
||||
@@ -69,6 +69,13 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace replica::logstor
|
||||
|
||||
template<>
|
||||
size_t hist_key<replica::logstor::segment_descriptor>(const replica::logstor::segment_descriptor& desc);
|
||||
|
||||
namespace replica::logstor {
|
||||
|
||||
using segment_descriptor_hist = log_heap<segment_descriptor, segment_descriptor_hist_options>;
|
||||
|
||||
struct segment_set {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
#include "service/query_state.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
#pragma once
|
||||
#include <unordered_set>
|
||||
#include "service/raft/group0_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -9,7 +9,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <iosfwd>
|
||||
#include "raft/raft.hh"
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -38,7 +38,6 @@
|
||||
#include "replica/exceptions.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "dht/token_range_endpoints.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/cas_shard.hh"
|
||||
#include "service/storage_proxy_fwd.hh"
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include "absl-flat_hash_map.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/client_routes.hh"
|
||||
@@ -40,11 +41,9 @@
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include "cdc/generation_id.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "node_ops/id.hh"
|
||||
#include "raft/server.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "service/tablet_operation.hh"
|
||||
#include "mutation/timestamp.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -115,6 +114,10 @@ class tablet_mutation_builder;
|
||||
|
||||
namespace auth { class cache; }
|
||||
|
||||
namespace service {
|
||||
class tablet_allocator;
|
||||
}
|
||||
|
||||
namespace utils {
|
||||
class disk_space_monitor;
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
|
||||
#include "utils/log.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "cdc/generation_id.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "service/session.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
|
||||
41
stdafx.hh
41
stdafx.hh
@@ -254,7 +254,6 @@
|
||||
#include <seastar/coroutine/generator.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/http/api_docs.hh>
|
||||
#include <seastar/http/client.hh>
|
||||
#include <seastar/http/common.hh>
|
||||
#include <seastar/http/connection_factory.hh>
|
||||
@@ -401,4 +400,44 @@
|
||||
#define ZSTD_STATIC_LINKING_ONLY
|
||||
#include <zstd.h>
|
||||
|
||||
// Scylla internal headers included by most translation units
|
||||
#include "bytes.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "utils/tagged_integer.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/fragment_range.hh"
|
||||
#include "types/types.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "mutation/mutation_partition.hh"
|
||||
#include "mutation/mutation_fragment.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "locator/types.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
#endif
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "table_helper.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/when_all.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "gms/gossiper.hh"
|
||||
#include <seastar/util/defer.hh>
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/error_injection.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include "mutation/mutation.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(per_partition_rate_limit_test)
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <seastar/testing/on_internal_error.hh>
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "cdc/generation_service.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <seastar/net/socket_defs.hh>
|
||||
#include <seastar/net/api.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "tools/utils.hh"
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
#include "vs_mock_server.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <boost/test/tools/old/interface.hpp>
|
||||
#include <seastar/core/seastar.hh>
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "tools/load_system_tablets.hh"
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
#include "query/query-result-set.hh"
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
|
||||
@@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
|
||||
std::vector<cql3::statements::batch_statement::single_statement> modifications;
|
||||
std::vector<cql3::raw_value_view_vector_with_unset> values;
|
||||
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
|
||||
std::unordered_map<cql3::prepared_cache_key_type, cql3::statements::prepared_statement::checked_weak_ptr> pending_authorization_entries;
|
||||
|
||||
modifications.reserve(n.assume_value());
|
||||
values.reserve(n.assume_value());
|
||||
|
||||
@@ -566,7 +566,7 @@ public:
|
||||
}
|
||||
if (need_preempt() && i != _cache.end()) {
|
||||
auto key = i->idx;
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
i = _cache.lower_bound(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -360,24 +360,20 @@ utils::config_file::configs utils::config_file::unset_values() const {
|
||||
}
|
||||
|
||||
future<> utils::config_file::read_from_file(file f, error_handler h) {
|
||||
return f.size().then([this, f, h](size_t s) {
|
||||
return do_with(make_file_input_stream(f), [this, s, h](input_stream<char>& in) {
|
||||
return in.read_exactly(s).then([this, h](temporary_buffer<char> buf) {
|
||||
read_from_yaml(sstring(buf.begin(), buf.end()), h);
|
||||
if (!_initialization_completed) {
|
||||
// Boolean value set on only one shard, but broadcast_to_all_shards().get() called later
|
||||
// in main.cc will apply the required memory barriers anyway.
|
||||
_initialization_completed = true;
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
auto s = co_await f.size();
|
||||
auto in = make_file_input_stream(f);
|
||||
auto buf = co_await in.read_exactly(s);
|
||||
read_from_yaml(sstring(buf.begin(), buf.end()), h);
|
||||
if (!_initialization_completed) {
|
||||
// Boolean value set on only one shard, but broadcast_to_all_shards().get() called later
|
||||
// in main.cc will apply the required memory barriers anyway.
|
||||
_initialization_completed = true;
|
||||
}
|
||||
}
|
||||
|
||||
future<> utils::config_file::read_from_file(const sstring& filename, error_handler h) {
|
||||
return open_file_dma(filename, open_flags::ro).then([this, h](file f) {
|
||||
return read_from_file(std::move(f), h);
|
||||
});
|
||||
auto f = co_await open_file_dma(filename, open_flags::ro);
|
||||
co_await read_from_file(std::move(f), h);
|
||||
}
|
||||
|
||||
future<> utils::config_file::broadcast_to_all_shards() {
|
||||
|
||||
@@ -83,7 +83,7 @@ directories::directories(bool developer_mode)
|
||||
future<> directories::create_and_verify(directories::set dir_set, recursive recursive) {
|
||||
std::vector<file_lock> locks;
|
||||
locks.reserve(dir_set.get_paths().size());
|
||||
co_await coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
|
||||
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
|
||||
file_lock lock = co_await touch_and_lock(path);
|
||||
locks.emplace_back(std::move(lock));
|
||||
co_await disk_sanity(path, _developer_mode);
|
||||
@@ -144,7 +144,7 @@ future<> directories::do_verify_owner_and_mode(fs::path path, recursive recurse,
|
||||
co_return;
|
||||
}
|
||||
auto lister = directory_lister(path, lister::dir_entry_types::full(), do_verify_subpath, lister::show_hidden::no);
|
||||
co_await with_closeable(std::move(lister), coroutine::lambda([&] (auto& lister) -> future<> {
|
||||
co_await with_closeable(std::move(lister), seastar::coroutine::lambda([&] (auto& lister) -> future<> {
|
||||
while (auto de = co_await lister.get()) {
|
||||
co_await do_verify_owner_and_mode(path / de->name, recurse, level + 1, do_verify_subpath);
|
||||
}
|
||||
@@ -168,7 +168,7 @@ future<> directories::verify_owner_and_mode(fs::path path, recursive recursive)
|
||||
future<> directories::verify_owner_and_mode_of_data_dir(directories::set dir_set) {
|
||||
// verify data and index files in the first iteration and the other files in the second iteration.
|
||||
for (auto verify_data_and_index_files : { true, false }) {
|
||||
co_await coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
|
||||
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
|
||||
return do_verify_owner_and_mode(std::move(path), recursive::yes, 0, [verify_data_and_index_files] (const fs::path& dir, const directory_entry& de) {
|
||||
auto path = dir / de.name;
|
||||
component_type path_component_type;
|
||||
|
||||
@@ -335,7 +335,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
} catch (...) {
|
||||
// just disregard the failure, we will retry below in the wrapped handler
|
||||
}
|
||||
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
|
||||
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, seastar::input_stream<char>& in) -> future<> {
|
||||
auto _in = std::move(in);
|
||||
auto status_class = reply::classify_status(rep._status);
|
||||
/*
|
||||
@@ -352,7 +352,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
auto content = co_await util::read_entire_stream_contiguous(_in);
|
||||
auto error_msg = get_gcp_error_message(std::string_view(content));
|
||||
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
|
||||
}
|
||||
|
||||
std::exception_ptr eptr;
|
||||
@@ -366,7 +366,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
if (eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
};
|
||||
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
|
||||
|
||||
@@ -84,7 +84,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
|
||||
_gen.emplace(_opened.experimental_list_directory());
|
||||
}
|
||||
if (!_gen) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
|
||||
co_return seastar::coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
|
||||
}
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
@@ -114,7 +114,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
|
||||
_gen.reset();
|
||||
if (ex) {
|
||||
co_await _opened.close();
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -438,7 +438,7 @@ private:
|
||||
break;
|
||||
}
|
||||
_reclaim(free_memory_threshold - memory::free_memory());
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
llogger.debug("background_reclaimer::main_loop: exit");
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ public:
|
||||
_map.erase(_map.begin());
|
||||
}
|
||||
if (++ret % evictions_per_yield == 0) {
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return ret;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <exception>
|
||||
#include "utils/exceptions.hh"
|
||||
#include <initializer_list>
|
||||
#include <memory>
|
||||
#include <regex>
|
||||
@@ -339,7 +340,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
|
||||
should_retry = utils::http::retryable::yes;
|
||||
co_await authorize(request);
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(
|
||||
aws::aws_exception(aws_error(possible_error->get_error_type(), possible_error->get_error_message().c_str(), should_retry))));
|
||||
}
|
||||
|
||||
@@ -358,7 +359,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
if (eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -539,7 +540,7 @@ future<> client::put_object_tagging(sstring object_name, tag_set tagging, seasta
|
||||
}
|
||||
co_await output.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::ok, as);
|
||||
@@ -609,7 +610,7 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf, sea
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
|
||||
@@ -635,7 +636,7 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
|
||||
@@ -885,7 +886,7 @@ future<> dump_multipart_upload_parts(output_stream<char> out, const utils::chunk
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -901,7 +902,7 @@ future<> client::multipart_upload::start_upload() {
|
||||
auto body = co_await util::read_entire_stream_contiguous(in);
|
||||
_upload_id = parse_multipart_upload_id(body);
|
||||
if (_upload_id.empty()) {
|
||||
co_await coroutine::return_exception(std::runtime_error("cannot initiate upload"));
|
||||
co_await seastar::coroutine::return_exception(std::runtime_error("cannot initiate upload"));
|
||||
}
|
||||
s3l.trace("created uploads for {} -> id = {}", _object_name, _upload_id);
|
||||
}, http::reply::status_type::ok, _as);
|
||||
@@ -936,7 +937,7 @@ future<> client::multipart_upload::upload_part(memory_data_sink_buffers bufs) {
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
// note: At this point the buffers are sent, but the response is not yet
|
||||
// received. However, claim is released and next part may start uploading
|
||||
@@ -985,7 +986,7 @@ future<> client::multipart_upload::finalize_upload() {
|
||||
|
||||
unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
|
||||
if (parts_xml_len == 0) {
|
||||
co_await coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
|
||||
co_await seastar::coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
|
||||
}
|
||||
|
||||
s3l.trace("POST upload completion {} parts (upload id {})", _part_etags.size(), _upload_id);
|
||||
@@ -1001,15 +1002,15 @@ future<> client::multipart_upload::finalize_upload() {
|
||||
auto status_class = http::reply::classify_status(rep._status);
|
||||
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
|
||||
if (possible_error) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
co_await seastar::coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
}
|
||||
|
||||
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
co_await seastar::coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
}
|
||||
|
||||
if (rep._status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
|
||||
// this point it is ok since we are not interested in parsing this particular response
|
||||
@@ -1443,7 +1444,7 @@ auto client::download_source::request_body() -> future<external_body> {
|
||||
(void)_client->make_request(std::move(req), [this, &p] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
|
||||
s3l.trace("GET {} got the body ({} {} bytes)", _object_name, rep._status, rep.content_length);
|
||||
if (rep._status != http::reply::status_type::partial_content && rep._status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
|
||||
auto in = std::move(in_);
|
||||
@@ -1533,7 +1534,7 @@ class client::do_upload_file : private multipart_upload {
|
||||
co_await output.close();
|
||||
co_await input.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1805,7 +1806,7 @@ future<> client::close() {
|
||||
_creds_invalidation_timer.cancel();
|
||||
_creds_update_timer.cancel();
|
||||
}
|
||||
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
|
||||
co_await seastar::coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
|
||||
co_await it.second.http.close();
|
||||
});
|
||||
}
|
||||
@@ -1914,7 +1915,7 @@ future<std::optional<directory_entry>> client::bucket_lister::get() {
|
||||
}
|
||||
co_await close();
|
||||
if (ex) {
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user