gpt4 book ai didi

c++ - 为什么 ZeroMQ PUSH/PULL 可以工作,而 PUB/SUB 不行?

转载 作者:行者123 更新时间:2023-12-02 10:36:30 25 4
gpt4 key购买 nike

环境:NVIDIA 风格的 Ubuntu 18.01 在他们的 Jetson 开发板上使用他们的 TX2i 处理器。 ZMQ 4.3.2,使用 cppzmq ZMQ 的 C++ 包装器。

我有大量使用带有 ZeroMQ 的 google Protocol Buffer 运行的代码,而且都是 PUSH/PULL,它工作得很好,除了我有一个不是点对点而是 1:3 的情况。此处正确的解决方案是执行 PUB/SUB,但我无法将消息传递给我的订阅者。

我将代码缩减为这个简单的示例。如果我取消注释 #define声明,订阅者一无所获。注释(编译为 PUSH/PULL 而不是 PUB/SUB),然后订阅者按预期获得消息。用过度sleep_for()有时,我希望订阅者在发布者执行发送之前有足够的时间进行注册。

编辑:

为什么对订阅者进行尝试/捕获?我很早就遇到了异常(exception),并认为这是因为出版商还没有准备好。这似乎不再是这种情况,所以它不是我想的那样。

// Publisher
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>


#define PUB_SUB

int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUB );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PUSH );
#endif

std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
//m_pSocket->bind( "tcp://*:53001" ); // using '*' or specific IP doesn't change result
m_pSocket->bind( "tcp://127.0.0.1:53001" );
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

// Send the parameters
protobuf_namespace::Params params;
params.set_calibrationdata( protobuf_namespace::CalDataType::CAL_REQUESTED ); // init one value to non-zero
std::string params_str = params.SerializeAsString();
zmq::message_t zmsg( params_str.size() );

memcpy( zmsg.data(), params_str.c_str(), params_str.size() );
m_pSocket->send( zmsg, zmq::send_flags::none );

std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}
// Subscriber - start me first!
#include "/usr/local/include/zmq.hpp"
#include "protobuf_namespace.pb.h"
#include <chrono>
#include <thread>
#include <stdio.h>

#define PUB_SUB


int main( void )
{
zmq::context_t* m_pContext = new zmq::context_t( 1 );

#ifdef PUB_SUB
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_SUB );
m_pSocket->connect( "tcp://127.0.0.1:53001" );

int linger = 0;
zmq_setsockopt( m_pSocket, ZMQ_LINGER, &linger, sizeof( linger ) );
zmq_setsockopt( m_pSocket, ZMQ_SUBSCRIBE, "", 0 );
#else
zmq::socket_t* m_pSocket = new zmq::socket_t( *m_pContext, ZMQ_PULL );
m_pSocket->connect( "tcp://127.0.0.1:53001" );
#endif

protobuf_namespace::Params params;
zmq::message_t zmsg;
bool retry = true;

do {
try {
m_pSocket->recv( zmsg, zmq::recv_flags::none );
retry = false;
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} catch( ... ) {
printf("caught\n");
}
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} while( retry );

std::string param_str( static_cast<char*>( zmsg.data() ), zmsg.size() );
params.ParseFromString( param_str );

if( params.calibrationdata() == protobuf_namespace::CalDataType::CAL_REQUESTED )
printf( "CAL_REQUESTED\n" );
else
printf( "bad data\n" );


std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
m_pSocket->close();
zmq_ctx_destroy( m_pContext );
}

最佳答案

如果从未使用过 ZeroMQ,
在深入了解更多细节之前,不妨先看看“ZeroMQ Principles in less than Five Seconds

Q : Why the try/catch on the subscriber?



因为 :a ) // Subscriber - start me first!却拥有 PUB - 在执行 tcp:// 之前几乎“永远” sleep 接受任何第一个 .connect() 的传输类路径设置在 .bind() ,在此之前是大睡... std::this_thread::sleep_for( std::chrono::seconds( 1 ) );

b) try 'd m_pSocket->recv( zmsg, zmq::recv_flags::none );必须根据定义抛出异常,因为没有 tcp://到目前为止的传输类路径设置(因为 PUB -side 尚未从 sleep 中返回)

Q : Why will ZeroMQ PUSH/PULL work but not PUB/SUB?



好吧, 两者都会 ,如果设计得当,尊重已发布的 API。

只需删除任何阻塞 sleep() -s,防止 SUB -s 加入,制作 .connect() -s 能够尽快成功。再加上可能会进入 .recv() 的非*阻塞形式-ops(重构 try/catch),在 中很常见更好地反射(reflect)预防性设计的性质 .poll()基于或 react 性 .recv(..., ZMQ_NOBLOCK ) - 基于事件处理。

最后但并非最不重要的 :

ZeroMQ v4+(相对于 v2+ 和 pre-v3.? API)被切换为使用 PUB -side 消息过滤,因此也必须适当考虑订阅管理(定时/错误处理/弹性)。

如有疑问,可集成使用 ZeroMQ 内置 socket_monitor 工具,扩展 Context() -instance,并跟踪/检查 Context() 内部的每个事件-instance,远低于已发布的 API 事件,一直到最低的详细级别。

不要犹豫 read and ask more

关于c++ - 为什么 ZeroMQ PUSH/PULL 可以工作,而 PUB/SUB 不行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60067338/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com