- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在c++中使用zmq 2.2(我知道较旧的版本)来创建具有多个连接的订阅服务器的发布服务器,这些订阅服务器以不同的速度读取消息。根据我对文档的理解以及Peter Hintjens的答案here,每个订阅者都有自己的队列,而发布者每个连接的订阅者都有一个队列。这似乎表明每个订户都独立于其他订户从发布者接收消息。
但是,在快速订阅者和慢速订阅者下面的代码段中,它们会收到相似的消息或完全相同的消息(即使我增加了在 A 点的睡眠时间并更改了 B 点的 ZMQ_HWM
,也会发生这种情况)。
有人可以阐明为什么会这样吗?
#include <zmq.hpp>
#include <unistd.h>
#include <iostream>
#include <vector>
#include <future>
using socket_t = zmq::socket_t;
using context_t = zmq::context_t;
using msg_t = zmq::message_t;
using namespace std;
vector<int> slow_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
sleep(3); // 3 seconds
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
usleep(10000); // 10 miliseconds ___________________________POINT A
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
vector<int> fast_consumer(int64_t hwm, int to_read)
{
vector<int> v;
context_t context{1};
socket_t socket(context, ZMQ_SUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
socket.connect("tcp://localhost:5554");
msg_t msg;
for (int i = 0; i < to_read; i++)
{
socket.recv(&msg);
v.emplace_back(*reinterpret_cast<int*>(msg.data()));
}
return v;
}
void publisher(int64_t hwm)
{
context_t context{1};
socket_t socket(context, ZMQ_PUB);
socket.setsockopt(ZMQ_HWM, &hwm, sizeof(hwm));
socket.bind("tcp://*:5554");
int count = 0;
while (true) {
msg_t msg(sizeof(count));
memcpy(msg.data(), &count, sizeof(count));
socket.send(msg);
count++;
}
}
int main()
{
int64_t hwm = 1; // __________________________________________POINT B
int to_read = 20;
auto fast = async(launch::async, fast_consumer, hwm, to_read);
auto slow = async(launch::async, slow_consumer, hwm, to_read);
hwm = 1; // Don't queue anything on the publisher
thread pub(publisher, hwm);
auto slow_v = slow.get();
auto fast_v = fast.get();
cout << "fast slow" << endl;
for (int i = 0; i < fast_v.size(); i ++)
{
cout << fast_v[i] << " " << slow_v[i] << endl;
}
exit(0);
}
g++ -o mixed mixed_speed_consumers.cpp -g -lzmq -lpthread
通过
fast slow
25988 305855
52522 454312
79197 477807
106365 502594
132793 528551
159236 554519
184486 581419
209208 606411
234483 629298
256122 651159
281188 675031
305855 701533 // Messages on the fast subscriber starting here line up with messages on the slow subscriber
454312 727817
477807 754154
502594 778654
528551 804137
554519 830677
581419 854959
606411 878841
629298 902601
最佳答案
each subscriber has its own queue
PUB
-side
.Context()
-instance的设计属性,在该属性中进行发送队列管理(稍后会对此进行更多介绍)。
This would seem to indicate that each subscriber receives messages from the publisher independent of other subscribers.
ZMQ_HWM
,其作用是“阻止程序”语义的副作用。
ZMQ_HWM
可以保护/阻止任何新条目插入
PUB
-侧“私有(private)”-发送队列(大小不超过
ZMQ_HWM == 1
的深度),直到成功进行远程操作-清空(由“远程”
SUB
-侧
Context()
-s自主异步的“内部”传输相关主动性,取决于是否可能(重新)加载该
SUB
-侧“私有(private)”-接收队列(大小,再者,根据
ZMQ_HWM == 1
而言,没有更深的意思
PUB.send()
-s的有效载荷将被有效地丢弃,直到远程
*_SUB.recv()
-s从其“远程”-
Context()
-instance的接收队列中卸载“阻塞”有效载荷(大小,为根据
ZMQ_HWM == 1
的规定,最多只能存储一个有效载荷。
PUB.send()
-er
在( secret 阻止)测试期间接收了大约的 ~ 902601
()上的上的()发射的不仅仅是20
消息。SUB
-method时,所有这些 == to_read
消息都只是在 902581+
-旁边被 PUB
扔掉了。Context()
内部的简化 View .send()
-ed对等节点的出现和消失, Context()
-管理的队列的增长/收缩,但是在ZeroMQ API v2.2中同时具有TX和RX端。相同的高水位线天花板。如所记录的,尝试对超出此限制的任何内容进行Context()
的尝试将被丢弃。TIME _____________________________
v [ ]
v [ ]
v [ ]
v [ ]
v PUB.setsockopt( ZMQ_HWM, 1 );]
v PUB.send()-s [ | ]
v : [ +-----------------QUEUE-length ( a storage depth ) is but one single message
v _________________ : [
v [ ] : [Context()-managed pool-of-QUEUE(s)
v [ ] : [
v [ ] : [ ___________________
v [ ] : [ [ ]
v FAST_SUB.connect()---:------------>[?] [ ]
v FAST_SUB.recv()-s : [?] [ ]
v : : [?] [ ]
v : : [?][?]<---SLOW_SUB.connect() ]
v : : [?][?] SLOW_SUB.recv()-s ]
v : .send(1)----->[1][1] :
| 1 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(2)----->[2][1] :
| 2 <-.recv()--------------------[?][1] :
| : [?][1] :
| : .send(3)----->[3][1] :
| 3 <-.recv()--------------------[?][?]------------.recv()-> 1
| : [?][?] :
| : .send(4)----->[4][4] :
| 4 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(5)----->[5][4] :
| 5 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(6)----->[6][4] :
| 6 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(7)----->[7][4] :
| 7 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(8)----->[8][4] :
| 8 <-.recv()--------------------[?][4] :
| : [?][4] :
| : .send(9)----->[9][4] :
| 9 <-.recv()--------------------[?][?]------------.recv()-> 4
| : [?][?] :
| : .send(A)----->[A][A] :
| A <-.recv()--------------------[?][A]
| : [?][A]
| : .send(B)----->[B][A]
| B <-.recv()--------------------[?][A]
v : [ [
v : [
v :
v
"Messages on the fast subscriber starting here line up with messages on the slow subscriber"
.connect()
尚未使它成为20x .send()
-s,而在阻塞SUB
之后,缓慢的(-ed)-.recv()
最终得到了。SUB
阶段的影响,其中较慢的 sleep( 3 )
不会尝试接收任何内容main(){
|
| async(launch::async,fast|_fast____________|
| async(launch::async,slow| .setsockopt |_slow____________|
| ... | .setsockopt | .setsockopt |
| ... | .connect | .setsockopt |
| thread | ~~~~~~? | .connect |
| |_pub___________________| ~~~~~~? | ~~~~~~? |
| | .setsockopt | ~~~~~~? | ~~~~~~? |
| | .bind | ~~~~~~? | ~~~~~~? |
| | ~~~~~~? | ~~~~~~? | ~~~~~~? |
| | ~~~~~~=RTO | ~~~~~~? | ~~~~~~? |
| | .send()-s 1,2,..99| ~~~~~~? | ~~~~~~? |
| | .send()-s 23456,..| ~~~~~~=RTO | ~~~~~~=RTO |
| | .send()-s 25988,..| 25988 --> v[ 0]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 52522,..| 52522 --> v[ 1]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 79197,..| 79197 --> v[ 2]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 106365,..| 106365 --> v[ 3]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 132793,..| 132793 --> v[ 4]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 159236,..| 159236 --> v[ 5]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 184486,..| 184486 --> v[ 6]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 209208,..| 209208 --> v[ 7]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 234483,..| 234483 --> v[ 8]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 256122,..| 256122 --> v[ 9]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 281188,..| 281188 --> v[10]| : slow still sleep( 3 )-s before going to .recv() it's first message
| | .send()-s 305855,..| 305855 --> v[11]| 305855 --> v[ 0]|// Messages on the fast subscriber starting here line up with messages on the slow subscriber
| | .send()-s 454312,..| 454312 --> v[12]| 454312 --> v[ 1]|
| | .send()-s 477807,..| 477807 --> v[13]| 477807 --> v[ 2]|
| | .send()-s 502594,..| 502594 --> v[14]| 502594 --> v[ 3]|
| | .send()-s 528551,..| 528551 --> v[15]| 528551 --> v[ 4]|
| | .send()-s 554519,..| 554519 --> v[16]| 554519 --> v[ 5]|
| | .send()-s 581419,..| 581419 --> v[17]| 581419 --> v[ 6]|
| | .send()-s 606411,..| 606411 --> v[18]| 606411 --> v[ 7]|
| | .send()-s 629298,..| 629298 --> v[19]| 629298 --> v[ 8]|
| | .send()-s 651159,..| | 651159 --> v[ 9]|
| | .send()-s 675031,..| return v | 675031 --> v[10]|
| | .send()-s 701533,..|_________________| 701533 --> v[11]|
| | .send()-s 727817,..| | 727817 --> v[12]|
| | .send()-s 754154,..| | 754154 --> v[13]|
| | .send()-s 778654,..| | 778654 --> v[14]|
| | .send()-s 804137,..| | 804137 --> v[15]|
| | .send()-s 830677,..| | 830677 --> v[16]|
| | .send()-s 854959,..| | 854959 --> v[17]|
| | .send()-s 878841,..| | 878841 --> v[18]|
| | .send()-s 902601,..| | 902601 --> v[19]|
| | .send()-s 912345,..| | |
| | .send()-s 923456,..| | return v |
| | .send()-s 934567,..| |_________________|
| | .send()-s 945678,..|
| | .send()-s 956789,..|
| | .send()-s 967890,..|
| | .send()-s 978901,..|
| | .send()-s 989012,..|
| | .send()-s 990123,..|
| | .send()-s ad inf,..|
sleep( 3 )
-边代码强制性地尽可能快地调用SUB
-s,但它是本地PUB
-instance所保留的空间并不只是一个这样的消息要接受的,所有其他消息都被静默丢弃,无论何时进入队列独奏位置被占用。.send()
标记恢复为零时,内部机制便允许下一个其他Context()
将消息的实际内容(有效负载)传递到队列存储,并且随后跟随HWM == 1
-s的所有后续尝试再次开始由于 .send()
绑定(bind)逻辑而被静默丢弃。
关于c++ - ZeroMQ不同速度的订户看到相同的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50047062/
是 if(a == 0 && b == 0 && c == 0) { return; } 一样 if(a == 0) { return; } if(b == 0) { return; } if(c =
我想做这样的事情: Class A Class B extends A Class C extends A B b = new B(); C c = new C(); b->setField("foo
我对 Mysql 世界很天真......:)我试图使用连接从表中查询, 我遇到结果集问题...表结构如下 下面... VIDEO_XXXXX | Field | Type
我最近问过关于从另一个类获取类的唯一实例的问题。 ( How to get specific instance of class from another class in Java? ) 所以,我正
假设我们有两种类型 using t1 = int*; using t2 = int*; 我知道 std::is_same::value会给我们true .什么是,或者是否有模板工具可以实现以下目标?
对于我的一个应用程序,我假设比较 2 个字符串的第一个字符比比较整个字符串是否相等要快。例如,如果我知道只有 2 个可能的字符串(在一组 n 字符串中)可以以相同的字母开头(比如说 'q'),如果是这
我想在我的NXP LPC11U37H主板(ARM Cortex-M0)上分析一些算法,因为我想知道执行特定算法需要多少个时钟周期。 我编写了这些简单的宏来进行一些分析: #define START_C
我在 Excel 中创建了一个宏,它将在 Excel 中复制一个表格,并将行除以我确定的特定数字(默认 = 500 行),并为宏创建的每个部门打开不同的工作表。 使用的代码是这样的: Sub Copy
我想根据第一个字典对第二个字典的值求和。如果我有字典 A 和 B。 A = {"Mark": ["a", "b", "c", "d"], "June": ["e", "a"], "John": ["a
当我这样做时 system()在 Perl 中调用,我通常根据 perldocs 检查返回码.嗯,我是这么想的。大部分时间 $rc!=0对我来说已经足够了。最近我在这里帮助了两个遇到问题的人syste
在我的进度条上,我试图让它检测 div 加载速度。 如果 div 加载速度很快,我想要实现的目标将很快达到 100%。但进度条的加载速度应该与 div 的加载速度一样快。 问题:如何让我的进度条加载
当我获得与本地时间相同的时间戳时,firebase 生成的服务器时间戳是否会自动转换为本地时间,或者我错过了什么? _firestore.collection("9213903123").docume
根据the original OWL definition of OWL DL ,我们不能为类和个体赋予相同的名称(这是 OWL DL 和 OWL Full 之间的明显区别)。 "Punning" i
我有两个输入复选框: 尝试使用 jQuery 来允许两个输入的行为相同。如果选中第一个复选框,则选中第二个复选框。如果未检查第 1 个,则不会检查第 2 个。反之亦然。 我有代码: $('inpu
可以从不同系统编译两个相同的java文件,但它们都有相同的内容操作系统(Windows 7),会生成不同的.class文件(大小)? 最佳答案 是的,您可以检查是否有不同版本的JDK(Java Dev
我正在清理另一个人的正则表达式,他们目前所有的都以结尾 .*$ 那么下面的不是完全一样吗? .* 最佳答案 .*将尽可能匹配,但默认情况下为 .不匹配换行符。如果您要匹配的文本有换行符并且您处于 MU
我使用 Pick ,但是如何编写可以选择多个字段的通用PickMulti呢? interface MyInterface { a: number, b: number, c: number
我有一个 SQL 数据库服务器和 2 个具有相同结构和数据的数据库。我在 2 个数据库中运行相同的 sql 查询,其中一个需要更长的时间,而另一个在不到 50% 的时间内完成。他们都有不同的执行计划。
我需要你的帮助,我有一个包含两列的表,一个 id 和 numpos,我希望 id 和 numops 具有相同的结果。 例子: $cnx = mysql_connect( "localhost", "r
如何将相同的列(在本例中按“级别”排序)放在一起?我正在做一个高分,我从我的数据库中按级别列出它们。如果他们处于同一级别,我希望他们具有相同的 ID。 但是我不想在别人身上显示ID。只有第一个。这是一
我是一名优秀的程序员,十分优秀!