mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-26 19:35:12 +00:00
Compare commits
15 Commits
branch-202
...
ykaul/comp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
339f1ae1a0 | ||
|
|
07d69aa8fa | ||
|
|
c50bfb995b | ||
|
|
e7dbccbdcd | ||
|
|
faa2f8ba76 | ||
|
|
7aca42aa31 | ||
|
|
92e0597807 | ||
|
|
0798c112d0 | ||
|
|
9650390482 | ||
|
|
a1e8ef8d6e | ||
|
|
ea00cfad3d | ||
|
|
0fd89d77b3 | ||
|
|
361a717d89 | ||
|
|
9df4fc3e2f | ||
|
|
d1b4fd5683 |
@@ -234,11 +234,15 @@ generate_scylla_version()
|
||||
|
||||
option(Scylla_USE_PRECOMPILED_HEADER "Use precompiled header for Scylla" ON)
|
||||
add_library(scylla-precompiled-header STATIC exported_templates.cc)
|
||||
target_include_directories(scylla-precompiled-header PRIVATE
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}"
|
||||
"${scylla_gen_build_dir}")
|
||||
target_link_libraries(scylla-precompiled-header PRIVATE
|
||||
absl::headers
|
||||
absl::btree
|
||||
absl::hash
|
||||
absl::raw_hash_set
|
||||
idl
|
||||
Seastar::seastar
|
||||
Snappy::snappy
|
||||
systemd
|
||||
|
||||
19
configure.py
19
configure.py
@@ -2769,6 +2769,25 @@ def write_build_file(f,
|
||||
f.write('build {}: rust_source {}\n'.format(cc, src))
|
||||
obj = cc.replace('.cc', '.o')
|
||||
compiles[obj] = cc
|
||||
# Sources shared between scylla (compiled with PCH) and small tests
|
||||
# (with custom deps and partial link sets) must not use the PCH,
|
||||
# because -fpch-instantiate-templates injects symbol references that
|
||||
# the small test link sets cannot satisfy.
|
||||
small_test_srcs = set()
|
||||
for test_binary, test_deps in deps.items():
|
||||
if not test_binary.startswith('test/'):
|
||||
continue
|
||||
# Only exclude PCH for tests with truly small/partial link sets.
|
||||
# Tests that include scylla_core or similar large dep sets link
|
||||
# against enough objects to satisfy PCH-injected symbol refs.
|
||||
if len(test_deps) > 50:
|
||||
continue
|
||||
for src in test_deps:
|
||||
if src.endswith('.cc'):
|
||||
small_test_srcs.add(src)
|
||||
for src in small_test_srcs:
|
||||
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')
|
||||
compiles_with_pch.discard(obj)
|
||||
for obj in compiles:
|
||||
src = compiles[obj]
|
||||
seastar_dep = f'$builddir/{mode}/seastar/libseastar.{seastar_lib_ext}'
|
||||
|
||||
84
cql3/prepared_cache_key_type.hh
Normal file
84
cql3/prepared_cache_key_type.hh
Normal file
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
* Copyright (C) 2017-present ScyllaDB
|
||||
*
|
||||
* Modified by ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.1 and Apache-2.0)
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "bytes.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
typedef bytes cql_prepared_id_type;
|
||||
|
||||
/// \brief The key of the prepared statements cache
|
||||
///
|
||||
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
|
||||
/// the latter was introduced for unifying the CQL and Thrift prepared
|
||||
/// statements so that they can be stored in the same cache.
|
||||
class prepared_cache_key_type {
|
||||
public:
|
||||
// derive from cql_prepared_id_type so we can customize the formatter of
|
||||
// cache_key_type
|
||||
struct cache_key_type : public cql_prepared_id_type {
|
||||
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
|
||||
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
|
||||
bool operator==(const cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
private:
|
||||
cache_key_type _key;
|
||||
|
||||
public:
|
||||
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
|
||||
|
||||
cache_key_type& key() { return _key; }
|
||||
const cache_key_type& key() const { return _key; }
|
||||
|
||||
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
|
||||
return key.key();
|
||||
}
|
||||
|
||||
bool operator==(const prepared_cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k);
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k.key());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// for prepared_statements_cache log printouts
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
|
||||
}
|
||||
};
|
||||
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", p.key());
|
||||
}
|
||||
};
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
#include "utils/loading_cache.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "cql3/prepared_cache_key_type.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/column_specification.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
@@ -27,39 +28,6 @@ struct prepared_cache_entry_size {
|
||||
}
|
||||
};
|
||||
|
||||
typedef bytes cql_prepared_id_type;
|
||||
|
||||
/// \brief The key of the prepared statements cache
|
||||
///
|
||||
/// TODO: consolidate prepared_cache_key_type and the nested cache_key_type
|
||||
/// the latter was introduced for unifying the CQL and Thrift prepared
|
||||
/// statements so that they can be stored in the same cache.
|
||||
class prepared_cache_key_type {
|
||||
public:
|
||||
// derive from cql_prepared_id_type so we can customize the formatter of
|
||||
// cache_key_type
|
||||
struct cache_key_type : public cql_prepared_id_type {
|
||||
cache_key_type(cql_prepared_id_type&& id, cql3::dialect d) : cql_prepared_id_type(std::move(id)), dialect(d) {}
|
||||
cql3::dialect dialect; // Not part of hash, but we don't expect collisions because of that
|
||||
bool operator==(const cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
private:
|
||||
cache_key_type _key;
|
||||
|
||||
public:
|
||||
explicit prepared_cache_key_type(cql_prepared_id_type cql_id, dialect d) : _key(std::move(cql_id), d) {}
|
||||
|
||||
cache_key_type& key() { return _key; }
|
||||
const cache_key_type& key() const { return _key; }
|
||||
|
||||
static const cql_prepared_id_type& cql_id(const prepared_cache_key_type& key) {
|
||||
return key.key();
|
||||
}
|
||||
|
||||
bool operator==(const prepared_cache_key_type& other) const = default;
|
||||
};
|
||||
|
||||
class prepared_statements_cache {
|
||||
public:
|
||||
struct stats {
|
||||
@@ -164,35 +132,3 @@ public:
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type::cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type::cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k);
|
||||
}
|
||||
};
|
||||
|
||||
template<>
|
||||
struct hash<cql3::prepared_cache_key_type> final {
|
||||
size_t operator()(const cql3::prepared_cache_key_type& k) const {
|
||||
return std::hash<cql3::cql_prepared_id_type>()(k.key());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// for prepared_statements_cache log printouts
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type::cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type::cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{{cql_id: {}, dialect: {}}}", static_cast<const cql3::cql_prepared_id_type&>(p), p.dialect);
|
||||
}
|
||||
};
|
||||
|
||||
template <> struct fmt::formatter<cql3::prepared_cache_key_type> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const cql3::prepared_cache_key_type& p, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", p.key());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -17,6 +17,9 @@
|
||||
#include <seastar/coroutine/as_future.hh>
|
||||
#include <seastar/coroutine/try_future.hh>
|
||||
|
||||
#include "cql3/prepared_statements_cache.hh"
|
||||
#include "cql3/authorized_prepared_statements_cache.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/mapreduce_service.hh"
|
||||
@@ -77,7 +80,7 @@ static service::query_state query_state_for_internal_call() {
|
||||
return {service::client_state::for_internal_calls(), empty_service_permit()};
|
||||
}
|
||||
|
||||
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm)
|
||||
query_processor::query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc, query_processor::memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm)
|
||||
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
|
||||
, _proxy(proxy)
|
||||
, _db(db)
|
||||
@@ -86,7 +89,7 @@ query_processor::query_processor(service::storage_proxy& proxy, data_dictionary:
|
||||
, _mcfg(mcfg)
|
||||
, _cql_config(cql_cfg)
|
||||
, _prepared_cache(prep_cache_log, _mcfg.prepared_statment_cache_size)
|
||||
, _authorized_prepared_cache(std::move(auth_prep_cache_cfg), authorized_prepared_statements_cache_log)
|
||||
, _authorized_prepared_cache(auth_prep_cache_cfg, authorized_prepared_statements_cache_log)
|
||||
, _auth_prepared_cache_cfg_cb([this] (uint32_t) { (void) _authorized_prepared_cache_config_action.trigger_later(); })
|
||||
, _authorized_prepared_cache_config_action([this] { update_authorized_prepared_cache_config(); return make_ready_future<>(); })
|
||||
, _authorized_prepared_cache_update_interval_in_ms_observer(_db.get_config().permissions_update_interval_in_ms.observe(_auth_prepared_cache_cfg_cb))
|
||||
@@ -1074,7 +1077,7 @@ query_processor::execute_batch_without_checking_exception_message(
|
||||
::shared_ptr<statements::batch_statement> batch,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
|
||||
auto access_future = co_await coroutine::as_future(batch->check_access(*this, query_state.get_client_state()));
|
||||
bool failed = access_future.failed();
|
||||
co_await audit::inspect(batch, query_state, options, failed);
|
||||
|
||||
@@ -22,13 +22,14 @@
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/dialect.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/stats.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "service/migration_listener.hh"
|
||||
#include "mutation/timestamp.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/broadcast_tables/experimental/query_result.hh"
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "utils/observable.hh"
|
||||
#include "utils/rolling_max_tracker.hh"
|
||||
@@ -41,6 +42,9 @@
|
||||
|
||||
|
||||
namespace lang { class manager; }
|
||||
namespace vector_search {
|
||||
class vector_store_client;
|
||||
}
|
||||
namespace service {
|
||||
class migration_manager;
|
||||
class query_state;
|
||||
@@ -58,6 +62,9 @@ struct query;
|
||||
|
||||
namespace cql3 {
|
||||
|
||||
class prepared_statements_cache;
|
||||
class authorized_prepared_statements_cache;
|
||||
|
||||
namespace statements {
|
||||
class batch_statement;
|
||||
class schema_altering_statement;
|
||||
@@ -184,7 +191,7 @@ public:
|
||||
static std::vector<std::unique_ptr<statements::raw::parsed_statement>> parse_statements(std::string_view queries, dialect d);
|
||||
|
||||
query_processor(service::storage_proxy& proxy, data_dictionary::database db, service::migration_notifier& mn, vector_search::vector_store_client& vsc,
|
||||
memory_config mcfg, cql_config& cql_cfg, utils::loading_cache_config auth_prep_cache_cfg, lang::manager& langm);
|
||||
memory_config mcfg, cql_config& cql_cfg, const utils::loading_cache_config& auth_prep_cache_cfg, lang::manager& langm);
|
||||
|
||||
~query_processor();
|
||||
|
||||
@@ -474,7 +481,7 @@ public:
|
||||
::shared_ptr<statements::batch_statement> stmt,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries) {
|
||||
return execute_batch_without_checking_exception_message(
|
||||
std::move(stmt),
|
||||
query_state,
|
||||
@@ -490,7 +497,7 @@ public:
|
||||
::shared_ptr<statements::batch_statement>,
|
||||
service::query_state& query_state,
|
||||
query_options& options,
|
||||
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries);
|
||||
std::unordered_map<prepared_cache_key_type, statements::prepared_statement::checked_weak_ptr> pending_authorization_entries);
|
||||
|
||||
future<service::broadcast_tables::query_result>
|
||||
execute_broadcast_table_query(const service::broadcast_tables::query&);
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <seastar/core/execution_stage.hh>
|
||||
#include "cas_request.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "utils/unique_view.hh"
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "cql3/expr/evaluate.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/values.hh"
|
||||
#include "timeout_config.hh"
|
||||
#include "service/broadcast_tables/experimental/lang.hh"
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "auth/service.hh"
|
||||
#include "cql3/statements/prepared_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "transport/event.hh"
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#pragma once
|
||||
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "raw/parsed_statement.hh"
|
||||
#include "service/qos/qos_common.hh"
|
||||
#include "service/query_state.hh"
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "cql3/cql_statement.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "unimplemented.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include <optional>
|
||||
#include "validation.hh"
|
||||
|
||||
18
db/config.cc
18
db/config.cc
@@ -330,14 +330,14 @@ const config_type& config_type_for<std::vector<db::config::error_injection_at_st
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type& config_type_for<enum_option<netw::dict_training_loop::when>>() {
|
||||
const config_type& config_type_for<enum_option<netw::dict_training_when>>() {
|
||||
static config_type ct(
|
||||
"dictionary training conditions", printable_to_json<enum_option<netw::dict_training_loop::when>>);
|
||||
"dictionary training conditions", printable_to_json<enum_option<netw::dict_training_when>>);
|
||||
return ct;
|
||||
}
|
||||
|
||||
template <>
|
||||
const config_type& config_type_for<netw::advanced_rpc_compressor::tracker::algo_config>() {
|
||||
const config_type& config_type_for<netw::algo_config>() {
|
||||
static config_type ct(
|
||||
"advanced rpc compressor config", printable_vector_to_json<enum_option<netw::compression_algorithm>>);
|
||||
return ct;
|
||||
@@ -530,9 +530,9 @@ struct convert<db::config::error_injection_at_startup> {
|
||||
|
||||
|
||||
template <>
|
||||
class convert<enum_option<netw::dict_training_loop::when>> {
|
||||
class convert<enum_option<netw::dict_training_when>> {
|
||||
public:
|
||||
static bool decode(const Node& node, enum_option<netw::dict_training_loop::when>& rhs) {
|
||||
static bool decode(const Node& node, enum_option<netw::dict_training_when>& rhs) {
|
||||
std::string name;
|
||||
if (!convert<std::string>::decode(node, name)) {
|
||||
return false;
|
||||
@@ -1110,7 +1110,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
|
||||
"Specifies RPC compression algorithms supported by this node. ")
|
||||
, internode_compression_enable_advanced(this, "internode_compression_enable_advanced", liveness::MustRestart, value_status::Used, false,
|
||||
"Enables the new implementation of RPC compression. If disabled, Scylla will fall back to the old implementation.")
|
||||
, rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_loop::when::type::NEVER,
|
||||
, rpc_dict_training_when(this, "rpc_dict_training_when", liveness::LiveUpdate, value_status::Used, netw::dict_training_when::type::NEVER,
|
||||
"Specifies when RPC compression dictionary training is performed by this node.\n"
|
||||
"* `never` disables it unconditionally.\n"
|
||||
"* `when_leader` enables it only whenever the node is the Raft leader.\n"
|
||||
@@ -2025,8 +2025,8 @@ template struct utils::config_file::named_value<enum_option<db::experimental_fea
|
||||
template struct utils::config_file::named_value<enum_option<db::replication_strategy_restriction_t>>;
|
||||
template struct utils::config_file::named_value<enum_option<db::consistency_level_restriction_t>>;
|
||||
template struct utils::config_file::named_value<enum_option<db::tablets_mode_t>>;
|
||||
template struct utils::config_file::named_value<enum_option<netw::dict_training_loop::when>>;
|
||||
template struct utils::config_file::named_value<netw::advanced_rpc_compressor::tracker::algo_config>;
|
||||
template struct utils::config_file::named_value<enum_option<netw::dict_training_when>>;
|
||||
template struct utils::config_file::named_value<netw::algo_config>;
|
||||
template struct utils::config_file::named_value<std::vector<enum_option<db::experimental_features_t>>>;
|
||||
template struct utils::config_file::named_value<std::vector<enum_option<db::replication_strategy_restriction_t>>>;
|
||||
template struct utils::config_file::named_value<std::vector<enum_option<db::consistency_level_restriction_t>>>;
|
||||
@@ -2094,7 +2094,7 @@ future<gms::inet_address> resolve(const config_file::named_value<sstring>& addre
|
||||
}
|
||||
}
|
||||
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
|
||||
static std::vector<seastar::metrics::relabel_config> get_relable_from_yaml(const YAML::Node& yaml, const std::string& name) {
|
||||
|
||||
14
db/config.hh
14
db/config.hh
@@ -9,6 +9,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <filesystem>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
@@ -16,15 +17,14 @@
|
||||
#include <seastar/util/program-options.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/replication_strategy_type.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "utils/config_file.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "db/hints/host_filter.hh"
|
||||
#include "utils/error_injection.hh"
|
||||
#include "message/dict_trainer.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "message/rpc_compression_types.hh"
|
||||
#include "db/consistency_level_type.hh"
|
||||
#include "db/tri_mode_restriction.hh"
|
||||
#include "sstables/compressor.hh"
|
||||
@@ -325,9 +325,9 @@ public:
|
||||
named_value<uint32_t> internode_compression_zstd_min_message_size;
|
||||
named_value<uint32_t> internode_compression_zstd_max_message_size;
|
||||
named_value<bool> internode_compression_checksumming;
|
||||
named_value<netw::advanced_rpc_compressor::tracker::algo_config> internode_compression_algorithms;
|
||||
named_value<netw::algo_config> internode_compression_algorithms;
|
||||
named_value<bool> internode_compression_enable_advanced;
|
||||
named_value<enum_option<netw::dict_training_loop::when>> rpc_dict_training_when;
|
||||
named_value<enum_option<netw::dict_training_when>> rpc_dict_training_when;
|
||||
named_value<uint32_t> rpc_dict_training_min_time_seconds;
|
||||
named_value<uint64_t> rpc_dict_training_min_bytes;
|
||||
named_value<bool> inter_dc_tcp_nodelay;
|
||||
@@ -739,8 +739,8 @@ extern template struct utils::config_file::named_value<enum_option<db::experimen
|
||||
extern template struct utils::config_file::named_value<enum_option<db::replication_strategy_restriction_t>>;
|
||||
extern template struct utils::config_file::named_value<enum_option<db::consistency_level_restriction_t>>;
|
||||
extern template struct utils::config_file::named_value<enum_option<db::tablets_mode_t>>;
|
||||
extern template struct utils::config_file::named_value<enum_option<netw::dict_training_loop::when>>;
|
||||
extern template struct utils::config_file::named_value<netw::advanced_rpc_compressor::tracker::algo_config>;
|
||||
extern template struct utils::config_file::named_value<enum_option<netw::dict_training_when>>;
|
||||
extern template struct utils::config_file::named_value<netw::algo_config>;
|
||||
extern template struct utils::config_file::named_value<std::vector<enum_option<db::experimental_features_t>>>;
|
||||
extern template struct utils::config_file::named_value<std::vector<enum_option<db::replication_strategy_restriction_t>>>;
|
||||
extern template struct utils::config_file::named_value<std::vector<enum_option<db::consistency_level_restriction_t>>>;
|
||||
|
||||
@@ -15,10 +15,11 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/generation-number.hh"
|
||||
#include "gms/loaded_endpoint_state.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "mutation_query.hh"
|
||||
#include "system_keyspace_view_types.hh"
|
||||
@@ -36,6 +37,10 @@ namespace netw {
|
||||
class shared_dict;
|
||||
};
|
||||
|
||||
namespace query {
|
||||
class result_set;
|
||||
}
|
||||
|
||||
namespace sstables {
|
||||
struct entry_descriptor;
|
||||
class generation_type;
|
||||
|
||||
@@ -29,6 +29,8 @@
|
||||
|
||||
#include "db/config.hh"
|
||||
#include "db/view/base_info.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "db/view/view_consumer.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "gms/gossiper.hh"
|
||||
#include "db/view/view_building_coordinator.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "locator/tablets.hh"
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
#include "dht/token.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
#include <flat_set>
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include <seastar/core/gate.hh>
|
||||
#include "db/view/view_building_state.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "cdc/metadata.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "query/query-result-set.hh"
|
||||
#include "db/virtual_table.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "db/virtual_tables.hh"
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/types.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "gms/loaded_endpoint_state.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -71,11 +72,6 @@ struct gossip_config {
|
||||
utils::updateable_value<utils::UUID> recovery_leader;
|
||||
};
|
||||
|
||||
struct loaded_endpoint_state {
|
||||
gms::inet_address endpoint;
|
||||
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
|
||||
};
|
||||
|
||||
/**
|
||||
* This module is responsible for Gossiping information for the local endpoint. This abstraction
|
||||
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
|
||||
|
||||
23
gms/loaded_endpoint_state.hh
Normal file
23
gms/loaded_endpoint_state.hh
Normal file
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "gms/inet_address.hh"
|
||||
#include "locator/types.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
struct loaded_endpoint_state {
|
||||
inet_address endpoint;
|
||||
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
@@ -11,7 +11,7 @@
|
||||
#include "query/query_id.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "tasks/types.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
|
||||
namespace utils {
|
||||
class UUID final {
|
||||
@@ -43,4 +43,3 @@ class host_id final {
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#include "utils/UUID_gen.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/managed_string.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <ranges>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
@@ -284,14 +284,3 @@ future<> instance_cache::stop() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace std {
|
||||
|
||||
template <>
|
||||
struct equal_to<seastar::scheduling_group> {
|
||||
bool operator()(seastar::scheduling_group& sg1, seastar::scheduling_group& sg2) const noexcept {
|
||||
return sg1 == sg2;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "utils/sequenced_set.hh"
|
||||
#include "utils/simple_hashers.hh"
|
||||
#include "tablets.hh"
|
||||
#include "locator/replication_strategy_type.hh"
|
||||
#include "data_dictionary/consistency_config_options.hh"
|
||||
|
||||
// forward declaration since replica/database.hh includes this file
|
||||
@@ -38,13 +39,6 @@ extern logging::logger rslogger;
|
||||
using inet_address = gms::inet_address;
|
||||
using token = dht::token;
|
||||
|
||||
enum class replication_strategy_type {
|
||||
simple,
|
||||
local,
|
||||
network_topology,
|
||||
everywhere_topology,
|
||||
};
|
||||
|
||||
using replication_strategy_config_option = std::variant<sstring, rack_list>;
|
||||
using replication_strategy_config_options = std::map<sstring, replication_strategy_config_option>;
|
||||
|
||||
|
||||
20
locator/replication_strategy_type.hh
Normal file
20
locator/replication_strategy_type.hh
Normal file
@@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright (C) 2015-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace locator {
|
||||
|
||||
enum class replication_strategy_type {
|
||||
simple,
|
||||
local,
|
||||
network_topology,
|
||||
everywhere_topology,
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
@@ -12,7 +12,7 @@
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "utils/small_vector.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
#include "dht/i_partitioner_fwd.hh"
|
||||
#include "dht/token-sharding.hh"
|
||||
#include "dht/ring_position.hh"
|
||||
@@ -21,10 +21,9 @@
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/hash.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
|
||||
#include <ranges>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/util/log.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
@@ -11,9 +11,10 @@
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include <utility>
|
||||
|
||||
#include "rpc_compression_types.hh"
|
||||
#include "utils/refcounted.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/enum_option.hh"
|
||||
#include "shared_dict.hh"
|
||||
|
||||
namespace netw {
|
||||
@@ -28,103 +29,6 @@ class dict_sampler;
|
||||
using dict_ptr = lw_shared_ptr<foreign_ptr<lw_shared_ptr<shared_dict>>>;
|
||||
class control_protocol_frame;
|
||||
|
||||
// An enum wrapper, describing supported RPC compression algorithms.
|
||||
// Always contains a valid value —- the constructors won't allow
|
||||
// an invalid/unknown enum variant to be constructed.
|
||||
struct compression_algorithm {
|
||||
using underlying = uint8_t;
|
||||
enum class type : underlying {
|
||||
RAW,
|
||||
LZ4,
|
||||
ZSTD,
|
||||
COUNT,
|
||||
} _value;
|
||||
// Construct from an integer.
|
||||
// Used to deserialize the algorithm from the first byte of the frame.
|
||||
constexpr compression_algorithm(underlying x) {
|
||||
if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) {
|
||||
throw std::runtime_error(fmt::format("Invalid value {} for enum compression_algorithm", static_cast<int>(x)));
|
||||
}
|
||||
_value = static_cast<type>(x);
|
||||
}
|
||||
// Construct from `type`. Makes sure that `type` has a valid value.
|
||||
constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {}
|
||||
|
||||
// These names are used in multiple places:
|
||||
// RPC negotiation, in metric labels, and config.
|
||||
static constexpr std::string_view names[] = {
|
||||
"raw",
|
||||
"lz4",
|
||||
"zstd",
|
||||
};
|
||||
static_assert(std::size(names) == static_cast<int>(compression_algorithm::type::COUNT));
|
||||
|
||||
// Implements enum_option.
|
||||
static auto map() {
|
||||
std::unordered_map<std::string, type> ret;
|
||||
for (size_t i = 0; i < std::size(names); ++i) {
|
||||
ret.insert(std::make_pair<std::string, type>(std::string(names[i]), compression_algorithm(i).get()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
constexpr std::string_view name() const noexcept { return names[idx()]; }
|
||||
constexpr underlying idx() const noexcept { return std::to_underlying(_value); }
|
||||
constexpr type get() const noexcept { return _value; }
|
||||
constexpr static size_t count() { return static_cast<size_t>(type::COUNT); };
|
||||
bool operator<=>(const compression_algorithm &) const = default;
|
||||
};
|
||||
|
||||
|
||||
// Represents a set of compression algorithms.
|
||||
// Backed by a bitset.
|
||||
// Used for convenience during algorithm negotiations.
|
||||
class compression_algorithm_set {
|
||||
uint8_t _bitset;
|
||||
static_assert(std::numeric_limits<decltype(_bitset)>::digits > compression_algorithm::count());
|
||||
constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {}
|
||||
public:
|
||||
// Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order)
|
||||
// than it.
|
||||
constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept {
|
||||
auto x = 1 << (algo.idx());
|
||||
return {x + (x - 1)};
|
||||
}
|
||||
// Returns the strongest (greatest in the enum order) algorithm in the set.
|
||||
constexpr compression_algorithm heaviest() const {
|
||||
return {std::bit_width(_bitset) - 1};
|
||||
}
|
||||
// The usual set operations.
|
||||
constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept {
|
||||
return {1 << algo.idx()};
|
||||
}
|
||||
constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept {
|
||||
return {_bitset & o._bitset};
|
||||
}
|
||||
constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept {
|
||||
return {_bitset &~ o._bitset};
|
||||
}
|
||||
constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept {
|
||||
return {_bitset | o._bitset};
|
||||
}
|
||||
constexpr bool contains(compression_algorithm algo) const noexcept {
|
||||
return _bitset & (1 << algo.idx());
|
||||
}
|
||||
constexpr bool operator==(const compression_algorithm_set&) const = default;
|
||||
// Returns the contained bitset. Used for serialization.
|
||||
constexpr uint8_t value() const noexcept {
|
||||
return _bitset;
|
||||
}
|
||||
// Reconstructs the set from the output of `value()`. Used for deserialization.
|
||||
constexpr static compression_algorithm_set from_value(uint8_t bitset) {
|
||||
compression_algorithm_set x = bitset;
|
||||
x.heaviest(); // This is a validation check. It will throw if the bitset contains some illegal/unknown bits.
|
||||
return x;
|
||||
}
|
||||
};
|
||||
|
||||
using algo_config = std::vector<enum_option<compression_algorithm>>;
|
||||
|
||||
// See docs/dev/advanced_rpc_compression.md,
|
||||
// section `Negotiation` for more information about the protocol.
|
||||
struct control_protocol {
|
||||
@@ -248,7 +152,7 @@ struct per_algorithm_stats {
|
||||
// prevent a misuse of the API (dangling references).
|
||||
class advanced_rpc_compressor::tracker : public utils::refcounted {
|
||||
public:
|
||||
using algo_config = algo_config;
|
||||
using algo_config = netw::algo_config;
|
||||
struct config {
|
||||
utils::updateable_value<uint32_t> zstd_min_msg_size{0};
|
||||
utils::updateable_value<uint32_t> zstd_max_msg_size{std::numeric_limits<uint32_t>::max()};
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "shared_dict.hh"
|
||||
#include "advanced_rpc_compressor.hh"
|
||||
#include "rpc_compression_types.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "rpc_compression_types.hh"
|
||||
#include "utils/reservoir_sampling.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
@@ -88,28 +89,7 @@ class dict_training_loop {
|
||||
seastar::semaphore _pause{0};
|
||||
seastar::abort_source _pause_as;
|
||||
public:
|
||||
struct when {
|
||||
enum class type {
|
||||
NEVER,
|
||||
WHEN_LEADER,
|
||||
ALWAYS,
|
||||
COUNT,
|
||||
};
|
||||
static constexpr std::string_view names[] = {
|
||||
"never",
|
||||
"when_leader",
|
||||
"always",
|
||||
};
|
||||
static_assert(std::size(names) == static_cast<size_t>(type::COUNT));
|
||||
// Implements enum_option.
|
||||
static std::unordered_map<std::string, type> map() {
|
||||
std::unordered_map<std::string, type> ret;
|
||||
for (size_t i = 0; i < std::size(names); ++i) {
|
||||
ret.insert({std::string(names[i]), type(i)});
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
using when = netw::dict_training_when;
|
||||
void pause();
|
||||
void unpause();
|
||||
void cancel() noexcept;
|
||||
|
||||
@@ -54,11 +54,11 @@ dictionary_service::dictionary_service(
|
||||
void dictionary_service::maybe_toggle_dict_training() {
|
||||
auto when = _rpc_dict_training_when();
|
||||
netw::dict_trainer_logger.debug("dictionary_service::maybe_toggle_dict_training(), called, _is_leader={}, when={}", _is_leader, when);
|
||||
if (when == netw::dict_training_loop::when::type::NEVER) {
|
||||
if (when == netw::dict_training_when::type::NEVER) {
|
||||
_training_fiber.pause();
|
||||
} else if (when == netw::dict_training_loop::when::type::ALWAYS) {
|
||||
} else if (when == netw::dict_training_when::type::ALWAYS) {
|
||||
_training_fiber.unpause();
|
||||
} else if (when == netw::dict_training_loop::when::type::WHEN_LEADER) {
|
||||
} else if (when == netw::dict_training_when::type::WHEN_LEADER) {
|
||||
_is_leader ? _training_fiber.unpause() : _training_fiber.pause();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -40,7 +40,7 @@ namespace gms {
|
||||
class dictionary_service {
|
||||
db::system_keyspace& _sys_ks;
|
||||
locator::host_id _our_host_id;
|
||||
utils::updateable_value<enum_option<netw::dict_training_loop::when>> _rpc_dict_training_when;
|
||||
utils::updateable_value<enum_option<netw::dict_training_when>> _rpc_dict_training_when;
|
||||
service::raft_group0_client& _raft_group0_client;
|
||||
abort_source& _as;
|
||||
netw::dict_training_loop _training_fiber;
|
||||
@@ -48,7 +48,7 @@ class dictionary_service {
|
||||
|
||||
bool _is_leader = false;
|
||||
utils::observer<bool> _leadership_observer;
|
||||
utils::observer<enum_option<netw::dict_training_loop::when>> _when_observer;
|
||||
utils::observer<enum_option<netw::dict_training_when>> _when_observer;
|
||||
std::optional<std::any> _feature_observer;
|
||||
|
||||
void maybe_toggle_dict_training();
|
||||
@@ -61,7 +61,7 @@ public:
|
||||
locator::host_id our_host_id = Uninitialized();
|
||||
utils::updateable_value<uint32_t> rpc_dict_training_min_time_seconds = Uninitialized();
|
||||
utils::updateable_value<uint64_t> rpc_dict_training_min_bytes = Uninitialized();
|
||||
utils::updateable_value<enum_option<netw::dict_training_loop::when>> rpc_dict_training_when = Uninitialized();
|
||||
utils::updateable_value<enum_option<netw::dict_training_when>> rpc_dict_training_when = Uninitialized();
|
||||
};
|
||||
// Note: the training fiber will start as soon as the relevant cluster feature is enabled.
|
||||
dictionary_service(
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include <seastar/coroutine/all.hh>
|
||||
|
||||
#include "message/messaging_service.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "gms/gossiper.hh"
|
||||
#include "service/storage_service.hh"
|
||||
|
||||
@@ -19,11 +19,11 @@
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "streaming/stream_fwd.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
#include "service/maintenance_mode.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "gms/generation-number.hh"
|
||||
#include "tasks/types.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
|
||||
#include <list>
|
||||
@@ -120,6 +120,8 @@ namespace qos {
|
||||
|
||||
namespace netw {
|
||||
|
||||
class walltime_compressor_tracker;
|
||||
|
||||
/* All verb handler identifiers */
|
||||
enum class messaging_verb : int32_t {
|
||||
CLIENT_ID = 0,
|
||||
|
||||
155
message/rpc_compression_types.hh
Normal file
155
message/rpc_compression_types.hh
Normal file
@@ -0,0 +1,155 @@
|
||||
/*
|
||||
* Copyright (C) 2026-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <bit>
|
||||
#include <compare>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <iterator>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/enum_option.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
// An enum wrapper, describing supported RPC compression algorithms.
|
||||
// Always contains a valid value -- the constructors won't allow
|
||||
// an invalid/unknown enum variant to be constructed.
|
||||
struct compression_algorithm {
|
||||
using underlying = uint8_t;
|
||||
enum class type : underlying {
|
||||
RAW,
|
||||
LZ4,
|
||||
ZSTD,
|
||||
COUNT,
|
||||
} _value;
|
||||
|
||||
// Construct from an integer.
|
||||
// Used to deserialize the algorithm from the first byte of the frame.
|
||||
constexpr compression_algorithm(underlying x) {
|
||||
if (x < std::to_underlying(type::RAW) || x >= std::to_underlying(type::COUNT)) {
|
||||
throw std::runtime_error(std::string("Invalid value ") + std::to_string(unsigned(x)) + " for enum compression_algorithm");
|
||||
}
|
||||
_value = static_cast<type>(x);
|
||||
}
|
||||
|
||||
// Construct from `type`. Makes sure that `type` has a valid value.
|
||||
constexpr compression_algorithm(type x) : compression_algorithm(std::to_underlying(x)) {}
|
||||
|
||||
// These names are used in multiple places:
|
||||
// RPC negotiation, in metric labels, and config.
|
||||
static constexpr std::string_view names[] = {
|
||||
"raw",
|
||||
"lz4",
|
||||
"zstd",
|
||||
};
|
||||
static_assert(std::size(names) == static_cast<int>(compression_algorithm::type::COUNT));
|
||||
|
||||
// Implements enum_option.
|
||||
static auto map() {
|
||||
std::unordered_map<std::string, type> ret;
|
||||
for (size_t i = 0; i < std::size(names); ++i) {
|
||||
ret.insert(std::make_pair(std::string(names[i]), compression_algorithm(i).get()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
constexpr std::string_view name() const noexcept { return names[idx()]; }
|
||||
constexpr underlying idx() const noexcept { return std::to_underlying(_value); }
|
||||
constexpr type get() const noexcept { return _value; }
|
||||
constexpr static size_t count() { return static_cast<size_t>(type::COUNT); }
|
||||
bool operator<=>(const compression_algorithm&) const = default;
|
||||
};
|
||||
|
||||
// Represents a set of compression algorithms.
|
||||
// Backed by a bitset.
|
||||
// Used for convenience during algorithm negotiations.
|
||||
class compression_algorithm_set {
|
||||
uint8_t _bitset;
|
||||
static_assert(std::numeric_limits<decltype(_bitset)>::digits > compression_algorithm::count());
|
||||
constexpr compression_algorithm_set(uint8_t v) noexcept : _bitset(v) {}
|
||||
public:
|
||||
// Returns a set containing the given algorithm and all algorithms weaker (smaller in the enum order)
|
||||
// than it.
|
||||
constexpr static compression_algorithm_set this_or_lighter(compression_algorithm algo) noexcept {
|
||||
auto x = 1 << algo.idx();
|
||||
return {uint8_t(x + (x - 1))};
|
||||
}
|
||||
|
||||
// Returns the strongest (greatest in the enum order) algorithm in the set.
|
||||
constexpr compression_algorithm heaviest() const {
|
||||
return {compression_algorithm::underlying(std::bit_width(_bitset) - 1)};
|
||||
}
|
||||
|
||||
// The usual set operations.
|
||||
constexpr static compression_algorithm_set singleton(compression_algorithm algo) noexcept {
|
||||
return {uint8_t(1 << algo.idx())};
|
||||
}
|
||||
constexpr compression_algorithm_set intersection(compression_algorithm_set o) const noexcept {
|
||||
return {uint8_t(_bitset & o._bitset)};
|
||||
}
|
||||
constexpr compression_algorithm_set difference(compression_algorithm_set o) const noexcept {
|
||||
return {uint8_t(_bitset &~ o._bitset)};
|
||||
}
|
||||
constexpr compression_algorithm_set sum(compression_algorithm_set o) const noexcept {
|
||||
return {uint8_t(_bitset | o._bitset)};
|
||||
}
|
||||
constexpr bool contains(compression_algorithm algo) const noexcept {
|
||||
return _bitset & (1 << algo.idx());
|
||||
}
|
||||
constexpr bool operator==(const compression_algorithm_set&) const = default;
|
||||
|
||||
// Returns the contained bitset. Used for serialization.
|
||||
constexpr uint8_t value() const noexcept {
|
||||
return _bitset;
|
||||
}
|
||||
|
||||
// Reconstructs the set from the output of `value()`. Used for deserialization.
|
||||
constexpr static compression_algorithm_set from_value(uint8_t bitset) {
|
||||
compression_algorithm_set x = bitset;
|
||||
x.heaviest(); // Validation: throws on illegal/unknown bits.
|
||||
return x;
|
||||
}
|
||||
};
|
||||
|
||||
using algo_config = std::vector<enum_option<compression_algorithm>>;
|
||||
|
||||
struct dict_training_when {
|
||||
enum class type {
|
||||
NEVER,
|
||||
WHEN_LEADER,
|
||||
ALWAYS,
|
||||
COUNT,
|
||||
};
|
||||
|
||||
static constexpr std::string_view names[] = {
|
||||
"never",
|
||||
"when_leader",
|
||||
"always",
|
||||
};
|
||||
static_assert(std::size(names) == static_cast<size_t>(type::COUNT));
|
||||
|
||||
// Implements enum_option.
|
||||
static std::unordered_map<std::string, type> map() {
|
||||
std::unordered_map<std::string, type> ret;
|
||||
for (size_t i = 0; i < std::size(names); ++i) {
|
||||
ret.insert({std::string(names[i]), type(i)});
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
27
raft/raft_fwd.hh
Normal file
27
raft/raft_fwd.hh
Normal file
@@ -0,0 +1,27 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
// Lightweight forward-declaration header for commonly used raft types.
|
||||
// Include this instead of raft/raft.hh when only the basic ID/index types
|
||||
// are needed (e.g. in other header files), to avoid pulling in the full
|
||||
// raft machinery (futures, abort_source, bytes_ostream, etc.).
|
||||
|
||||
#include "internal.hh"
|
||||
|
||||
namespace raft {
|
||||
|
||||
using server_id = internal::tagged_id<struct server_id_tag>;
|
||||
using group_id = internal::tagged_id<struct group_id_tag>;
|
||||
using term_t = internal::tagged_uint64<struct term_tag>;
|
||||
using index_t = internal::tagged_uint64<struct index_tag>;
|
||||
using read_id = internal::tagged_uint64<struct read_id_tag>;
|
||||
|
||||
class server;
|
||||
|
||||
} // namespace raft
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
@@ -113,6 +112,10 @@ namespace gms {
|
||||
class feature_service;
|
||||
}
|
||||
|
||||
namespace locator {
|
||||
class abstract_replication_strategy;
|
||||
}
|
||||
|
||||
namespace alternator {
|
||||
class table_stats;
|
||||
}
|
||||
|
||||
@@ -69,6 +69,13 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace replica::logstor
|
||||
|
||||
template<>
|
||||
size_t hist_key<replica::logstor::segment_descriptor>(const replica::logstor::segment_descriptor& desc);
|
||||
|
||||
namespace replica::logstor {
|
||||
|
||||
using segment_descriptor_hist = log_heap<segment_descriptor, segment_descriptor_hist_options>;
|
||||
|
||||
struct segment_set {
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
|
||||
#include "mutation/mutation.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
#include "locator/tablets.hh"
|
||||
|
||||
namespace replica {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "service/paxos/paxos_state.hh"
|
||||
#include "service/query_state.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/untyped_result_set.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
#pragma once
|
||||
#include <unordered_set>
|
||||
#include "service/raft/group0_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -9,7 +9,11 @@
|
||||
#pragma once
|
||||
|
||||
#include <iosfwd>
|
||||
#include "raft/raft.hh"
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
#include <seastar/core/timer.hh>
|
||||
#include <seastar/core/lowres_clock.hh>
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/UUID.hh"
|
||||
#include "service/session_id.hh"
|
||||
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
@@ -19,12 +19,6 @@
|
||||
|
||||
namespace service {
|
||||
|
||||
using session_id = utils::tagged_uuid<struct session_id_tag>;
|
||||
|
||||
// We want it be different than default-constructed session_id to catch mistakes.
|
||||
constexpr session_id default_session_id = session_id(
|
||||
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
|
||||
|
||||
/// Session is used to track execution of work related to some greater task, identified by session_id.
|
||||
/// Work can enter the session using enter(), and is considered to be part of the session
|
||||
/// as long as the guard returned by enter() is alive.
|
||||
|
||||
21
service/session_id.hh
Normal file
21
service/session_id.hh
Normal file
@@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
using session_id = utils::tagged_uuid<struct session_id_tag>;
|
||||
|
||||
// We want it to be different than a default-constructed session_id to catch mistakes.
|
||||
constexpr session_id default_session_id = session_id(
|
||||
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
|
||||
|
||||
} // namespace service
|
||||
@@ -38,7 +38,6 @@
|
||||
#include "replica/exceptions.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "dht/token_range_endpoints.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/cas_shard.hh"
|
||||
#include "service/storage_proxy_fwd.hh"
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include "absl-flat_hash_map.hh"
|
||||
#include "gms/endpoint_state.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "gms/i_endpoint_state_change_subscriber.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/client_routes.hh"
|
||||
@@ -40,11 +41,9 @@
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include "cdc/generation_id.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "node_ops/id.hh"
|
||||
#include "raft/server.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "service/tablet_operation.hh"
|
||||
#include "mutation/timestamp.hh"
|
||||
#include "utils/UUID.hh"
|
||||
@@ -115,6 +114,10 @@ class tablet_mutation_builder;
|
||||
|
||||
namespace auth { class cache; }
|
||||
|
||||
namespace service {
|
||||
class tablet_allocator;
|
||||
}
|
||||
|
||||
namespace utils {
|
||||
class disk_space_monitor;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,10 @@
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
|
||||
namespace cql3 {
|
||||
class query_processor;
|
||||
}
|
||||
|
||||
namespace db {
|
||||
class system_keyspace;
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
#include <seastar/core/metrics.hh>
|
||||
|
||||
#include "utils/log.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
|
||||
@@ -17,9 +17,9 @@
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include "cdc/generation_id.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "raft/raft_fwd.hh"
|
||||
#include "utils/UUID.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "locator/host_id.hh"
|
||||
|
||||
@@ -44,7 +44,7 @@
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "dht/decorated_key.hh"
|
||||
#include "service/session.hh"
|
||||
#include "service/session_id.hh"
|
||||
#include "sstables/trie/bti_index.hh"
|
||||
#include "sstables/file_size_stats.hh"
|
||||
|
||||
|
||||
41
stdafx.hh
41
stdafx.hh
@@ -254,7 +254,6 @@
|
||||
#include <seastar/coroutine/generator.hh>
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/http/api_docs.hh>
|
||||
#include <seastar/http/client.hh>
|
||||
#include <seastar/http/common.hh>
|
||||
#include <seastar/http/connection_factory.hh>
|
||||
@@ -401,4 +400,44 @@
|
||||
#define ZSTD_STATIC_LINKING_ONLY
|
||||
#include <zstd.h>
|
||||
|
||||
// Scylla internal headers included by most translation units
|
||||
#include "bytes.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "db/timeout_clock.hh"
|
||||
#include "db_clock.hh"
|
||||
#include "dht/token.hh"
|
||||
#include "exceptions/exceptions.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "keys/keys.hh"
|
||||
#include "locator/host_id.hh"
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "locator/token_metadata_fwd.hh"
|
||||
#include "locator/types.hh"
|
||||
#include "mutation/mutation_fragment.hh"
|
||||
#include "mutation/mutation_partition.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "seastarx.hh"
|
||||
#include "service/client_state.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "types/types.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "utils/fragment_range.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
#include "utils/tagged_integer.hh"
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#endif
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/UUID.hh"
|
||||
#include "service/session_id.hh"
|
||||
|
||||
namespace streaming {
|
||||
|
||||
@@ -21,9 +21,3 @@ class stream_state;
|
||||
using plan_id = utils::tagged_uuid<struct plan_id_tag>;
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
namespace service {
|
||||
|
||||
using session_id = utils::tagged_uuid<struct session_id_tag>;
|
||||
|
||||
}
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "table_helper.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/statements/create_table_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
#include "replica/database.hh"
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/when_all.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "gms/gossiper.hh"
|
||||
#include <seastar/util/defer.hh>
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/error_injection.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#undef SEASTAR_TESTING_MAIN
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "db/commitlog/commitlog_replayer.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#include "mutation/mutation.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
|
||||
BOOST_AUTO_TEST_SUITE(per_partition_rate_limit_test)
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "service/migration_manager.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "service/topology_mutation.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include <seastar/testing/on_internal_error.hh>
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "cdc/generation_service.hh"
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "cql3/query_processor.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/statements/batch_statement.hh"
|
||||
#include "cql3/statements/modification_statement.hh"
|
||||
@@ -35,6 +36,7 @@
|
||||
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossip_address_map.hh"
|
||||
#include "service/raft/raft_group_registry.hh"
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#include <seastar/util/closeable.hh>
|
||||
#include "db/config.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "message/advanced_rpc_compressor.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossip_digest_syn.hh"
|
||||
#include "gms/gossip_digest_ack.hh"
|
||||
|
||||
@@ -11,6 +11,8 @@
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <seastar/net/socket_defs.hh>
|
||||
#include <seastar/net/api.hh>
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "schema/schema_builder.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "tools/utils.hh"
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
#include "vs_mock_server.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "transport/messages/result_message.hh"
|
||||
#include "utils/rjson.hh"
|
||||
#include <boost/test/tools/old/interface.hpp>
|
||||
#include <seastar/core/seastar.hh>
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#include "tools/load_system_tablets.hh"
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
#include "query/query-result-set.hh"
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
|
||||
@@ -1687,7 +1687,7 @@ process_batch_internal(service::client_state& client_state, sharded<cql3::query_
|
||||
|
||||
std::vector<cql3::statements::batch_statement::single_statement> modifications;
|
||||
std::vector<cql3::raw_value_view_vector_with_unset> values;
|
||||
std::unordered_map<cql3::prepared_cache_key_type, cql3::authorized_prepared_statements_cache::value_type> pending_authorization_entries;
|
||||
std::unordered_map<cql3::prepared_cache_key_type, cql3::statements::prepared_statement::checked_weak_ptr> pending_authorization_entries;
|
||||
|
||||
modifications.reserve(n.assume_value());
|
||||
values.reserve(n.assume_value());
|
||||
|
||||
@@ -566,7 +566,7 @@ public:
|
||||
}
|
||||
if (need_preempt() && i != _cache.end()) {
|
||||
auto key = i->idx;
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
i = _cache.lower_bound(key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -361,7 +361,7 @@ utils::config_file::configs utils::config_file::unset_values() const {
|
||||
|
||||
future<> utils::config_file::read_from_file(file f, error_handler h) {
|
||||
return f.size().then([this, f, h](size_t s) {
|
||||
return do_with(make_file_input_stream(f), [this, s, h](input_stream<char>& in) {
|
||||
return do_with(make_file_input_stream(f), [this, s, h](seastar::input_stream<char>& in) {
|
||||
return in.read_exactly(s).then([this, h](temporary_buffer<char> buf) {
|
||||
read_from_yaml(sstring(buf.begin(), buf.end()), h);
|
||||
if (!_initialization_completed) {
|
||||
|
||||
@@ -83,7 +83,7 @@ directories::directories(bool developer_mode)
|
||||
future<> directories::create_and_verify(directories::set dir_set, recursive recursive) {
|
||||
std::vector<file_lock> locks;
|
||||
locks.reserve(dir_set.get_paths().size());
|
||||
co_await coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
|
||||
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [this, &locks, recursive] (fs::path path) -> future<> {
|
||||
file_lock lock = co_await touch_and_lock(path);
|
||||
locks.emplace_back(std::move(lock));
|
||||
co_await disk_sanity(path, _developer_mode);
|
||||
@@ -144,7 +144,7 @@ future<> directories::do_verify_owner_and_mode(fs::path path, recursive recurse,
|
||||
co_return;
|
||||
}
|
||||
auto lister = directory_lister(path, lister::dir_entry_types::full(), do_verify_subpath, lister::show_hidden::no);
|
||||
co_await with_closeable(std::move(lister), coroutine::lambda([&] (auto& lister) -> future<> {
|
||||
co_await with_closeable(std::move(lister), seastar::coroutine::lambda([&] (auto& lister) -> future<> {
|
||||
while (auto de = co_await lister.get()) {
|
||||
co_await do_verify_owner_and_mode(path / de->name, recurse, level + 1, do_verify_subpath);
|
||||
}
|
||||
@@ -168,7 +168,7 @@ future<> directories::verify_owner_and_mode(fs::path path, recursive recursive)
|
||||
future<> directories::verify_owner_and_mode_of_data_dir(directories::set dir_set) {
|
||||
// verify data and index files in the first iteration and the other files in the second iteration.
|
||||
for (auto verify_data_and_index_files : { true, false }) {
|
||||
co_await coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
|
||||
co_await seastar::coroutine::parallel_for_each(dir_set.get_paths(), [verify_data_and_index_files] (const auto &path) {
|
||||
return do_verify_owner_and_mode(std::move(path), recursive::yes, 0, [verify_data_and_index_files] (const fs::path& dir, const directory_entry& de) {
|
||||
auto path = dir / de.name;
|
||||
component_type path_component_type;
|
||||
|
||||
@@ -335,7 +335,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
} catch (...) {
|
||||
// just disregard the failure, we will retry below in the wrapped handler
|
||||
}
|
||||
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, input_stream<char>& in) -> future<> {
|
||||
auto wrapped_handler = [this, handler = std::move(handler), &req, scope](const reply& rep, seastar::input_stream<char>& in) -> future<> {
|
||||
auto _in = std::move(in);
|
||||
auto status_class = reply::classify_status(rep._status);
|
||||
/*
|
||||
@@ -352,7 +352,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
auto content = co_await util::read_entire_stream_contiguous(_in);
|
||||
auto error_msg = get_gcp_error_message(std::string_view(content));
|
||||
gcp_storage.debug("Got unexpected response status: {}, content: {}", rep._status, content);
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(httpd::unexpected_status_error(rep._status)));
|
||||
}
|
||||
|
||||
std::exception_ptr eptr;
|
||||
@@ -366,7 +366,7 @@ utils::gcp::storage::client::impl::send_with_retry(const std::string& path, cons
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
if (eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::move(eptr));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(eptr));
|
||||
}
|
||||
};
|
||||
object_storage_retry_strategy retry_strategy(10,10ms,10000ms, as);
|
||||
|
||||
@@ -84,7 +84,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
|
||||
_gen.emplace(_opened.experimental_list_directory());
|
||||
}
|
||||
if (!_gen) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
|
||||
co_return seastar::coroutine::exception(std::make_exception_ptr(seastar::broken_pipe_exception()));
|
||||
}
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
@@ -114,7 +114,7 @@ future<std::optional<directory_entry>> directory_lister::get() {
|
||||
_gen.reset();
|
||||
if (ex) {
|
||||
co_await _opened.close();
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
@@ -438,7 +438,7 @@ private:
|
||||
break;
|
||||
}
|
||||
_reclaim(free_memory_threshold - memory::free_memory());
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
llogger.debug("background_reclaimer::main_loop: exit");
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ public:
|
||||
_map.erase(_map.begin());
|
||||
}
|
||||
if (++ret % evictions_per_yield == 0) {
|
||||
co_await coroutine::maybe_yield();
|
||||
co_await seastar::coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
co_return ret;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <exception>
|
||||
#include "utils/exceptions.hh"
|
||||
#include <initializer_list>
|
||||
#include <memory>
|
||||
#include <regex>
|
||||
@@ -354,7 +355,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
|
||||
should_retry = utils::http::retryable::yes;
|
||||
co_await authorize(request);
|
||||
}
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(
|
||||
aws::aws_exception(aws_error(possible_error->get_error_type(), possible_error->get_error_message().c_str(), should_retry))));
|
||||
}
|
||||
|
||||
@@ -373,7 +374,7 @@ http::experimental::client::reply_handler client::wrap_handler(http::request& re
|
||||
eptr = std::current_exception();
|
||||
}
|
||||
if (eptr) {
|
||||
co_await coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::make_exception_ptr(aws::aws_exception(aws_error::from_exception_ptr(eptr))));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -554,7 +555,7 @@ future<> client::put_object_tagging(sstring object_name, tag_set tagging, seasta
|
||||
}
|
||||
co_await output.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), ignore_reply, http::reply::status_type::ok, as);
|
||||
@@ -624,7 +625,7 @@ future<> client::put_object(sstring object_name, temporary_buffer<char> buf, sea
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
|
||||
@@ -650,7 +651,7 @@ future<> client::put_object(sstring object_name, ::memory_data_sink_buffers bufs
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
});
|
||||
co_await make_request(std::move(req), [len, start = s3_clock::now()] (group_client& gc, const auto& rep, auto&& in) {
|
||||
@@ -900,7 +901,7 @@ future<> dump_multipart_upload_parts(output_stream<char> out, const utils::chunk
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -916,7 +917,7 @@ future<> client::multipart_upload::start_upload() {
|
||||
auto body = co_await util::read_entire_stream_contiguous(in);
|
||||
_upload_id = parse_multipart_upload_id(body);
|
||||
if (_upload_id.empty()) {
|
||||
co_await coroutine::return_exception(std::runtime_error("cannot initiate upload"));
|
||||
co_await seastar::coroutine::return_exception(std::runtime_error("cannot initiate upload"));
|
||||
}
|
||||
s3l.trace("created uploads for {} -> id = {}", _object_name, _upload_id);
|
||||
}, http::reply::status_type::ok, _as);
|
||||
@@ -951,7 +952,7 @@ future<> client::multipart_upload::upload_part(memory_data_sink_buffers bufs) {
|
||||
}
|
||||
co_await out.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
// note: At this point the buffers are sent, but the response is not yet
|
||||
// received. However, claim is released and next part may start uploading
|
||||
@@ -1000,7 +1001,7 @@ future<> client::multipart_upload::finalize_upload() {
|
||||
|
||||
unsigned parts_xml_len = prepare_multipart_upload_parts(_part_etags);
|
||||
if (parts_xml_len == 0) {
|
||||
co_await coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
|
||||
co_await seastar::coroutine::return_exception(std::runtime_error("Failed to parse ETag list. Aborting multipart upload."));
|
||||
}
|
||||
|
||||
s3l.trace("POST upload completion {} parts (upload id {})", _part_etags.size(), _upload_id);
|
||||
@@ -1016,15 +1017,15 @@ future<> client::multipart_upload::finalize_upload() {
|
||||
auto status_class = http::reply::classify_status(rep._status);
|
||||
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
|
||||
if (possible_error) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
co_await seastar::coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
}
|
||||
|
||||
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
co_await seastar::coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
}
|
||||
|
||||
if (rep._status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
|
||||
// this point it is ok since we are not interested in parsing this particular response
|
||||
@@ -1458,7 +1459,7 @@ auto client::download_source::request_body() -> future<external_body> {
|
||||
(void)_client->make_request(std::move(req), [this, &p] (group_client& gc, const http::reply& rep, input_stream<char>&& in_) mutable -> future<> {
|
||||
s3l.trace("GET {} got the body ({} {} bytes)", _object_name, rep._status, rep.content_length);
|
||||
if (rep._status != http::reply::status_type::partial_content && rep._status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
co_await seastar::coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
|
||||
auto in = std::move(in_);
|
||||
@@ -1548,7 +1549,7 @@ class client::do_upload_file : private multipart_upload {
|
||||
co_await output.close();
|
||||
co_await input.close();
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
co_await seastar::coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1823,7 +1824,7 @@ future<> client::close() {
|
||||
_creds_invalidation_timer.cancel();
|
||||
_creds_update_timer.cancel();
|
||||
}
|
||||
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
|
||||
co_await seastar::coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
|
||||
co_await it.second.http.close();
|
||||
});
|
||||
|
||||
@@ -1934,7 +1935,7 @@ future<std::optional<directory_entry>> client::bucket_lister::get() {
|
||||
}
|
||||
co_await close();
|
||||
if (ex) {
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
co_return seastar::coroutine::exception(std::move(ex));
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user