gpt4 book ai didi

c++ - ZeroMQ 推/拉

转载 作者:太空宇宙 更新时间:2023-11-04 11:29:50 29 4
gpt4 key购买 nike

zmq 的某些部分未以可预测的方式运行。

我正在使用 VS2013 和 zmq 3.2.4。为了不在我的 pubsub 框架中“丢失”消息 [旁白:我认为这是一个设计缺陷。我应该能够首先启动我的订阅者,然后是发布者,我应该会收到所有消息] 我必须将发布者与订阅者同步到 la durapub/durasub等。我正在使用 zeromq 指南中的 durasub.cpp 和 durapub.cpp 示例。如果我按原样使用示例,系统将完美运行。

如果我现在在 durasub.cpp 中的 ZMQ_PUSH 周围添加范围括号

{
zmq::socket_t sync (context, ZMQ_PUSH);
sync.connect(syncstr.c_str());
s_send (sync, "sync");
}

系统停止工作。匹配的“ZMQ_PULL”信号永远不会到达 durapub.cpp 中的应用层。

我已经通过 C++ 包装器检查了 zmq_close 的返回值,一切正常。就 ZMQ 而言,它已将消息传递到端点。希望我做了一些明显愚蠢的事情?

还有更多。添加

std::this_thread::sleep_for(std::chrono::milliseconds(1));

允许系统(即发布/订阅)重新开始工作。所以这显然是一个竞争条件,大概是在收割者线程中,因为它破坏了套接字。

更多的挖掘。我认为 LIBZMQ-179 也提到了这个问题。


EDIT#2 2014-08-13 03:00 [UTC+0000]

Publisher.cpp:

#include <zmq.hpp>
#include <zhelpers.hpp>
#include <string>
int main (int argc, char *argv[])
{
zmq::context_t context(1);
std::string bind_point("tcp://*:5555");
std::string sync_bind("tcp://*:5554");
zmq::socket_t sync(context, ZMQ_PULL);
sync.bind(sync_bind.c_str());

// We send updates via this socket
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind(bind_point.c_str());

// Wait for synchronization request
std::string tmp = s_recv (sync);
std::cout << "Recieved: " << tmp << std::endl;

int numbytessent = s_send (publisher, "END");
std::cout << numbytessent << "bytes sent" << std::endl;
}

订阅者.cpp

#include <zmq.hpp>
#include <zhelpers.hpp>
#include <string>

int main (int argc, char *argv[])
{
std::string connectstr("tcp://127.0.0.1:5555");
std::string syncstr("tcp://127.0.0.1:5554");

zmq::context_t context(1);

zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber.connect(connectstr.c_str());

#if ENABLE_PROBLEM
{
#endif ENABLE_PROBLEM
zmq::socket_t sync (context, ZMQ_PUSH);
sync.connect(syncstr.c_str());
s_send (sync, "sync");
#if ENABLE_PROBLEM
}
#endif ENABLE_PROBLEM

while (1)
{
std::cout << "Receiving..." << std::endl;
std::string s = s_recv (subscriber);
std::cout << s << std::endl;
if (s == "END")
{
break;
}
}
}
  1. 将每个 cpp 编译成它自己的 exe。
  2. 启动两个 exe(启动顺序无关紧要)

如果定义了ENABLE_PROBLEM:

  • 发布者:(空提示)
  • 订阅者:“正在接收...”然后您必须终止这两个进程,因为它们已挂起...

如果未定义ENABLE_PROBLEM:

  • 发布者:“收到:同步”“已发送 3 个字节”
  • 订阅者:“正在接收...”'结束'

最佳答案

EDIT#1 2014-08-11:原始帖子已更改,没有留下可见的修订

目标是什么?

恕我直言,仅从上述三个 SLOC 中分离目标并模拟任何通过/失败测试以验证目标是非常困难的。

那么让我们一步步开始吧。

那里使用了哪些 ZMQ 原语?

待定

编辑#1 后:ZMQ_PUSH + ZMQ_PULL + ( 隐藏ZMQ_PUB + ZMQ_SUB ... 下次宁愿发布 ProblemDOMAIN-context-complete 源,最好用 self-测试用例输出相似:

...
// <code>-debug-isolation-framing ------------------------------------------------
std::cout << "---[Pre-test]: sync.connect(syncstr.c_str()) argument" << std::endl;
std::cout << syncstr.c_str() << std::endl;
std::cout << "---[Use/exec]: " << std::endl;
sync.connect( syncstr.c_str());
// <code>-debug-isolation-framing ------------------------------------------------
...

)

部署了什么 ZMQ-context create/terminate life-cycle-policy?

待定

post-EDIT#1:n.b.:ZMQ_LINGER 相当影响资源的 .close(),这可能在 ZMQ_Context 之前发生终止出现。 (并且可能会阻塞......这会伤害......)

关于“ZMQ_LINGER 何时真正重要?”的注释

一旦 Context 即将终止,而发送队列尚未为空并且正在处理对 zmq_close() 的尝试,此参数就会生效.

在大多数体系结构中(......在低延迟/高性能中更多,其中微秒和纳秒计数......)(共享/受限)资源设置/处置操作出现的原因有很多 在系统生命周期的最开始,。不用多说为什么,想象一下与所有设置/丢弃操作直接相关的开销,这些开销在近乎真实的常规操作流程中根本不可能发生(重复发生的次数越少......)时间系统设计。

因此,让系统进程进入最后的“整理”阶段(就在退出之前)

设置 ZMQ_LINGER == 0 只是忽略仍在<sender>> 队列中的任何内容,并允许提示 zmq_close () + zmq_term()

类似地,ZMQ_LINGER == -1 将仍在<发件人> 队列中的任何内容放入[ 具有最大值(value)],整个系统必须无限期等待,在(希望任何)<接收者> 在允许任何 zmq_close() + zmq_term() 发生之前检索并“消费”所有排队的消息...这可能会很长并且完全不受您的控制...

最后,ZMQ_LINGER > 0 作为一种折衷方案,如果某些<接收器> 等待定义的 [msec]-s 数量来并检索排队的消息。然而,在给定的 TimeDOMAIN 里程碑上,系统会继续执行 zmq_close() + zmq_term() 以优雅干净地释放所有保留资源并根据系统设计时间退出约束条件。

关于c++ - ZeroMQ 推/拉,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25195242/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com