Files
scylladb/test/lib/cql_test_env.cc
Avi Kivity 6b259babeb Merge 'logstor: initial log-structured storage for key-value tables' from Michael Litvak
Introduce an initial and experimental implementation of an alternative log-structured storage engine for key-value tables.

Main flows and components:
* The storage is composed of 32MB files, each file divided to segments of size 128k. We write to them sequentially records that contain a mutation and additional metadata. Records are written to a buffer first and then written to the active segment sequentially in 4k sized blocks.
* The primary index in memory maps keys to their location on disk. It is a B-tree per-table that is ordered by tokens, similar to a memtable.
* On reads we calculate the key and look it up in the primary index, then read the mutation from disk with a single disk IO.
* On writes we write the record to a buffer, wait for it to be written to disk, then update the index with the new location, and free the previous record.
* We track the used space in each segment. When overwriting a record, we increase the free space counter for the segment of the previous record that becomes dead. We store the segments in a histogram by usage.
* The compaction process takes segments with low utilization, reads them and writes the live records to new segments, and frees the old segments.
* Segments are initially "mixed" - we write to the active segment records from all tables and all tablets. The "separator" process rewrites records from mixed segments into new segments that are organized by compaction groups (tablets), and frees the mixed segments. Each write is written to the active segment and to a separator buffer of the compaction group, which is eventually flushed to a new segment in the compaction group.

Currently this mode is experimental and requires an experimental flag to be enabled.
Some things that are not supported yet are strong consistency, tablet migration, tablet split/merge, big mutations, tombstone gc, ttl.

to use, add to config:
```
enable_logstor: true

experimental_features:
  - logstor
```

create a table:
```
CREATE TABLE ks.t(pk int PRIMARY KEY, a int, v text) WITH storage_engine = 'logstor';
```

INSERT, SELECT, DELETE work as expected
UPDATE not supported yet

no backport - new feature

Closes scylladb/scylladb#28706

* github.com:scylladb/scylladb:
  logstor: trigger separator flush for buffers that hold old segments
  docs/dev: add logstor documentation
  logstor: recover segments into compaction groups
  logstor: range read
  logstor: change index to btree by token per table
  logstor: move segments to replica::compaction_group
  db: update dirty mem limits dynamically
  logstor: track memory usage
  logstor: logstor stats api
  logstor: compaction buffer pool
  logstor: separator: flush buffer when full
  logstor: hold segment until index updates
  logstor: truncate table
  logstor: enable/disable compaction per table
  logstor: separator buffer pool
  test: logstor: add separator and compaction tests
  logstor: segment and separator barrier
  logstor: separator debt controller
  logstor: compaction controller
  logstor: recovery: recover mixed segments using separator
  logstor: wait for pending reads in compaction
  logstor: separator
  logstor: compaction groups
  logstor: cache files for read
  logstor: recovery: initial
  logstor: add segment generation
  logstor: reserve segments for compaction
  logstor: index: buckets
  logstor: add buffer header
  logstor: add group_id
  logstor: record generation
  logstor: generation utility
  logstor: use RIPEMD-160 for index key
  test: add test_logstor.py
  api: add logstor compaction trigger endpoint
  replica: add logstor to db
  schema: add logstor cf property
  logstor: initial commit
  db: disable tablet balancing with logstor
  db: add logstor experimental feature flag
2026-03-20 00:18:09 +02:00

