DPDK: Initialize mempools to work with external memory

If seastar is configured to use hugetlbfs initialize mempools
with external memory buffer. This way we are going to better control the overall
memory consumption.

Signed-off-by: Vlad Zolotarov <vladz@cloudius-systems.com>

New in v2:
   - Use char* instead of void* for pointer's arithmetics.
This commit is contained in:
Vlad Zolotarov
2015-01-27 15:06:56 +02:00
parent d4cddbc3d0
commit 82e20564b0

View File

@@ -12,6 +12,7 @@
#include "core/circular_buffer.hh"
#include "core/align.hh"
#include "core/sstring.hh"
#include "core/memory.hh"
#include "util/function_input_iterator.hh"
#include "util/transform_iterator.hh"
#include <atomic>
@@ -24,6 +25,7 @@
#include "toeplitz.hh"
#include <getopt.h>
#include <malloc.h>
#include <rte_config.h>
#include <rte_common.h>
@@ -195,16 +197,50 @@ public:
class dpdk_qp : public net::qp {
public:
explicit dpdk_qp(dpdk_device* dev, uint8_t qid);
explicit dpdk_qp(dpdk_device* dev, uint8_t qid,
bool huge_pages_mem_backend);
virtual future<> send(packet p) override {
abort();
}
virtual uint32_t send(circular_buffer<packet>& p) override;
virtual ~dpdk_qp() {
// TODO: Free all mempools
if (_rx_xmem) {
free(_rx_xmem);
}
if (_tx_xmem) {
free(_tx_xmem);
}
}
private:
bool init_mbuf_pools();
/**
* Allocates a memory chunk to accommodate the given number of buffers of
* the given size and fills a vector with underlying physical pages.
*
* The chunk is going to be used as an external memory buffer of the DPDK
* memory pool (created using rte_mempool_xmem_create()).
*
* The chunk size if calculated using rte_mempool_xmem_size() function.
*
* @param num_bufs Number of buffers (in)
* @param buf_sz Size of each buffer (in)
* @param mappings vector of physical pages (out)
*
* @note this function assumes that "mappings" is properly set and adds the
* mappings to the back of the vector.
*
* @return a virtual address of the allocated memory chunk or nullptr in
* case of a failure.
*/
void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz,
std::vector<phys_addr_t>& mappings);
/**
* Polls for a burst of incoming packets. This function will not block and
* will immediately return after processing all available packets.
@@ -250,8 +286,10 @@ private:
rte_mbuf* create_tx_mbuf(packet& p);
private:
dpdk_device* _dev;
bool _huge_pages_mem_backend = false;
uint8_t _qid;
rte_mempool *_pktmbuf_pool_rx, *_pktmbuf_pool_tx;
void *_rx_xmem = nullptr, *_tx_xmem = nullptr;
reactor::poller _rx_poller;
std::vector<rte_mbuf*> _tx_burst;
uint16_t _tx_burst_idx = 0;
@@ -404,30 +442,107 @@ void dpdk_device::init_port_fini()
printf("Created DPDK device\n");
}
void* dpdk_qp::alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz,
std::vector<phys_addr_t>& mappings)
{
using namespace memory;
char* xmem;
size_t xmem_size = rte_mempool_xmem_size(num_bufs, buf_sz, page_bits);
// Aligning to 2M causes the further failure in small allocations.
// TODO: Check why - and fix.
xmem = (char*)memalign(page_size, xmem_size);
if (!xmem) {
printf("Can't allocate %ld bytes aligned to %ld\n",
xmem_size, page_size);
return nullptr;
}
for (size_t i = 0; i < xmem_size / page_size; ++i) {
translation tr = translate(xmem + i * page_size, page_size);
assert(tr.size);
mappings.push_back(tr.addr);
}
return xmem;
}
bool dpdk_qp::init_mbuf_pools()
{
using namespace memory;
sstring name = to_sstring(pktmbuf_pool_name) + to_sstring(_qid);
/* don't pass single-producer/single-consumer flags to mbuf create as it
* seems faster to use a cache instead */
printf("Creating mbuf pools '%s_rx/_tx' [%u and %u mbufs respectively] ...\n",
printf("Creating mbuf pools '%s_rx/_tx' "
"[%u and %u mbufs respectively] ...\n",
name.c_str(), mbufs_per_queue_rx, mbufs_per_queue_tx);
_pktmbuf_pool_rx =
rte_mempool_create((name + to_sstring("_rx")).c_str(),
mbufs_per_queue_rx,
mbuf_size, mbuf_cache_size,
//
// If we have a hugetlbfs memory backend we may perform a virt2phys
// translation and memory is "pinned". Therefore we may provide an external
// memory for DPDK pools and this way significantly reduce the memory needed
// for the DPDK in this case.
//
if (_huge_pages_mem_backend) {
std::vector<phys_addr_t> mappings;
_rx_xmem = alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_size, mappings);
if (!_rx_xmem) {
printf("Can't allocate a memory for Rx buffers\n");
return false;
}
//
// Don't pass single-producer/single-consumer flags to mbuf create as it
// seems faster to use a cache instead.
//
_pktmbuf_pool_rx =
rte_mempool_xmem_create((name + to_sstring("_rx")).c_str(),
mbufs_per_queue_rx, mbuf_size,
mbuf_cache_size,
sizeof(struct rte_pktmbuf_pool_private),
rte_pktmbuf_pool_init, NULL,
rte_pktmbuf_init, NULL,
rte_socket_id(), 0,
_rx_xmem, mappings.data(), mappings.size(),
page_bits);
mappings.clear();
_tx_xmem = alloc_mempool_xmem(mbufs_per_queue_tx, mbuf_size, mappings);
if (!_tx_xmem) {
printf("Can't allocate a memory for Tx buffers\n");
return false;
}
_pktmbuf_pool_tx =
rte_mempool_xmem_create((name + to_sstring("_tx")).c_str(),
mbufs_per_queue_tx, mbuf_size,
mbuf_cache_size,
sizeof(struct rte_pktmbuf_pool_private),
rte_pktmbuf_pool_init, NULL,
rte_pktmbuf_init, NULL,
rte_socket_id(), 0);
_pktmbuf_pool_tx =
rte_socket_id(), 0,
_tx_xmem, mappings.data(), mappings.size(),
page_bits);
} else {
_pktmbuf_pool_rx =
rte_mempool_create((name + to_sstring("_rx")).c_str(),
mbufs_per_queue_rx, mbuf_size,
mbuf_cache_size,
sizeof(struct rte_pktmbuf_pool_private),
rte_pktmbuf_pool_init, NULL,
rte_pktmbuf_init, NULL,
rte_socket_id(), 0);
_pktmbuf_pool_tx =
rte_mempool_create((name + to_sstring("_tx")).c_str(),
mbufs_per_queue_tx,
mbuf_size, mbuf_cache_size,
mbufs_per_queue_tx, mbuf_size,
mbuf_cache_size,
sizeof(struct rte_pktmbuf_pool_private),
rte_pktmbuf_pool_init, NULL,
rte_pktmbuf_init, NULL,
rte_socket_id(), 0);
}
return _pktmbuf_pool_rx != NULL && _pktmbuf_pool_tx != NULL;
}
@@ -466,9 +581,9 @@ void dpdk_device::check_port_link_status()
t->arm_periodic(check_interval);
}
dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid)
: _dev(dev), _qid(qid), _rx_poller([&] { return poll_rx_once(); })
dpdk_qp::dpdk_qp(dpdk_device* dev, uint8_t qid, bool huge_pages_mem_backend)
: _dev(dev), _huge_pages_mem_backend(huge_pages_mem_backend), _qid(qid),
_rx_poller([&] { return poll_rx_once(); })
{
if (!init_mbuf_pools()) {
rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n");
@@ -745,7 +860,7 @@ void dpdk_device::get_rss_table()
#endif
std::unique_ptr<qp> dpdk_device::init_local_queue(boost::program_options::variables_map opts, uint16_t qid) {
auto qp = std::make_unique<dpdk_qp>(this, qid);
auto qp = std::make_unique<dpdk_qp>(this, qid, opts.count("hugepages"));
smp::submit_to(_home_cpu, [this] () mutable {
if (++_queues_ready == _num_queues) {
init_port_fini();