From 18f35236dbd29e5e9fad75a73d9832edb63b6e03 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Sun, 25 Jan 2015 14:49:53 +0200 Subject: [PATCH 1/6] memory: Move page_size, page_bits and huge page size definitions to header They are going to be used in more places (not just in memory.cc). Signed-off-by: Vlad Zolotarov --- core/memory.cc | 3 --- core/memory.hh | 5 +++++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/memory.cc b/core/memory.cc index 1c931d1871..3598432129 100644 --- a/core/memory.cc +++ b/core/memory.cc @@ -56,9 +56,6 @@ namespace memory { -static constexpr const size_t page_bits = 12; -static constexpr const size_t page_size = 1 << page_bits; -static constexpr const size_t huge_page_size = 512 * page_size; static constexpr const unsigned cpu_id_shift = 36; // FIXME: make dynamic static constexpr const unsigned max_cpus = 256; static constexpr const size_t cache_line_size = 64; diff --git a/core/memory.hh b/core/memory.hh index 32744ad3fb..7c4aae9191 100644 --- a/core/memory.hh +++ b/core/memory.hh @@ -12,6 +12,11 @@ namespace memory { +// TODO: Use getpagesize() in order to learn a size of a system PAGE. +static constexpr const size_t page_bits = 12; +static constexpr const size_t page_size = 1 << page_bits; // 4K +static constexpr const size_t huge_page_size = 512 * page_size; // 2M + void configure(std::vector m, std::experimental::optional hugetlbfs_path = {}); From d4cddbc3d0851611cf75531b94102419ccf4cc66 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Wed, 24 Dec 2014 20:36:01 +0200 Subject: [PATCH 2/6] DPDK: Use separate pools for Rx and Tx queues and adjust their sizes There is no reason for Rx and Tx pools to be of the same size: Rx pool is 3 times the ring size to give the upper layers some time to free the Rx buffers before the ring stalls with no buffers. Tx has absolutely different constraints: since it provides a back pressure to the upper layers if HW doesn't keep up there is no need to allow more buffers in the air than the amount we may send in a single rte_eth_tx_burst() call. Therefore we need 2 times HW ring size buffers since HW may release the whole ring of buffers in a single rte_eth_tx_burst() call and thus we may be able to place another whole ring of buffers in the same call. Signed-off-by: Vlad Zolotarov New in v4: - Fixed the info message. --- net/dpdk.cc | 59 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/net/dpdk.cc b/net/dpdk.cc index 0c17b2456b..d5167378bf 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -39,7 +39,16 @@ namespace dpdk { /******************* Net device related constatns *****************************/ -static constexpr uint16_t mbufs_per_queue = 1536; +static constexpr uint16_t default_ring_size = 512; + +static constexpr uint16_t mbufs_per_queue_rx = 3 * default_ring_size; + +// +// No need to keep more descriptors in the air than can be sent in a single +// rte_eth_tx_burst() call. +// +static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size; + static constexpr uint16_t mbuf_cache_size = 512; static constexpr uint16_t mbuf_overhead = sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; @@ -50,9 +59,6 @@ static constexpr uint8_t max_frags = 32; static constexpr uint16_t mbuf_size = mbuf_data_size + mbuf_overhead; -static constexpr uint16_t default_rx_ring_size = 512; -static constexpr uint16_t default_tx_ring_size = 512; - #ifdef RTE_VERSION_1_7 /* * RX and TX Prefetch, Host, and Write-back threshold values should be @@ -245,7 +251,7 @@ private: private: dpdk_device* _dev; uint8_t _qid; - rte_mempool* _pktmbuf_pool; + rte_mempool *_pktmbuf_pool_rx, *_pktmbuf_pool_tx; reactor::poller _rx_poller; std::vector _tx_burst; uint16_t _tx_burst_idx = 0; @@ -400,24 +406,30 @@ void dpdk_device::init_port_fini() bool dpdk_qp::init_mbuf_pools() { - // Allocate the same amount of buffers for Rx and Tx. - const unsigned num_mbufs = 2 * mbufs_per_queue; sstring name = to_sstring(pktmbuf_pool_name) + to_sstring(_qid); /* don't pass single-producer/single-consumer flags to mbuf create as it * seems faster to use a cache instead */ - printf("Creating mbuf pool '%s' [%u mbufs] ...\n", name.c_str(), num_mbufs); + printf("Creating mbuf pools '%s_rx/_tx' [%u and %u mbufs respectively] ...\n", + name.c_str(), mbufs_per_queue_rx, mbufs_per_queue_tx); - // - // We currently allocate a one big mempool on the current CPU to fit all - // requested queues. - // TODO: Allocate a separate pool for each queue on the appropriate CPU. - // - _pktmbuf_pool = rte_mempool_create(name.c_str(), num_mbufs, - mbuf_size, mbuf_cache_size, - sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, - NULL, rte_pktmbuf_init, NULL, rte_socket_id(), 0); + _pktmbuf_pool_rx = + rte_mempool_create((name + to_sstring("_rx")).c_str(), + mbufs_per_queue_rx, + mbuf_size, mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + rte_pktmbuf_init, NULL, + rte_socket_id(), 0); + _pktmbuf_pool_tx = + rte_mempool_create((name + to_sstring("_tx")).c_str(), + mbufs_per_queue_tx, + mbuf_size, mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + rte_pktmbuf_init, NULL, + rte_socket_id(), 0); - return _pktmbuf_pool != NULL; + return _pktmbuf_pool_rx != NULL && _pktmbuf_pool_tx != NULL; } void dpdk_device::check_port_link_status() @@ -462,16 +474,13 @@ dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid) rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n"); } - const uint16_t rx_ring_size = default_rx_ring_size; - const uint16_t tx_ring_size = default_tx_ring_size; - - if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, rx_ring_size, + if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, default_ring_size, rte_eth_dev_socket_id(_dev->port_idx()), - _dev->def_rx_conf(), _pktmbuf_pool) < 0) { + _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) { rte_exit(EXIT_FAILURE, "Cannot initialize rx queue\n"); } - if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, tx_ring_size, + if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, default_ring_size, rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_tx_conf()) < 0) { rte_exit(EXIT_FAILURE, "Cannot initialize tx queue\n"); } @@ -537,7 +546,7 @@ bool dpdk_qp::poll_rx_once() size_t dpdk_qp::copy_one_data_buf(rte_mbuf*& m, char* data, size_t l) { - m = rte_pktmbuf_alloc(_pktmbuf_pool); + m = rte_pktmbuf_alloc(_pktmbuf_pool_tx); if (!m) { return 0; } From 82e20564b0d3bf32fc0e521799db3ca3ea39b657 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 27 Jan 2015 15:06:56 +0200 Subject: [PATCH 3/6] DPDK: Initialize mempools to work with external memory If seastar is configured to use hugetlbfs initialize mempools with external memory buffer. This way we are going to better control the overall memory consumption. Signed-off-by: Vlad Zolotarov New in v2: - Use char* instead of void* for pointer's arithmetics. --- net/dpdk.cc | 147 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 16 deletions(-) diff --git a/net/dpdk.cc b/net/dpdk.cc index d5167378bf..7ee0bdb7ca 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -12,6 +12,7 @@ #include "core/circular_buffer.hh" #include "core/align.hh" #include "core/sstring.hh" +#include "core/memory.hh" #include "util/function_input_iterator.hh" #include "util/transform_iterator.hh" #include @@ -24,6 +25,7 @@ #include "toeplitz.hh" #include +#include #include #include @@ -195,16 +197,50 @@ public: class dpdk_qp : public net::qp { public: - explicit dpdk_qp(dpdk_device* dev, uint8_t qid); + explicit dpdk_qp(dpdk_device* dev, uint8_t qid, + bool huge_pages_mem_backend); virtual future<> send(packet p) override { abort(); } virtual uint32_t send(circular_buffer& p) override; + + virtual ~dpdk_qp() { + // TODO: Free all mempools + if (_rx_xmem) { + free(_rx_xmem); + } + + if (_tx_xmem) { + free(_tx_xmem); + } + } private: bool init_mbuf_pools(); + /** + * Allocates a memory chunk to accommodate the given number of buffers of + * the given size and fills a vector with underlying physical pages. + * + * The chunk is going to be used as an external memory buffer of the DPDK + * memory pool (created using rte_mempool_xmem_create()). + * + * The chunk size if calculated using rte_mempool_xmem_size() function. + * + * @param num_bufs Number of buffers (in) + * @param buf_sz Size of each buffer (in) + * @param mappings vector of physical pages (out) + * + * @note this function assumes that "mappings" is properly set and adds the + * mappings to the back of the vector. + * + * @return a virtual address of the allocated memory chunk or nullptr in + * case of a failure. + */ + void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, + std::vector& mappings); + /** * Polls for a burst of incoming packets. This function will not block and * will immediately return after processing all available packets. @@ -250,8 +286,10 @@ private: rte_mbuf* create_tx_mbuf(packet& p); private: dpdk_device* _dev; + bool _huge_pages_mem_backend = false; uint8_t _qid; rte_mempool *_pktmbuf_pool_rx, *_pktmbuf_pool_tx; + void *_rx_xmem = nullptr, *_tx_xmem = nullptr; reactor::poller _rx_poller; std::vector _tx_burst; uint16_t _tx_burst_idx = 0; @@ -404,30 +442,107 @@ void dpdk_device::init_port_fini() printf("Created DPDK device\n"); } +void* dpdk_qp::alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, + std::vector& mappings) +{ + using namespace memory; + char* xmem; + + size_t xmem_size = rte_mempool_xmem_size(num_bufs, buf_sz, page_bits); + + // Aligning to 2M causes the further failure in small allocations. + // TODO: Check why - and fix. + xmem = (char*)memalign(page_size, xmem_size); + if (!xmem) { + printf("Can't allocate %ld bytes aligned to %ld\n", + xmem_size, page_size); + return nullptr; + } + + for (size_t i = 0; i < xmem_size / page_size; ++i) { + translation tr = translate(xmem + i * page_size, page_size); + assert(tr.size); + mappings.push_back(tr.addr); + } + + return xmem; +} + bool dpdk_qp::init_mbuf_pools() { + using namespace memory; sstring name = to_sstring(pktmbuf_pool_name) + to_sstring(_qid); - /* don't pass single-producer/single-consumer flags to mbuf create as it - * seems faster to use a cache instead */ - printf("Creating mbuf pools '%s_rx/_tx' [%u and %u mbufs respectively] ...\n", + + printf("Creating mbuf pools '%s_rx/_tx' " + "[%u and %u mbufs respectively] ...\n", name.c_str(), mbufs_per_queue_rx, mbufs_per_queue_tx); - _pktmbuf_pool_rx = - rte_mempool_create((name + to_sstring("_rx")).c_str(), - mbufs_per_queue_rx, - mbuf_size, mbuf_cache_size, + // + // If we have a hugetlbfs memory backend we may perform a virt2phys + // translation and memory is "pinned". Therefore we may provide an external + // memory for DPDK pools and this way significantly reduce the memory needed + // for the DPDK in this case. + // + if (_huge_pages_mem_backend) { + std::vector mappings; + + _rx_xmem = alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_size, mappings); + if (!_rx_xmem) { + printf("Can't allocate a memory for Rx buffers\n"); + return false; + } + + // + // Don't pass single-producer/single-consumer flags to mbuf create as it + // seems faster to use a cache instead. + // + _pktmbuf_pool_rx = + rte_mempool_xmem_create((name + to_sstring("_rx")).c_str(), + mbufs_per_queue_rx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + rte_pktmbuf_init, NULL, + rte_socket_id(), 0, + _rx_xmem, mappings.data(), mappings.size(), + page_bits); + + mappings.clear(); + _tx_xmem = alloc_mempool_xmem(mbufs_per_queue_tx, mbuf_size, mappings); + if (!_tx_xmem) { + printf("Can't allocate a memory for Tx buffers\n"); + return false; + } + + _pktmbuf_pool_tx = + rte_mempool_xmem_create((name + to_sstring("_tx")).c_str(), + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, - rte_socket_id(), 0); - _pktmbuf_pool_tx = + rte_socket_id(), 0, + _tx_xmem, mappings.data(), mappings.size(), + page_bits); + } else { + _pktmbuf_pool_rx = + rte_mempool_create((name + to_sstring("_rx")).c_str(), + mbufs_per_queue_rx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, NULL, + rte_pktmbuf_init, NULL, + rte_socket_id(), 0); + + _pktmbuf_pool_tx = rte_mempool_create((name + to_sstring("_tx")).c_str(), - mbufs_per_queue_tx, - mbuf_size, mbuf_cache_size, + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, NULL, rte_pktmbuf_init, NULL, rte_socket_id(), 0); + } return _pktmbuf_pool_rx != NULL && _pktmbuf_pool_tx != NULL; } @@ -466,9 +581,9 @@ void dpdk_device::check_port_link_status() t->arm_periodic(check_interval); } - -dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid) - : _dev(dev), _qid(qid), _rx_poller([&] { return poll_rx_once(); }) +dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid, bool huge_pages_mem_backend) + : _dev(dev), _huge_pages_mem_backend(huge_pages_mem_backend), _qid(qid), + _rx_poller([&] { return poll_rx_once(); }) { if (!init_mbuf_pools()) { rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n"); @@ -745,7 +860,7 @@ void dpdk_device::get_rss_table() #endif std::unique_ptr dpdk_device::init_local_queue(boost::program_options::variables_map opts, uint16_t qid) { - auto qp = std::make_unique(this, qid); + auto qp = std::make_unique(this, qid, opts.count("hugepages")); smp::submit_to(_home_cpu, [this] () mutable { if (++_queues_ready == _num_queues) { init_port_fini(); From 46b6644c35fade766eada9bf228862dc13f1a54b Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 26 Jan 2015 18:35:32 +0200 Subject: [PATCH 4/6] DPDK: add a function that returns a number of bytes needed for each QP's mempool objects This function is needed when we want to estimate a number of memory we want to give to DPDK when we can provide a mempool an external memory buffer. Signed-off-by: Vlad Zolotarov --- net/dpdk.cc | 24 ++++++++++++++++++++++++ net/dpdk.hh | 7 +++++++ 2 files changed, 31 insertions(+) diff --git a/net/dpdk.cc b/net/dpdk.cc index 7ee0bdb7ca..24af926d78 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -61,6 +61,30 @@ static constexpr uint8_t max_frags = 32; static constexpr uint16_t mbuf_size = mbuf_data_size + mbuf_overhead; +uint32_t qp_mempool_obj_size() +{ + uint32_t mp_size = 0; + struct rte_mempool_objsz mp_obj_sz = {}; + + // + // We will align each size to huge page size because DPDK allocates + // physically contiguous memory region for each pool object. + // + + // Rx + mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, 0, &mp_obj_sz) + + sizeof(struct rte_pktmbuf_pool_private), + memory::huge_page_size); + //Tx + std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz)); + mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, + MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET, + &mp_obj_sz) + + sizeof(struct rte_pktmbuf_pool_private), + memory::huge_page_size); + return mp_size; +} + #ifdef RTE_VERSION_1_7 /* * RX and TX Prefetch, Host, and Write-back threshold values should be diff --git a/net/dpdk.hh b/net/dpdk.hh index b625d8914c..3c762eebcc 100644 --- a/net/dpdk.hh +++ b/net/dpdk.hh @@ -17,6 +17,13 @@ std::unique_ptr create_dpdk_net_device( boost::program_options::options_description get_dpdk_net_options_description(); +namespace dpdk { +/** + * @return Number of bytes needed for mempool objects of each QP. + */ +uint32_t qp_mempool_obj_size(); +} + #endif // _SEASTAR_DPDK_DEV_H #endif // HAVE_DPDK From 4d0f2d3e4cb6143f34ae72e3bdd816cb3ef82569 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Mon, 26 Jan 2015 18:39:31 +0200 Subject: [PATCH 5/6] DPDK_RTE: Give rte_eal_init() -m parameter when we use hugetlbfs When we use hugetlbfs we will give mempools external buffer for allocations but the mempool internals still need memory. We will assume that each CPU core is going to have a HW QP ("worst" case) and provide the DPDK with enough memory to be able to allocate them all. The memory above is subtracted from the total amount of memory given to the application (with -m seastar application parameter). Signed-off-by: Vlad Zolotarov --- core/dpdk_rte.cc | 30 ++++++++++++++++++++++++++++++ core/dpdk_rte.hh | 7 +++++++ core/reactor.cc | 20 ++++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/core/dpdk_rte.cc b/core/dpdk_rte.cc index 3d98ae1dbe..169b61f2f2 100644 --- a/core/dpdk_rte.cc +++ b/core/dpdk_rte.cc @@ -1,5 +1,6 @@ #ifdef HAVE_DPDK +#include "net/dpdk.hh" #include "core/dpdk_rte.hh" #include "util/conversions.hh" #include @@ -35,6 +36,18 @@ void eal::init(cpuset cpus, boost::program_options::variables_map opts) if (hugepages_path) { args.push_back(string2vector("--huge-dir")); args.push_back(string2vector(hugepages_path.value())); + + // + // We don't know what is going to be our networking configuration so we + // assume there is going to be a queue per-CPU. Plus we'll give a DPDK + // 64MB for "other stuff". + // + size_t size_MB = mem_size(cpus.count()) >> 20; + std::stringstream size_MB_str; + size_MB_str << size_MB; + + args.push_back(string2vector("-m")); + args.push_back(string2vector(size_MB_str.str())); } else if (!opts.count("dpdk-pmd")) { args.push_back(string2vector("--no-huge")); } @@ -61,6 +74,23 @@ void eal::init(cpuset cpus, boost::program_options::variables_map opts) initialized = true; } +size_t eal::mem_size(int num_cpus) +{ + size_t memsize = 0; + // + // PMD mempool memory: + // + // We don't know what is going to be our networking configuration so we + // assume there is going to be a queue per-CPU. + // + memsize += num_cpus * qp_mempool_obj_size(); + + // Plus we'll give a DPDK 64MB for "other stuff". + memsize += (64UL << 20); + + return memsize; +} + } // namespace dpdk #endif // HAVE_DPDK diff --git a/core/dpdk_rte.hh b/core/dpdk_rte.hh index 8368df93a2..8ac70439c0 100644 --- a/core/dpdk_rte.hh +++ b/core/dpdk_rte.hh @@ -52,6 +52,13 @@ public: using cpuset = std::bitset; static void init(cpuset cpus, boost::program_options::variables_map opts); + /** + * Returns the amount of memory needed for DPDK + * @param num_cpus Number of CPUs the application is going to use + * + * @return + */ + static size_t mem_size(int num_cpus); static bool initialized; }; diff --git a/core/reactor.cc b/core/reactor.cc index 96643cbf3c..59a235e90d 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1268,6 +1268,26 @@ void smp::configure(boost::program_options::variables_map configuration) resource::configuration rc; if (configuration.count("memory")) { rc.total_memory = parse_memory_size(configuration["memory"].as()); +#ifdef HAVE_DPDK + if (configuration.count("hugepages") && + !configuration["network-stack"].as().compare("native") && + configuration.count("dpdk-pmd")) { + size_t dpdk_memory = dpdk::eal::mem_size(smp::count); + + if (dpdk_memory >= rc.total_memory) { + std::cerr<<"Can't run with the given amount of memory: "; + std::cerr<(); + std::cerr<<". Consider giving more."<()); From 21f4c88c85c83a5be6d0a3db51e4439698135774 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Tue, 3 Feb 2015 20:33:12 +0200 Subject: [PATCH 6/6] DPDK: zero_copy_tx - initial attempt Send packets without copying fragments data: - Poll all the Tx descriptors and place them into a circular_buffer. We will take them from there when we need to send new packets. - PMD will return the completed buffers descriptors to the Tx mempool. This way we are going to know that we may release the buffer. - "move" the packet object into the last segment's descriptor's private data. When this fragment is completed means the whole packet has been sent and its memory may be released. So, we will do it by calling the packet's destructor. Exceptions: - Copy if hugepages backend is not enabled. - Copy when we failed to send in a zero-copy flow (e.g. when we failed to translate a buffer virtual address). - Copy if first frag requires fragmentation below 128 bytes level - this is in order to avoid headers splitting. Signed-off-by: Vlad Zolotarov New in v5: - NULL -> nullptr across the board. - Removed unused macros: MBUF_ZC_PRIVATE() and max_frags_zc. - Improved the local variables localization according to Nadav's remarks. - tx_buf class: - Don't regress the whole packet to the copy-send if a single fragment failed to be sent in a zero-copy manner (e.g. its data failed the virt2phys translation). Send only such a fragment in a copy way and try to send the rest of the fragments in a zero-copy way. - Make set_packet() receive packet&&. - Fixed the comments in check_frag0(): we check first 128 bytes and not first 2KB. starting from v2. - Use assert() instead of rte_exit() in do_one_frag(). - Rename in set_one_data_buf() and in copy_one_data_buf(): l -> buf_len - Improve the assert about the size of private data in the tx_buf class: - Added two MARKER fields at the beginning and at the end of the private fields section which are going to be allocated on the mbuf's private data section. - Assert on the distance between these two markers. - Replace the sanity_check() (checks that packet doesn't have a zero-length) in a copy-flow by an assert() in a general function since this check is relevant both for a copy and for a zero-copy flows. - Make a sanity_check to be explicitly called frag0_check. - Make from_packet() receive packet&&. - In case frag0_check() fails - copy only the first fragment and not the whole packet. - tx_buf_factory class: - Change the interface to work with tx_buf* instead of tx_buf&. - Better utilize for-loop facilities in gc(). - Kill the extra if() in the init_factory(). - Use std::deque instead of circular_buffer for storing elements in tx_buf_factory. - Optimize the tx_buf_factory::get(): - First take the completed buffers from the mempool and only if there aren't any - take from the factory's cache. - Make Tx mempools using cache: this significantly improves the performance despite the fact that it's not the right mempool configuration for a single-producer+single-consumer mode. - Remove empty() and size() methods. - Add comments near the assert()s in the fast-path. - Removed the not-needed "inline" qualifiers: - There is no need to specify "inline" qualifier for in-class defined methods INCLUDING static methods. - Defining process_packets() and poll_rx_once() as inline degraded the performance by about 1.5%. - Added a _tx_gc_poller: it will call tx_buf_factory::gc(). - Don't check a pointer before calling free(). - alloc_mempool_xmem(): Use posix_memalign() instead of memalign(). New in v4: - Improve the info messages. - Simplified the mempool name creation code. - configure.py: Opt-out the invalid-offsetof compilation warning. New in v3: - Add missing macros definitions dropped in v2 by mistake. New in v2: - Use Tx mbufs in a LIFO way for better cache utilization. - Lower the frag0 non-split thresh to 128 bytes. - Use new (iterators) semantics in circular_buffer. - Use optional for storing the packing in the mbuf. - Use rte_pktmbuf_alloc() instead of __rte_mbuf_raw_alloc(). - Introduce tx_buf class: - Hide the private rte_mbuf area handling. - Hide packet to rte_mbuf cluster translation handling. - Introduce a "Tx buffers factory" class: - Hide the rte_mbuf flow details: mempool->circular_buffer->(PMD->)mempool - Templatization: - Make huge_pages_mem_backend a dpdk_qp class template parameter. - Unite the from_packet_xxx() code into a single template function. - Unite the translate_one_frag() and copy_one_frag() into a single template function. --- configure.py | 2 +- core/dpdk_rte.hh | 6 + net/dpdk.cc | 958 ++++++++++++++++++++++++++++++++++------------- 3 files changed, 708 insertions(+), 258 deletions(-) diff --git a/configure.py b/configure.py index 9214c4270d..2f5e8751de 100755 --- a/configure.py +++ b/configure.py @@ -227,7 +227,7 @@ if args.with_osv: if args.dpdk_target: args.user_cflags = (args.user_cflags + ' -DHAVE_DPDK -I' + - args.dpdk_target + '/include -Wno-error=literal-suffix -Wno-literal-suffix') + args.dpdk_target + '/include -Wno-error=literal-suffix -Wno-literal-suffix -Wno-invalid-offsetof') libs += (' -L' + args.dpdk_target + '/lib ' + '-Wl,--whole-archive -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -Wl,--no-whole-archive -lrte_distributor -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_hash -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_kvargs -lrte_mbuf -lrte_ip_frag -lethdev -lrte_eal -lrte_malloc -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -lrt -lm -ldl') diff --git a/core/dpdk_rte.hh b/core/dpdk_rte.hh index 8ac70439c0..942e3255dc 100644 --- a/core/dpdk_rte.hh +++ b/core/dpdk_rte.hh @@ -28,6 +28,9 @@ #define rte_mbuf_nb_segs(m) ((m)->pkt.nb_segs) #define rte_mbuf_l2_len(m) ((m)->pkt.vlan_macip.f.l2_len) #define rte_mbuf_l3_len(m) ((m)->pkt.vlan_macip.f.l3_len) +#define rte_mbuf_buf_addr(m) ((m)->pkt.buf_addr) +#define rte_mbuf_buf_physaddr(m) ((m)->pkt.buf_physaddr) +#define rte_mbuf_data_off(m) ((m)->pkt.data_off) #else @@ -39,6 +42,9 @@ #define rte_mbuf_nb_segs(m) ((m)->nb_segs) #define rte_mbuf_l2_len(m) ((m)->l2_len) #define rte_mbuf_l3_len(m) ((m)->l3_len) +#define rte_mbuf_buf_addr(m) ((m)->buf_addr) +#define rte_mbuf_buf_physaddr(m) ((m)->buf_physaddr) +#define rte_mbuf_data_off(m) ((m)->data_off) #endif diff --git a/net/dpdk.cc b/net/dpdk.cc index 24af926d78..9de50ff0f9 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include "ip.hh" #include "const.hh" #include "core/dpdk_rte.hh" @@ -40,7 +41,6 @@ using namespace net; namespace dpdk { /******************* Net device related constatns *****************************/ - static constexpr uint16_t default_ring_size = 512; static constexpr uint16_t mbufs_per_queue_rx = 3 * default_ring_size; @@ -56,10 +56,11 @@ static constexpr uint16_t mbuf_overhead = sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; static constexpr size_t mbuf_data_size = 2048; -// MBUF_DATA_SIZE(2K) * 32 = 64K = Max TSO/LRO size -static constexpr uint8_t max_frags = 32; +// (MBUF_DATA_SIZE(2K) * 32 = 64K = Max TSO/LRO size) + 1 mbuf for headers +static constexpr uint8_t max_frags = 32 + 1; -static constexpr uint16_t mbuf_size = mbuf_data_size + mbuf_overhead; +static constexpr uint16_t mbuf_size = + mbuf_data_size + mbuf_overhead; uint32_t qp_mempool_obj_size() { @@ -77,9 +78,7 @@ uint32_t qp_mempool_obj_size() memory::huge_page_size); //Tx std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz)); - mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, - MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET, - &mp_obj_sz) + + mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, 0, &mp_obj_sz) + sizeof(struct rte_pktmbuf_pool_private), memory::huge_page_size); return mp_size; @@ -219,29 +218,665 @@ public: uint8_t port_idx() { return _port_idx; } }; +template class dpdk_qp : public net::qp { + class tx_buf_factory; + + class tx_buf { + friend class dpdk_qp; + public: + static tx_buf* me(rte_mbuf* mbuf) { + return reinterpret_cast(mbuf); + } + + /** + * Creates a tx_buf cluster representing a given packet in a "zero-copy" + * way. + * + * @param p packet to translate + * @param dev Parent dpdk_device + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + static tx_buf* from_packet_zc( + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + return from_packet(check_frag0, translate_one_frag, copy_one_frag, + [](packet&& _p, tx_buf& _last_seg) { + _last_seg.set_packet(std::move(_p)); + }, std::move(p), dev, fc); + } + + /** + * Creates a tx_buf cluster representing a given packet in a "copy" way. + * + * @param p packet to translate + * @param dev Parent dpdk_device + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + static tx_buf* from_packet_copy( + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + return from_packet([](packet& _p) { return true; }, + copy_one_frag, copy_one_frag, + [](packet&& _p, tx_buf& _last_seg) {}, + std::move(p), dev, fc); + } + private: + /** + * Creates a tx_buf cluster representing a given packet using provided + * functors. + * + * @param sanity Functor that performs a packet's sanity checks + * @param do_one_frag Functor that handles a single frag translation + * @param fin Functor that performs a cluster finalization + * @param p packet to translate + * @param dev Parent dpdk_device object + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + template + static tx_buf* from_packet( + FirstFragCheck frag0_check, TrOneFunc do_one_frag, + CopyOneFunc copy_one_frag, FinalizeFunc fin, + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + // Too fragmented - linearize + if (p.nr_frags() > max_frags) { + p.linearize(); + } + + rte_mbuf *head = nullptr, *last_seg = nullptr; + unsigned nsegs = 0; + + // + // Create a HEAD of the fragmented packet: check if frag0 has to be + // copied and if yes - send it in a copy way + // + if (!frag0_check(p)) { + if (!copy_one_frag(fc, p.frag(0), head, last_seg, nsegs)) { + return nullptr; + } + } else if (!do_one_frag(fc, p.frag(0), head, last_seg, nsegs)) { + return nullptr; + } + + unsigned total_nsegs = nsegs; + + for (unsigned i = 1; i < p.nr_frags(); i++) { + rte_mbuf *h = nullptr, *new_last_seg = nullptr; + if (!do_one_frag(fc, p.frag(i), h, new_last_seg, nsegs)) { + me(head)->recycle(); + return nullptr; + } + + total_nsegs += nsegs; + + // Attach a new buffers' chain to the packet chain + rte_mbuf_next(last_seg) = h; + last_seg = new_last_seg; + } + + // Update the HEAD buffer with the packet info + rte_mbuf_pkt_len(head) = p.len(); + rte_mbuf_nb_segs(head) = total_nsegs; + + // Handle TCP checksum offload + auto oi = p.offload_info(); + if (oi.needs_ip_csum) { + head->ol_flags |= PKT_TX_IP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } + if (dev.hw_features().tx_csum_l4_offload) { + if (oi.protocol == ip_protocol_num::tcp) { + head->ol_flags |= PKT_TX_TCP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } else if (oi.protocol == ip_protocol_num::udp) { + head->ol_flags |= PKT_TX_UDP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } + } + + fin(std::move(p), *me(last_seg)); + + return me(head); + } + + /** + * Zero-copy handling of a single net::fragment. + * + * @param do_one_buf Functor responsible for a single rte_mbuf + * handling + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * @return TRUE in case of success + */ + template + static bool do_one_frag(DoOneBufFunc do_one_buf, tx_buf_factory& fc, + fragment& frag, rte_mbuf*& head, + rte_mbuf*& last_seg, unsigned& nsegs) { + // + // TODO: Optimize the small fragments case: merge them into a + // single mbuf. + // + + size_t len, left_to_set = frag.size; + char* base = frag.base; + + rte_mbuf* m; + + // TODO: assert() in a fast path! Remove me ASAP! + assert(frag.size); + + // Create a HEAD of mbufs' cluster and set the first bytes into it + len = do_one_buf(fc, head, base, left_to_set); + if (!len) { + return false; + } + + left_to_set -= len; + base += len; + nsegs = 1; + + // + // Set the rest of the data into the new mbufs and chain them to + // the cluster. + // + rte_mbuf* prev_seg = head; + while (left_to_set) { + len = do_one_buf(fc, m, base, left_to_set); + if (!len) { + me(head)->recycle(); + return false; + } + + left_to_set -= len; + base += len; + nsegs++; + + rte_mbuf_next(prev_seg) = m; + prev_seg = m; + } + + // Return the last mbuf in the cluster + last_seg = prev_seg; + + return true; + } + + /** + * Zero-copy handling of a single net::fragment. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * @return TRUE in case of success + */ + static bool translate_one_frag(tx_buf_factory& fc, fragment& frag, + rte_mbuf*& head, rte_mbuf*& last_seg, + unsigned& nsegs) { + return do_one_frag(set_one_data_buf, fc, frag, head, + last_seg, nsegs); + } + + /** + * Copies one net::fragment into the cluster of rte_mbuf's. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * We return the "last_seg" to avoid traversing the cluster in order to get + * it. + * + * @return TRUE in case of success + */ + static bool copy_one_frag(tx_buf_factory& fc, fragment& frag, + rte_mbuf*& head, rte_mbuf*& last_seg, + unsigned& nsegs) { + return do_one_frag(copy_one_data_buf, fc, frag, head, + last_seg, nsegs); + } + + /** + * Allocates a single rte_mbuf and sets it to point to a given data + * buffer. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param m New allocated rte_mbuf (out) + * @param va virtual address of a data buffer (in) + * @param buf_len length of the data to copy (in) + * + * @return The actual number of bytes that has been set in the mbuf + */ + static size_t set_one_data_buf( + tx_buf_factory& fc, rte_mbuf*& m, char* va, size_t buf_len) { + + using namespace memory; + translation tr = translate(va, buf_len); + + // + // Currently we break a buffer on a 4K boundary for simplicity. + // + // TODO: Optimize it to better utilize the physically continuity of the + // buffer. Note to take into an account a HW limitation for a maximum data + // size per single descriptor (e.g. 15.5K for 82599 devices). + // + phys_addr_t pa = tr.addr; + + if (!tr.size) { + return copy_one_data_buf(fc, m, va, buf_len); + } + + tx_buf* buf = fc.get(); + if (!buf) { + return 0; + } + + size_t page_offset = pa & ~page_mask; + size_t len = std::min(page_size - page_offset, buf_len); + + buf->set_zc_info(va, pa, len); + m = buf->rte_mbuf_p(); + + return len; + } + + /** + * Allocates a single rte_mbuf and copies a given data into it. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param m New allocated rte_mbuf (out) + * @param data Data to copy from (in) + * @param buf_len length of the data to copy (in) + * + * @return The actual number of bytes that has been copied + */ + static size_t copy_one_data_buf( + tx_buf_factory& fc, rte_mbuf*& m, char* data, size_t buf_len) + { + tx_buf* buf = fc.get(); + if (!buf) { + return 0; + } + + size_t len = std::min(buf_len, mbuf_data_size); + + m = buf->rte_mbuf_p(); + + // mbuf_put() + rte_mbuf_data_len(m) = len; + rte_mbuf_pkt_len(m) = len; + + + rte_memcpy(rte_pktmbuf_mtod(m, void*), data, len); + + return len; + } + + /** + * Checks if the first fragment of the given packet satisfies the + * zero-copy flow requirement: its first 128 bytes should not cross the + * 4K page boundary. This is required in order to avoid splitting packet + * headers. + * + * @param p packet to check + * + * @return TRUE if packet is ok and FALSE otherwise. + */ + static bool check_frag0(packet& p) + { + using namespace memory; + // + // First frag is special - it has headers that should not be split. + // If the addressing is such that the first fragment has to be + // split, then send this packet in a (non-zero) copy flow. We'll + // check if the first 128 bytes of the first fragment reside in the + // same page. If that's the case - we are good to go. + // + + uint64_t base = (uint64_t)p.frag(0).base; + uint64_t frag0_page0_len = page_size - (base & ~page_mask); + + if (frag0_page0_len < 128 && + frag0_page0_len < p.frag(0).size) { + return false; + } + + return true; + } + + public: + tx_buf(tx_buf_factory& fc) : _fc(fc) { + + _buf_physaddr = _mbuf.buf_physaddr; + _buf_len = _mbuf.buf_len; + _data_off = _mbuf.data_off; + } + + rte_mbuf* rte_mbuf_p() { return &_mbuf; } + + void set_zc_info(void* va, phys_addr_t pa, size_t len) { + // mbuf_put() + rte_mbuf_data_len(&_mbuf) = len; + rte_mbuf_pkt_len(&_mbuf) = len; + + // Set the mbuf to point to our data + rte_mbuf_buf_addr(&_mbuf) = va; + rte_mbuf_buf_physaddr(&_mbuf) = pa; + rte_mbuf_data_off(&_mbuf) = 0; + _is_zc = true; + } + + void reset_zc() { + + // + // If this mbuf was the last in a cluster and contains an + // original packet object then call the destructor of the + // original packet object. + // + if (_p) { + // + // Reset the std::optional. This in particular is going + // to call the "packet"'s destructor and reset the + // "optional" state to "nonengaged". + // + _p = std::experimental::nullopt; + + } else if (!_is_zc) { + return; + } + + // Restore the rte_mbuf fields we trashed in set_zc_info() + _mbuf.buf_physaddr = _buf_physaddr; + _mbuf.buf_addr = RTE_MBUF_TO_BADDR(&_mbuf); + _mbuf.buf_len = _buf_len; + _mbuf.data_off = _data_off; + + _is_zc = false; + } + + void recycle() { + struct rte_mbuf *m = &_mbuf, *m_next; + + while (m != nullptr) { + m_next = m->next; + // + // Zero only "next" field since we want to save the dirtying of + // the extra cache line. + // There is no need to reset the pkt_len or data_len fields and + // the rest of the fields that are set in the HEAD mbuf of the + // cluster are going to be cleared when the buffer is pooled + // from the mempool and not in this flow. + // + m->next = nullptr; + _fc.put(me(m)); + m = m_next; + } + } + + void set_packet(packet&& p) { + _p = std::move(p); + } + + private: + struct rte_mbuf _mbuf; + MARKER private_start; + std::experimental::optional _p; + phys_addr_t _buf_physaddr; + uint32_t _buf_len; + uint16_t _data_off; + // TRUE if underlying mbuf has been used in the zero-copy flow + bool _is_zc = false; + // buffers' factory the buffer came from + tx_buf_factory& _fc; + MARKER private_end; + }; + + class tx_buf_factory { + // + // Number of buffers to free in each GC iteration: + // We want the buffers to be allocated from the mempool as many as + // possible. + // + // On the other hand if there is no Tx for some time we want the + // completions to be eventually handled. Thus we choose the smallest + // possible packets count number here. + // + static constexpr int gc_count = 1; + public: + tx_buf_factory(uint8_t qid) { + using namespace memory; + + sstring name = sstring(pktmbuf_pool_name) + to_sstring(qid) + "_tx"; + printf("Creating Tx mbuf pool '%s' [%u mbufs] ...\n", + name.c_str(), mbufs_per_queue_tx); + + if (HugetlbfsMemBackend) { + std::vector mappings; + + _xmem = dpdk_qp::alloc_mempool_xmem(mbufs_per_queue_tx, + mbuf_size, mappings); + if (!_xmem) { + printf("Can't allocate a memory for Tx buffers\n"); + exit(1); + } + + // + // We are going to push the buffers from the mempool into + // the circular_buffer and then poll them from there anyway, so + // we prefer to make a mempool non-atomic in this case. + // + _pool = + rte_mempool_xmem_create(name.c_str(), + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0, + _xmem, mappings.data(), + mappings.size(), page_bits); + + } else { + _pool = + rte_mempool_create(name.c_str(), + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0); + } + + if (!_pool) { + printf("Failed to create mempool for Tx\n"); + exit(1); + } + + // + // Fill the factory with the buffers from the mempool allocated + // above. + // + init_factory(); + } + + ~tx_buf_factory() { + // WTF: Hmmm... There is no way to destroy the mempool! + + free(_xmem); + } + + /** + * @note Should not be called if there are no free tx_buf's + * + * @return a free tx_buf object + */ + tx_buf* get() { + // Take completed from the HW first + tx_buf *pkt = get_one_completed(); + if (pkt) { + if (HugetlbfsMemBackend) { + pkt->reset_zc(); + } + + return pkt; + } + + // + // If there are no completed at the moment - take from the + // factory's cache. + // + if (_ring.empty()) { + return nullptr; + } + + pkt = _ring.front(); + _ring.pop_front(); + + return pkt; + } + + void put(tx_buf* buf) { + if (HugetlbfsMemBackend) { + buf->reset_zc(); + } + _ring.push_front(buf); + } + + bool gc() { + for (int cnt = 0; cnt < gc_count; ++cnt) { + auto tx_buf_p = get_one_completed(); + if (!tx_buf_p) { + return false; + } + + put(tx_buf_p); + } + + return true; + } + private: + /** + * Fill the mbufs circular buffer: after this the _pool will become + * empty. We will use it to catch the completed buffers: + * + * - Underlying PMD drivers will "free" the mbufs once they are + * completed. + * - We will poll the _pktmbuf_pool_tx till it's empty and release + * all the buffers from the freed mbufs. + */ + void init_factory() { + while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) { + _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this}); + } + } + + /** + * PMD puts the completed buffers back into the mempool they have + * originally come from. + * + * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call + * rte_pktmbuf_reset() here again. + * + * @return a single tx_buf that has been completed by HW. + */ + tx_buf* get_one_completed() { + return tx_buf::me(rte_pktmbuf_alloc(_pool)); + } + + private: + std::deque _ring; + rte_mempool* _pool = nullptr; + void* _xmem = nullptr; + }; + public: - explicit dpdk_qp(dpdk_device* dev, uint8_t qid, - bool huge_pages_mem_backend); + explicit dpdk_qp(dpdk_device* dev, uint8_t qid); virtual future<> send(packet p) override { abort(); } - virtual uint32_t send(circular_buffer& p) override; virtual ~dpdk_qp() { // TODO: Free all mempools if (_rx_xmem) { free(_rx_xmem); } + } - if (_tx_xmem) { - free(_tx_xmem); + virtual uint32_t send(circular_buffer& pb) override { + if (HugetlbfsMemBackend) { + // Zero-copy send + return _send(pb, [&] (packet&& p) { + return tx_buf::from_packet_zc( + std::move(p), *_dev, _tx_buf_factory); + }); + } else { + // "Copy"-send + return _send(pb, [&](packet&& p) { + return tx_buf::from_packet_copy( + std::move(p), *_dev, _tx_buf_factory); + }); } } private: - bool init_mbuf_pools(); + template + uint32_t _send(circular_buffer& pb, Func packet_to_tx_buf_p) { + if (_tx_burst.size() == 0) { + for (auto&& p : pb) { + // TODO: assert() in a fast path! Remove me ASAP! + assert(p.len()); + + tx_buf* buf = packet_to_tx_buf_p(std::move(p)); + if (!buf) { + break; + } + + _tx_burst.push_back(buf->rte_mbuf_p()); + } + } + + uint16_t sent = rte_eth_tx_burst(_dev->port_idx(), _qid, + _tx_burst.data() + _tx_burst_idx, + _tx_burst.size() - _tx_burst_idx); + + for (int i = 0; i < sent; i++) { + pb.pop_front(); + } + + _tx_burst_idx += sent; + + if (_tx_burst_idx == _tx_burst.size()) { + _tx_burst_idx = 0; + _tx_burst.clear(); + } + + return sent; + } + + bool init_rx_mbuf_pool(); /** * Allocates a memory chunk to accommodate the given number of buffers of @@ -262,8 +897,8 @@ private: * @return a virtual address of the allocated memory chunk or nullptr in * case of a failure. */ - void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, - std::vector& mappings); + static void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, + std::vector& mappings); /** * Polls for a burst of incoming packets. This function will not block and @@ -280,43 +915,17 @@ private: */ void process_packets(struct rte_mbuf **bufs, uint16_t count); - /** - * Copies one net::fragment into the cluster of rte_mbuf's. - * - * @param frag Fragment to copy (in) - * @param head Head of the cluster (out) - * @param last_seg Last segment of the cluster (out) - * @param nsegs Number of segments in the cluster (out) - * - * We return the "last_seg" to avoid traversing the cluster in order to get - * it. - * - * @return TRUE in case of success - */ - bool copy_one_frag(fragment& frag, rte_mbuf*& head, rte_mbuf*& last_seg, - unsigned& nsegs); - - /** - * Allocates a single rte_mbuf and copies a given data into it. - * - * @param m New allocated rte_mbuf (out) - * @param data Data to copy from (in) - * @param l length of the data to copy (in) - * - * @return The actual number of bytes that has been copied - */ - size_t copy_one_data_buf(rte_mbuf*& m, char* data, size_t l); - - rte_mbuf* create_tx_mbuf(packet& p); private: dpdk_device* _dev; - bool _huge_pages_mem_backend = false; uint8_t _qid; - rte_mempool *_pktmbuf_pool_rx, *_pktmbuf_pool_tx; - void *_rx_xmem = nullptr, *_tx_xmem = nullptr; + rte_mempool *_pktmbuf_pool_rx; + void *_rx_xmem = nullptr; + tx_buf_factory _tx_buf_factory; reactor::poller _rx_poller; + reactor::poller _tx_gc_poller; std::vector _tx_burst; uint16_t _tx_burst_idx = 0; + static constexpr phys_addr_t page_mask = ~(memory::page_size - 1); }; int dpdk_device::init_port_start() @@ -466,8 +1075,9 @@ void dpdk_device::init_port_fini() printf("Created DPDK device\n"); } -void* dpdk_qp::alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, - std::vector& mappings) +template +void* dpdk_qp::alloc_mempool_xmem( + uint16_t num_bufs, uint16_t buf_sz, std::vector& mappings) { using namespace memory; char* xmem; @@ -476,8 +1086,7 @@ void* dpdk_qp::alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, // Aligning to 2M causes the further failure in small allocations. // TODO: Check why - and fix. - xmem = (char*)memalign(page_size, xmem_size); - if (!xmem) { + if (posix_memalign((void**)&xmem, page_size, xmem_size)) { printf("Can't allocate %ld bytes aligned to %ld\n", xmem_size, page_size); return nullptr; @@ -492,14 +1101,14 @@ void* dpdk_qp::alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, return xmem; } -bool dpdk_qp::init_mbuf_pools() +template +bool dpdk_qp::init_rx_mbuf_pool() { using namespace memory; - sstring name = to_sstring(pktmbuf_pool_name) + to_sstring(_qid); + sstring name = sstring(pktmbuf_pool_name) + to_sstring(_qid) + "_rx"; - printf("Creating mbuf pools '%s_rx/_tx' " - "[%u and %u mbufs respectively] ...\n", - name.c_str(), mbufs_per_queue_rx, mbufs_per_queue_tx); + printf("Creating Rx mbuf pool '%s' [%u mbufs] ...\n", + name.c_str(), mbufs_per_queue_rx); // // If we have a hugetlbfs memory backend we may perform a virt2phys @@ -507,7 +1116,7 @@ bool dpdk_qp::init_mbuf_pools() // memory for DPDK pools and this way significantly reduce the memory needed // for the DPDK in this case. // - if (_huge_pages_mem_backend) { + if (HugetlbfsMemBackend) { std::vector mappings; _rx_xmem = alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_size, mappings); @@ -521,54 +1130,27 @@ bool dpdk_qp::init_mbuf_pools() // seems faster to use a cache instead. // _pktmbuf_pool_rx = - rte_mempool_xmem_create((name + to_sstring("_rx")).c_str(), + rte_mempool_xmem_create(name.c_str(), mbufs_per_queue_rx, mbuf_size, mbuf_cache_size, sizeof(struct rte_pktmbuf_pool_private), - rte_pktmbuf_pool_init, NULL, - rte_pktmbuf_init, NULL, + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, rte_socket_id(), 0, _rx_xmem, mappings.data(), mappings.size(), page_bits); - - mappings.clear(); - _tx_xmem = alloc_mempool_xmem(mbufs_per_queue_tx, mbuf_size, mappings); - if (!_tx_xmem) { - printf("Can't allocate a memory for Tx buffers\n"); - return false; - } - - _pktmbuf_pool_tx = - rte_mempool_xmem_create((name + to_sstring("_tx")).c_str(), - mbufs_per_queue_tx, mbuf_size, - mbuf_cache_size, - sizeof(struct rte_pktmbuf_pool_private), - rte_pktmbuf_pool_init, NULL, - rte_pktmbuf_init, NULL, - rte_socket_id(), 0, - _tx_xmem, mappings.data(), mappings.size(), - page_bits); } else { _pktmbuf_pool_rx = - rte_mempool_create((name + to_sstring("_rx")).c_str(), - mbufs_per_queue_rx, mbuf_size, - mbuf_cache_size, - sizeof(struct rte_pktmbuf_pool_private), - rte_pktmbuf_pool_init, NULL, - rte_pktmbuf_init, NULL, - rte_socket_id(), 0); - - _pktmbuf_pool_tx = - rte_mempool_create((name + to_sstring("_tx")).c_str(), - mbufs_per_queue_tx, mbuf_size, + rte_mempool_create(name.c_str(), + mbufs_per_queue_rx, mbuf_size, mbuf_cache_size, sizeof(struct rte_pktmbuf_pool_private), - rte_pktmbuf_pool_init, NULL, - rte_pktmbuf_init, NULL, + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, rte_socket_id(), 0); } - return _pktmbuf_pool_rx != NULL && _pktmbuf_pool_tx != NULL; + return _pktmbuf_pool_rx != nullptr; } void dpdk_device::check_port_link_status() @@ -605,14 +1187,25 @@ void dpdk_device::check_port_link_status() t->arm_periodic(check_interval); } -dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid, bool huge_pages_mem_backend) - : _dev(dev), _huge_pages_mem_backend(huge_pages_mem_backend), _qid(qid), - _rx_poller([&] { return poll_rx_once(); }) +template +dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid) + : _dev(dev), _qid(qid), + _tx_buf_factory(qid), + _rx_poller([&] { return poll_rx_once(); }), + _tx_gc_poller([&] { return _tx_buf_factory.gc(); }) { - if (!init_mbuf_pools()) { + if (!init_rx_mbuf_pool()) { rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n"); } + static_assert(offsetof(class tx_buf, private_end) - + offsetof(class tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM, + "RTE_PKTMBUF_HEADROOM is less than dpdk_qp::tx_buf size! " + "Increase the headroom size in the DPDK configuration"); + static_assert(offsetof(class tx_buf, _mbuf) == 0, + "There is a pad at the beginning of the tx_buf before _mbuf " + "field!"); + if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, default_ring_size, rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) { @@ -625,7 +1218,9 @@ dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid, bool huge_pages_mem_backend) } } -void dpdk_qp::process_packets(struct rte_mbuf **bufs, uint16_t count) +template +void dpdk_qp::process_packets( + struct rte_mbuf **bufs, uint16_t count) { update_rx_count(count); for (uint16_t i = 0; i < count; i++) { @@ -667,7 +1262,8 @@ void dpdk_qp::process_packets(struct rte_mbuf **bufs, uint16_t count) } } -bool dpdk_qp::poll_rx_once() +template +bool dpdk_qp::poll_rx_once() { struct rte_mbuf *buf[packet_read_size]; @@ -683,165 +1279,6 @@ bool dpdk_qp::poll_rx_once() return rx_count; } -size_t dpdk_qp::copy_one_data_buf(rte_mbuf*& m, char* data, size_t l) -{ - m = rte_pktmbuf_alloc(_pktmbuf_pool_tx); - if (!m) { - return 0; - } - - size_t len = std::min(l, mbuf_data_size); - - // mbuf_put() - rte_mbuf_data_len(m) += len; - rte_mbuf_pkt_len(m) += len; - - rte_memcpy(rte_pktmbuf_mtod(m, void*), data, len); - - return len; -} - - -bool dpdk_qp::copy_one_frag(fragment& frag, rte_mbuf*& head, - rte_mbuf*& last_seg, unsigned& nsegs) -{ - size_t len, left_to_copy = frag.size; - char* base = frag.base; - rte_mbuf* m; - - if (!frag.size) { - rte_exit(EXIT_FAILURE, "DPDK Tx: Zero-size fragment"); - } - - // Create a HEAD of mbufs' cluster and copy the first bytes into it - len = copy_one_data_buf(head, base, left_to_copy); - if (!len) { - return false; - } - - left_to_copy -= len; - base += len; - nsegs = 1; - - // Copy the rest of the data into the new mbufs and chain them to the - // cluster - rte_mbuf* prev_seg = head; - while (left_to_copy) { - len = copy_one_data_buf(m, base, left_to_copy); - if (!len) { - rte_pktmbuf_free(head); - return false; - } - - left_to_copy -= len; - base += len; - nsegs++; - - rte_mbuf_next(prev_seg) = m; - prev_seg = m; - } - - // Return the last mbuf in the cluster - last_seg = prev_seg; - - return true; -} - -rte_mbuf* dpdk_qp::create_tx_mbuf(packet& p) { - // sanity - if (!p.len()) { - return nullptr; - } - - // Too fragmented - linearize - if (p.nr_frags() > max_frags) { - p.linearize(); - } - - /* TODO: configure the offload features here if any */ - - // - // We will copy the data for now and will implement a zero-copy in the - // future. - - rte_mbuf *head = nullptr, *last_seg = NULL; - unsigned total_nsegs = 0, nsegs = 0; - - // Create a HEAD of the fragmented packet - if (!copy_one_frag(p.frag(0), head, last_seg, nsegs)) { - // Drop if we failed to allocate new mbuf - return nullptr; - } - - total_nsegs += nsegs; - - for (unsigned i = 1; i < p.nr_frags(); i++) { - - rte_mbuf *h = NULL, *new_last_seg = NULL; - if (!copy_one_frag(p.frag(i), h, new_last_seg, nsegs)) { - rte_pktmbuf_free(head); - return nullptr; - } - - total_nsegs += nsegs; - - // Attach a new buffers' chain to the packet chain - rte_mbuf_next(last_seg) = h; - last_seg = new_last_seg; - } - - // Update the HEAD buffer with the packet info - rte_mbuf_pkt_len(head) = p.len(); - rte_mbuf_nb_segs(head) = total_nsegs; - - // Handle TCP checksum offload - auto oi = p.offload_info(); - if (oi.needs_ip_csum) { - head->ol_flags |= PKT_TX_IP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } - if (_dev->hw_features().tx_csum_l4_offload) { - if (oi.protocol == ip_protocol_num::tcp) { - head->ol_flags |= PKT_TX_TCP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } else if (oi.protocol == ip_protocol_num::udp) { - head->ol_flags |= PKT_TX_UDP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } - } - return head; -} - -uint32_t dpdk_qp::send(circular_buffer& pb) -{ - if (_tx_burst.size() == 0) { - for (auto&& p : pb) { - auto mbuf = create_tx_mbuf(p); - if (!mbuf) { - break; - } - _tx_burst.push_back(mbuf); - } - } - - auto sent = rte_eth_tx_burst(_dev->port_idx(), _qid, _tx_burst.data() + _tx_burst_idx, _tx_burst.size() - _tx_burst_idx); - - for (int i = 0; i < sent; i++) { - pb.pop_front(); - } - - _tx_burst_idx += sent; - - if (_tx_burst_idx == _tx_burst.size()) { - _tx_burst_idx = 0; - _tx_burst.clear(); - } - return sent; -} - #ifdef RTE_VERSION_1_7 void dpdk_device::get_rss_table() { @@ -884,7 +1321,14 @@ void dpdk_device::get_rss_table() #endif std::unique_ptr dpdk_device::init_local_queue(boost::program_options::variables_map opts, uint16_t qid) { - auto qp = std::make_unique(this, qid, opts.count("hugepages")); + + std::unique_ptr qp; + if (opts.count("hugepages")) { + qp = std::make_unique>(this, qid); + } else { + qp = std::make_unique>(this, qid); + } + smp::submit_to(_home_cpu, [this] () mutable { if (++_queues_ready == _num_queues) { init_port_fini();