messaging_service: Delay listening ms during boot up

When a node starts up, peer node can send gossip syn message to it
before the gossip message handlers are registered in messaging_service.

We can see:

  scylla[123]:  [shard 0] rpc - client a.b.c.d: unknown verb exception 6 ignored

To fix, we delay the listening of messaging_service to the point when
gossip message handlers are registered.
Message-Id: <9b20d85e199ef0e44cdcde2920123a301a88f3d7.1464254400.git.asias@scylladb.com>
This commit is contained in:
Asias He
2016-05-26 09:20:44 +00:00
committed by Pekka Enberg
parent f3fc3afe00
commit f27e5d2a68
4 changed files with 41 additions and 18 deletions

View File

@@ -313,6 +313,9 @@ void gossiper::init_messaging_service_handler() {
});
return messaging_service::no_wait();
});
// Start listening messaging_service after gossip message handlers are registered
ms().start_listen();
}
void gossiper::uninit_messaging_service_handler() {

View File

@@ -79,7 +79,9 @@ void init_ms_fd_gossiper(sstring listen_address
}
// Init messaging_service
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds).get();
// Delay listening messaging_service until gossip message handlers are registered
bool listen_now = false;
net::get_messaging_service().start(listen, storage_port, ew, ssl_storage_port, creds, listen_now).get();
// #293 - do not stop anything
//engine().at_exit([] { return net::get_messaging_service().stop(); });

View File

@@ -207,8 +207,8 @@ void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) {
ms->rpc()->register_handler(verb, std::move(func));
}
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
: messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr)
messaging_service::messaging_service(gms::inet_address ip, uint16_t port, bool listen_now)
: messaging_service(std::move(ip), port, encrypt_what::none, 0, nullptr, listen_now)
{}
static
@@ -221,31 +221,41 @@ rpc_resource_limits() {
return limits;
}
void messaging_service::start_listen() {
if (!_server) {
auto addr = ipv4_addr{_listen_address.raw_addr(), _port};
_server = std::unique_ptr<rpc_protocol_server_wrapper>(new rpc_protocol_server_wrapper(*_rpc,
addr, rpc_resource_limits()));
}
if (!_server_tls) {
_server_tls = std::unique_ptr<rpc_protocol_server_wrapper>(
[this] () -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
return nullptr;
}
listen_options lo;
lo.reuse_address = true;
auto addr = make_ipv4_address(ipv4_addr{_listen_address.raw_addr(), _ssl_port});
return std::make_unique<rpc_protocol_server_wrapper>(*_rpc,
seastar::tls::listen(_credentials, addr, lo));
}());
}
}
messaging_service::messaging_service(gms::inet_address ip
, uint16_t port
, encrypt_what ew
, uint16_t ssl_port
, std::shared_ptr<seastar::tls::credentials_builder> credentials
, bool listen_now
)
: _listen_address(ip)
, _port(port)
, _ssl_port(ssl_port)
, _encrypt_what(ew)
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _server(new rpc_protocol_server_wrapper(*_rpc, ipv4_addr { _listen_address.raw_addr(), _port }, rpc_resource_limits()))
, _credentials(credentials ? credentials->build_server_credentials() : nullptr)
, _server_tls([this]() -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_encrypt_what == encrypt_what::none) {
return nullptr;
}
listen_options lo;
lo.reuse_address = true;
return std::make_unique<rpc_protocol_server_wrapper>(*_rpc,
seastar::tls::listen(_credentials
, make_ipv4_address(ipv4_addr {_listen_address.raw_addr(), _ssl_port})
, lo)
);
}())
{
_rpc->set_logger([] (const sstring& log) {
rpc_logger.info("{}", log);
@@ -255,6 +265,11 @@ messaging_service::messaging_service(gms::inet_address ip
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
return rpc::no_wait;
});
if (listen_now) {
start_listen();
}
// Do this on just cpu 0, to avoid duplicate logs.
if (engine().cpu_id() == 0) {
if (_server_tls) {

View File

@@ -184,11 +184,14 @@ private:
public:
using clock_type = std::chrono::steady_clock;
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"), uint16_t port = 7000);
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"),
uint16_t port = 7000, bool listen_now = true);
messaging_service(gms::inet_address ip, uint16_t port, encrypt_what,
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>);
uint16_t ssl_port, std::shared_ptr<seastar::tls::credentials_builder>,
bool listen_now = true);
~messaging_service();
public:
void start_listen();
uint16_t port();
gms::inet_address listen_address();
future<> stop();