diff --git a/configure.py b/configure.py index bb1a3eb21e..bc3ca913e6 100755 --- a/configure.py +++ b/configure.py @@ -229,7 +229,7 @@ if args.with_osv: if args.dpdk_target: args.user_cflags = (args.user_cflags + ' -DHAVE_DPDK -I' + - args.dpdk_target + '/include -Wno-error=literal-suffix -Wno-literal-suffix') + args.dpdk_target + '/include -Wno-error=literal-suffix -Wno-literal-suffix -Wno-invalid-offsetof') libs += (' -L' + args.dpdk_target + '/lib ' + '-Wl,--whole-archive -lrte_pmd_bond -lrte_pmd_vmxnet3_uio -lrte_pmd_virtio_uio -lrte_pmd_i40e -lrte_pmd_ixgbe -lrte_pmd_e1000 -lrte_pmd_ring -Wl,--no-whole-archive -lrte_distributor -lrte_kni -lrte_pipeline -lrte_table -lrte_port -lrte_timer -lrte_hash -lrte_lpm -lrte_power -lrte_acl -lrte_meter -lrte_sched -lrte_kvargs -lrte_mbuf -lrte_ip_frag -lethdev -lrte_eal -lrte_malloc -lrte_mempool -lrte_ring -lrte_cmdline -lrte_cfgfile -lrt -lm -ldl') diff --git a/core/dpdk_rte.cc b/core/dpdk_rte.cc index 3d98ae1dbe..169b61f2f2 100644 --- a/core/dpdk_rte.cc +++ b/core/dpdk_rte.cc @@ -1,5 +1,6 @@ #ifdef HAVE_DPDK +#include "net/dpdk.hh" #include "core/dpdk_rte.hh" #include "util/conversions.hh" #include @@ -35,6 +36,18 @@ void eal::init(cpuset cpus, boost::program_options::variables_map opts) if (hugepages_path) { args.push_back(string2vector("--huge-dir")); args.push_back(string2vector(hugepages_path.value())); + + // + // We don't know what is going to be our networking configuration so we + // assume there is going to be a queue per-CPU. Plus we'll give a DPDK + // 64MB for "other stuff". + // + size_t size_MB = mem_size(cpus.count()) >> 20; + std::stringstream size_MB_str; + size_MB_str << size_MB; + + args.push_back(string2vector("-m")); + args.push_back(string2vector(size_MB_str.str())); } else if (!opts.count("dpdk-pmd")) { args.push_back(string2vector("--no-huge")); } @@ -61,6 +74,23 @@ void eal::init(cpuset cpus, boost::program_options::variables_map opts) initialized = true; } +size_t eal::mem_size(int num_cpus) +{ + size_t memsize = 0; + // + // PMD mempool memory: + // + // We don't know what is going to be our networking configuration so we + // assume there is going to be a queue per-CPU. + // + memsize += num_cpus * qp_mempool_obj_size(); + + // Plus we'll give a DPDK 64MB for "other stuff". + memsize += (64UL << 20); + + return memsize; +} + } // namespace dpdk #endif // HAVE_DPDK diff --git a/core/dpdk_rte.hh b/core/dpdk_rte.hh index 8368df93a2..942e3255dc 100644 --- a/core/dpdk_rte.hh +++ b/core/dpdk_rte.hh @@ -28,6 +28,9 @@ #define rte_mbuf_nb_segs(m) ((m)->pkt.nb_segs) #define rte_mbuf_l2_len(m) ((m)->pkt.vlan_macip.f.l2_len) #define rte_mbuf_l3_len(m) ((m)->pkt.vlan_macip.f.l3_len) +#define rte_mbuf_buf_addr(m) ((m)->pkt.buf_addr) +#define rte_mbuf_buf_physaddr(m) ((m)->pkt.buf_physaddr) +#define rte_mbuf_data_off(m) ((m)->pkt.data_off) #else @@ -39,6 +42,9 @@ #define rte_mbuf_nb_segs(m) ((m)->nb_segs) #define rte_mbuf_l2_len(m) ((m)->l2_len) #define rte_mbuf_l3_len(m) ((m)->l3_len) +#define rte_mbuf_buf_addr(m) ((m)->buf_addr) +#define rte_mbuf_buf_physaddr(m) ((m)->buf_physaddr) +#define rte_mbuf_data_off(m) ((m)->data_off) #endif @@ -52,6 +58,13 @@ public: using cpuset = std::bitset; static void init(cpuset cpus, boost::program_options::variables_map opts); + /** + * Returns the amount of memory needed for DPDK + * @param num_cpus Number of CPUs the application is going to use + * + * @return + */ + static size_t mem_size(int num_cpus); static bool initialized; }; diff --git a/core/memory.cc b/core/memory.cc index 1c931d1871..3598432129 100644 --- a/core/memory.cc +++ b/core/memory.cc @@ -56,9 +56,6 @@ namespace memory { -static constexpr const size_t page_bits = 12; -static constexpr const size_t page_size = 1 << page_bits; -static constexpr const size_t huge_page_size = 512 * page_size; static constexpr const unsigned cpu_id_shift = 36; // FIXME: make dynamic static constexpr const unsigned max_cpus = 256; static constexpr const size_t cache_line_size = 64; diff --git a/core/memory.hh b/core/memory.hh index 32744ad3fb..7c4aae9191 100644 --- a/core/memory.hh +++ b/core/memory.hh @@ -12,6 +12,11 @@ namespace memory { +// TODO: Use getpagesize() in order to learn a size of a system PAGE. +static constexpr const size_t page_bits = 12; +static constexpr const size_t page_size = 1 << page_bits; // 4K +static constexpr const size_t huge_page_size = 512 * page_size; // 2M + void configure(std::vector m, std::experimental::optional hugetlbfs_path = {}); diff --git a/core/reactor.cc b/core/reactor.cc index 96643cbf3c..59a235e90d 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1268,6 +1268,26 @@ void smp::configure(boost::program_options::variables_map configuration) resource::configuration rc; if (configuration.count("memory")) { rc.total_memory = parse_memory_size(configuration["memory"].as()); +#ifdef HAVE_DPDK + if (configuration.count("hugepages") && + !configuration["network-stack"].as().compare("native") && + configuration.count("dpdk-pmd")) { + size_t dpdk_memory = dpdk::eal::mem_size(smp::count); + + if (dpdk_memory >= rc.total_memory) { + std::cerr<<"Can't run with the given amount of memory: "; + std::cerr<(); + std::cerr<<". Consider giving more."<()); diff --git a/net/dpdk.cc b/net/dpdk.cc index 0c17b2456b..9de50ff0f9 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -12,11 +12,13 @@ #include "core/circular_buffer.hh" #include "core/align.hh" #include "core/sstring.hh" +#include "core/memory.hh" #include "util/function_input_iterator.hh" #include "util/transform_iterator.hh" #include #include #include +#include #include "ip.hh" #include "const.hh" #include "core/dpdk_rte.hh" @@ -24,6 +26,7 @@ #include "toeplitz.hh" #include +#include #include #include @@ -38,20 +41,48 @@ using namespace net; namespace dpdk { /******************* Net device related constatns *****************************/ +static constexpr uint16_t default_ring_size = 512; + +static constexpr uint16_t mbufs_per_queue_rx = 3 * default_ring_size; + +// +// No need to keep more descriptors in the air than can be sent in a single +// rte_eth_tx_burst() call. +// +static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size; -static constexpr uint16_t mbufs_per_queue = 1536; static constexpr uint16_t mbuf_cache_size = 512; static constexpr uint16_t mbuf_overhead = sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM; static constexpr size_t mbuf_data_size = 2048; -// MBUF_DATA_SIZE(2K) * 32 = 64K = Max TSO/LRO size -static constexpr uint8_t max_frags = 32; +// (MBUF_DATA_SIZE(2K) * 32 = 64K = Max TSO/LRO size) + 1 mbuf for headers +static constexpr uint8_t max_frags = 32 + 1; -static constexpr uint16_t mbuf_size = mbuf_data_size + mbuf_overhead; +static constexpr uint16_t mbuf_size = + mbuf_data_size + mbuf_overhead; -static constexpr uint16_t default_rx_ring_size = 512; -static constexpr uint16_t default_tx_ring_size = 512; +uint32_t qp_mempool_obj_size() +{ + uint32_t mp_size = 0; + struct rte_mempool_objsz mp_obj_sz = {}; + + // + // We will align each size to huge page size because DPDK allocates + // physically contiguous memory region for each pool object. + // + + // Rx + mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, 0, &mp_obj_sz) + + sizeof(struct rte_pktmbuf_pool_private), + memory::huge_page_size); + //Tx + std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz)); + mp_size += align_up(rte_mempool_calc_obj_size(mbuf_size, 0, &mp_obj_sz) + + sizeof(struct rte_pktmbuf_pool_private), + memory::huge_page_size); + return mp_size; +} #ifdef RTE_VERSION_1_7 /* @@ -187,17 +218,687 @@ public: uint8_t port_idx() { return _port_idx; } }; +template class dpdk_qp : public net::qp { + class tx_buf_factory; + + class tx_buf { + friend class dpdk_qp; + public: + static tx_buf* me(rte_mbuf* mbuf) { + return reinterpret_cast(mbuf); + } + + /** + * Creates a tx_buf cluster representing a given packet in a "zero-copy" + * way. + * + * @param p packet to translate + * @param dev Parent dpdk_device + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + static tx_buf* from_packet_zc( + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + return from_packet(check_frag0, translate_one_frag, copy_one_frag, + [](packet&& _p, tx_buf& _last_seg) { + _last_seg.set_packet(std::move(_p)); + }, std::move(p), dev, fc); + } + + /** + * Creates a tx_buf cluster representing a given packet in a "copy" way. + * + * @param p packet to translate + * @param dev Parent dpdk_device + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + static tx_buf* from_packet_copy( + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + return from_packet([](packet& _p) { return true; }, + copy_one_frag, copy_one_frag, + [](packet&& _p, tx_buf& _last_seg) {}, + std::move(p), dev, fc); + } + private: + /** + * Creates a tx_buf cluster representing a given packet using provided + * functors. + * + * @param sanity Functor that performs a packet's sanity checks + * @param do_one_frag Functor that handles a single frag translation + * @param fin Functor that performs a cluster finalization + * @param p packet to translate + * @param dev Parent dpdk_device object + * @param fc Buffers' factory to use + * + * @return the HEAD tx_buf of the cluster or nullptr in case of a + * failure + */ + template + static tx_buf* from_packet( + FirstFragCheck frag0_check, TrOneFunc do_one_frag, + CopyOneFunc copy_one_frag, FinalizeFunc fin, + packet&& p, dpdk_device& dev, tx_buf_factory& fc) { + + // Too fragmented - linearize + if (p.nr_frags() > max_frags) { + p.linearize(); + } + + rte_mbuf *head = nullptr, *last_seg = nullptr; + unsigned nsegs = 0; + + // + // Create a HEAD of the fragmented packet: check if frag0 has to be + // copied and if yes - send it in a copy way + // + if (!frag0_check(p)) { + if (!copy_one_frag(fc, p.frag(0), head, last_seg, nsegs)) { + return nullptr; + } + } else if (!do_one_frag(fc, p.frag(0), head, last_seg, nsegs)) { + return nullptr; + } + + unsigned total_nsegs = nsegs; + + for (unsigned i = 1; i < p.nr_frags(); i++) { + rte_mbuf *h = nullptr, *new_last_seg = nullptr; + if (!do_one_frag(fc, p.frag(i), h, new_last_seg, nsegs)) { + me(head)->recycle(); + return nullptr; + } + + total_nsegs += nsegs; + + // Attach a new buffers' chain to the packet chain + rte_mbuf_next(last_seg) = h; + last_seg = new_last_seg; + } + + // Update the HEAD buffer with the packet info + rte_mbuf_pkt_len(head) = p.len(); + rte_mbuf_nb_segs(head) = total_nsegs; + + // Handle TCP checksum offload + auto oi = p.offload_info(); + if (oi.needs_ip_csum) { + head->ol_flags |= PKT_TX_IP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } + if (dev.hw_features().tx_csum_l4_offload) { + if (oi.protocol == ip_protocol_num::tcp) { + head->ol_flags |= PKT_TX_TCP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } else if (oi.protocol == ip_protocol_num::udp) { + head->ol_flags |= PKT_TX_UDP_CKSUM; + rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); + rte_mbuf_l3_len(head) = oi.ip_hdr_len; + } + } + + fin(std::move(p), *me(last_seg)); + + return me(head); + } + + /** + * Zero-copy handling of a single net::fragment. + * + * @param do_one_buf Functor responsible for a single rte_mbuf + * handling + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * @return TRUE in case of success + */ + template + static bool do_one_frag(DoOneBufFunc do_one_buf, tx_buf_factory& fc, + fragment& frag, rte_mbuf*& head, + rte_mbuf*& last_seg, unsigned& nsegs) { + // + // TODO: Optimize the small fragments case: merge them into a + // single mbuf. + // + + size_t len, left_to_set = frag.size; + char* base = frag.base; + + rte_mbuf* m; + + // TODO: assert() in a fast path! Remove me ASAP! + assert(frag.size); + + // Create a HEAD of mbufs' cluster and set the first bytes into it + len = do_one_buf(fc, head, base, left_to_set); + if (!len) { + return false; + } + + left_to_set -= len; + base += len; + nsegs = 1; + + // + // Set the rest of the data into the new mbufs and chain them to + // the cluster. + // + rte_mbuf* prev_seg = head; + while (left_to_set) { + len = do_one_buf(fc, m, base, left_to_set); + if (!len) { + me(head)->recycle(); + return false; + } + + left_to_set -= len; + base += len; + nsegs++; + + rte_mbuf_next(prev_seg) = m; + prev_seg = m; + } + + // Return the last mbuf in the cluster + last_seg = prev_seg; + + return true; + } + + /** + * Zero-copy handling of a single net::fragment. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * @return TRUE in case of success + */ + static bool translate_one_frag(tx_buf_factory& fc, fragment& frag, + rte_mbuf*& head, rte_mbuf*& last_seg, + unsigned& nsegs) { + return do_one_frag(set_one_data_buf, fc, frag, head, + last_seg, nsegs); + } + + /** + * Copies one net::fragment into the cluster of rte_mbuf's. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param frag Fragment to copy (in) + * @param head Head of the cluster (out) + * @param last_seg Last segment of the cluster (out) + * @param nsegs Number of segments in the cluster (out) + * + * We return the "last_seg" to avoid traversing the cluster in order to get + * it. + * + * @return TRUE in case of success + */ + static bool copy_one_frag(tx_buf_factory& fc, fragment& frag, + rte_mbuf*& head, rte_mbuf*& last_seg, + unsigned& nsegs) { + return do_one_frag(copy_one_data_buf, fc, frag, head, + last_seg, nsegs); + } + + /** + * Allocates a single rte_mbuf and sets it to point to a given data + * buffer. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param m New allocated rte_mbuf (out) + * @param va virtual address of a data buffer (in) + * @param buf_len length of the data to copy (in) + * + * @return The actual number of bytes that has been set in the mbuf + */ + static size_t set_one_data_buf( + tx_buf_factory& fc, rte_mbuf*& m, char* va, size_t buf_len) { + + using namespace memory; + translation tr = translate(va, buf_len); + + // + // Currently we break a buffer on a 4K boundary for simplicity. + // + // TODO: Optimize it to better utilize the physically continuity of the + // buffer. Note to take into an account a HW limitation for a maximum data + // size per single descriptor (e.g. 15.5K for 82599 devices). + // + phys_addr_t pa = tr.addr; + + if (!tr.size) { + return copy_one_data_buf(fc, m, va, buf_len); + } + + tx_buf* buf = fc.get(); + if (!buf) { + return 0; + } + + size_t page_offset = pa & ~page_mask; + size_t len = std::min(page_size - page_offset, buf_len); + + buf->set_zc_info(va, pa, len); + m = buf->rte_mbuf_p(); + + return len; + } + + /** + * Allocates a single rte_mbuf and copies a given data into it. + * + * @param fc Buffers' factory to allocate the tx_buf from (in) + * @param m New allocated rte_mbuf (out) + * @param data Data to copy from (in) + * @param buf_len length of the data to copy (in) + * + * @return The actual number of bytes that has been copied + */ + static size_t copy_one_data_buf( + tx_buf_factory& fc, rte_mbuf*& m, char* data, size_t buf_len) + { + tx_buf* buf = fc.get(); + if (!buf) { + return 0; + } + + size_t len = std::min(buf_len, mbuf_data_size); + + m = buf->rte_mbuf_p(); + + // mbuf_put() + rte_mbuf_data_len(m) = len; + rte_mbuf_pkt_len(m) = len; + + + rte_memcpy(rte_pktmbuf_mtod(m, void*), data, len); + + return len; + } + + /** + * Checks if the first fragment of the given packet satisfies the + * zero-copy flow requirement: its first 128 bytes should not cross the + * 4K page boundary. This is required in order to avoid splitting packet + * headers. + * + * @param p packet to check + * + * @return TRUE if packet is ok and FALSE otherwise. + */ + static bool check_frag0(packet& p) + { + using namespace memory; + // + // First frag is special - it has headers that should not be split. + // If the addressing is such that the first fragment has to be + // split, then send this packet in a (non-zero) copy flow. We'll + // check if the first 128 bytes of the first fragment reside in the + // same page. If that's the case - we are good to go. + // + + uint64_t base = (uint64_t)p.frag(0).base; + uint64_t frag0_page0_len = page_size - (base & ~page_mask); + + if (frag0_page0_len < 128 && + frag0_page0_len < p.frag(0).size) { + return false; + } + + return true; + } + + public: + tx_buf(tx_buf_factory& fc) : _fc(fc) { + + _buf_physaddr = _mbuf.buf_physaddr; + _buf_len = _mbuf.buf_len; + _data_off = _mbuf.data_off; + } + + rte_mbuf* rte_mbuf_p() { return &_mbuf; } + + void set_zc_info(void* va, phys_addr_t pa, size_t len) { + // mbuf_put() + rte_mbuf_data_len(&_mbuf) = len; + rte_mbuf_pkt_len(&_mbuf) = len; + + // Set the mbuf to point to our data + rte_mbuf_buf_addr(&_mbuf) = va; + rte_mbuf_buf_physaddr(&_mbuf) = pa; + rte_mbuf_data_off(&_mbuf) = 0; + _is_zc = true; + } + + void reset_zc() { + + // + // If this mbuf was the last in a cluster and contains an + // original packet object then call the destructor of the + // original packet object. + // + if (_p) { + // + // Reset the std::optional. This in particular is going + // to call the "packet"'s destructor and reset the + // "optional" state to "nonengaged". + // + _p = std::experimental::nullopt; + + } else if (!_is_zc) { + return; + } + + // Restore the rte_mbuf fields we trashed in set_zc_info() + _mbuf.buf_physaddr = _buf_physaddr; + _mbuf.buf_addr = RTE_MBUF_TO_BADDR(&_mbuf); + _mbuf.buf_len = _buf_len; + _mbuf.data_off = _data_off; + + _is_zc = false; + } + + void recycle() { + struct rte_mbuf *m = &_mbuf, *m_next; + + while (m != nullptr) { + m_next = m->next; + // + // Zero only "next" field since we want to save the dirtying of + // the extra cache line. + // There is no need to reset the pkt_len or data_len fields and + // the rest of the fields that are set in the HEAD mbuf of the + // cluster are going to be cleared when the buffer is pooled + // from the mempool and not in this flow. + // + m->next = nullptr; + _fc.put(me(m)); + m = m_next; + } + } + + void set_packet(packet&& p) { + _p = std::move(p); + } + + private: + struct rte_mbuf _mbuf; + MARKER private_start; + std::experimental::optional _p; + phys_addr_t _buf_physaddr; + uint32_t _buf_len; + uint16_t _data_off; + // TRUE if underlying mbuf has been used in the zero-copy flow + bool _is_zc = false; + // buffers' factory the buffer came from + tx_buf_factory& _fc; + MARKER private_end; + }; + + class tx_buf_factory { + // + // Number of buffers to free in each GC iteration: + // We want the buffers to be allocated from the mempool as many as + // possible. + // + // On the other hand if there is no Tx for some time we want the + // completions to be eventually handled. Thus we choose the smallest + // possible packets count number here. + // + static constexpr int gc_count = 1; + public: + tx_buf_factory(uint8_t qid) { + using namespace memory; + + sstring name = sstring(pktmbuf_pool_name) + to_sstring(qid) + "_tx"; + printf("Creating Tx mbuf pool '%s' [%u mbufs] ...\n", + name.c_str(), mbufs_per_queue_tx); + + if (HugetlbfsMemBackend) { + std::vector mappings; + + _xmem = dpdk_qp::alloc_mempool_xmem(mbufs_per_queue_tx, + mbuf_size, mappings); + if (!_xmem) { + printf("Can't allocate a memory for Tx buffers\n"); + exit(1); + } + + // + // We are going to push the buffers from the mempool into + // the circular_buffer and then poll them from there anyway, so + // we prefer to make a mempool non-atomic in this case. + // + _pool = + rte_mempool_xmem_create(name.c_str(), + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0, + _xmem, mappings.data(), + mappings.size(), page_bits); + + } else { + _pool = + rte_mempool_create(name.c_str(), + mbufs_per_queue_tx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0); + } + + if (!_pool) { + printf("Failed to create mempool for Tx\n"); + exit(1); + } + + // + // Fill the factory with the buffers from the mempool allocated + // above. + // + init_factory(); + } + + ~tx_buf_factory() { + // WTF: Hmmm... There is no way to destroy the mempool! + + free(_xmem); + } + + /** + * @note Should not be called if there are no free tx_buf's + * + * @return a free tx_buf object + */ + tx_buf* get() { + // Take completed from the HW first + tx_buf *pkt = get_one_completed(); + if (pkt) { + if (HugetlbfsMemBackend) { + pkt->reset_zc(); + } + + return pkt; + } + + // + // If there are no completed at the moment - take from the + // factory's cache. + // + if (_ring.empty()) { + return nullptr; + } + + pkt = _ring.front(); + _ring.pop_front(); + + return pkt; + } + + void put(tx_buf* buf) { + if (HugetlbfsMemBackend) { + buf->reset_zc(); + } + _ring.push_front(buf); + } + + bool gc() { + for (int cnt = 0; cnt < gc_count; ++cnt) { + auto tx_buf_p = get_one_completed(); + if (!tx_buf_p) { + return false; + } + + put(tx_buf_p); + } + + return true; + } + private: + /** + * Fill the mbufs circular buffer: after this the _pool will become + * empty. We will use it to catch the completed buffers: + * + * - Underlying PMD drivers will "free" the mbufs once they are + * completed. + * - We will poll the _pktmbuf_pool_tx till it's empty and release + * all the buffers from the freed mbufs. + */ + void init_factory() { + while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) { + _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this}); + } + } + + /** + * PMD puts the completed buffers back into the mempool they have + * originally come from. + * + * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call + * rte_pktmbuf_reset() here again. + * + * @return a single tx_buf that has been completed by HW. + */ + tx_buf* get_one_completed() { + return tx_buf::me(rte_pktmbuf_alloc(_pool)); + } + + private: + std::deque _ring; + rte_mempool* _pool = nullptr; + void* _xmem = nullptr; + }; + public: explicit dpdk_qp(dpdk_device* dev, uint8_t qid); virtual future<> send(packet p) override { abort(); } - virtual uint32_t send(circular_buffer& p) override; + + virtual ~dpdk_qp() { + // TODO: Free all mempools + if (_rx_xmem) { + free(_rx_xmem); + } + } + + virtual uint32_t send(circular_buffer& pb) override { + if (HugetlbfsMemBackend) { + // Zero-copy send + return _send(pb, [&] (packet&& p) { + return tx_buf::from_packet_zc( + std::move(p), *_dev, _tx_buf_factory); + }); + } else { + // "Copy"-send + return _send(pb, [&](packet&& p) { + return tx_buf::from_packet_copy( + std::move(p), *_dev, _tx_buf_factory); + }); + } + } private: - bool init_mbuf_pools(); + template + uint32_t _send(circular_buffer& pb, Func packet_to_tx_buf_p) { + if (_tx_burst.size() == 0) { + for (auto&& p : pb) { + // TODO: assert() in a fast path! Remove me ASAP! + assert(p.len()); + + tx_buf* buf = packet_to_tx_buf_p(std::move(p)); + if (!buf) { + break; + } + + _tx_burst.push_back(buf->rte_mbuf_p()); + } + } + + uint16_t sent = rte_eth_tx_burst(_dev->port_idx(), _qid, + _tx_burst.data() + _tx_burst_idx, + _tx_burst.size() - _tx_burst_idx); + + for (int i = 0; i < sent; i++) { + pb.pop_front(); + } + + _tx_burst_idx += sent; + + if (_tx_burst_idx == _tx_burst.size()) { + _tx_burst_idx = 0; + _tx_burst.clear(); + } + + return sent; + } + + bool init_rx_mbuf_pool(); + + /** + * Allocates a memory chunk to accommodate the given number of buffers of + * the given size and fills a vector with underlying physical pages. + * + * The chunk is going to be used as an external memory buffer of the DPDK + * memory pool (created using rte_mempool_xmem_create()). + * + * The chunk size if calculated using rte_mempool_xmem_size() function. + * + * @param num_bufs Number of buffers (in) + * @param buf_sz Size of each buffer (in) + * @param mappings vector of physical pages (out) + * + * @note this function assumes that "mappings" is properly set and adds the + * mappings to the back of the vector. + * + * @return a virtual address of the allocated memory chunk or nullptr in + * case of a failure. + */ + static void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz, + std::vector& mappings); /** * Polls for a burst of incoming packets. This function will not block and @@ -214,41 +915,17 @@ private: */ void process_packets(struct rte_mbuf **bufs, uint16_t count); - /** - * Copies one net::fragment into the cluster of rte_mbuf's. - * - * @param frag Fragment to copy (in) - * @param head Head of the cluster (out) - * @param last_seg Last segment of the cluster (out) - * @param nsegs Number of segments in the cluster (out) - * - * We return the "last_seg" to avoid traversing the cluster in order to get - * it. - * - * @return TRUE in case of success - */ - bool copy_one_frag(fragment& frag, rte_mbuf*& head, rte_mbuf*& last_seg, - unsigned& nsegs); - - /** - * Allocates a single rte_mbuf and copies a given data into it. - * - * @param m New allocated rte_mbuf (out) - * @param data Data to copy from (in) - * @param l length of the data to copy (in) - * - * @return The actual number of bytes that has been copied - */ - size_t copy_one_data_buf(rte_mbuf*& m, char* data, size_t l); - - rte_mbuf* create_tx_mbuf(packet& p); private: dpdk_device* _dev; uint8_t _qid; - rte_mempool* _pktmbuf_pool; + rte_mempool *_pktmbuf_pool_rx; + void *_rx_xmem = nullptr; + tx_buf_factory _tx_buf_factory; reactor::poller _rx_poller; + reactor::poller _tx_gc_poller; std::vector _tx_burst; uint16_t _tx_burst_idx = 0; + static constexpr phys_addr_t page_mask = ~(memory::page_size - 1); }; int dpdk_device::init_port_start() @@ -398,26 +1075,82 @@ void dpdk_device::init_port_fini() printf("Created DPDK device\n"); } -bool dpdk_qp::init_mbuf_pools() +template +void* dpdk_qp::alloc_mempool_xmem( + uint16_t num_bufs, uint16_t buf_sz, std::vector& mappings) { - // Allocate the same amount of buffers for Rx and Tx. - const unsigned num_mbufs = 2 * mbufs_per_queue; - sstring name = to_sstring(pktmbuf_pool_name) + to_sstring(_qid); - /* don't pass single-producer/single-consumer flags to mbuf create as it - * seems faster to use a cache instead */ - printf("Creating mbuf pool '%s' [%u mbufs] ...\n", name.c_str(), num_mbufs); + using namespace memory; + char* xmem; + + size_t xmem_size = rte_mempool_xmem_size(num_bufs, buf_sz, page_bits); + + // Aligning to 2M causes the further failure in small allocations. + // TODO: Check why - and fix. + if (posix_memalign((void**)&xmem, page_size, xmem_size)) { + printf("Can't allocate %ld bytes aligned to %ld\n", + xmem_size, page_size); + return nullptr; + } + + for (size_t i = 0; i < xmem_size / page_size; ++i) { + translation tr = translate(xmem + i * page_size, page_size); + assert(tr.size); + mappings.push_back(tr.addr); + } + + return xmem; +} + +template +bool dpdk_qp::init_rx_mbuf_pool() +{ + using namespace memory; + sstring name = sstring(pktmbuf_pool_name) + to_sstring(_qid) + "_rx"; + + printf("Creating Rx mbuf pool '%s' [%u mbufs] ...\n", + name.c_str(), mbufs_per_queue_rx); // - // We currently allocate a one big mempool on the current CPU to fit all - // requested queues. - // TODO: Allocate a separate pool for each queue on the appropriate CPU. + // If we have a hugetlbfs memory backend we may perform a virt2phys + // translation and memory is "pinned". Therefore we may provide an external + // memory for DPDK pools and this way significantly reduce the memory needed + // for the DPDK in this case. // - _pktmbuf_pool = rte_mempool_create(name.c_str(), num_mbufs, - mbuf_size, mbuf_cache_size, - sizeof(struct rte_pktmbuf_pool_private), rte_pktmbuf_pool_init, - NULL, rte_pktmbuf_init, NULL, rte_socket_id(), 0); + if (HugetlbfsMemBackend) { + std::vector mappings; - return _pktmbuf_pool != NULL; + _rx_xmem = alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_size, mappings); + if (!_rx_xmem) { + printf("Can't allocate a memory for Rx buffers\n"); + return false; + } + + // + // Don't pass single-producer/single-consumer flags to mbuf create as it + // seems faster to use a cache instead. + // + _pktmbuf_pool_rx = + rte_mempool_xmem_create(name.c_str(), + mbufs_per_queue_rx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0, + _rx_xmem, mappings.data(), mappings.size(), + page_bits); + } else { + _pktmbuf_pool_rx = + rte_mempool_create(name.c_str(), + mbufs_per_queue_rx, mbuf_size, + mbuf_cache_size, + sizeof(struct rte_pktmbuf_pool_private), + rte_pktmbuf_pool_init, nullptr, + rte_pktmbuf_init, nullptr, + rte_socket_id(), 0); + } + + return _pktmbuf_pool_rx != nullptr; } void dpdk_device::check_port_link_status() @@ -454,30 +1187,40 @@ void dpdk_device::check_port_link_status() t->arm_periodic(check_interval); } - -dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid) - : _dev(dev), _qid(qid), _rx_poller([&] { return poll_rx_once(); }) +template +dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid) + : _dev(dev), _qid(qid), + _tx_buf_factory(qid), + _rx_poller([&] { return poll_rx_once(); }), + _tx_gc_poller([&] { return _tx_buf_factory.gc(); }) { - if (!init_mbuf_pools()) { + if (!init_rx_mbuf_pool()) { rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n"); } - const uint16_t rx_ring_size = default_rx_ring_size; - const uint16_t tx_ring_size = default_tx_ring_size; + static_assert(offsetof(class tx_buf, private_end) - + offsetof(class tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM, + "RTE_PKTMBUF_HEADROOM is less than dpdk_qp::tx_buf size! " + "Increase the headroom size in the DPDK configuration"); + static_assert(offsetof(class tx_buf, _mbuf) == 0, + "There is a pad at the beginning of the tx_buf before _mbuf " + "field!"); - if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, rx_ring_size, + if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, default_ring_size, rte_eth_dev_socket_id(_dev->port_idx()), - _dev->def_rx_conf(), _pktmbuf_pool) < 0) { + _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) { rte_exit(EXIT_FAILURE, "Cannot initialize rx queue\n"); } - if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, tx_ring_size, + if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, default_ring_size, rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_tx_conf()) < 0) { rte_exit(EXIT_FAILURE, "Cannot initialize tx queue\n"); } } -void dpdk_qp::process_packets(struct rte_mbuf **bufs, uint16_t count) +template +void dpdk_qp::process_packets( + struct rte_mbuf **bufs, uint16_t count) { update_rx_count(count); for (uint16_t i = 0; i < count; i++) { @@ -519,7 +1262,8 @@ void dpdk_qp::process_packets(struct rte_mbuf **bufs, uint16_t count) } } -bool dpdk_qp::poll_rx_once() +template +bool dpdk_qp::poll_rx_once() { struct rte_mbuf *buf[packet_read_size]; @@ -535,165 +1279,6 @@ bool dpdk_qp::poll_rx_once() return rx_count; } -size_t dpdk_qp::copy_one_data_buf(rte_mbuf*& m, char* data, size_t l) -{ - m = rte_pktmbuf_alloc(_pktmbuf_pool); - if (!m) { - return 0; - } - - size_t len = std::min(l, mbuf_data_size); - - // mbuf_put() - rte_mbuf_data_len(m) += len; - rte_mbuf_pkt_len(m) += len; - - rte_memcpy(rte_pktmbuf_mtod(m, void*), data, len); - - return len; -} - - -bool dpdk_qp::copy_one_frag(fragment& frag, rte_mbuf*& head, - rte_mbuf*& last_seg, unsigned& nsegs) -{ - size_t len, left_to_copy = frag.size; - char* base = frag.base; - rte_mbuf* m; - - if (!frag.size) { - rte_exit(EXIT_FAILURE, "DPDK Tx: Zero-size fragment"); - } - - // Create a HEAD of mbufs' cluster and copy the first bytes into it - len = copy_one_data_buf(head, base, left_to_copy); - if (!len) { - return false; - } - - left_to_copy -= len; - base += len; - nsegs = 1; - - // Copy the rest of the data into the new mbufs and chain them to the - // cluster - rte_mbuf* prev_seg = head; - while (left_to_copy) { - len = copy_one_data_buf(m, base, left_to_copy); - if (!len) { - rte_pktmbuf_free(head); - return false; - } - - left_to_copy -= len; - base += len; - nsegs++; - - rte_mbuf_next(prev_seg) = m; - prev_seg = m; - } - - // Return the last mbuf in the cluster - last_seg = prev_seg; - - return true; -} - -rte_mbuf* dpdk_qp::create_tx_mbuf(packet& p) { - // sanity - if (!p.len()) { - return nullptr; - } - - // Too fragmented - linearize - if (p.nr_frags() > max_frags) { - p.linearize(); - } - - /* TODO: configure the offload features here if any */ - - // - // We will copy the data for now and will implement a zero-copy in the - // future. - - rte_mbuf *head = nullptr, *last_seg = NULL; - unsigned total_nsegs = 0, nsegs = 0; - - // Create a HEAD of the fragmented packet - if (!copy_one_frag(p.frag(0), head, last_seg, nsegs)) { - // Drop if we failed to allocate new mbuf - return nullptr; - } - - total_nsegs += nsegs; - - for (unsigned i = 1; i < p.nr_frags(); i++) { - - rte_mbuf *h = NULL, *new_last_seg = NULL; - if (!copy_one_frag(p.frag(i), h, new_last_seg, nsegs)) { - rte_pktmbuf_free(head); - return nullptr; - } - - total_nsegs += nsegs; - - // Attach a new buffers' chain to the packet chain - rte_mbuf_next(last_seg) = h; - last_seg = new_last_seg; - } - - // Update the HEAD buffer with the packet info - rte_mbuf_pkt_len(head) = p.len(); - rte_mbuf_nb_segs(head) = total_nsegs; - - // Handle TCP checksum offload - auto oi = p.offload_info(); - if (oi.needs_ip_csum) { - head->ol_flags |= PKT_TX_IP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } - if (_dev->hw_features().tx_csum_l4_offload) { - if (oi.protocol == ip_protocol_num::tcp) { - head->ol_flags |= PKT_TX_TCP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } else if (oi.protocol == ip_protocol_num::udp) { - head->ol_flags |= PKT_TX_UDP_CKSUM; - rte_mbuf_l2_len(head) = sizeof(struct ether_hdr); - rte_mbuf_l3_len(head) = oi.ip_hdr_len; - } - } - return head; -} - -uint32_t dpdk_qp::send(circular_buffer& pb) -{ - if (_tx_burst.size() == 0) { - for (auto&& p : pb) { - auto mbuf = create_tx_mbuf(p); - if (!mbuf) { - break; - } - _tx_burst.push_back(mbuf); - } - } - - auto sent = rte_eth_tx_burst(_dev->port_idx(), _qid, _tx_burst.data() + _tx_burst_idx, _tx_burst.size() - _tx_burst_idx); - - for (int i = 0; i < sent; i++) { - pb.pop_front(); - } - - _tx_burst_idx += sent; - - if (_tx_burst_idx == _tx_burst.size()) { - _tx_burst_idx = 0; - _tx_burst.clear(); - } - return sent; -} - #ifdef RTE_VERSION_1_7 void dpdk_device::get_rss_table() { @@ -736,7 +1321,14 @@ void dpdk_device::get_rss_table() #endif std::unique_ptr dpdk_device::init_local_queue(boost::program_options::variables_map opts, uint16_t qid) { - auto qp = std::make_unique(this, qid); + + std::unique_ptr qp; + if (opts.count("hugepages")) { + qp = std::make_unique>(this, qid); + } else { + qp = std::make_unique>(this, qid); + } + smp::submit_to(_home_cpu, [this] () mutable { if (++_queues_ready == _num_queues) { init_port_fini(); diff --git a/net/dpdk.hh b/net/dpdk.hh index b625d8914c..3c762eebcc 100644 --- a/net/dpdk.hh +++ b/net/dpdk.hh @@ -17,6 +17,13 @@ std::unique_ptr create_dpdk_net_device( boost::program_options::options_description get_dpdk_net_options_description(); +namespace dpdk { +/** + * @return Number of bytes needed for mempool objects of each QP. + */ +uint32_t qp_mempool_obj_size(); +} + #endif // _SEASTAR_DPDK_DEV_H #endif // HAVE_DPDK