- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我的公司正在研究使用 ZeroMQ 作为传输机制。首先,我对性能进行了基准测试,只是为了了解我在玩什么。
所以我创建了一个应用程序,将 zmq 经销商到经销商的设置与 winsock 进行比较。我测量了从客户端向服务器发送同步消息的往返时间,然后计算平均值。
这里是运行winsock的服务器:
DWORD RunServerWINSOCKTest(DWORD dwPort)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
printf("WSAStartup failed with error: %d\n", iRet);
return iRet;
}
struct addrinfo hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
struct addrinfo *result = NULL;
iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
if (iRet != 0)
{
WSACleanup();
return iRet;
}
SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (ListenSocket == INVALID_SOCKET)
{
freeaddrinfo(result);
WSACleanup();
return WSAGetLastError();
}
iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
if (iRet == SOCKET_ERROR)
{
freeaddrinfo(result);
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
freeaddrinfo(result);
iRet = listen(ListenSocket, SOMAXCONN);
if (iRet == SOCKET_ERROR)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
while (true)
{
SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
if (ClientSocket == INVALID_SOCKET)
{
closesocket(ListenSocket);
WSACleanup();
return WSAGetLastError();
}
char value = 0;
setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));
char recvbuf[DEFAULT_BUFLEN];
int recvbuflen = DEFAULT_BUFLEN;
do {
iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
if (iRet > 0) {
// Echo the buffer back to the sender
int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
if (iSendResult == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
}
else if (iRet == 0)
printf("Connection closing...\n");
else {
closesocket(ClientSocket);
WSACleanup();
return 1;
}
} while (iRet > 0);
iRet = shutdown(ClientSocket, SD_SEND);
if (iRet == SOCKET_ERROR)
{
closesocket(ClientSocket);
WSACleanup();
return WSAGetLastError();
}
closesocket(ClientSocket);
}
closesocket(ListenSocket);
return WSACleanup();
}
这是运行winsock的客户端:
DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
WSADATA wsaData;
int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iRet != NO_ERROR)
{
return iRet;
}
SOCKET ConnectSocket = INVALID_SOCKET;
struct addrinfo *result = NULL, *ptr = NULL, hints;
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
if (iResult != 0) {
WSACleanup();
return 1;
}
for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}
iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
ConnectSocket = INVALID_SOCKET;
continue;
}
break;
}
freeaddrinfo(result);
if (ConnectSocket == INVALID_SOCKET) {
WSACleanup();
return 1;
}
// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 1000000;
CHAR cRecvMsg[DEFAULT_BUFLEN];
SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);
std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
if (iRet == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
if (iRet < 1)
{
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
std::string strMessage(cRecvMsg);
if (strMessage.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
iResult = shutdown(ConnectSocket, SD_SEND);
if (iResult == SOCKET_ERROR) {
closesocket(ConnectSocket);
WSACleanup();
return 1;
}
closesocket(ConnectSocket);
return WSACleanup();
}
这里是运行ZMQ的服务器(dealer)
DWORD RunServerZMQTest(DWORD dwPort)
{
try
{
zmq::context_t context(1);
zmq::socket_t server(context, ZMQ_DEALER);
// Set options here
std::string strIdentity = s_set_id(server);
printf("Created server connection with ID: %s\n", strIdentity.c_str());
std::string strConnect = "tcp://*:" + std::to_string(dwPort);
server.bind(strConnect.c_str());
bool bRunning = true;
while (bRunning)
{
std::string strMessage = s_recv(server);
if (!s_send(server, strMessage))
{
return NO_ERROR;
}
}
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
这里是运行ZMQ的客户端(dealer)
DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
try
{
zmq::context_t ctx(1);
zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ
// Set options here
std::string strIdentity = s_set_id(client);
std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
client.connect(strConnect.c_str());
if(s_send(client, "INIT"))
{
std::string strMessage = s_recv(client);
if (strMessage.compare("INIT") == 0)
{
printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}
// Statistics
UINT64 uint64BytesTransmitted = 0;
UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
UINT64 uint64WaitForResponse = 0;
DWORD dwMessageCount = 10000000;
std::string strSendMsg(dwMessageSize, 'X');
for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
{
if (s_send(client, strSendMsg))
{
uint64BytesTransmitted += strSendMsg.size();
UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
std::string strRecvMsg = s_recv(client);
if (strRecvMsg.compare(strSendMsg) == 0)
{
uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
}
else
{
return NO_ERROR;
}
}
else
{
return NO_ERROR;
}
}
UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
}
catch (zmq::error_t& e)
{
return (DWORD)e.num();
}
return NO_ERROR;
}
我在本地运行基准测试,消息大小为 5 个字节,我得到以下结果:
温索克
Messages sent: 1 000 000
Time elapsed (us): 48 019 415
Time elapsed (s): 48.019 415
Message size (bytes): 5
Msg/s: 20 825
Bytes/s: 104 125
Mb/s: 0.099
Total response time (us): 24 537 376
Average repsonse time (us): 24.0
和
ZeroMQ
Messages sent: 1 000 000
Time elapsed (us): 158 290 708
Time elapsed (s): 158.290 708
Message size (bytes): 5
Msg/s: 6 317
Bytes/s: 31 587
Mb/s: 0.030
Total response time (us): 125 524 178
Average response time (us): 125.0
谁能解释为什么使用 ZMQ 时平均响应时间要长得多?
我的目标是找到一个我可以异步发送和接收消息而无需回复的设置。如果这可以通过与经销商经销商不同的设置来实现,请告诉我!
最佳答案
这只是对您问题的一小部分的回答,但这里是 -
为什么需要经销商/经销商?我假设是因为通信可以从任一点开始?您没有绑定(bind)到经销商/经销商,特别是它限制您只有两个端点,如果您在通信的任何一侧添加另一个端点,比如第二个客户端,那么每个客户端都会只收到一半的消息,因为经销商是严格循环的。
异步通信所需的是经销商和/或路由器套接字的某种组合。两者都不需要响应,主要区别在于它们如何选择向哪个连接的对等方发送消息:
这两种套接字类型可以一起工作,因为经销商套接字(和请求套接字,经销商是“请求类型”套接字)将它们的“名称”作为消息的一部分发送,路由器套接字可以使用它来发回数据。这是一个请求/回复范例,您会在 the guide 中的所有示例中看到这种范例。 ,但您可以将该范例用于您正在寻找的内容,特别是经销商和路由器都不需要回复。
在不知道您的全部要求的情况下,我无法告诉您我会选择哪种 ZMQ 架构,但总的来说,我更喜欢路由器套接字的可扩展性,它更容易处理适当的寻址,而不是将所有东西硬塞进一个对等点...你会看到反对做路由器/路由器的警告,我同意他们在尝试之前应该了解你在做什么的程度,但了解你在做什么,实现不是那样很难。
如果满足您的要求,您还可以选择为每一端设置一个 pub 套接字,如果没有任何回复ever,则每个端都有一个子套接字。如果它是严格意义上的从源到目标的数据馈送,并且任何对等方都不需要对其发送的内容有任何反馈,那么这可能是最佳选择,即使这意味着您在每端处理两个套接字而不是一个套接字。
这些都没有直接解决性能问题,但重要的是要了解 zmq 套接字针对特定用例进行了优化,正如 John Jefferies 的回答中指出的那样,您正在打破经销商套接字的用例测试中的消息传递严格同步。首先要确定您的 ZMQ 架构,然后模拟实际的消息流,特别是不添加任意等待和同步性,这将必然改变方式吞吐量看起来就像您正在测试的那样,几乎符合定义。
关于c++ - ZeroMQ dealer--to-dealer 与 winsock 相比延迟高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26235533/
#include using namespace std; class C{ private: int value; public: C(){ value = 0;
这个问题已经有答案了: What is the difference between char a[] = ?string?; and char *p = ?string?;? (8 个回答) 已关闭
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 7 年前。 此帖子已于 8 个月
除了调试之外,是否有任何针对 c、c++ 或 c# 的测试工具,其工作原理类似于将独立函数复制粘贴到某个文本框,然后在其他文本框中输入参数? 最佳答案 也许您会考虑单元测试。我推荐你谷歌测试和谷歌模拟
我想在第二台显示器中移动一个窗口 (HWND)。问题是我尝试了很多方法,例如将分辨率加倍或输入负值,但它永远无法将窗口放在我的第二台显示器上。 关于如何在 C/C++/c# 中执行此操作的任何线索 最
我正在寻找 C/C++/C## 中不同类型 DES 的现有实现。我的运行平台是Windows XP/Vista/7。 我正在尝试编写一个 C# 程序,它将使用 DES 算法进行加密和解密。我需要一些实
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 1
有没有办法强制将另一个 窗口置于顶部? 不是应用程序的窗口,而是另一个已经在系统上运行的窗口。 (Windows, C/C++/C#) 最佳答案 SetWindowPos(that_window_ha
假设您可以在 C/C++ 或 Csharp 之间做出选择,并且您打算在 Windows 和 Linux 服务器上运行同一服务器的多个实例,那么构建套接字服务器应用程序的最明智选择是什么? 最佳答案 如
你们能告诉我它们之间的区别吗? 顺便问一下,有什么叫C++库或C库的吗? 最佳答案 C++ 标准库 和 C 标准库 是 C++ 和 C 标准定义的库,提供给 C++ 和 C 程序使用。那是那些词的共同
下面的测试代码,我将输出信息放在注释中。我使用的是 gcc 4.8.5 和 Centos 7.2。 #include #include class C { public:
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
我的客户将使用名为 annoucement 的结构/类与客户通信。我想我会用 C++ 编写服务器。会有很多不同的类继承annoucement。我的问题是通过网络将这些类发送给客户端 我想也许我应该使用
我在 C# 中有以下函数: public Matrix ConcatDescriptors(IList> descriptors) { int cols = descriptors[0].Co
我有一个项目要编写一个函数来对某些数据执行某些操作。我可以用 C/C++ 编写代码,但我不想与雇主共享该函数的代码。相反,我只想让他有权在他自己的代码中调用该函数。是否可以?我想到了这两种方法 - 在
我使用的是编写糟糕的第 3 方 (C/C++) Api。我从托管代码(C++/CLI)中使用它。有时会出现“访问冲突错误”。这使整个应用程序崩溃。我知道我无法处理这些错误[如果指针访问非法内存位置等,
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,因为
我有一些 C 代码,将使用 P/Invoke 从 C# 调用。我正在尝试为这个 C 函数定义一个 C# 等效项。 SomeData* DoSomething(); struct SomeData {
这个问题已经有答案了: Why are these constructs using pre and post-increment undefined behavior? (14 个回答) 已关闭 6
我是一名优秀的程序员,十分优秀!