messaging_service: Delay listening ms during boot up
When a node starts up, peer node can send gossip syn message to it before the gossip message handlers are registered in messaging_service. We can see: scylla[123]: [shard 0] rpc - client a.b.c.d: unknown verb exception 6 ignored To fix, we delay the listening of messaging_service to the point when gossip message handlers are registered. Message-Id: <9b20d85e199ef0e44cdcde2920123a301a88f3d7.1464254400.git.asias@scylladb.com>
This commit is contained in:
@@ -313,6 +313,9 @@ void gossiper::init_messaging_service_handler() {
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
|
||||
// Start listening messaging_service after gossip message handlers are registered
|
||||
ms().start_listen();
|
||||
}
|
||||
|
||||
void gossiper::uninit_messaging_service_handler() {
|
||||
|
||||
4
init.cc
4
init.cc
@@ -79,7 +79,9 @@ void init_ms_fd_gossiper(sstring listen_address
|
||||
}
|
||||
|
||||
// Init messaging_service
|
||||
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
|
||||
// Delay listening messaging_service until gossip message handlers are registered
|
||||
bool listen_now = false;
|
||||
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds, listen_now).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
//engine().at_exit([] { return net::get_messaging_service().stop(); });
|
||||
|
||||
@@ -207,8 +207,8 @@ void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) {
|
||||
ms->rpc()->register_handler(verb, std::move(func));
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr)
|
||||
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
|
||||
: messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr, listen_now)
|
||||
{}
|
||||
|
||||
static
|
||||
@@ -221,31 +221,41 @@ rpc_resource_limits() {
|
||||
return limits;
|
||||
}
|
||||
|
||||
void messaging_service::start_listen() {
|
||||
if (!_server) {
|
||||
auto addr = ipv4_addr{_listen_address.raw_addr(), _port};
|
||||
_server = std::unique_ptr<rpc_protocol_server_wrapper>(new rpc_protocol_server_wrapper(*_rpc,
|
||||
addr, rpc_resource_limits()));
|
||||
}
|
||||
|
||||
if (!_server_tls) {
|
||||
_server_tls = std::unique_ptr<rpc_protocol_server_wrapper>(
|
||||
[this] () -> std::unique_ptr<rpc_protocol_server_wrapper>{
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
}
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
auto addr = make_ipv4_address(ipv4_addr{_listen_address.raw_addr(), _ssl_port});
|
||||
return std::make_unique<rpc_protocol_server_wrapper>(*_rpc,
|
||||
seastar::tls::listen(_credentials, addr, lo));
|
||||
}());
|
||||
}
|
||||
}
|
||||
|
||||
messaging_service::messaging_service(gms::inet_address ip
|
||||
, uint16_t port
|
||||
, encrypt_what ew
|
||||
, uint16_t ssl_port
|
||||
, std::shared_ptr<seastar::tls::credentials_builder> credentials
|
||||
, bool listen_now
|
||||
)
|
||||
: _listen_address(ip)
|
||||
, _port(port)
|
||||
, _ssl_port(ssl_port)
|
||||
, _encrypt_what(ew)
|
||||
, _rpc(new rpc_protocol_wrapper(serializer { }))
|
||||
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
|
||||
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
|
||||
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
|
||||
if (_encrypt_what == encrypt_what::none) {
|
||||
return nullptr;
|
||||
}
|
||||
listen_options lo;
|
||||
lo.reuse_address = true;
|
||||
return std::make_unique<rpc_protocol_server_wrapper>(*_rpc,
|
||||
seastar::tls::listen(_credentials
|
||||
, make_ipv4_address(ipv4_addr {_listen_address.raw_addr(), _ssl_port})
|
||||
, lo)
|
||||
);
|
||||
}())
|
||||
{
|
||||
_rpc->set_logger([] (const sstring& log) {
|
||||
rpc_logger.info("{}", log);
|
||||
@@ -255,6 +265,11 @@ messaging_service::messaging_service(gms::inet_address ip
|
||||
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
|
||||
return rpc::no_wait;
|
||||
});
|
||||
|
||||
if (listen_now) {
|
||||
start_listen();
|
||||
}
|
||||
|
||||
// Do this on just cpu 0, to avoid duplicate logs.
|
||||
if (engine().cpu_id() == 0) {
|
||||
if (_server_tls) {
|
||||
|
||||
@@ -184,11 +184,14 @@ private:
|
||||
public:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
|
||||
uint16_t port = 7000, bool listen_now = true);
|
||||
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
|
||||
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
|
||||
bool listen_now = true);
|
||||
~messaging_service();
|
||||
public:
|
||||
void start_listen();
|
||||
uint16_t port();
|
||||
gms::inet_address listen_address();
|
||||
future<> stop();
|
||||
|
||||
Reference in New Issue
Block a user