mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
gossiper: split feature storage into a new feature_service
Feature lifetime is tied to storage_service lifetime, but features are now managed by gossip. To avoid circular dependency, add a new feature_service service to manage feature lifetime. To work around the problem, the current code re-initializes features after gossip is initialized. This patch does not fix this problem; it only makes it possible to solve it by untyping features from gossip.
This commit is contained in:
@@ -25,6 +25,8 @@
|
||||
|
||||
namespace gms {
|
||||
|
||||
class feature_service;
|
||||
|
||||
/**
|
||||
* A gossip feature tracks whether all the nodes the current one is
|
||||
* aware of support the specified feature.
|
||||
@@ -32,12 +34,13 @@ namespace gms {
|
||||
* A feature should only be created once the gossiper is available.
|
||||
*/
|
||||
class feature final {
|
||||
feature_service* _service = nullptr;
|
||||
sstring _name;
|
||||
bool _enabled = false;
|
||||
mutable shared_promise<> _pr;
|
||||
friend class gossiper;
|
||||
public:
|
||||
explicit feature(sstring name, bool enabled = false);
|
||||
explicit feature(feature_service& service, sstring name, bool enabled = false);
|
||||
feature() = default;
|
||||
~feature();
|
||||
feature(const feature& other) = delete;
|
||||
|
||||
50
gms/feature_service.hh
Normal file
50
gms/feature_service.hh
Normal file
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace gms {
|
||||
|
||||
class feature;
|
||||
|
||||
/**
|
||||
* A gossip feature tracks whether all the nodes the current one is
|
||||
* aware of support the specified feature.
|
||||
*/
|
||||
class feature_service final {
|
||||
std::unordered_map<sstring, std::vector<feature*>> _registered_features;
|
||||
public:
|
||||
feature_service();
|
||||
~feature_service();
|
||||
future<> stop();
|
||||
void register_feature(feature* f);
|
||||
void unregister_feature(feature* f);
|
||||
void enable(const sstring& name);
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "gms/gossip_digest_ack2.hh"
|
||||
#include "gms/versioned_value.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/i_failure_detection_event_listener.hh"
|
||||
@@ -126,7 +127,8 @@ public:
|
||||
void on_restart(inet_address, endpoint_state) override {}
|
||||
};
|
||||
|
||||
gossiper::gossiper() {
|
||||
gossiper::gossiper(feature_service& features)
|
||||
: _feature_service(features) {
|
||||
// Gossiper's stuff below runs only on CPU0
|
||||
if (engine().cpu_id() != 0) {
|
||||
return;
|
||||
@@ -2153,14 +2155,26 @@ future<> gossiper::wait_for_feature_on_node(std::set<sstring> features, inet_add
|
||||
});
|
||||
}
|
||||
|
||||
void gossiper::register_feature(feature* f) {
|
||||
feature_service::feature_service() = default;
|
||||
|
||||
feature_service::~feature_service() = default;
|
||||
|
||||
future<> feature_service::stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void feature_service::register_feature(feature* f) {
|
||||
_registered_features.emplace(f->name(), std::vector<feature*>()).first->second.emplace_back(f);
|
||||
}
|
||||
|
||||
void gossiper::register_feature(feature* f) {
|
||||
_feature_service.register_feature(f);
|
||||
if (check_features(get_local_gossiper().get_supported_features(), {f->name()})) {
|
||||
f->enable();
|
||||
}
|
||||
}
|
||||
|
||||
void gossiper::unregister_feature(feature* f) {
|
||||
void feature_service::unregister_feature(feature* f) {
|
||||
auto&& fsit = _registered_features.find(f->name());
|
||||
if (fsit == _registered_features.end()) {
|
||||
return;
|
||||
@@ -2172,50 +2186,53 @@ void gossiper::unregister_feature(feature* f) {
|
||||
}
|
||||
}
|
||||
|
||||
void gossiper::unregister_feature(feature* f) {
|
||||
_feature_service.unregister_feature(f);
|
||||
}
|
||||
|
||||
|
||||
void feature_service::enable(const sstring& name) {
|
||||
if (auto it = _registered_features.find(name); it != _registered_features.end()) {
|
||||
for (auto&& f : it->second) {
|
||||
f->enable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::maybe_enable_features() {
|
||||
if (_registered_features.empty()) {
|
||||
_features_condvar.broadcast();
|
||||
return;
|
||||
}
|
||||
|
||||
auto&& features = get_supported_features();
|
||||
container().invoke_on_all([&features] (gossiper& g) {
|
||||
for (auto it = g._registered_features.begin(); it != g._registered_features.end();) {
|
||||
if (features.find(it->first) != features.end()) {
|
||||
for (auto&& f : it->second) {
|
||||
f->enable();
|
||||
}
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
for (auto&& name : features) {
|
||||
g._feature_service.enable(name);
|
||||
}
|
||||
g._features_condvar.broadcast();
|
||||
}).get();
|
||||
}
|
||||
|
||||
feature::feature(sstring name, bool enabled)
|
||||
: _name(name)
|
||||
feature::feature(feature_service& service, sstring name, bool enabled)
|
||||
: _service(&service)
|
||||
, _name(name)
|
||||
, _enabled(enabled) {
|
||||
get_local_gossiper().register_feature(this);
|
||||
_service->register_feature(this);
|
||||
if (_enabled) {
|
||||
_pr.set_value();
|
||||
}
|
||||
}
|
||||
|
||||
feature::~feature() {
|
||||
auto& gossiper = get_gossiper();
|
||||
if (gossiper.local_is_initialized()) {
|
||||
gossiper.local().unregister_feature(this);
|
||||
if (_service) {
|
||||
_service->unregister_feature(this);
|
||||
}
|
||||
}
|
||||
|
||||
feature& feature::operator=(feature&& other) {
|
||||
get_local_gossiper().unregister_feature(this);
|
||||
_service->unregister_feature(this);
|
||||
_service = std::exchange(other._service, nullptr);
|
||||
_name = other._name;
|
||||
_enabled = other._enabled;
|
||||
_pr = std::move(other._pr);
|
||||
get_local_gossiper().register_feature(this);
|
||||
_service->register_feature(this);
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
@@ -70,6 +70,8 @@ class inet_address;
|
||||
class i_endpoint_state_change_subscriber;
|
||||
class i_failure_detector;
|
||||
|
||||
class feature_service;
|
||||
|
||||
struct bind_messaging_port_tag {};
|
||||
using bind_messaging_port = bool_class<bind_messaging_port_tag>;
|
||||
|
||||
@@ -236,7 +238,7 @@ private:
|
||||
// The value must be kept alive until completes and not change.
|
||||
future<> replicate(inet_address, application_state key, const versioned_value& value);
|
||||
public:
|
||||
gossiper();
|
||||
explicit gossiper(feature_service& features);
|
||||
|
||||
void set_last_processed_message_at();
|
||||
void set_last_processed_message_at(clk::time_point tp);
|
||||
@@ -568,7 +570,7 @@ private:
|
||||
class msg_proc_guard;
|
||||
private:
|
||||
condition_variable _features_condvar;
|
||||
std::unordered_map<sstring, std::vector<feature*>> _registered_features;
|
||||
feature_service& _feature_service;
|
||||
friend class feature;
|
||||
// Get features supported by a particular node
|
||||
std::set<sstring> get_supported_features(inet_address endpoint) const;
|
||||
|
||||
12
init.cc
12
init.cc
@@ -26,6 +26,8 @@
|
||||
#include "service/storage_service.hh"
|
||||
#include "to_string.hh"
|
||||
#include "gms/inet_address.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
logging::logger startlog("init");
|
||||
|
||||
@@ -34,13 +36,15 @@ logging::logger startlog("init");
|
||||
// duplicated in cql_test_env.cc
|
||||
// until proper shutdown is done.
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks).get();
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<gms::feature_service>& feature_service) {
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks, feature_service).get();
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return service::deinit_storage_service(); });
|
||||
}
|
||||
|
||||
void init_ms_fd_gossiper(sstring listen_address_in
|
||||
void init_ms_fd_gossiper(sharded<gms::feature_service>& features
|
||||
, sstring listen_address_in
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
, bool tcp_nodelay_inter_dc
|
||||
@@ -150,7 +154,7 @@ void init_ms_fd_gossiper(sstring listen_address_in
|
||||
to_string(seeds), listen_address_in, broadcast_address);
|
||||
throw std::runtime_error("Use broadcast_address for seeds list");
|
||||
}
|
||||
gms::get_gossiper().start().get();
|
||||
gms::get_gossiper().start(std::ref(features)).get();
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
gossiper.set_seeds(seeds);
|
||||
// #293 - do not stop anything
|
||||
|
||||
10
init.hh
10
init.hh
@@ -28,16 +28,21 @@
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "database.hh"
|
||||
#include "log.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace db {
|
||||
class extensions;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
class feature_service;
|
||||
}
|
||||
|
||||
extern logging::logger startlog;
|
||||
|
||||
class bad_configuration_error : public std::exception {};
|
||||
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
void init_storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&, sharded<gms::feature_service>&);
|
||||
|
||||
struct init_scheduling_config {
|
||||
scheduling_group streaming;
|
||||
@@ -45,7 +50,8 @@ struct init_scheduling_config {
|
||||
scheduling_group gossip;
|
||||
};
|
||||
|
||||
void init_ms_fd_gossiper(sstring listen_address
|
||||
void init_ms_fd_gossiper(sharded<gms::feature_service>& features
|
||||
, sstring listen_address
|
||||
, uint16_t storage_port
|
||||
, uint16_t ssl_storage_port
|
||||
, bool tcp_nodelay_inter_dc
|
||||
|
||||
12
main.cc
12
main.cc
@@ -63,6 +63,7 @@
|
||||
#include "sstables/compaction_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include <db/view/view_update_from_staging_generator.hh>
|
||||
#include "gms/feature_service.hh"
|
||||
|
||||
seastar::metrics::metric_groups app_metrics;
|
||||
|
||||
@@ -332,6 +333,7 @@ int main(int ac, char** av) {
|
||||
httpd::http_server_control prometheus_server;
|
||||
prometheus::config pctx;
|
||||
directories dirs;
|
||||
sharded<gms::feature_service> feature_service;
|
||||
|
||||
return app.run_deprecated(ac, av, [&] {
|
||||
|
||||
@@ -359,7 +361,8 @@ int main(int ac, char** av) {
|
||||
|
||||
tcp_syncookies_sanity();
|
||||
|
||||
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator] {
|
||||
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
|
||||
&feature_service] {
|
||||
read_config(opts, *cfg).get();
|
||||
configurable::init_all(opts, *cfg, *ext).get();
|
||||
|
||||
@@ -379,6 +382,8 @@ int main(int ac, char** av) {
|
||||
throw bad_configuration_error();
|
||||
}
|
||||
}
|
||||
feature_service.start().get();
|
||||
// FIXME: feature_service.stop(), when we fix up shutdown
|
||||
dht::set_global_partitioner(cfg->partitioner(), cfg->murmur3_partitioner_ignore_msb_bits());
|
||||
auto make_sched_group = [&] (sstring name, unsigned shares) {
|
||||
if (cfg->cpu_scheduler()) {
|
||||
@@ -502,7 +507,7 @@ int main(int ac, char** av) {
|
||||
static sharded<auth::service> auth_service;
|
||||
static sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
supervisor::notify("initializing storage service");
|
||||
init_storage_service(db, auth_service, sys_dist_ks);
|
||||
init_storage_service(db, auth_service, sys_dist_ks, feature_service);
|
||||
supervisor::notify("starting per-shard database core");
|
||||
|
||||
// Note: changed from using a move here, because we want the config object intact.
|
||||
@@ -598,7 +603,8 @@ int main(int ac, char** av) {
|
||||
scfg.statement = dbcfg.statement_scheduling_group;
|
||||
scfg.streaming = dbcfg.streaming_scheduling_group;
|
||||
scfg.gossip = scheduling_group();
|
||||
init_ms_fd_gossiper(listen_address
|
||||
init_ms_fd_gossiper(feature_service
|
||||
, listen_address
|
||||
, storage_port
|
||||
, ssl_storage_port
|
||||
, tcp_nodelay_inter_dc
|
||||
|
||||
@@ -124,8 +124,10 @@ int get_generation_number() {
|
||||
return generation_number;
|
||||
}
|
||||
|
||||
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks)
|
||||
: _db(db)
|
||||
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
gms::feature_service& feature_service)
|
||||
: _feature_service(feature_service)
|
||||
, _db(db)
|
||||
, _auth_service(auth_service)
|
||||
, _replicate_action([this] { return do_replicate_to_all_cores(); })
|
||||
, _update_pending_ranges_action([this] { return do_update_pending_ranges(); })
|
||||
@@ -438,21 +440,21 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
}
|
||||
|
||||
void storage_service::register_features() {
|
||||
_range_tombstones_feature = gms::feature(RANGE_TOMBSTONES_FEATURE);
|
||||
_large_partitions_feature = gms::feature(LARGE_PARTITIONS_FEATURE);
|
||||
_counters_feature = gms::feature(COUNTERS_FEATURE);
|
||||
_digest_multipartition_read_feature = gms::feature(DIGEST_MULTIPARTITION_READ_FEATURE);
|
||||
_correct_counter_order_feature = gms::feature(CORRECT_COUNTER_ORDER_FEATURE);
|
||||
_schema_tables_v3 = gms::feature(SCHEMA_TABLES_V3);
|
||||
_correct_non_compound_range_tombstones = gms::feature(CORRECT_NON_COMPOUND_RANGE_TOMBSTONES);
|
||||
_write_failure_reply_feature = gms::feature(WRITE_FAILURE_REPLY_FEATURE);
|
||||
_xxhash_feature = gms::feature(XXHASH_FEATURE);
|
||||
_roles_feature = gms::feature(ROLES_FEATURE);
|
||||
_la_sstable_feature = gms::feature(LA_SSTABLE_FEATURE);
|
||||
_stream_with_rpc_stream_feature = gms::feature(STREAM_WITH_RPC_STREAM);
|
||||
_mc_sstable_feature = gms::feature(MC_SSTABLE_FEATURE);
|
||||
_materialized_views_feature = gms::feature(MATERIALIZED_VIEWS_FEATURE);
|
||||
_indexes_feature = gms::feature(INDEXES_FEATURE);
|
||||
_range_tombstones_feature = gms::feature(_feature_service, RANGE_TOMBSTONES_FEATURE);
|
||||
_large_partitions_feature = gms::feature(_feature_service, LARGE_PARTITIONS_FEATURE);
|
||||
_counters_feature = gms::feature(_feature_service, COUNTERS_FEATURE);
|
||||
_digest_multipartition_read_feature = gms::feature(_feature_service, DIGEST_MULTIPARTITION_READ_FEATURE);
|
||||
_correct_counter_order_feature = gms::feature(_feature_service, CORRECT_COUNTER_ORDER_FEATURE);
|
||||
_schema_tables_v3 = gms::feature(_feature_service, SCHEMA_TABLES_V3);
|
||||
_correct_non_compound_range_tombstones = gms::feature(_feature_service, CORRECT_NON_COMPOUND_RANGE_TOMBSTONES);
|
||||
_write_failure_reply_feature = gms::feature(_feature_service, WRITE_FAILURE_REPLY_FEATURE);
|
||||
_xxhash_feature = gms::feature(_feature_service, XXHASH_FEATURE);
|
||||
_roles_feature = gms::feature(_feature_service, ROLES_FEATURE);
|
||||
_la_sstable_feature = gms::feature(_feature_service, LA_SSTABLE_FEATURE);
|
||||
_stream_with_rpc_stream_feature = gms::feature(_feature_service, STREAM_WITH_RPC_STREAM);
|
||||
_mc_sstable_feature = gms::feature(_feature_service, MC_SSTABLE_FEATURE);
|
||||
_materialized_views_feature = gms::feature(_feature_service, MATERIALIZED_VIEWS_FEATURE);
|
||||
_indexes_feature = gms::feature(_feature_service, INDEXES_FEATURE);
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
@@ -3272,8 +3274,9 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const
|
||||
});
|
||||
}
|
||||
|
||||
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks));
|
||||
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<gms::feature_service>& feature_service) {
|
||||
return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(feature_service));
|
||||
}
|
||||
|
||||
future<> deinit_storage_service() {
|
||||
|
||||
@@ -71,6 +71,10 @@ namespace dht {
|
||||
class boot_strapper;
|
||||
}
|
||||
|
||||
namespace gms {
|
||||
class feature_service;
|
||||
};
|
||||
|
||||
namespace service {
|
||||
|
||||
class load_broadcaster;
|
||||
@@ -120,6 +124,7 @@ private:
|
||||
/* JMX notification serial number counter */
|
||||
private final AtomicLong notificationSerialNumber = new AtomicLong();
|
||||
#endif
|
||||
gms::feature_service& _feature_service;
|
||||
distributed<database>& _db;
|
||||
sharded<auth::service>& _auth_service;
|
||||
int _update_jobs{0};
|
||||
@@ -139,7 +144,7 @@ private:
|
||||
bool _stream_manager_stopped = false;
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
public:
|
||||
storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&);
|
||||
storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&, gms::feature_service& feature_service);
|
||||
void isolate_on_error();
|
||||
void isolate_on_commit_error();
|
||||
|
||||
@@ -2279,7 +2284,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<gms::feature_service>& feature_service);
|
||||
future<> deinit_storage_service();
|
||||
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "service/storage_service.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
@@ -58,8 +59,8 @@ future<> await_background_jobs_on_all_shards();
|
||||
|
||||
static const sstring testing_superuser = "tester";
|
||||
|
||||
static future<> tst_init_ms_fd_gossiper(db::seed_provider_type seed_provider, sstring cluster_name = "Test Cluster") {
|
||||
return gms::get_failure_detector().start().then([seed_provider, cluster_name] {
|
||||
static future<> tst_init_ms_fd_gossiper(sharded<gms::feature_service>& features, db::seed_provider_type seed_provider, sstring cluster_name = "Test Cluster") {
|
||||
return gms::get_failure_detector().start().then([seed_provider, cluster_name, &features] () mutable {
|
||||
// Init gossiper
|
||||
std::set<gms::inet_address> seeds;
|
||||
if (seed_provider.parameters.count("seeds") > 0) {
|
||||
@@ -74,7 +75,7 @@ static future<> tst_init_ms_fd_gossiper(db::seed_provider_type seed_provider, ss
|
||||
if (seeds.empty()) {
|
||||
seeds.emplace(gms::inet_address("127.0.0.1"));
|
||||
}
|
||||
return gms::get_gossiper().start().then([seeds, cluster_name] {
|
||||
return gms::get_gossiper().start(std::ref(features)).then([seeds, cluster_name] {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
gossiper.set_seeds(seeds);
|
||||
gossiper.set_cluster_name(cluster_name);
|
||||
@@ -88,6 +89,7 @@ public:
|
||||
static const char* ks_name;
|
||||
static std::atomic<bool> active;
|
||||
private:
|
||||
shared_ptr<sharded<gms::feature_service>> _feature_service;
|
||||
::shared_ptr<distributed<database>> _db;
|
||||
::shared_ptr<sharded<auth::service>> _auth_service;
|
||||
::shared_ptr<sharded<db::view::view_builder>> _view_builder;
|
||||
@@ -117,11 +119,13 @@ private:
|
||||
}
|
||||
public:
|
||||
single_node_cql_env(
|
||||
shared_ptr<sharded<gms::feature_service>> feature_service,
|
||||
::shared_ptr<distributed<database>> db,
|
||||
::shared_ptr<sharded<auth::service>> auth_service,
|
||||
::shared_ptr<sharded<db::view::view_builder>> view_builder,
|
||||
::shared_ptr<sharded<db::view::view_update_from_staging_generator>> view_update_generator)
|
||||
: _db(db)
|
||||
: _feature_service(std::move(_feature_service))
|
||||
, _db(db)
|
||||
, _auth_service(std::move(auth_service))
|
||||
, _view_builder(std::move(view_builder))
|
||||
, _view_update_generator(std::move(view_update_generator))
|
||||
@@ -338,8 +342,12 @@ public:
|
||||
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
|
||||
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
|
||||
|
||||
auto feature_service = make_shared<sharded<gms::feature_service>>();
|
||||
feature_service->start().get();
|
||||
auto stop_feature_service = defer([&] { feature_service->stop().get(); });
|
||||
|
||||
auto& ss = service::get_storage_service();
|
||||
ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks)).get();
|
||||
ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*feature_service)).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
database_config dbcfg;
|
||||
@@ -350,7 +358,7 @@ public:
|
||||
});
|
||||
|
||||
// FIXME: split
|
||||
tst_init_ms_fd_gossiper(db::config::seed_provider_type()).get();
|
||||
tst_init_ms_fd_gossiper(*feature_service, db::config::seed_provider_type()).get();
|
||||
auto stop_ms_fd_gossiper = defer([] {
|
||||
gms::get_gossiper().stop().get();
|
||||
gms::get_failure_detector().stop().get();
|
||||
@@ -431,7 +439,7 @@ public:
|
||||
auto stop_view_update_generator = defer([view_update_generator] {
|
||||
view_update_generator->stop().get();
|
||||
});
|
||||
single_node_cql_env env(db, auth_service, view_builder, view_update_generator);
|
||||
single_node_cql_env env(feature_service, db, auth_service, view_builder, view_update_generator);
|
||||
env.start().get();
|
||||
auto stop_env = defer([&env] { env.stop().get(); });
|
||||
|
||||
@@ -468,6 +476,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func) {
|
||||
}
|
||||
|
||||
class storage_service_for_tests::impl {
|
||||
sharded<gms::feature_service> _feature_service;
|
||||
distributed<database> _db;
|
||||
sharded<auth::service> _auth_service;
|
||||
sharded<db::system_distributed_keyspace> _sys_dist_ks;
|
||||
@@ -475,8 +484,9 @@ public:
|
||||
impl() {
|
||||
auto thread = seastar::thread_impl::get();
|
||||
assert(thread);
|
||||
_feature_service.start().get();
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
|
||||
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks)).get();
|
||||
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_feature_service)).get();
|
||||
service::get_storage_service().invoke_on_all([] (auto& ss) {
|
||||
ss.enable_all_features();
|
||||
}).get();
|
||||
@@ -485,6 +495,7 @@ public:
|
||||
service::get_storage_service().stop().get();
|
||||
netw::get_messaging_service().stop().get();
|
||||
_db.stop().get();
|
||||
_feature_service.stop().get();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "db/system_distributed_keyspace.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "service/storage_service.hh"
|
||||
@@ -71,15 +72,17 @@ int main(int ac, char ** av) {
|
||||
auto vv = std::make_shared<gms::versioned_value::factory>();
|
||||
return async([&] {
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
sharded<gms::feature_service> feature_service;
|
||||
feature_service.start().get();
|
||||
sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks).get();
|
||||
service::init_storage_service(db, auth_service, sys_dist_ks, feature_service).get();
|
||||
netw::get_messaging_service().start(listen).get();
|
||||
auto& server = netw::get_local_messaging_service();
|
||||
auto port = server.port();
|
||||
auto listen = server.listen_address();
|
||||
fmt::print("Messaging server listening on ip {} port {:d} ...\n", listen, port);
|
||||
gms::get_failure_detector().start().get();
|
||||
gms::get_gossiper().start().get();
|
||||
gms::get_gossiper().start(std::ref(feature_service)).get();
|
||||
std::set<gms::inet_address> seeds;
|
||||
for (auto s : config["seed"].as<std::vector<std::string>>()) {
|
||||
seeds.emplace(std::move(s));
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/failure_detector.hh"
|
||||
#include "gms/gossiper.hh"
|
||||
#include "gms/feature_service.hh"
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include "service/storage_service.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
@@ -41,6 +42,9 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
sharded<auth::service> auth_service;
|
||||
sharded<db::system_distributed_keyspace> sys_dist_ks;
|
||||
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
|
||||
sharded<gms::feature_service> feature_service;
|
||||
feature_service.start().get();
|
||||
auto stop_feature_service = defer([&] { feature_service.stop().get(); });
|
||||
|
||||
locator::i_endpoint_snitch::create_snitch("SimpleSnitch").get();
|
||||
auto stop_snitch = defer([&] { gms::get_failure_detector().stop().get(); });
|
||||
@@ -48,7 +52,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get();
|
||||
auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); });
|
||||
|
||||
service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks)).get();
|
||||
service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(feature_service)).get();
|
||||
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
|
||||
|
||||
db.start().get();
|
||||
@@ -57,7 +61,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
|
||||
gms::get_failure_detector().start().get();
|
||||
auto stop_failure_detector = defer([&] { gms::get_failure_detector().stop().get(); });
|
||||
|
||||
gms::get_gossiper().start().get();
|
||||
gms::get_gossiper().start(std::ref(feature_service)).get();
|
||||
auto stop_gossiper = defer([&] { gms::get_gossiper().stop().get(); });
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user