1324 lines
64 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <iterator>
#include <random>
#include <seastar/core/thread.hh>
#include <seastar/util/defer.hh>
#include <boost/test/unit_test.hpp>
#include "gms/generation-number.hh"
#include "db/view/view_building_worker.hh"
#include "replica/database_fwd.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/test_utils.hh"
#include "cdc/generation_service.hh"
#include "cql3/functions/functions.hh"
#include "cql3/query_processor.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "cql3/cql_config.hh"
#include <fmt/ranges.h>
#include <seastar/core/sharded.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/coroutine.hh>
#include "service/client_routes.hh"
#include "service/migration_manager.hh"
#include "service/qos/raft_service_level_distributed_data_accessor.hh"
#include "service/tablet_allocator.hh"
#include "compaction/compaction_manager.hh"
#include "message/messaging_service.hh"
#include "gms/gossip_address_map.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "service/mapreduce_service.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "auth/service.hh"
#include "auth/cache.hh"
#include "auth/common.hh"
#include "db/config.hh"
#include "db/batchlog_manager.hh"
#include "schema/schema_builder.hh"
#include "schema/compression_initializer.hh"
#include "db/view/view_building_state.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/log.hh"
#include "db/view/view_builder.hh"
#include "db/view/node_view_update_backlog.hh"
#include "db/view/view_update_generator.hh"
#include "replica/distributed_loader.hh"
// TODO: remove (#293)
#include "message/messaging_service.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "service/qos/service_level_controller.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "repair/row_level.hh"
#include "utils/assert.hh"
#include "utils/only_on_shard0.hh"
#include "utils/cross-shard-barrier.hh"
#include "streaming/stream_manager.hh"
#include "debug.hh"
#include "db/schema_tables.hh"
#include "db/virtual_tables.hh"
#include "service/strong_consistency/groups_manager.hh"
#include "service/strong_consistency/coordinator.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/raft/raft_group0.hh"
#include "service/paxos/paxos_state.hh"
#include "sstables/sstables_manager.hh"
#include "init.hh"
#include "lang/manager.hh"
#include "utils/disk_space_monitor.hh"
#include <sys/time.h>
using namespace std::chrono_literals;
future<scheduling_groups> get_scheduling_groups() {
static std::optional<scheduling_groups> _scheduling_groups;
if (!_scheduling_groups) {
_scheduling_groups.emplace();
_scheduling_groups->compaction_scheduling_group = co_await create_scheduling_group("compaction", 1000);
_scheduling_groups->memory_compaction_scheduling_group = co_await create_scheduling_group("mem_compaction", 1000);
_scheduling_groups->streaming_scheduling_group = co_await create_scheduling_group("streaming", 200);
_scheduling_groups->statement_scheduling_group = co_await create_scheduling_group("statement", 1000);
_scheduling_groups->memtable_scheduling_group = co_await create_scheduling_group("memtable", 1000);
_scheduling_groups->memtable_to_cache_scheduling_group = co_await create_scheduling_group("memtable_to_cache", 200);
_scheduling_groups->gossip_scheduling_group = co_await create_scheduling_group("gossip", 1000);
}
co_return *_scheduling_groups;
}
cql_test_config::cql_test_config()
: cql_test_config(make_shared<db::config>())
{}
cql_test_config::cql_test_config(shared_ptr<db::config> cfg)
: db_config(cfg)
{
// This causes huge amounts of commitlog writes to allocate space on disk,
// which all get thrown away when the test is done. This can cause timeouts
// if /tmp is not tmpfs.
db_config->commitlog_use_o_dsync.set(false);
db_config->add_all_default_extensions();
db_config->flush_schema_tables_after_modification.set(false);
db_config->commitlog_use_o_dsync(false);
db_config->rf_rack_valid_keyspaces(true);
}
cql_test_config::cql_test_config(const cql_test_config&) = default;
cql_test_config::~cql_test_config() = default;
static const sstring testing_superuser = "tester";
// END TODO
data_dictionary::database
cql_test_env::data_dictionary() {
return db().local().as_data_dictionary();
}
class single_node_cql_env : public cql_test_env {
public:
static constexpr std::string_view ks_name = "ks";
static std::atomic<bool> active;
private:
sharded<default_sstable_compressor_factory> _scf;
sharded<replica::database> _db;
sharded<gms::feature_service> _feature_service;
sharded<sstables::storage_manager> _sstm;
sharded<service::storage_proxy> _proxy;
sharded<service::paxos::paxos_store> _paxos_store;
sharded<cql3::query_processor> _qp;
sharded<auth::service> _auth_service;
sharded<auth::cache> _auth_cache;
sharded<db::view::view_builder> _view_builder;
sharded<db::view::view_building_worker> _view_building_worker;
sharded<db::view::view_update_generator> _view_update_generator;
sharded<service::migration_notifier> _mnotifier;
sharded<qos::service_level_controller> _sl_controller;
sharded<service::client_routes_service> _client_routes;
sharded<qos::service_level_controller::auth_integration> _sl_controller_auth_integration;
sharded<service::topology_state_machine> _topology_state_machine;
sharded<db::view::view_building_state_machine> _view_building_state_machine;
sharded<netw::walltime_compressor_tracker> _compressor_tracker;
sharded<service::migration_manager> _mm;
sharded<db::batchlog_manager> _batchlog_manager;
sharded<gms::gossiper> _gossiper;
sharded<service::raft_group_registry> _group0_registry;
sharded<db::system_keyspace> _sys_ks;
sharded<service::tablet_allocator> _tablet_allocator;
sharded<db::system_distributed_keyspace> _sys_dist_ks;
sharded<locator::snitch_ptr> _snitch;
sharded<compaction::compaction_manager> _cm;
sharded<tasks::task_manager> _task_manager;
sharded<netw::messaging_service> _ms;
sharded<service::strong_consistency::groups_manager> _groups_manager;
sharded<service::strong_consistency::coordinator> _sc_coordinator;
sharded<service::storage_service> _ss;
sharded<locator::shared_token_metadata> _token_metadata;
sharded<locator::effective_replication_map_factory> _erm_factory;
sharded<sstables::directory_semaphore> _sst_dir_semaphore;
std::optional<utils::disk_space_monitor> _disk_space_monitor_shard0;
sharded<lang::manager> _lang_manager;
sharded<cql3::cql_config> _cql_config;
sharded<service::endpoint_lifecycle_notifier> _elc_notif;
sharded<cdc::generation_service> _cdc_generation_service;
sharded<repair_service> _repair;
sharded<streaming::stream_manager> _stream_manager;
sharded<service::mapreduce_service> _mapreduce_service;
sharded<direct_failure_detector::failure_detector> _fd;
sharded<gms::gossip_address_map> _gossip_address_map;
sharded<service::direct_fd_pinger> _fd_pinger;
sharded<cdc::cdc_service> _cdc;
sharded<vector_search::vector_store_client> _vector_store_client;
db::config* _db_config;
service::raft_group0_client* _group0_client;
private:
struct core_local_state {
service::client_state client_state;
core_local_state(auth::service& auth_service, qos::service_level_controller& sl_controller, timeout_config timeout)
: client_state(service::client_state::external_tag{}, auth_service, &sl_controller, timeout)
{
client_state.set_login(auth::authenticated_user(testing_superuser));
}
future<> stop() {
return make_ready_future<>();
}
};
sharded<core_local_state> _core_local;
private:
cql3::dialect test_dialect() {
return cql3::dialect{
.duplicate_bind_variable_names_refer_to_same_variable = _db.local().get_config().cql_duplicate_bind_variable_names_refer_to_same_variable(),
};
}
auto make_query_state() {
if (_db.local().has_keyspace(ks_name)) {
_core_local.local().client_state.set_keyspace(_db.local(), ks_name);
cql_transport::cql_protocol_extension_enum_set cql_proto_exts;
cql_proto_exts.set(cql_transport::cql_protocol_extension::TABLETS_ROUTING_V1);
_core_local.local().client_state.set_protocol_extensions(std::move(cql_proto_exts));
}
return ::make_shared<service::query_state>(_core_local.local().client_state, empty_service_permit());
}
public:
single_node_cql_env()
{
tests::adjust_rlimit();
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(std::string_view text) override {
testlog.trace("{}(\"{}\")", __FUNCTION__, text);
auto qs = make_query_state();
auto qo = make_shared<cql3::query_options>(cql3::query_options::DEFAULT);
return local_qp().execute_direct_without_checking_exception_message(text, *qs, test_dialect(), *qo).then([qs, qo] (auto msg) {
return cql_transport::messages::propagate_exception_as_future(std::move(msg));
});
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(
std::string_view text,
std::unique_ptr<cql3::query_options> qo) override
{
testlog.trace("{}(\"{}\")", __FUNCTION__, text);
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().execute_direct_without_checking_exception_message(text, *qs, test_dialect(), lqo).then([qs, qo = std::move(qo)] (auto msg) {
return cql_transport::messages::propagate_exception_as_future(std::move(msg));
});
}
virtual future<cql3::prepared_cache_key_type> prepare(sstring query) override {
return qp().invoke_on_all([query, this] (auto& local_qp) {
auto qs = this->make_query_state();
return local_qp.prepare(query, *qs, test_dialect()).finally([qs] {}).discard_result();
}).then([query, this] {
return local_qp().compute_id(query, ks_name, test_dialect());
});
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared(
cql3::prepared_cache_key_type id,
cql3::raw_value_vector_with_unset values,
db::consistency_level cl = db::consistency_level::ONE) override {
const auto& so = cql3::query_options::specific_options::DEFAULT;
auto options = std::make_unique<cql3::query_options>(cl,
std::move(values), cql3::query_options::specific_options{
so.page_size,
so.state,
db::consistency_level::SERIAL,
so.timestamp,
});
return execute_prepared_with_qo(id, std::move(options));
}
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_prepared_with_qo(
cql3::prepared_cache_key_type id,
std::unique_ptr<cql3::query_options> qo) override
{
auto qs = make_query_state();
bool needs_authorization = false;
// First, try to lookup in the cache of already authorized statements. If the corresponding entry is not found there
// look for the prepared statement and then authorize it.
auto prepared = local_qp().get_prepared(qs->get_client_state().user(), id);
if (!prepared) {
needs_authorization = true;
prepared = local_qp().get_prepared(id);
}
if (!prepared) {
throw not_prepared_exception(id);
}
auto stmt = prepared->statement;
SCYLLA_ASSERT(stmt->get_bound_terms() == qo->get_values_count());
qo->prepare(prepared->bound_names);
auto& lqo = *qo;
return local_qp().execute_prepared_without_checking_exception_message(*qs, std::move(stmt), lqo, std::move(prepared), std::move(id), needs_authorization)
.then([qs, qo = std::move(qo)] (auto msg) {
return cql_transport::messages::propagate_exception_as_future(std::move(msg));
});
}
virtual future<utils::chunked_vector<mutation>> get_modification_mutations(const sstring& text) override {
auto qs = make_query_state();
auto cql_stmt = local_qp().get_statement(text, qs->get_client_state(), test_dialect())->statement;
auto modif_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(std::move(cql_stmt));
if (!modif_stmt) {
throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text));
}
auto& qo = cql3::query_options::DEFAULT;
auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout;
cql3::statements::modification_statement::json_cache_opt json_cache = modif_stmt->maybe_prepare_json_cache(qo);
std::vector<dht::partition_range> keys = modif_stmt->build_partition_keys(qo, json_cache);
return modif_stmt->get_mutations(local_qp(), qo, timeout, false, qo.get_timestamp(*qs), *qs, json_cache, keys)
.finally([qs, modif_stmt = std::move(modif_stmt)] {});
}
virtual future<> create_table(std::function<schema(std::string_view)> schema_maker) override {
schema_builder builder(make_lw_shared<schema>(schema_maker(ks_name)));
auto s = builder.build(schema_builder::compact_storage::no);
auto group0_guard = co_await _mm.local().start_group0_operation();
auto ts = group0_guard.write_timestamp();
co_return co_await _mm.local().announce(co_await service::prepare_new_column_family_announcement(_proxy.local(), s, ts), std::move(group0_guard), "");
}
virtual service::client_state& local_client_state() override {
return _core_local.local().client_state;
}
virtual replica::database& local_db() override {
return _db.local();
}
virtual sharded<locator::shared_token_metadata>& shared_token_metadata() override {
return _token_metadata;
}
cql3::query_processor& local_qp() override {
return _qp.local();
}
sharded<replica::database>& db() override {
return _db;
}
sharded<cql3::query_processor>& qp() override {
return _qp;
}
auth::service& local_auth_service() override {
return _auth_service.local();
}
virtual sharded<db::view::view_builder>& view_builder() override {
return _view_builder;
}
virtual db::view::view_builder& local_view_builder() override {
return _view_builder.local();
}
virtual db::view::view_update_generator& local_view_update_generator() override {
return _view_update_generator.local();
}
virtual sharded<db::view::view_building_worker>& view_building_worker() override {
return _view_building_worker;
}
virtual service::migration_notifier& local_mnotifier() override {
return _mnotifier.local();
}
virtual sharded<service::migration_manager>& migration_manager() override {
return _mm;
}
virtual sharded<db::batchlog_manager>& batchlog_manager() override {
return _batchlog_manager;
}
virtual sharded<netw::messaging_service>& get_messaging_service() override {
return _ms;
}
virtual sharded<gms::gossiper>& gossiper() override {
return _gossiper;
}
virtual service::raft_group0_client& get_raft_group0_client() override {
return *_group0_client;
}
virtual sharded<service::raft_group_registry>& get_raft_group_registry() override {
return _group0_registry;
}
virtual sharded<db::system_keyspace>& get_system_keyspace() override {
return _sys_ks;
}
virtual sharded<service::tablet_allocator>& get_tablet_allocator() override {
return _tablet_allocator;
}
virtual sharded<service::storage_proxy>& get_storage_proxy() override {
return _proxy;
}
virtual sharded<gms::feature_service>& get_feature_service() override {
return _feature_service;
}
virtual sharded<sstables::storage_manager>& get_sstorage_manager() override {
return _sstm;
}
virtual sharded<service::storage_service>& get_storage_service() override {
return _ss;
}
virtual sharded<tasks::task_manager>& get_task_manager() override {
return _task_manager;
}
virtual sharded<locator::shared_token_metadata>& get_shared_token_metadata() override {
return _token_metadata;
}
virtual sharded<service::topology_state_machine>& get_topology_state_machine() override {
return _topology_state_machine;
}
virtual future<> refresh_client_state() override {
return _core_local.invoke_on_all([] (core_local_state& state) {
state.client_state.maybe_update_per_service_level_params();
});
}
future<> create_keyspace(const cql_test_config& cfg, std::string_view name) {
sstring tablets = "";
if (cfg.initial_tablets) {
tablets = format(" and tablets = {{'initial' : {}}}", *cfg.initial_tablets);
} else if (cfg.db_config->enable_tablets()) {
tablets = " and tablets = {'enabled' : true}";
}
auto query = seastar::format("create keyspace {} with replication = {{ 'class' : 'org.apache.cassandra.locator.NetworkTopologyStrategy', 'replication_factor' : 1}}{};", name,
tablets);
return execute_cql(query).discard_result();
}
virtual sharded<vector_search::vector_store_client>& vector_store_client() override {
return _vector_store_client;
}
static future<> do_with(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
return seastar::async([cfg_in = std::move(cfg_in), init_configurables = std::move(init_configurables), func] {
logalloc::prime_segment_pool(memory::stats().total_memory(), memory::min_free_memory()).get();
bool old_active = false;
if (!active.compare_exchange_strong(old_active, true)) {
throw std::runtime_error("Starting more than one cql_test_env at a time not supported due to singletons.");
}
auto deactivate = defer([] {
bool old_active = true;
auto success = active.compare_exchange_strong(old_active, false);
SCYLLA_ASSERT(success);
});
// FIXME: make the function storage non static
auto clear_funcs = defer([] {
smp::invoke_on_all([] () {
cql3::functions::change_batch batch;
batch.clear_functions();
batch.commit();
}).get();
});
single_node_cql_env env;
env.run_in_thread(std::move(func), std::move(cfg_in), std::move(init_configurables));
});
}
static void do_with_noreentrant_thread(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
single_node_cql_env env;
env.run_in_thread(std::move(func), std::move(cfg_in), std::move(init_configurables));
}
private:
static auto defer_verbose_shutdown(const char* what, std::function<void()> func) {
return defer([what, func = std::move(func)] {
testlog.info("Shutting down {}", what);
try {
func();
testlog.info("Shutting down {} was successful", what);
} catch (...) {
testlog.error("Unexpected error shutting down {}: {}", what, std::current_exception());
throw;
}
});
}
void run_in_thread(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
using namespace std::filesystem;
// disable reactor stall detection during startup
auto blocked_reactor_notify_ms = engine().get_blocked_reactor_notify_ms();
smp::invoke_on_all([] {
engine().update_blocked_reactor_notify_ms(std::chrono::milliseconds(1000000));
}).get();
sharded<abort_source> abort_sources;
abort_sources.start().get();
// FIXME: handle signals (SIGINT, SIGTERM) - request aborts
auto stop_abort_sources = defer([&] { abort_sources.stop().get(); });
debug::the_database = &_db;
auto reset_db_ptr = defer([] {
debug::the_database = nullptr;
});
auto cfg = cfg_in.db_config;
if (!cfg->reader_concurrency_semaphore_serialize_limit_multiplier.is_set()) {
cfg->reader_concurrency_semaphore_serialize_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
}
if (!cfg->reader_concurrency_semaphore_kill_limit_multiplier.is_set()) {
cfg->reader_concurrency_semaphore_kill_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
}
if (!cfg->view_update_reader_concurrency_semaphore_serialize_limit_multiplier.is_set()) {
cfg->view_update_reader_concurrency_semaphore_serialize_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
}
if (!cfg->view_update_reader_concurrency_semaphore_kill_limit_multiplier.is_set()) {
cfg->view_update_reader_concurrency_semaphore_kill_limit_multiplier.set(std::numeric_limits<uint32_t>::max());
}
cfg->critical_disk_utilization_level.set(1.0f);
tmpdir data_dir;
auto data_dir_path = data_dir.path().string();
if (!cfg->data_file_directories.is_set()) {
cfg->data_file_directories.set({data_dir_path});
} else {
data_dir_path = cfg->data_file_directories()[0];
}
cfg->commitlog_directory.set(data_dir_path + "/commitlog.dir");
cfg->schema_commitlog_directory.set(cfg->commitlog_directory() + "/schema");
cfg->hints_directory.set(data_dir_path + "/hints.dir");
cfg->view_hints_directory.set(data_dir_path + "/view_hints.dir");
cfg->logstor_directory.set(data_dir_path + "/logstor.dir");
cfg->num_tokens.set(256);
cfg->ring_delay_ms.set(500);
cfg->shutdown_announce_in_ms.set(0);
cfg->broadcast_to_all_shards().get();
smp::invoke_on_all([&] {
sstables::global_cache_index_pages = cfg->cache_index_pages.operator utils::updateable_value<bool>();
}).get();
create_directories((data_dir_path + "/system").c_str());
create_directories(cfg->commitlog_directory().c_str());
create_directories(cfg->schema_commitlog_directory().c_str());
create_directories(cfg->logstor_directory().c_str());
create_directories(cfg->hints_directory().c_str());
create_directories(cfg->view_hints_directory().c_str());
for (unsigned i = 0; i < smp::count; ++i) {
create_directories((cfg->hints_directory() + "/" + std::to_string(i)).c_str());
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
}
if (!cfg->max_memory_for_unlimited_query_soft_limit.is_set()) {
cfg->max_memory_for_unlimited_query_soft_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
}
if (!cfg->max_memory_for_unlimited_query_hard_limit.is_set()) {
cfg->max_memory_for_unlimited_query_hard_limit.set(uint64_t(query::result_memory_limiter::unlimited_result_size));
}
auto scheduling_groups = get_scheduling_groups().get();
debug::streaming_scheduling_group = scheduling_groups.streaming_scheduling_group;
debug::gossip_scheduling_group = scheduling_groups.streaming_scheduling_group;
auto notify_set = init_configurables
? configurable::init_all(*cfg, init_configurables->extensions, service_set(
_db, _ss, _mm, _proxy, _feature_service, _ms, _qp, _batchlog_manager
)).get()
: configurable::notify_set{}
;
auto stop_configurables = defer_verbose_shutdown("configurables", [&] { notify_set.notify_all(configurable::system_state::stopped).get(); });
auto schema_initializer_checkpoint = schema_builder::capture_schema_initializers_checkpoint();
register_compression_initializer(*cfg, [this] {
return bool(_feature_service.local().sstable_compression_dicts);
});
// Important to restore schema initializers during shutdown to
// support tests that repeatedly create `cql_test_env` instances.
auto unregister_schema_initializers = defer_verbose_shutdown("schema initializers", [schema_initializer_checkpoint] {
schema_builder::restore_schema_initializers_checkpoint(schema_initializer_checkpoint);
});
replica::set_strongly_consistent_tables_enabled(cfg->check_experimental(
db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES
));
gms::feature_config fcfg;
fcfg.disabled_features = get_disabled_features_from_db_config(*cfg, cfg_in.disabled_features);
_feature_service.start(fcfg).get();
auto stop_feature_service = defer_verbose_shutdown("feature service", [this] { _feature_service.stop().get(); });
auto my_address = cfg_in.broadcast_address;
locator::snitch_config snitch_config;
snitch_config.listen_address = my_address;
snitch_config.broadcast_address = my_address;
_snitch.start(snitch_config).get();
auto stop_snitch = defer_verbose_shutdown("snitch", [this] { _snitch.stop().get(); });
_snitch.invoke_on_all(&locator::snitch_ptr::start).get();
locator::token_metadata::config tm_cfg;
tm_cfg.topo_cfg.this_endpoint = my_address;
tm_cfg.topo_cfg.this_cql_address = my_address;
tm_cfg.topo_cfg.local_dc_rack = { _snitch.local()->get_datacenter(), _snitch.local()->get_rack() };
_token_metadata.start([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg).get();
auto stop_token_metadata = defer_verbose_shutdown("token metadata", [this] { _token_metadata.stop().get(); });
_erm_factory.start().get();
auto stop_erm_factory = deferred_stop(_erm_factory);
_mnotifier.start().get();
auto stop_mm_notify = defer_verbose_shutdown("migration manager notifier", [this] { _mnotifier.stop().get(); });
_sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
auto stop_sst_dir_sem = defer_verbose_shutdown("sst dir semaphore", [this] {
_sst_dir_semaphore.stop().get();
});
replica::database_config dbcfg;
if (cfg_in.dbcfg) {
dbcfg = std::move(*cfg_in.dbcfg);
} else {
dbcfg.available_memory = memory::stats().total_memory();
}
dbcfg.compaction_scheduling_group = scheduling_groups.compaction_scheduling_group;
dbcfg.memory_compaction_scheduling_group = scheduling_groups.memory_compaction_scheduling_group;
dbcfg.streaming_scheduling_group = scheduling_groups.streaming_scheduling_group;
dbcfg.statement_scheduling_group = scheduling_groups.statement_scheduling_group;
dbcfg.memtable_scheduling_group = scheduling_groups.memtable_scheduling_group;
dbcfg.memtable_to_cache_scheduling_group = scheduling_groups.memtable_to_cache_scheduling_group;
dbcfg.gossip_scheduling_group = scheduling_groups.gossip_scheduling_group;
auto get_tm_cfg = sharded_parameter([&] {
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
};
});
_task_manager.start(std::move(get_tm_cfg), std::ref(abort_sources)).get();
auto stop_task_manager = defer_verbose_shutdown("task manager", [this] {
_task_manager.stop().get();
});
utils::disk_space_monitor::config dsm_cfg = {
.sched_group = scheduling_groups.streaming_scheduling_group,
.normal_polling_interval = cfg->disk_space_monitor_normal_polling_interval_in_seconds,
.high_polling_interval = cfg->disk_space_monitor_high_polling_interval_in_seconds,
.polling_interval_threshold = cfg->disk_space_monitor_polling_interval_threshold,
.capacity_override = cfg->data_file_capacity,
};
_disk_space_monitor_shard0.emplace(abort_sources.local(), data_dir_path, dsm_cfg);
_disk_space_monitor_shard0->start().get();
auto stop_dsm = defer_verbose_shutdown("disk space monitor", [this] { _disk_space_monitor_shard0->stop().get(); });
// get_cm_cfg is called on each shard when starting a sharded<compaction_manager>
// we need the getter since updateable_value is not shard-safe (#7316)
auto get_cm_cfg = sharded_parameter([&] {
return compaction::compaction_manager::config {
.compaction_sched_group = compaction::compaction_manager::scheduling_group{dbcfg.compaction_scheduling_group},
.maintenance_sched_group = compaction::compaction_manager::scheduling_group{dbcfg.streaming_scheduling_group},
.available_memory = dbcfg.available_memory,
.static_shares = cfg->compaction_static_shares,
.max_shares = cfg->compaction_max_shares,
.throughput_mb_per_sec = cfg->compaction_throughput_mb_per_sec,
.flush_all_tables_before_major = cfg->compaction_flush_all_tables_before_major_seconds() * 1s,
};
});
_cm.start(std::move(get_cm_cfg), std::ref(abort_sources), std::ref(_task_manager)).get();
auto stop_cm = deferred_stop(_cm);
_cm.invoke_on_all(&compaction::compaction_manager::start, std::ref(*cfg), only_on_shard0(&*_disk_space_monitor_shard0)).get();
_sstm.start(std::ref(*cfg), sstables::storage_manager::config{}).get();
auto stop_sstm = deferred_stop(_sstm);
_sl_controller.start(std::ref(_auth_service), std::ref(_token_metadata), std::ref(abort_sources), qos::service_level_options{.shares = 1000}, scheduling_supergroup(), scheduling_groups.statement_scheduling_group).get();
auto stop_sl_controller = defer_verbose_shutdown("service level controller", [this] { _sl_controller.stop().get(); });
_sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
lang::manager::config lang_config;
lang_config.lua.max_bytes = cfg->user_defined_function_allocation_limit_bytes();
lang_config.lua.max_contiguous = cfg->user_defined_function_contiguous_allocation_limit_bytes();
lang_config.lua.timeout = std::chrono::milliseconds(cfg->user_defined_function_time_limit_ms());
if (cfg->enable_user_defined_functions() && cfg->check_experimental(db::experimental_features_t::feature::UDF)) {
lang_config.wasm = lang::manager::wasm_config {
.udf_memory_limit = cfg->wasm_udf_memory_limit(),
.cache_size = dbcfg.available_memory * cfg->wasm_cache_memory_fraction(),
.cache_instance_size = cfg->wasm_cache_instance_size_limit(),
.cache_timer_period = std::chrono::milliseconds(cfg->wasm_cache_timeout_in_ms()),
.yield_fuel = cfg->wasm_udf_yield_fuel(),
.total_fuel = cfg->wasm_udf_total_fuel(),
};
}
_lang_manager.start(lang_config).get();
auto stop_lang_manager = defer_verbose_shutdown("lang manager", [this] { _lang_manager.stop().get(); });
_lang_manager.invoke_on_all(&lang::manager::start).get();
auto numa_groups = local_engine->smp().shard_to_numa_node_mapping();
_scf.start(sharded_parameter(default_sstable_compressor_factory::config::from_db_config, std::cref(*cfg), std::cref(numa_groups))).get();
auto stop_scf = defer_verbose_shutdown("sstable_compressor_factory", [this] {
_scf.stop().get();
});
_db_config = &*cfg;
_db.start(std::ref(*cfg), dbcfg, std::ref(_mnotifier), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_cm), std::ref(_sstm), std::ref(_lang_manager), std::ref(_sst_dir_semaphore), std::ref(_scf), std::ref(abort_sources), utils::cross_shard_barrier()).get();
auto stop_db = defer_verbose_shutdown("database", [this] {
_db.stop().get();
});
_db.invoke_on_all(&replica::database::start, std::ref(_sl_controller), only_on_shard0(&*_disk_space_monitor_shard0)).get();
smp::invoke_on_all([blocked_reactor_notify_ms] {
engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms);
}).get();
service::storage_proxy::config spcfg {
.hints_directory_initializer = db::hints::directory_initializer::make_dummy(),
};
spcfg.available_memory = memory::stats().total_memory();
db::view::node_update_backlog b(smp::count, 10ms);
scheduling_group_key_config sg_conf =
make_scheduling_group_key_config<service::storage_proxy_stats::stats>();
_proxy.start(std::ref(_db), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get(), std::ref(_feature_service), std::ref(_token_metadata), std::ref(_erm_factory)).get();
auto stop_proxy = defer_verbose_shutdown("storage proxy", [this] { _proxy.stop().get(); });
_cql_config.start(cql3::cql_config::default_tag{}).get();
auto stop_cql_config = defer_verbose_shutdown("cql config", [this] { _cql_config.stop().get(); });
cql3::query_processor::memory_config qp_mcfg;
if (cfg_in.qp_mcfg) {
qp_mcfg = *cfg_in.qp_mcfg;
} else {
qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
}
auto local_data_dict = seastar::sharded_parameter([] (const replica::database& db) { return db.as_data_dictionary(); }, std::ref(_db));
utils::loading_cache_config auth_prep_cache_config;
auth_prep_cache_config.max_size = qp_mcfg.authorized_prepared_cache_size;
auth_prep_cache_config.expiry = std::min(std::chrono::milliseconds(cfg->permissions_validity_in_ms()),
std::chrono::duration_cast<std::chrono::milliseconds>(cql3::prepared_statements_cache::entry_expiry));
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
_vector_store_client.start(std::ref(*cfg)).get();
auto stop_vector_store_client = defer_verbose_shutdown("vector store client", [this] {
_vector_store_client.stop().get();
});
_qp.start(std::ref(_proxy), std::move(local_data_dict), std::ref(_mnotifier), std::ref(_vector_store_client), qp_mcfg, std::ref(_cql_config), auth_prep_cache_config, std::ref(_lang_manager)).get();
auto stop_qp = defer_verbose_shutdown("query processor", [this] { _qp.stop().get(); });
_elc_notif.start().get();
auto stop_elc_notif = defer_verbose_shutdown("lifecycle notifier", [this] { _elc_notif.stop().get(); });
set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
_sys_ks.start(std::ref(_qp), std::ref(_db)).get();
auto stop_sys_kd = defer_verbose_shutdown("system keyspace", [this] {
_sys_ks.stop().get();
});
replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db).get();
_db.local().init_schema_commitlog();
_sys_ks.invoke_on_all(&db::system_keyspace::mark_writable).get();
_sys_ks.local().build_bootstrap_info().get();
auto host_id = cfg_in.host_id;
if (!host_id) {
auto linfo = _sys_ks.local().load_local_info().get();
if (!linfo.host_id) {
linfo.host_id = locator::host_id::create_random_id();
}
const auto location = _snitch.local()->get_location();
linfo.dc = location.dc;
linfo.rack = location.rack;
host_id = linfo.host_id;
_sys_ks.local().save_local_info(std::move(linfo), my_address, my_address).get();
}
locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = host_id] (locator::token_metadata& tm) {
auto& topo = tm.get_topology();
topo.set_host_id_cfg(hostid);
topo.add_or_update_endpoint(hostid,
std::nullopt,
locator::node::state::normal,
smp::count);
return make_ready_future<>();
}).get();
_gossip_address_map.start().get();
auto stop_gossip_address_map = defer_verbose_shutdown("gossip address map", [this] {
_gossip_address_map.stop().get();
});
_task_manager.invoke_on_all([&] (auto& tm) {
tm.set_host_id(host_id);
}).get();
auto arct_cfg = [&] {
return netw::advanced_rpc_compressor::tracker::config{
.zstd_quota_fraction{1.0},
.register_metrics = true,
};
};
_compressor_tracker.start(arct_cfg).get();
auto stop_compressor_tracker = defer_verbose_shutdown("compressor tracker", [this] { _compressor_tracker.stop().get(); });
uint16_t port = 7000;
seastar::server_socket tmp;
// #20543 - if we should actually use message_server::listen, we need to find an unused port.
// Create a dummy socket to pick the port for us. Unfortunately, due to reactor::posix_reuseport_detect()
// currently giving back false always, we can't simply do this though. Thus this ugly loop.
// Adding the stop defer before actually creating the service should be fine.
auto stop_ms_func = [this] { _ms.stop().get(); };
using stop_type = decltype(stop_ms_func);
std::optional<decltype(defer_verbose_shutdown("", stop_type(stop_ms_func)))> stop_ms;
do {
stop_ms = std::nullopt;
try {
if (cfg_in.ms_listen) {
tmp = seastar::listen(seastar::socket_address(listen, 0), listen_options{true});
port = tmp.local_address().port();
}
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service),
std::ref(_gossip_address_map), gms::generation_type{}, std::ref(_compressor_tracker),
std::ref(_sl_controller)).get();
stop_ms = defer_verbose_shutdown("messaging service", stop_type(stop_ms_func));
if (cfg_in.ms_listen) {
// FIXME: should not need to do this - it makes this whole thing unsafe. But
// reactor::posix_reuseport_detect() currently always returns false, thus
// trying to grab a port and reusing it properly here does _not_ work at all.
// Once the seastar issue is fixed, we can just keep the tmp socket aliva across
// the listen invoke below.
tmp = {};
_ms.invoke_on_all(&netw::messaging_service::start_listen, std::ref(_token_metadata), [host_id] (gms::inet_address ip) {return host_id; }).get();
}
} catch (std::system_error& e) {
// if we still hit a used port (quick other process), just shut down ms and try again.
if (port != 7000 && e.code().category() == std::system_category() && e.code().value() == EADDRINUSE) {
continue;
}
throw;
}
} while (false);
// Normally the auth server is already stopped in here,
// but if there is an initialization failure we have to
// make sure to stop it now or ~sharded will SCYLLA_ASSERT.
auto stop_auth_server = defer_verbose_shutdown("auth service", [this] {
_auth_service.stop().get();
});
auto stop_sys_dist_ks = defer_verbose_shutdown("system distributed keyspace", [this] { _sys_dist_ks.stop().get(); });
// Init gossiper
std::set<gms::inet_address> seeds;
auto seed_provider = db::config::seed_provider_type();
if (seed_provider.parameters.contains("seeds")) {
for (const auto& seed : utils::split_comma_separated_list(seed_provider.parameters.at("seeds"))) {
seeds.emplace(seed);
}
}
if (seeds.empty()) {
seeds.emplace("127.0.0.1");
}
_topology_state_machine.start().get();
auto stop_topology_state_machine = defer_verbose_shutdown("topology state machine", [this] {
_topology_state_machine.stop().get();
});
gms::gossip_config gcfg;
gcfg.cluster_name = "Test Cluster";
gcfg.seeds = std::move(seeds);
gcfg.shutdown_announce_ms = 0;
_gossiper.start(std::ref(abort_sources), std::ref(_token_metadata), std::ref(_ms), std::move(gcfg), std::ref(_gossip_address_map), std::ref(_topology_state_machine)).get();
auto stop_ms_fd_gossiper = defer_verbose_shutdown("gossiper", [this] {
_gossiper.stop().get();
});
_gossiper.invoke_on_all(&gms::gossiper::start).get();
_fd_pinger.start(std::ref(_ms)).get();
auto stop_fd_pinger = defer_verbose_shutdown("fd pinger", [this] { _fd_pinger.stop().get(); });
service::direct_fd_clock fd_clock;
_fd.start(
std::ref(_fd_pinger), std::ref(fd_clock),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{100}}.count(),
service::direct_fd_clock::base::duration{std::chrono::milliseconds{600}}.count(), gcfg.gossip_scheduling_group).get();
auto stop_fd = defer_verbose_shutdown("direct failure detector", [this] {
_fd.stop().get();
});
_group0_registry.start(
raft::server_id{host_id.id},
std::ref(_ms), std::ref(_fd)).get();
auto stop_raft_gr = deferred_stop(_group0_registry);
_feature_service.invoke_on_all([] (auto& fs) {
return fs.enable(fs.supported_feature_set());
}).get();
_mapreduce_service.start(std::ref(_ms), std::ref(_proxy), std::ref(_db), std::ref(abort_sources)).get();
auto stop_mapreduce_service = defer_verbose_shutdown("mapreduce service", [this] { _mapreduce_service.stop().get(); });
// gropu0 client exists only on shard 0
service::raft_group0_client group0_client(_group0_registry.local(), _gossiper.local(), _sys_ks.local(), _token_metadata.local(), maintenance_mode_enabled::no);
_mm.start(std::ref(_mnotifier), std::ref(_feature_service), std::ref(_ms), std::ref(_proxy), std::ref(_gossiper), std::ref(group0_client), std::ref(_sys_ks)).get();
auto stop_mm = defer_verbose_shutdown("migration manager", [this] { _mm.stop().get(); });
_tablet_allocator.start(service::tablet_allocator::config{}, std::ref(_mnotifier), std::ref(_db)).get();
auto stop_tablet_allocator = defer_verbose_shutdown("tablet allocator", [this] {
_tablet_allocator.stop().get();
});
_view_building_state_machine.start().get();
auto stop_view_building_state_machine = defer_verbose_shutdown("view building state machine", [this] {
_view_building_state_machine.stop().get();
});
service::raft_group0 group0_service{
abort_sources.local(), _group0_registry.local(), _ms,
_gossiper.local(), _feature_service.local(), group0_client, scheduling_groups.gossip_scheduling_group};
auto compression_dict_updated_callback = [] (std::string_view) { return make_ready_future<>(); };
_sys_dist_ks.start(std::ref(_qp), std::ref(_mm), std::ref(_proxy)).get();
_view_update_generator.start(std::ref(_db), std::ref(_proxy), std::ref(abort_sources)).get();
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [this] {
_view_update_generator.stop().get();
});
_view_builder.start(std::ref(_db), std::ref(_sys_ks), std::ref(_sys_dist_ks), std::ref(_mnotifier), std::ref(_view_update_generator), std::ref(group0_client), std::ref(_qp)).get();
auto stop_view_builder = defer_verbose_shutdown("view builder", [this] {
_view_builder.stop().get();
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_view_building_worker), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_auth_cache.start(std::ref(_qp), std::ref(abort_sources)).get();
auto stop_auth_cache = defer_verbose_shutdown("auth cache", [this] { _auth_cache.stop().get(); });
_groups_manager.start(std::ref(_ms), std::ref(_group0_registry), std::ref(_qp),
std::ref(_db), std::ref(_mm), std::ref(_sys_ks), std::ref(_feature_service)).get();
auto stop_groups_manager = defer_verbose_shutdown("strongly consistent groups manager", [this] { _groups_manager.stop().get(); });
_sc_coordinator.start(std::ref(_groups_manager), std::ref(_db), std::ref(_gossiper)).get();
auto stop_sc_coordinator = defer_verbose_shutdown("strongly consistent coordinator", [this] {
_sc_coordinator.stop().get();
});
_ss.start(std::ref(abort_sources), std::ref(_db),
std::ref(_gossiper),
std::ref(_sys_ks),
std::ref(_sys_dist_ks),
std::ref(_feature_service), std::ref(_mm),
std::ref(_token_metadata), std::ref(_erm_factory), std::ref(_ms),
std::ref(_repair),
std::ref(_stream_manager),
std::ref(_elc_notif),
std::ref(_batchlog_manager),
std::ref(_snitch),
std::ref(_tablet_allocator),
std::ref(_cdc_generation_service),
std::ref(_view_builder), std::ref(_view_building_worker),
std::ref(_qp),
std::ref(_sl_controller),
std::ref(_auth_cache),
std::ref(_client_routes),
std::ref(_topology_state_machine),
std::ref(_view_building_state_machine),
std::ref(_task_manager),
std::ref(_gossip_address_map),
compression_dict_updated_callback,
only_on_shard0(&*_disk_space_monitor_shard0),
std::ref(_groups_manager)
).get();
auto stop_storage_service = defer_verbose_shutdown("storage service", [this] { _ss.stop().get(); });
_mnotifier.local().register_listener(&_ss.local());
auto stop_mm_listener = defer_verbose_shutdown("storage service notifications", [this] {
_mnotifier.local().unregister_listener(&_ss.local()).get();
});
smp::invoke_on_all([&] {
return db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, _tablet_allocator, _ms, *cfg);
}).get();
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client,
_sc_coordinator.local());
}).get();
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [this] {
_qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
});
_cm.invoke_on_all([&](compaction::compaction_manager& cm) {
auto cl = _db.local().commitlog();
auto scl = _db.local().schema_commitlog();
if (cl && scl) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) {
return std::min(cl->min_gc_time(id), scl->min_gc_time(id));
});
} else if (cl) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) {
return cl->min_gc_time(id);
});
} else if (scl) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) {
return scl->min_gc_time(id);
});
}
}).get();
replica::distributed_loader::init_non_system_keyspaces(_db, _proxy, _sys_ks).get();
_db.invoke_on_all([] (replica::database& db) {
db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr<replica::table> table) {
replica::table& t = *table;
t.enable_auto_compaction();
});
}).get();
_group0_registry.invoke_on_all([] (service::raft_group_registry& raft_gr) {
return raft_gr.start();
}).get();
auto shutdown_db = defer_verbose_shutdown("database tables", [this] {
_db.invoke_on_all(&replica::database::shutdown).get();
});
_view_update_generator.invoke_on_all(&db::view::view_update_generator::start).get();
_paxos_store.start(std::ref(_sys_ks), std::ref(_feature_service), std::ref(_db), std::ref(_mm)).get();
auto stop_paxos_store = defer_verbose_shutdown("paxos store", [this] {
_paxos_store.stop().get();
});
if (cfg_in.need_remote_proxy) {
_proxy.invoke_on_all(&service::storage_proxy::start_remote, std::ref(_ms), std::ref(_gossiper), std::ref(_mm), std::ref(_sys_ks), std::ref(_paxos_store), std::ref(group0_client), std::ref(_topology_state_machine), std::ref(_view_building_state_machine)).get();
}
auto stop_proxy_remote = defer_verbose_shutdown("storage proxy RPC verbs", [this, need = cfg_in.need_remote_proxy] {
if (need) {
_proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
}
});
_sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
make_shared<qos::raft_service_level_distributed_data_accessor>(_qp.local(), group0_client));
return service.set_distributed_data_accessor(std::move(service_level_data_accessor));
}).get();
cdc::generation_service::config cdc_config;
/*
* Currently used when choosing the timestamp of the first CDC stream generation:
* normally we choose a timestamp in the future so other nodes have a chance to learn about it
* before it starts operating, but in the single-node-cluster case this is not necessary
* and would only slow down tests (by having them wait).
*/
cdc_config.ring_delay = std::chrono::milliseconds(0);
_cdc_generation_service.start(std::ref(cdc_config), std::ref(_sys_ks), std::ref(_db)).get();
auto stop_cdc_generation_service = defer_verbose_shutdown("CDC generation service", [this] {
_cdc_generation_service.stop().get();
});
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };
_cdc.start(std::ref(_proxy), sharded_parameter(get_cdc_metadata, std::ref(_cdc_generation_service)), std::ref(_mnotifier)).get();
auto stop_cdc_service = defer_verbose_shutdown("CDC log service", [this] {
_cdc.stop().get();
});
group0_service.start().get();
auto stop_group0_service = defer_verbose_shutdown("group 0 service", [&group0_service] {
group0_service.abort_and_drain().get();
group0_service.destroy();
});
_ss.local().set_group0(group0_service);
// Load address_map from system.peers and subscribe to gossiper events to keep it updated.
_ss.local().init_address_map(_gossip_address_map.local()).get();
auto cancel_address_map_subscription = defer_verbose_shutdown("storage service address map subscription", [this] {
_ss.local().uninit_address_map().get();
});
auto stop_group0_usage_in_storage_service = defer_verbose_shutdown("group 0 usage in storage service", [this] {
_ss.local().wait_for_group0_stop().get();
});
group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get();
_groups_manager.invoke_on_all([](service::strong_consistency::groups_manager& m) {
return m.start();
}).get();
_view_building_worker.start(std::ref(_db), std::ref(_sys_ks), std::ref(_mnotifier), std::ref(group0_service), std::ref(_view_update_generator), std::ref(_ms), std::ref(_view_building_state_machine)).get();
auto stop_view_building_worker = defer_verbose_shutdown("view building worker", [this] {
_view_building_worker.stop().get();
});
const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get());
try {
_ss.local().join_cluster(_proxy, service::start_hint_manager::no, generation_number).get();
} catch (std::exception& e) {
// if any of the defers crashes too, we'll never see
// the error
testlog.error("Failed to join cluster: {}", e);
throw;
}
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
_db.local().check_rf_rack_validity(_token_metadata.local().get());
_auth_service.start(std::ref(_qp), std::ref(group0_client),
auth::make_authorizer_factory(cfg->authorizer(), _qp),
auth::make_authenticator_factory(cfg->authenticator(), _qp, group0_client, _mm, _auth_cache),
auth::make_role_manager_factory(cfg->role_manager(), _qp, group0_client, _mm, _auth_cache),
maintenance_socket_enabled::no, std::ref(_auth_cache)).get();
_auth_service.invoke_on_all([this] (auth::service& auth) {
return auth.start(_mm.local(), _sys_ks.local());
}).get();
auto deinit_storage_service_server = defer_verbose_shutdown("auth service", [this] {
// #21159 don't shutdown gossip here - we don't in main.cc, and we should
// strive to keep the two paths aligned. Doing a gossip::shutdown here
// can, if we've provoked a storage_manager::isolate, cause parallel
// double execution of the shutdown method, which causes waiting for
// an invalid future if we're unlucky.
_auth_service.stop().get();
});
// Precondition: we can only call this after `auth::service` has been initialized and started on all shards.
_sl_controller.invoke_on_all([&auth_service = _auth_service] (qos::service_level_controller& controller) {
controller.register_auth_integration(auth_service.local());
}).get();
auto unregister_sl_controller_integration = defer([this] {
_sl_controller.invoke_on_all([] (qos::service_level_controller& controller) {
return controller.unregister_auth_integration();
}).get();
});
db::batchlog_manager_config bmcfg;
bmcfg.replay_rate = 100000000;
bmcfg.replay_timeout = cfg_in.batchlog_replay_timeout.value_or(2s);
bmcfg.delay = cfg_in.batchlog_delay;
bmcfg.replay_cleanup_after_replays = cfg->batchlog_replay_cleanup_after_replays();
_batchlog_manager.start(std::ref(_qp), std::ref(_sys_ks), std::ref(_feature_service), bmcfg).get();
auto stop_bm = defer_verbose_shutdown("batchlog manager", [this] {
_batchlog_manager.stop().get();
});
_view_builder.invoke_on_all([this] (db::view::view_builder& vb) {
return vb.start(_mm.local());
}).get();
auto drain_view_builder = defer_verbose_shutdown("view builder operations", [this] {
_view_builder.invoke_on_all(&db::view::view_builder::drain).get();
});
// Create the testing user.
try {
auth::role_config config;
config.is_superuser = true;
config.can_login = true;
auto& as = abort_sources.local();
auto guard = group0_client.start_operation(as).get();
service::group0_batch mc{std::move(guard)};
auth::create_role(
_auth_service.local(),
testing_superuser,
config,
auth::authentication_options(),
mc).get();
if (cfg->authenticator() == "PasswordAuthenticator") {
auth::authentication_options auth_opts;
auth_opts.credentials = auth::password_option{"cassandra"};
auth::create_role(
_auth_service.local(),
"cassandra",
config,
auth_opts,
mc).get();
}
std::move(mc).commit(group0_client, as, ::service::raft_timeout{}).get();
} catch (const auth::role_already_exists&) {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}
notify_set.notify_all(configurable::system_state::started).get();
_group0_client = &group0_client;
_core_local.start(std::ref(_auth_service), std::ref(_sl_controller), cfg_in.query_timeout.value_or(infinite_timeout_config)).get();
auto stop_core_local = defer_verbose_shutdown("local client state", [this] { _core_local.stop().get(); });
if (!local_db().has_keyspace(ks_name)) {
create_keyspace(cfg_in, ks_name).get();
}
try {
with_scheduling_group(dbcfg.statement_scheduling_group, [&func, this] {
return func(*this);
}).get();
} catch (...) {
namespace utf = boost::unit_test::framework;
testlog.error("{}cql env callback failed, error: {}",
utf::test_in_progress() ? utf::current_test_unit().p_name.get() + ": " : "",
std::current_exception());
throw;
}
}
public:
future<::shared_ptr<cql_transport::messages::result_message>> execute_batch(
const std::vector<std::string_view>& queries,
cql3::statements::batch_statement::type batch_type,
std::unique_ptr<cql3::query_options> qo) override {
using cql3::statements::batch_statement;
using cql3::statements::modification_statement;
testlog.trace("{}(type={}):\n {}",
__FUNCTION__, batch_type == batch_statement::type::LOGGED ? "LOGGED" : "UNLOGGED", fmt::join(queries, "\n "));
std::vector<batch_statement::single_statement> modifications;
std::ranges::transform(queries, back_inserter(modifications), [this](const auto& query) {
auto stmt = local_qp().get_statement(query, _core_local.local().client_state, test_dialect());
if (!dynamic_cast<modification_statement*>(stmt->statement.get())) {
throw exceptions::invalid_request_exception(
"Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed.");
}
return batch_statement::single_statement(static_pointer_cast<modification_statement>(stmt->statement));
});
auto batch = ::make_shared<batch_statement>(
batch_type,
std::move(modifications),
cql3::attributes::none(),
local_qp().get_cql_stats());
auto qs = make_query_state();
auto& lqo = *qo;
return local_qp().execute_batch_without_checking_exception_message(batch, *qs, lqo, {}).then([qs, batch, qo = std::move(qo)] (auto msg) {
return cql_transport::messages::propagate_exception_as_future(std::move(msg));
});
}
virtual sharded<qos::service_level_controller>& service_level_controller_service() override {
return _sl_controller;
}
utils::disk_space_monitor& disk_space_monitor() override {
return *_disk_space_monitor_shard0;
}
db::config& db_config() override {
return *_db_config;
}
sharded<auth::cache>& auth_cache() override {
return _auth_cache;
}
};
std::atomic<bool> single_node_cql_env::active = { false };
future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
return single_node_cql_env::do_with(func, std::move(cfg_in), std::move(init_configurables));
}
future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_test_config cfg_in, thread_attributes thread_attr, std::optional<cql_test_init_configurables> init_configurables) {
return single_node_cql_env::do_with([func = std::move(func), thread_attr] (auto& e) {
return seastar::async(thread_attr, [func = std::move(func), &e] {
return func(e);
});
}, std::move(cfg_in), std::move(init_configurables));
}
void do_with_cql_env_noreentrant_in_thread(std::function<future<>(cql_test_env&)> func, cql_test_config cfg_in, std::optional<cql_test_init_configurables> init_configurables) {
single_node_cql_env::do_with_noreentrant_thread(std::move(func), std::move(cfg_in), std::move(init_configurables));
}
// this function should be called in seastar thread
void do_with_mc(cql_test_env& env, std::function<void(service::group0_batch&)> func) {
seastar::abort_source as;
auto& g0 = env.get_raft_group0_client();
auto guard = g0.start_operation(as).get();
auto mc = service::group0_batch(std::move(guard));
func(mc);
std::move(mc).commit(g0, as, std::nullopt).get();
}
reader_permit make_reader_permit(cql_test_env& env) {
return env.local_db().get_reader_concurrency_semaphore().make_tracking_only_permit(nullptr, "test", db::no_timeout, {});
}