gpt4 book ai didi

c++ - ZeroMQ (cppzmq) 订阅者跳过第一条消息

转载 作者:搜寻专家 更新时间:2023-10-31 00:54:22 25 4
gpt4 key购买 nike

我正在尝试使用 ZMQCPPZMQ C++ 包装器,它似乎是 C++ Bindings 中建议的包装器.

客户端/服务器(REQ/REP)似乎工作正常。当尝试实现一对发布/订阅程序时,第一条消息似乎在订阅者中丢失了。为什么?

发布者.cpp:

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/format.hpp>
#include <zmq.hpp>
#include <string>
#include <iostream>

int main()
{
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5555");

for(int n = 0; n < 3; n++) {
zmq::message_t env1(1);
memcpy(env1.data(), "A", 1);
std::string msg1_str = (boost::format("Hello-%i") % (n + 1)).str();
zmq::message_t msg1(msg1_str.size());
memcpy(msg1.data(), msg1_str.c_str(), msg1_str.size());
std::cout << "Sending '" << msg1_str << "' on topic A" << std::endl;
publisher.send(env1, ZMQ_SNDMORE);
publisher.send(msg1);

zmq::message_t env2(1);
memcpy(env2.data(), "B", 1);
std::string msg2_str = (boost::format("World-%i") % (n + 1)).str();
zmq::message_t msg2(msg2_str.size());
memcpy(msg2.data(), msg2_str.c_str(), msg2_str.size());
std::cout << "Sending '" << msg2_str << "' on topic B" << std::endl;
publisher.send(env2, ZMQ_SNDMORE);
publisher.send(msg2);

boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
return 0;
}

订阅者.cpp:

#include <zmq.hpp>
#include <string>
#include <iostream>

int main()
{
zmq::context_t context(1);
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5555");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "B", 1);

while(true)
{
zmq::message_t env;
subscriber.recv(&env);
std::string env_str = std::string(static_cast<char*>(env.data()), env.size());
std::cout << "Received envelope '" << env_str << "'" << std::endl;

zmq::message_t msg;
subscriber.recv(&msg);
std::string msg_str = std::string(static_cast<char*>(msg.data()), msg.size());
std::cout << "Received '" << msg_str << "'" << std::endl;
}
return 0;
}

程序输出:

$ ./publisher
Sending 'Hello-1' on topic A
Sending 'World-1' on topic B
Sending 'Hello-2' on topic A
Sending 'World-2' on topic B
Sending 'Hello-3' on topic A
Sending 'World-3' on topic B

$ ./subscriber
Received envelope 'B'
Received 'World-2'
Received envelope 'B'
Received 'World-3'

(注意:订阅者在执行发布者之前执行)

奖励问题:顺便问一下,是我的印象还是这个 C++ 包装器级别很低?我看不到对 std::string 的直接支持,传输简单字符串的代码看起来非常冗长。

最佳答案

the ZeroMQ Guide 中找到了答案:

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

This "slow joiner" symptom hits enough people often enough that we're going to explain it in detail. Remember that ZeroMQ does asynchronous I/O, i.e., in the background. Say you have two nodes doing this, in this order:

Subscriber connects to an endpoint and receives and counts messages. Publisher binds to an endpoint and immediately sends 1,000 messages. Then the subscriber will most likely not receive anything. You'll blink, check that you set a correct filter and try again, and the subscriber will still not receive anything.

Making a TCP connection involves to and from handshaking that takes several milliseconds depending on your network and the number of hops between peers. In that time, ZeroMQ can send many messages. For sake of argument assume it takes 5 msecs to establish a connection, and that same link can handle 1M messages per second. During the 5 msecs that the subscriber is connecting to the publisher, it takes the publisher only 1 msec to send out those 1K messages.

In Chapter 2 - Sockets and Patterns we'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscribers really are connected and ready. There is a simple and stupid way to delay the publisher, which is to sleep. Don't do this in a real application, though, because it is extremely fragile as well as inelegant and slow. Use sleeps to prove to yourself what's happening, and then wait for Chapter 2 - Sockets and Patterns to see how to do this right.

The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn't care what transpired before it started up. This is how we built our weather client example.

So the client subscribes to its chosen zip code and collects 100 updates for that zip code. That means about ten million updates from the server, if zip codes are randomly distributed. You can start the client, and then the server, and the client will keep working. You can stop and restart the server as often as you like, and the client will keep working. When the client has collected its hundred updates, it calculates the average, prints it, and exits.

关于c++ - ZeroMQ (cppzmq) 订阅者跳过第一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45740168/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com