diff --git a/configure.py b/configure.py index d850e58706..c9d8314058 100755 --- a/configure.py +++ b/configure.py @@ -143,6 +143,7 @@ urchin_tests = [ 'tests/cartesian_product_test', 'tests/urchin/hash_test', 'tests/urchin/serializer_test', + 'tests/message', ] tests = [ @@ -357,6 +358,7 @@ urchin_core = (['database.cc', 'locator/abstract_replication_strategy.cc', 'locator/simple_strategy.cc', 'locator/token_metadata.cc', + 'message/messaging_service.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] @@ -396,6 +398,7 @@ deps = { 'tests/fstream_test': ['tests/fstream_test.cc'] + core, 'tests/distributed_test': ['tests/distributed_test.cc'] + core, 'tests/rpc': ['tests/rpc.cc'] + core + libnet, + 'tests/message': ['tests/message.cc', 'message/messaging_service.cc'] + core + libnet, } for t in urchin_tests: diff --git a/message/messaging_service.cc b/message/messaging_service.cc new file mode 100644 index 0000000000..ce519000b6 --- /dev/null +++ b/message/messaging_service.cc @@ -0,0 +1,10 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "message/messaging_service.hh" +#include "core/distributed.hh" + +namespace net { + distributed _the_messaging_service; +} diff --git a/message/messaging_service.hh b/message/messaging_service.hh new file mode 100644 index 0000000000..87776f230a --- /dev/null +++ b/message/messaging_service.hh @@ -0,0 +1,363 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include "core/reactor.hh" +#include "core/iostream.hh" +#include "core/distributed.hh" +#include "core/print.hh" +#include "core/sstring.hh" +#include "net/api.hh" +#include "util/serialization.hh" +#include "gms/inet_address.hh" +#include "rpc/rpc.hh" +#include + +namespace net { + +/* All verb handler identifiers */ +enum class messaging_verb : int32_t { + MUTATION, + BINARY, // Deprecated + READ_REPAIR, + READ, + REQUEST_RESPONSE, // client-initiated reads and writes + STREAM_INITIATE, // Deprecated + STREAM_INITIATE_DONE, // Deprecated + STREAM_REPLY, // Deprecated + STREAM_REQUEST, // Deprecated + RANGE_SLICE, + BOOTSTRAP_TOKEN, // Deprecated + TREE_REQUEST, // Deprecated + TREE_RESPONSE, // Deprecated + JOIN, // Deprecated + GOSSIP_DIGEST_SYN, + GOSSIP_DIGEST_ACK, + GOSSIP_DIGEST_ACK2, + DEFINITIONS_ANNOUNCE, // Deprecated + DEFINITIONS_UPDATE, + TRUNCATE, + SCHEMA_CHECK, + INDEX_SCAN, // Deprecated + REPLICATION_FINISHED, + INTERNAL_RESPONSE, // responses to internal calls + COUNTER_MUTATION, + STREAMING_REPAIR_REQUEST, // Deprecated + STREAMING_REPAIR_RESPONSE, // Deprecated + SNAPSHOT, // Similar to nt snapshot + MIGRATION_REQUEST, + GOSSIP_SHUTDOWN, + _TRACE, + ECHO, + REPAIR_MESSAGE, + PAXOS_PREPARE, + PAXOS_PROPOSE, + PAXOS_COMMIT, + PAGED_RANGE, + UNUSED_1, + UNUSED_2, + UNUSED_3, +}; + +} // namespace net + +namespace std { +template <> +class hash { +public: + size_t operator()(const net::messaging_verb& x) const { + return hash()(int32_t(x)); + } +}; +} // namespace std + +namespace net { + +struct serializer { + // For integer type + template + inline auto operator()(output_stream& out, T&& v, std::enable_if_t>::value, void*> = nullptr) { + auto v_ = net::hton(v); + return out.write(reinterpret_cast(&v_), sizeof(T)); + } + template + inline auto operator()(input_stream& in, T& v, std::enable_if_t::value, void*> = nullptr) { + return in.read_exactly(sizeof(v)).then([&v] (temporary_buffer buf) mutable { + if (buf.size() != sizeof(v)) { + throw rpc::closed_error(); + } + v = net::ntoh(*reinterpret_cast*>(buf.get())); + }); + } + + // For messaging_verb + inline auto operator()(output_stream& out, messaging_verb& v) { + bytes b(bytes::initialized_later(), sizeof(v)); + auto _out = b.begin(); + serialize_int32(_out, int32_t(v)); + return out.write(reinterpret_cast(b.c_str()), sizeof(v)); + } + inline auto operator()(input_stream& in, messaging_verb& v) { + return in.read_exactly(sizeof(v)).then([&v] (temporary_buffer buf) mutable { + if (buf.size() != sizeof(v)) { + throw rpc::closed_error(); + } + bytes_view bv(reinterpret_cast(buf.get()), sizeof(v)); + v = messaging_verb(read_simple(bv)); + }); + } + + // For sstring + inline auto operator()(output_stream& out, sstring& v) { + auto sz = serialize_int16_size + v.size(); + bytes b(bytes::initialized_later(), sz); + auto _out = b.begin(); + serialize_string(_out, v); + return out.write(reinterpret_cast(b.c_str()), sz); + } + inline auto operator()(input_stream& in, sstring& v) { + return in.read_exactly(serialize_int16_size).then([&in, &v] (temporary_buffer buf) mutable { + if (buf.size() != serialize_int16_size) { + throw rpc::closed_error(); + } + size_t sz = net::ntoh(*reinterpret_cast*>(buf.get())); + return in.read_exactly(sz).then([sz, &v] (temporary_buffer buf) mutable { + if (buf.size() != sz) { + throw rpc::closed_error(); + } + bytes_view bv(reinterpret_cast(buf.get()), sz); + v = read_simple_short_string(bv); + return make_ready_future<>(); + }); + }); + } + + // For complex types which have serialize()/deserialize(), e.g. gms::gossip_digest_syn, gms::gossip_digest_ack2 + template + inline auto operator()(output_stream& out, T&& v, std::enable_if_t>::value && + !std::is_enum>::value, void*> = nullptr) { + auto sz = serialize_int32_size + v.serialized_size(); + bytes b(bytes::initialized_later(), sz); + auto _out = b.begin(); + serialize_int32(_out, int32_t(sz - serialize_int32_size)); + v.serialize(_out); + return out.write(reinterpret_cast(b.c_str()), sz); + } + template + inline auto operator()(input_stream& in, T& v, std::enable_if_t::value && + !std::is_enum::value, void*> = nullptr) { + return in.read_exactly(serialize_int32_size).then([&in, &v] (temporary_buffer buf) mutable { + if (buf.size() != serialize_int32_size) { + throw rpc::closed_error(); + } + size_t sz = net::ntoh(*reinterpret_cast*>(buf.get())); + return in.read_exactly(sz).then([sz, &v] (temporary_buffer buf) mutable { + if (buf.size() != sz) { + throw rpc::closed_error(); + } + bytes_view bv(reinterpret_cast(buf.get()), sz); + v = v.deserialize(bv); + return make_ready_future<>(); + }); + }); + } + + // For std::tuple + inline auto operator()(output_stream& out, std::tuple& v) { + auto& x = std::get<0>(v); + auto f = operator()(out, x); + return f.then([this, &out, &v]{ + auto& y = std::get<1>(v); + return operator()(out, y); + }); + } + inline auto operator()(input_stream& in, std::tuple& v) { + auto& x = std::get<0>(v); + auto f = operator()(in, x); + return f.then([this, &in, &v]{ + auto& y = std::get<1>(v); + return operator()(in, y); + }); + } +}; + +class messaging_service { +public: + struct shard_id { + gms::inet_address addr; + uint32_t cpu_id; + friend inline bool operator==(const shard_id& x, const shard_id& y) { + return x.addr == y.addr && x.cpu_id == y.cpu_id ; + } + friend inline bool operator<(const shard_id& x, const shard_id& y) { + if (x.addr < y.addr) { + return true; + } else if (y.addr < x.addr) { + return false; + } else { + return x.cpu_id < y.cpu_id; + } + } + friend inline std::ostream& operator<<(std::ostream& os, const shard_id& x) { + return os << x.addr << ":" << x.cpu_id; + } + struct hash { + size_t operator()(const shard_id& id) const { + return std::hash()(id.cpu_id) + std::hash()(id.addr.raw_addr()); + } + }; + }; + struct shard_info { + shard_info(std::unique_ptr::client>&& client) + : rpc_client(std::move(client)) { + } + std::unique_ptr::client> rpc_client; + }; + struct handler_base { + }; + template + struct handler : public handler_base { + std::function rpc_handler; + handler(std::function&& rpc_handler_) : rpc_handler(std::move(rpc_handler_)) {} + }; +private: + static constexpr const uint16_t _port_base = 7000; + uint16_t _port; + rpc::protocol _rpc; + rpc::protocol::server _server; + std::unordered_map _clients; + std::unordered_map> _handlers; +public: + messaging_service() + : _port(_port_base + engine().cpu_id()) + , _rpc(serializer{}) + , _server(_rpc, ipv4_addr{_port}) { + } +public: + uint16_t port_min() { + return _port_base; + } + uint16_t port_max() { + return _port_base + smp::count - 1; + } + future<> stop() { + return make_ready_future<>(); + } + + static auto no_wait() { + return rpc::no_wait; + } +private: + template + struct tuple_to_handler_type; + + template + struct tuple_to_handler_type> { + using type = handler::client&, Args...)>; + }; + + template + struct tuple_to_handler_type_oneway; + + template + struct tuple_to_handler_type_oneway> { + using type = handler(rpc::protocol::client&, Args...)>; + }; +public: + // Register a handler (a callback lambda) for verb + template + void register_handler(messaging_verb verb, Func&& func) { + auto rpc_handler = _rpc.register_handler(verb, std::move(func)); + using Ret = typename function_traits::return_type; + using ArgsTuple = typename function_traits::args_as_tuple; + using handler_type = typename tuple_to_handler_type::type; + _handlers.emplace(verb, std::make_unique(std::move(rpc_handler))); + } + + template + void register_handler_oneway(messaging_verb verb, Func&& func) { + auto rpc_handler = _rpc.register_handler(verb, std::move(func)); + using Ret = typename function_traits::return_type; + using ArgsTuple = typename function_traits::args_as_tuple; + using handler_type = typename tuple_to_handler_type_oneway::type; + _handlers.emplace(verb, std::make_unique(std::move(rpc_handler))); + } + + // Send a message for verb + template + future send_message(messaging_verb verb, shard_id id, MsgOut... msg) { + auto& rpc_client = get_rpc_client(id); + auto& rpc_handler = get_rpc_handler(verb); + return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future f) -> future { + try { + auto ret = f.get(); + return make_ready_future(std::move(std::get<0>(ret))); + } catch (std::runtime_error&) { + remove_rpc_client(id); + throw; + } + }); + } + + template + future<> send_message_oneway(messaging_verb verb, shard_id id, MsgOut... msg) { + auto& rpc_client = get_rpc_client(id); + auto& rpc_handler = get_rpc_handler_oneway(verb); + return rpc_handler(rpc_client, std::move(msg)...).then_wrapped([this, id] (future<> f) -> future<> { + try { + f.get(); + return make_ready_future<>(); + } catch (std::runtime_error&) { + remove_rpc_client(id); + throw; + } + }); + } +private: + // Return rpc::protocol::client for a shard which is a ip + cpuid pair. + rpc::protocol::client& get_rpc_client(shard_id id) { + auto it = _clients.find(id); + if (it == _clients.end()) { + auto remote_addr = ipv4_addr(id.addr.raw_addr(), _port_base + id.cpu_id); + auto client = std::make_unique::client>(_rpc, remote_addr); + it = _clients.emplace(id, shard_info(std::move(client))).first; + return *it->second.rpc_client; + } else { + return *it->second.rpc_client; + } + } + + void remove_rpc_client(shard_id id) { + _clients.erase(id); + } + + // Return a std::function for verb + // which can be used by rpc client to start a rpc call + template + auto& get_rpc_handler(messaging_verb verb) { + handler_base* h = _handlers[verb].get(); + using handler_type = handler(rpc::protocol::client&, MsgOut...)>; + return static_cast(h)->rpc_handler; + } + + template + auto& get_rpc_handler_oneway(messaging_verb verb) { + handler_base* h = _handlers[verb].get(); + using handler_type = handler(rpc::protocol::client&, MsgOut...)>; + return static_cast(h)->rpc_handler; + } +}; + +extern distributed _the_messaging_service; + +inline distributed& get_messaging_service() { + return _the_messaging_service; +} + +inline messaging_service& get_local_messaging_service() { + return _the_messaging_service.local(); +} + +} // namespace net diff --git a/tests/message.cc b/tests/message.cc new file mode 100644 index 0000000000..ebc9f4fea1 --- /dev/null +++ b/tests/message.cc @@ -0,0 +1,185 @@ +#include "core/reactor.hh" +#include "core/app-template.hh" +#include "core/sstring.hh" +#include "message/messaging_service.hh" +#include "gms/gossip_digest_syn.hh" +#include "gms/gossip_digest_ack.hh" +#include "gms/gossip_digest_ack2.hh" +#include "gms/gossip_digest.hh" +#include "core/sleep.hh" + +using namespace std::chrono_literals; +using namespace net; + +struct empty_msg { + void serialize(bytes::iterator& out) const { + } + static empty_msg deserialize(bytes_view& v) { + return empty_msg(); + } + size_t serialized_size() const { + return 0; + } + friend inline std::ostream& operator<<(std::ostream& os, const empty_msg& ack) { + return os << "empty_msg"; + } +}; + +class tester { +private: + messaging_service& ms; + gms::inet_address _server; + uint32_t _cpuid; +public: + tester() + : ms(get_local_messaging_service()) { + } + using shard_id = net::messaging_service::shard_id; + using inet_address = gms::inet_address; + using endpoint_state = gms::endpoint_state; + shard_id get_shard_id() { + return shard_id{_server, _cpuid}; + } + void set_server_ip(sstring ip) { + _server = inet_address(ip); + } + void set_server_cpuid(uint32_t cpu) { + _cpuid = cpu; + } + future<> stop() { + return make_ready_future<>(); + } +public: + void init_handler() { + ms.register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [] (gms::gossip_digest_syn msg) { + print("Server got syn msg = %s\n", msg); + auto ep1 = inet_address("1.1.1.1"); + auto ep2 = inet_address("2.2.2.2"); + int32_t gen = 800; + int32_t ver = 900; + std::vector digests{ + {ep1, gen++, ver++}, + {ep2, gen++, ver++}, + }; + std::map eps{ + {ep1, endpoint_state()}, + {ep2, endpoint_state()}, + }; + gms::gossip_digest_ack ack(std::move(digests), std::move(eps)); + return make_ready_future(ack); + }); + ms.register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [] (gms::gossip_digest_ack2 msg) { + print("Server got ack2 msg = %s\n", msg); + return messaging_service::no_wait(); + }); + ms.register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [] (empty_msg msg) { + print("Server got shutdown msg = %s\n", msg); + return messaging_service::no_wait(); + }); + ms.register_handler(messaging_verb::ECHO, [] (int x, long y) { + print("Server got echo msg = (%d, %ld) \n", x, y); + std::tuple ret(x*x, y*y); + return make_ready_future(std::move(ret)); + }); + } + +public: + future<> test_gossip_digest() { + print("=== %s ===\n", __func__); + // Prepare gossip_digest_syn message + auto id = get_shard_id(); + auto ep1 = inet_address("1.1.1.1"); + auto ep2 = inet_address("2.2.2.2"); + int32_t gen = 100; + int32_t ver = 900; + std::vector digests{ + {ep1, gen++, ver++}, + {ep2, gen++, ver++}, + }; + gms::gossip_digest_syn syn("my_cluster", "my_partition", digests); + using RetMsg = gms::gossip_digest_ack; + return ms.send_message(messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(syn)).then([this, id] (RetMsg ack) { + print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack); + // Prepare gossip_digest_ack2 message + auto ep1 = inet_address("3.3.3.3"); + std::map eps{ + {ep1, endpoint_state()}, + }; + gms::gossip_digest_ack2 ack2(std::move(eps)); + return ms.send_message_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(ack2)).then([] () { + print("Client sent gossip_digest_ack2 got reply = void\n"); + return make_ready_future<>(); + }); + }); + } + + future<> test_gossip_shutdown() { + print("=== %s ===\n", __func__); + auto id = get_shard_id(); + empty_msg msg; + return ms.send_message_oneway(messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(msg)).then([] () { + print("Client sent gossip_shutdown got reply = void\n"); + return make_ready_future<>(); + }); + } + + future<> test_echo() { + print("=== %s ===\n", __func__); + auto id = get_shard_id(); + int msg1 = 30; + int msg2 = 60; + using RetMsg = std::tuple; + return ms.send_message(messaging_verb::ECHO, id, msg1, msg2).then([] (RetMsg msg) { + print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg)); + return sleep(100ms).then([]{ + return make_ready_future<>(); + }); + }); + } +}; + +namespace bpo = boost::program_options; + +int main(int ac, char ** av) { + app_template app; + app.add_options() + ("server", bpo::value(), "Server ip") + ("cpuid", bpo::value()->default_value(0), "Server cpuid"); + return app.run(ac, av, [&] { + net::get_messaging_service().start().then([&] () { + auto&& config = app.configuration(); + auto testers = new distributed; + testers->start().then([testers]{ + auto& server = net::get_local_messaging_service(); + auto min = server.port_min(); + auto max = server.port_max(); + std::cout << "Messaging server listening on ports " << min << " to " << max << " ...\n"; + return testers->invoke_on_all(&tester::init_handler); + }).then([testers, config] { + auto t = &testers->local(); + if (!config.count("server")) { + return; + } + auto ip = config["server"].as(); + auto cpuid = config["cpuid"].as(); + t->set_server_ip(ip); + t->set_server_cpuid(cpuid); + print("=============TEST START===========\n"); + print("Sending to server ....\n"); + t->test_gossip_digest().then([testers, t] { + return t->test_gossip_shutdown(); + }).then([testers, t] { + return t->test_echo(); + }).then([testers, t] { + print("=============TEST DONE===========\n"); + testers->stop().then([testers] { + delete testers; + net::get_messaging_service().stop().then([]{ + engine().exit(0); + }); + }); + }); + }); + }); + }); +}