gpt4 book ai didi

c++ - ZeroMQ不同速度的订户看到相同的消息

转载 作者:行者123 更新时间:2023-11-30 05:02:07 25 4
gpt4 key购买 nike

我在c++中使用zmq 2.2(我知道较旧的版本)来创建具有多个连接的订阅服务器的发布服务器,这些订阅服务器以不同的速度读取消息。根据我对文档的理解以及Peter Hintjens的答案here,每个订阅者都有自己的队列,而发布者每个连接的订阅者都有一个队列。这似乎表明每个订户都独立于其他订户从发布者接收消息。

但是,在快速订阅者和慢速订阅者下面的代码段中,它们会收到相似的消息或完全相同的消息(即使我增加了在 A 点的睡眠时间并更改了 B 点的 ZMQ_HWM ,也会发生这种情况)。

有人可以阐明为什么会这样吗?

#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;

vector<int> slow_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
sleep(3); // 3 seconds
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
usleep(10000); // 10 miliseconds ___________________________POINT A
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
void publisher(int64_t hwm)
{
context_t context{1};
socket_t socket(context, ZMQ_PUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.bind("tcp://*:5554");
int count = 0;
while (true) {
msg_t msg(sizeof(count));
memcpy(msg.data(), &count, sizeof(count));
socket.send(msg);
count++;
}
}

int main()
{
int64_t hwm = 1; // __________________________________________POINT B
int to_read = 20;
auto fast = async(launch::async, fast_consumer, hwm, to_read);
auto slow = async(launch::async, slow_consumer, hwm, to_read);
hwm = 1; // Don't queue anything on the publisher
thread pub(publisher, hwm);
auto slow_v = slow.get();
auto fast_v = fast.get();

cout << "fast slow" << endl;
for (int i = 0; i < fast_v.size(); i ++)
{
cout << fast_v[i] << " " << slow_v[i] << endl;
}
exit(0);
}

编译: g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread通过
GCC 6.3

样本输出:
fast    slow
25988 305855
52522 454312
79197 477807
106365 502594
132793 528551
159236 554519
184486 581419
209208 606411
234483 629298
256122 651159
281188 675031
305855 701533 // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312 727817
477807 754154
502594 778654
528551 804137
554519 830677
581419 854959
606411 878841
629298 902601

最佳答案

each subscriber has its own queue



是的,它确实 ...

这来自 PUB -side .Context() -instance的设计属性,在该属性中进行发送队列管理(稍后会对此进行更多介绍)。

您可以在[ ZeroMQ hierarchy in less than a five seconds ]部分中简短阅读有关主要概念性技巧的内容。

This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.



是的,它确实 ...

各个“专用”队列之间没有交互。此处重要的是 ZMQ_HWM ,其作用是“阻止程序”语义的副作用。

在此设置中,简约的 ZMQ_HWM 可以保护/阻止任何新条目插入 PUB -侧“私有(private)”-发送队列(大小不超过 ZMQ_HWM == 1 的深度),直到成功进行远程操作-清空(由“远程” SUB -侧 Context() -s自主异步的“内部”传输相关主动性,取决于是否可能(重新)加载该 SUB -侧“私有(private)”-接收队列(大小,再者,根据 ZMQ_HWM == 1 而言,没有更深的意思

换句话说, PUB.send() -s的有效载荷将被有效地丢弃,直到远程 *_SUB.recv() -s从其“远程”- Context() -instance的接收队列中卸载“阻塞”有效载荷(大小,为根据 ZMQ_HWM == 1 的规定,最多只能存储一个有效载荷。

以这种方式, PUB.send() -er 在( secret 阻止)测试期间接收了大约 ~ 902601 ()上的上的()发射的不仅仅是20消息

在调用SUB -method时,所有这些 == to_read 消息都只是在 902581+ -旁边被 PUB 扔掉了。

它实际上如何在内部工作? Context()内部的简化 View

给定上面的模拟示例,随着.send() -ed对等节点的出现和消失, Context() -管理的队列的增长/收缩,但是在ZeroMQ API v2.2中同时具有TX和RX端。相同的高水位线天花板。如所记录的,尝试对超出此限制的任何内容进行Context()的尝试将被丢弃。
TIME                   _____________________________
v [ ]
v [ ]
v [ ]
v [ ]
v PUB.setsockopt( ZMQ_HWM, 1 );]
v PUB.send()-s [ | ]
v : [ +-----------------QUEUE-length ( a storage depth ) is but one single message
v _________________ : [
v [ ] : [Context()-managed pool-of-QUEUE(s)
v [ ] : [
v [ ] : [ ___________________
v [ ] : [ [ ]
v FAST_SUB.connect()---:------------>[?] [ ]
v FAST_SUB.recv()-s : [?] [ ]
v : : [?] [ ]
v : : [?][?]<---SLOW_SUB.connect() ]
v : : [?][?] SLOW_SUB.recv()-s ]
v : .send(1)----->[1][1] :
| 1 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(2)----->[2][1] :
| 2 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(3)----->[3][1] :
| 3 <-.recv()--------------------[?][?]------------.recv()-> 1
| : [?][?] :
| : .send(4)----->[4][4] :
| 4 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(5)----->[5][4] :
| 5 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(6)----->[6][4] :
| 6 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(7)----->[7][4] :
| 7 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(8)----->[8][4] :
| 8 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(9)----->[9][4] :
| 9 <-.recv()--------------------[?][?]------------.recv()-> 4
| : [?][?] :
| : .send(A)----->[A][A] :
| A <-.recv()--------------------[?][A]
| : [?][A]
| : .send(B)----->[B][A]
| B <-.recv()--------------------[?][A]
v : [ [
v : [
v :
v

"Messages on the fast subscriber starting here line up with messages on the slow subscriber"



不,这不会发生。没有“排队”,而是持续时间的巧合,其中fast-.connect()尚未使它成为20x .send() -s,而在阻塞SUB之后,缓慢的(-ed)-.recv()最终得到了。

最初的“差距”只是SUB阶段的影响,其中较慢的 sleep( 3 ) 不会尝试接收任何内容
main(){
|
| async(launch::async,fast|_fast____________|
| async(launch::async,slow| .setsockopt |_slow____________|
| ... | .setsockopt | .setsockopt |
| ... | .connect | .setsockopt |
| thread | ~~~~~~? | .connect |
| |_pub___________________| ~~~~~~? | ~~~~~~? |
| | .setsockopt | ~~~~~~? | ~~~~~~? |
| | .bind | ~~~~~~? | ~~~~~~? |
| | ~~~~~~? | ~~~~~~? | ~~~~~~? |
| | ~~~~~~=RTO | ~~~~~~? | ~~~~~~? |
| | .send()-s 1,2,..99| ~~~~~~? | ~~~~~~? |
| | .send()-s 23456,..| ~~~~~~=RTO | ~~~~~~=RTO |
| | .send()-s 25988,..| 25988 --> v[ 0]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 52522,..| 52522 --> v[ 1]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 79197,..| 79197 --> v[ 2]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 106365,..| 106365 --> v[ 3]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 132793,..| 132793 --> v[ 4]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 159236,..| 159236 --> v[ 5]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 184486,..| 184486 --> v[ 6]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 209208,..| 209208 --> v[ 7]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 234483,..| 234483 --> v[ 8]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 256122,..| 256122 --> v[ 9]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 281188,..| 281188 --> v[10]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| | .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| | .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| | .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| | .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| | .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| | .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| | .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| | .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| | .send()-s 651159,..| | 651159 --> v[ 9]|
| | .send()-s 675031,..| return v | 675031 --> v[10]|
| | .send()-s 701533,..|_________________| 701533 --> v[11]|
| | .send()-s 727817,..| | 727817 --> v[12]|
| | .send()-s 754154,..| | 754154 --> v[13]|
| | .send()-s 778654,..| | 778654 --> v[14]|
| | .send()-s 804137,..| | 804137 --> v[15]|
| | .send()-s 830677,..| | 830677 --> v[16]|
| | .send()-s 854959,..| | 854959 --> v[17]|
| | .send()-s 878841,..| | 878841 --> v[18]|
| | .send()-s 902601,..| | 902601 --> v[19]|
| | .send()-s 912345,..| | |
| | .send()-s 923456,..| | return v |
| | .send()-s 934567,..| |_________________|
| | .send()-s 945678,..|
| | .send()-s 956789,..|
| | .send()-s 967890,..|
| | .send()-s 978901,..|
| | .send()-s 989012,..|
| | .send()-s 990123,..|
| | .send()-s ad inf,..|

尽管 sleep( 3 ) -边代码强制性地尽可能快地调用SUB -s,但它是本地PUB -instance所保留的空间并不只是一个这样的消息要接受的,所有其他消息都被静默丢弃,无论何时进入队列独奏位置被占用。

每当 .send() 标记恢复为零时,内部机制便允许下一个其他Context()将消息的实际内容(有效负载)传递到队列存储,并且随后跟随HWM == 1 -s的所有后续尝试再次开始由于 .send() 绑定(bind)逻辑而被静默丢弃。

关于c++ - ZeroMQ不同速度的订户看到相同的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50047062/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com