gpt4 book ai didi

c++ - 无法同时在两个 10Gbps 接口(interface)上达到全线速

转载 作者:太空狗 更新时间:2023-10-29 23:11:59 25 4
gpt4 key购买 nike

我正在开发生成吞吐量为数十 Gbit/s 的实时数据流的应用程序。我不需要任何响应,所以我使用 UDP。我将数据包发送到多播地址。我的系统 (Centos 7) 有 2 个 10 Gbit/s 的网络端口。

当我尝试同时通过两个端口发送数据时遇到了麻烦。我预计会低于 20 Gbit/s,但实际上我得到了 11-12 Gbit。如果仅使用 1 个端口,我将获得 9.5 Gbit/s 的速度。

我使用 select() 和非阻塞套接字。这是可执行演示:

#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <stdio.h>
#include <cerrno>
#include <cstring>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <ctime>
#include <sys/time.h>
#include <sys/fcntl.h>
// #include <sys/resource.h>



#define GROUP_PORT 3490
#define GROUP_ADDR "225.0.0.37"
#define INTERFACES 2
#define LOCAL_INTERFACE_IP0 "192.168.2.3"
#define LOCAL_INTERFACE_IP1 "192.168.2.4"



inline long duration_mcs(timespec t1, timespec t2) {
return ((t2.tv_sec - t1.tv_sec)*1000000+(t2.tv_nsec - t1.tv_nsec)/1000);
}



int main(int argc, char* argv[])
{

// if (setpriority(PRIO_PROCESS, 0, -15) == -1) {
// printf("PRIO failed: %s.\n", std::strerror(errno));
// return -1;
// }

//bind thread to specific core
cpu_set_t set;
CPU_ZERO(&set); //clear cpu set
int cpuId = 5;
CPU_SET(cpuId, &set); //dedicate cpu for current thread (add cpuId to set)
//bind current thread (pId=0) to dedicated cpu
if (sched_setaffinity(0, sizeof(set), &set) == -1) {
printf("sched_setaffinity failed: %s.\n", std::strerror(errno));
return -1;
}


// SETUP INTERFACES ADDRESSES ----------------------------------
// -------------------------------------------------------------
in_addr localInterface[INTERFACES];
localInterface[0].s_addr = inet_addr(LOCAL_INTERFACE_IP0);
localInterface[1].s_addr = inet_addr(LOCAL_INTERFACE_IP1);


// SETUP SOCKETS -----------------------------------------------
// -------------------------------------------------------------
int fdmax = 0;
int fds[INTERFACES];
int flags;

for (int i=0; i<INTERFACES; ++i) {
fds[i] = socket(AF_INET, SOCK_DGRAM, 0);
if (fds[i] == -1) {
printf("Socket %d failed: %s.\n", i, std::strerror(errno));
return -1;
}

//make sockets NONBLOCK
if ((flags = fcntl(fds[i], F_GETFL, 0)) < 0)
{
printf("F_GETFL on socket %d failed: %s.\n", i, std::strerror(errno));
}

if (fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
{
printf("O_NONBLOCK on socket %d failed: %s.\n", i, std::strerror(errno));
}

if (fds[i] > fdmax) fdmax = fds[i];
printf("Socket %d success.\n", i);
}



// SETUP SOCKET OPTIONS ----------------------------------------
// -------------------------------------------------------------
// send packets through particular interface
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_IF, (char*) &localInterface[i], sizeof(localInterface[i])) == -1) {
printf("IP_MULTICAST_IF on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}

// disable multicast loop
char loopch=0;
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_LOOP, (char*) &loopch, sizeof(loopch)) == -1) {
printf("IP_MULTICAST_LOOP on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}


// SETUP ADDRESS STRUCTURE FOR SENDING PACKETS TO --------------
// -------------------------------------------------------------
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(GROUP_PORT);
address.sin_addr.s_addr = inet_addr(GROUP_ADDR);


// SETUP DATA BUFFER -------------------------------------------
// -------------------------------------------------------------
size_t buf_size = 50000;
char* buffer = (char*) memalign(256, buf_size);


// SETUP SELECT() STRUCTURES -----------------------------------
// -------------------------------------------------------------
fd_set master, writefds;
FD_ZERO(&master);
FD_ZERO(&writefds);
for (int i=0; i<INTERFACES; ++i) {
FD_SET(fds[i], &master);
}


// SENDING PACKETS ---------------------------------------------
// -------------------------------------------------------------
size_t packets = 10000; //number of packets to send
size_t nbytes = 0;
int snt;

bool pckt_flag = false; //flag for all packets are sent

size_t cnt[INTERFACES]; //counter for sent packets per each interface
for (int ifs=0; ifs<INTERFACES; ++ifs) cnt[ifs] = 0;

timespec t1, t2;
timespec t1_sel, t2_sel;
timespec t1_proc, t2_proc;
timespec t1_snd, t2_snd;
long tsum_sel = 0, tsum_proc = 0, tsum_snd = 0;

clock_gettime(CLOCK_MONOTONIC_RAW, &t1);

while (!pckt_flag) {
writefds = master;

clock_gettime(CLOCK_MONOTONIC_RAW, &t1_sel);
if (select(fdmax+1, NULL, &writefds, NULL, NULL) == -1) {
printf("select() failed: %s.\n", std::strerror(errno));
return -1;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_sel);
tsum_sel += duration_mcs(t1_sel, t2_sel);

clock_gettime(CLOCK_MONOTONIC_RAW, &t1_proc);
for (int ifs=0; ifs<INTERFACES; ++ifs) {
if (FD_ISSET(fds[ifs], &writefds)) {
//check for how many packets were sent over the interface
if (cnt[ifs] < packets) {

clock_gettime(CLOCK_MONOTONIC_RAW, &t1_snd);
snt = sendto(fds[ifs], buffer, buf_size, 0, (sockaddr*) &address, sizeof(address));
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_snd);
tsum_snd += duration_mcs(t1_snd, t2_snd);

if (snt < buf_size) {
printf("Sending error: sent %d of %d bytes\n", snt, buf_size);
} else {
nbytes += snt;
++cnt[ifs];
}
}
}
}
//renew flag
pckt_flag = true;
for (int ifs=0; ifs<INTERFACES; ++ifs) {
pckt_flag = (pckt_flag && (cnt[ifs] == packets));
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_proc);
tsum_proc += duration_mcs(t1_proc, t2_proc);
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2);

size_t traf_tot_bytes = nbytes;
double duration_sec = (double) duration_mcs(t1, t2)/1000000;

printf("Time %f s.\n", duration_sec);
printf("Total bytes sent %d.\n", traf_tot_bytes);
printf("Total throughput %f Gbit/s.\n", 8*(traf_tot_bytes/duration_sec)/1000000000);
printf("Packets sent by interfaces %d/%d\n", cnt[0], cnt[1]);
printf("tsum_sel = %d\n", tsum_sel);
printf("tsum_proc = %d\n", tsum_proc);
printf("tsum_snd = %d\n", tsum_snd);


free(buffer);

return 0;

}

