- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在开发生成吞吐量为数十 Gbit/s 的实时数据流的应用程序。我不需要任何响应,所以我使用 UDP。我将数据包发送到多播地址。我的系统 (Centos 7) 有 2 个 10 Gbit/s 的网络端口。
当我尝试同时通过两个端口发送数据时遇到了麻烦。我预计会低于 20 Gbit/s,但实际上我得到了 11-12 Gbit。如果仅使用 1 个端口,我将获得 9.5 Gbit/s 的速度。
我使用 select() 和非阻塞套接字。这是可执行演示:
#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <stdio.h>
#include <cerrno>
#include <cstring>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <malloc.h>
#include <ctime>
#include <sys/time.h>
#include <sys/fcntl.h>
// #include <sys/resource.h>
#define GROUP_PORT 3490
#define GROUP_ADDR "225.0.0.37"
#define INTERFACES 2
#define LOCAL_INTERFACE_IP0 "192.168.2.3"
#define LOCAL_INTERFACE_IP1 "192.168.2.4"
inline long duration_mcs(timespec t1, timespec t2) {
return ((t2.tv_sec - t1.tv_sec)*1000000+(t2.tv_nsec - t1.tv_nsec)/1000);
}
int main(int argc, char* argv[])
{
// if (setpriority(PRIO_PROCESS, 0, -15) == -1) {
// printf("PRIO failed: %s.\n", std::strerror(errno));
// return -1;
// }
//bind thread to specific core
cpu_set_t set;
CPU_ZERO(&set); //clear cpu set
int cpuId = 5;
CPU_SET(cpuId, &set); //dedicate cpu for current thread (add cpuId to set)
//bind current thread (pId=0) to dedicated cpu
if (sched_setaffinity(0, sizeof(set), &set) == -1) {
printf("sched_setaffinity failed: %s.\n", std::strerror(errno));
return -1;
}
// SETUP INTERFACES ADDRESSES ----------------------------------
// -------------------------------------------------------------
in_addr localInterface[INTERFACES];
localInterface[0].s_addr = inet_addr(LOCAL_INTERFACE_IP0);
localInterface[1].s_addr = inet_addr(LOCAL_INTERFACE_IP1);
// SETUP SOCKETS -----------------------------------------------
// -------------------------------------------------------------
int fdmax = 0;
int fds[INTERFACES];
int flags;
for (int i=0; i<INTERFACES; ++i) {
fds[i] = socket(AF_INET, SOCK_DGRAM, 0);
if (fds[i] == -1) {
printf("Socket %d failed: %s.\n", i, std::strerror(errno));
return -1;
}
//make sockets NONBLOCK
if ((flags = fcntl(fds[i], F_GETFL, 0)) < 0)
{
printf("F_GETFL on socket %d failed: %s.\n", i, std::strerror(errno));
}
if (fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
{
printf("O_NONBLOCK on socket %d failed: %s.\n", i, std::strerror(errno));
}
if (fds[i] > fdmax) fdmax = fds[i];
printf("Socket %d success.\n", i);
}
// SETUP SOCKET OPTIONS ----------------------------------------
// -------------------------------------------------------------
// send packets through particular interface
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_IF, (char*) &localInterface[i], sizeof(localInterface[i])) == -1) {
printf("IP_MULTICAST_IF on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}
// disable multicast loop
char loopch=0;
for (int i=0; i<INTERFACES; ++i) {
if (setsockopt(fds[i], IPPROTO_IP, IP_MULTICAST_LOOP, (char*) &loopch, sizeof(loopch)) == -1) {
printf("IP_MULTICAST_LOOP on interface %s failed: %s.\n", inet_ntoa(localInterface[i]), std::strerror(errno));
return -1;
}
}
// SETUP ADDRESS STRUCTURE FOR SENDING PACKETS TO --------------
// -------------------------------------------------------------
sockaddr_in address;
address.sin_family = AF_INET;
address.sin_port = htons(GROUP_PORT);
address.sin_addr.s_addr = inet_addr(GROUP_ADDR);
// SETUP DATA BUFFER -------------------------------------------
// -------------------------------------------------------------
size_t buf_size = 50000;
char* buffer = (char*) memalign(256, buf_size);
// SETUP SELECT() STRUCTURES -----------------------------------
// -------------------------------------------------------------
fd_set master, writefds;
FD_ZERO(&master);
FD_ZERO(&writefds);
for (int i=0; i<INTERFACES; ++i) {
FD_SET(fds[i], &master);
}
// SENDING PACKETS ---------------------------------------------
// -------------------------------------------------------------
size_t packets = 10000; //number of packets to send
size_t nbytes = 0;
int snt;
bool pckt_flag = false; //flag for all packets are sent
size_t cnt[INTERFACES]; //counter for sent packets per each interface
for (int ifs=0; ifs<INTERFACES; ++ifs) cnt[ifs] = 0;
timespec t1, t2;
timespec t1_sel, t2_sel;
timespec t1_proc, t2_proc;
timespec t1_snd, t2_snd;
long tsum_sel = 0, tsum_proc = 0, tsum_snd = 0;
clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
while (!pckt_flag) {
writefds = master;
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_sel);
if (select(fdmax+1, NULL, &writefds, NULL, NULL) == -1) {
printf("select() failed: %s.\n", std::strerror(errno));
return -1;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_sel);
tsum_sel += duration_mcs(t1_sel, t2_sel);
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_proc);
for (int ifs=0; ifs<INTERFACES; ++ifs) {
if (FD_ISSET(fds[ifs], &writefds)) {
//check for how many packets were sent over the interface
if (cnt[ifs] < packets) {
clock_gettime(CLOCK_MONOTONIC_RAW, &t1_snd);
snt = sendto(fds[ifs], buffer, buf_size, 0, (sockaddr*) &address, sizeof(address));
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_snd);
tsum_snd += duration_mcs(t1_snd, t2_snd);
if (snt < buf_size) {
printf("Sending error: sent %d of %d bytes\n", snt, buf_size);
} else {
nbytes += snt;
++cnt[ifs];
}
}
}
}
//renew flag
pckt_flag = true;
for (int ifs=0; ifs<INTERFACES; ++ifs) {
pckt_flag = (pckt_flag && (cnt[ifs] == packets));
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2_proc);
tsum_proc += duration_mcs(t1_proc, t2_proc);
}
clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
size_t traf_tot_bytes = nbytes;
double duration_sec = (double) duration_mcs(t1, t2)/1000000;
printf("Time %f s.\n", duration_sec);
printf("Total bytes sent %d.\n", traf_tot_bytes);
printf("Total throughput %f Gbit/s.\n", 8*(traf_tot_bytes/duration_sec)/1000000000);
printf("Packets sent by interfaces %d/%d\n", cnt[0], cnt[1]);
printf("tsum_sel = %d\n", tsum_sel);
printf("tsum_proc = %d\n", tsum_proc);
printf("tsum_snd = %d\n", tsum_snd);
free(buffer);
return 0;
}
在这个演示中,我插入了计时器,用于记录等待 select() (tsum_sel)、数据包处理 (tsum_proc) 和 sendto() 本身发送的总时间( tsum_snd)。
在我的系统上使用 INTERFACE = 1 输出:
Socket 0 success.
Time 0.429122 s.
Total bytes sent 500000000.
Total throughput 9.321358 Gbit/s.
Packets sent by interfaces 10000/0
tsum_sel = 51086
tsum_proc = 362756
tsum_snd = 358939
对于 INTERFACE = 2:
Socket 0 success.
Socket 1 success.
Time 0.697962 s.
Total bytes sent 1000000000.
Total throughput 11.461942 Gbit/s.
Packets sent by interfaces 10000/10000
tsum_sel = 2383
tsum_proc = 662971
tsum_snd = 652629
我看到几乎所有时间都被 sendto() 函数消耗了。所以看起来我通过第一个接口(interface)发送数据包,等待 sendto 返回,然后发送到第二个接口(interface)。为了避免这种情况,我让我的套接字成为非阻塞的。我不明白这是怎么回事。
我的问题是:
1) 为什么这段代码不以 20 Gbit/s 的速率发送数据?
2) 为什么非阻塞 sendto() 需要那么多时间?
3) 如何在这里获得 20 Gbit/s?
最佳答案
嗯,我有 19 Gbit/s。
我创建了 2 个线程 - 每个线程独立发送数据。它看起来很简单,但几乎没有阴影。如果我将线程绑定(bind)到同一个虚拟核心 - 问题仍然存在 - 它会提供 14-15 Gbit/s。只有当我将线程绑定(bind)到不同的虚拟核心时它才能正常工作。甚至这些核心都在同一个物理核心上。这是我所能希望的最好的。我可以使用 1 个物理核心进行系统维护和联网。
感谢所有评论的人。
关于c++ - 无法同时在两个 10Gbps 接口(interface)上达到全线速,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48786450/
我通过 spring ioc 编写了一些 Rest 应用程序。但我无法解决这个问题。这是我的异常(exception): org.springframework.beans.factory.BeanC
我对 TestNG、Spring 框架等完全陌生,我正在尝试使用注释 @Value通过 @Configuration 访问配置文件注释。 我在这里想要实现的目标是让控制台从配置文件中写出“hi”,通过
为此工作了几个小时。我完全被难住了。 这是 CS113 的实验室。 如果用户在程序(二进制计算器)结束时选择继续,我们需要使用 goto 语句来到达程序的顶部。 但是,我们还需要释放所有分配的内存。
我正在尝试使用 ffmpeg 库构建一个小的 C 程序。但是我什至无法使用 avformat_open_input() 打开音频文件设置检查错误代码的函数后,我得到以下输出: Error code:
使用 Spring Initializer 创建一个简单的 Spring boot。我只在可用选项下选择 DevTools。 创建项目后,无需对其进行任何更改,即可正常运行程序。 现在,当我尝试在项目
所以我只是在 Mac OS X 中通过 brew 安装了 qt。但是它无法链接它。当我尝试运行 brew link qt 或 brew link --overwrite qt 我得到以下信息: ton
我在提交和 pull 时遇到了问题:在提交的 IDE 中,我看到: warning not all local changes may be shown due to an error: unable
我跑 man gcc | grep "-L" 我明白了 Usage: grep [OPTION]... PATTERN [FILE]... Try `grep --help' for more inf
我有一段代码,旨在接收任何 URL 并将其从网络上撕下来。到目前为止,它运行良好,直到有人给了它这个 URL: http://www.aspensurgical.com/static/images/a
在过去的 5 个小时里,我一直在尝试在我的服务器上设置 WireGuard,但在完成所有设置后,我无法 ping IP 或解析域。 下面是服务器配置 [Interface] Address = 10.
我正在尝试在 GitLab 中 fork 我的一个私有(private)项目,但是当我按下 fork 按钮时,我会收到以下信息: No available namespaces to fork the
我这里遇到了一些问题。我是 node.js 和 Rest API 的新手,但我正在尝试自学。我制作了 REST API,使用 MongoDB 与我的数据库进行通信,我使用 Postman 来测试我的路
下面的代码在控制台中给出以下消息: Uncaught DOMException: Failed to execute 'appendChild' on 'Node': The new child el
我正在尝试调用一个新端点来显示数据,我意识到在上一组有效的数据中,它在数据周围用一对额外的“[]”括号进行控制台,我认为这就是问题是,而新端点不会以我使用数据的方式产生它! 这是 NgFor 失败的原
我正在尝试将我的 Symfony2 应用程序部署到我的 Azure Web 应用程序,但遇到了一些麻烦。 推送到远程时,我在终端中收到以下消息 remote: Updating branch 'mas
Minikube已启动并正在运行,没有任何错误,但是我无法 curl IP。我在这里遵循:https://docs.traefik.io/user-guide/kubernetes/,似乎没有提到关闭
每当我尝试docker组成任何项目时,都会出现以下错误。 我尝试过有和没有sudo 我在这台机器上只有这个问题。我可以在Mac和Amazon WorkSpace上运行相同的容器。 (myslabs)
我正在尝试 pip install stanza 并收到此消息: ERROR: No matching distribution found for torch>=1.3.0 (from stanza
DNS 解析看起来不错,但我无法 ping 我的服务。可能是什么原因? 来自集群中的另一个 Pod: $ ping backend PING backend.default.svc.cluster.l
我正在使用Hibernate 4 + Spring MVC 4当我开始 Apache Tomcat Server 8我收到此错误: Error creating bean with name 'wel
我是一名优秀的程序员,十分优秀!