在这个演示中,我插入了计时器,用于记录等待 select() (tsum_sel)、数据包处理 (tsum_proc) 和 sendto() 本身发送的总时间( tsum_snd)。

在我的系统上使用 INTERFACE = 1 输出:

Socket 0 success.
Time 0.429122 s.
Total bytes sent 500000000.
Total throughput 9.321358 Gbit/s.
Packets sent by interfaces 10000/0
tsum_sel = 51086
tsum_proc = 362756
tsum_snd = 358939

对于 INTERFACE = 2:

Socket 0 success.
Socket 1 success.
Time 0.697962 s.
Total bytes sent 1000000000.
Total throughput 11.461942 Gbit/s.
Packets sent by interfaces 10000/10000
tsum_sel = 2383
tsum_proc = 662971
tsum_snd = 652629

我看到几乎所有时间都被 sendto() 函数消耗了。所以看起来我通过第一个接口(interface)发送数据包,等待 sendto 返回,然后发送到第二个接口(interface)。为了避免这种情况,我让我的套接字成为非阻塞的。我不明白这是怎么回事。

我的问题是:

1) 为什么这段代码不以 20 Gbit/s 的速率发送数据?

2) 为什么非阻塞 sendto() 需要那么多时间?

3) 如何在这里获得 20 Gbit/s?

最佳答案

嗯,我有 19 Gbit/s。

我创建了 2 个线程 - 每个线程独立发送数据。它看起来很简单,但几乎没有阴影。如果我将线程绑定(bind)到同一个虚拟核心 - 问题仍然存在 - 它会提供 14-15 Gbit/s。只有当我将线程绑定(bind)到不同的虚拟核心时它才能正常工作。甚至这些核心都在同一个物理核心上。这是我所能希望的最好的。我可以使用 1 个物理核心进行系统维护和联网。

感谢所有评论的人。

关于c++ - 无法同时在两个 10Gbps 接口(interface)上达到全线速,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48786450/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com