gpt4 book ai didi

zeromq - 为什么 ZeroMQ PUB 在没有连接订阅者的情况下对消息进行排队? (好吧, "disconnected"SUB-s)

转载 作者:行者123 更新时间:2023-12-04 18:59:24 30 4
gpt4 key购买 nike

我看到使用 的奇怪行为ZMQ_PUB .

我有一个制作人 .connect() -s 到不同的进程.bind()ZMQ_SUB socket 。

订阅者全部.bind() , 发布者 .connect() -s。

当生产者启动时,它会创建一个 ZMQ_PUB socket 和 .connect() -s 到不同的进程。然后它立即开始定期发送消息。

正如预期的那样,如果没有连接的订阅者,它会丢弃所有消息,直到订阅者启动。

然后流程正常工作,当订阅者启动时,它会从那一刻开始接收消息。

现在,问题是:

  • 我断开订阅者(停止进程)。
  • 此时没有活跃的订阅者,因为我停止了唯一的订阅者。生产者继续发送应该被丢弃的消息,因为不再有连接的订阅者...
  • 我重新启动原始订阅者,它绑定(bind),发布者重新连接......并且订阅者接收同时产生的所有消息!

  • 所以我看到的是生产者在订阅者关闭时将所有消息排入队列。套接字重新连接后,由于订阅者进程重新启动,它发送了所有排队的消息。

    据我了解 here ,当没有连接的订阅者时,发布者应该丢弃所有发送的消息:

    ZeroMQ examples

    "A publisher has no connected subscribers, then it will simply drop all messages."



    为什么会这样?

    顺便说一句,我在 linux 上使用 C++ 进行这些测试。

    我尝试在订阅者绑定(bind)时为其设置不同的身份,但没有成功。发布者仍然将消息排入队列,并在订阅者重新启动时将它们全部传递。

    提前致谢,

    路易斯

    更新:

    IMPORTANT UPDATE!!!!!
    Before posting this question
    I had tried different solutions. One was to set ZMQ_LINGER to 0, which didn't work.
    I added ZMQ:IMMEDIATE, and it worked, but I just found out that ZMQ:IMMEDIATE alone does not work. It requires also ZMQ_LINGER.
    Luis Rojas 3 hours ago



    更新:
    根据要求,我添加了一些简单的测试用例来说明我的观点。
    一个是简单的订阅者,它在命令行上运行并接收要绑定(bind)的 uri,例如:

    $ ./sub tcp://127.0.0.1:50001

    另一个是发布者,它接收要连接的 uri 列表,例如:

    ./pub tcp://127.0.0.1:50001 tcp://127.0.0.1:50002

    订阅者最多接收 5 条消息,然后关闭套接字并退出。我们可以在 wireshark 上看到 FIN/ACK 的双向交换,以及套接字如何进入 TIME_WAIT 状态。然后,发布者开始发送 SYN,尝试重新连接(探测 ZMQ_PUB 知道连接已关闭)

    我明确地没有取消订阅套接字,只是关闭它。在我看来,如果套接字关闭,发布者应该自动结束对该连接的任何订阅。

    所以我看到的是:我启动订阅者(一个或多个),我启动发布者,它开始发送消息。订阅者收到 5​​ 条消息并结束。与此同时,发布者继续发送消息,没有连接的订阅者。我重新启动订阅者,并立即收到几条消息,因为它们在发布者端排队。我认为那些排队的消息破坏了发布/订阅模型,其中消息应该只传递给连接的订阅者。如果订阅者关闭连接,则应丢弃发送给该订阅者的消息。更重要的是,当订阅者重新启动时,它可能会决定订阅其他消息,但它仍然会接收那些由绑定(bind)在同一端口的“先前的化身”订阅的消息。

    我的建议是 ZMQ_PUB(在连接模式下),当检测到套接字断开连接时,应该清除该套接字上的所有订阅,直到它重新连接并且新订阅者决定重新订阅。

    我为语言错误道歉,但英语不是我的母语。

    酒馆代码:
    #include <stdio.h>
    #include <stdlib.h>
    #include <libgen.h>
    #include <unistd.h>

    #include <string>
    #include <zeromq/zmq.hpp>

    int main( int argc, char *argv[] )
    {
    if ( argc < 2 )
    {
    fprintf( stderr, "Usage : %s <remoteUri1> [remoteUri2...]\n",
    basename( argv[0] ) );
    exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_PUB );
    if ( NULL == pSocket )
    {
    fprintf( stderr, "Couldn't create socket. Aborting...\n" );
    exit ( EXIT_FAILURE );
    }

    int i;
    try
    {
    for ( i = 1; i < argc; i++ )
    {
    printf( "Connecting to [%s]\n", argv[i] );
    {
    pSocket->connect( argv[i] );
    }
    }
    }
    catch( ... )
    {
    fprintf( stderr, "Couldn't connect socket to %s. Aborting...\n", argv[i] );
    exit ( EXIT_FAILURE );
    }

    printf( "Publisher Up and running... sending messages\n" );
    fflush(NULL);

    int msgCounter = 0;
    do
    {
    try
    {
    char msgBuffer[1024];
    sprintf( msgBuffer, "Message #%d", msgCounter++ );
    zmq::message_t outTask( msgBuffer, strlen( msgBuffer ) + 1 );
    printf("Sending message [%s]\n", msgBuffer );
    pSocket->send ( outTask );
    sleep( 1 );
    }
    catch( ... )
    {
    fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
    exit ( EXIT_FAILURE );
    }
    }
    while ( true );

    exit ( EXIT_SUCCESS );
    }

    子的代码
    #include <stdio.h>
    #include <stdlib.h>
    #include <libgen.h>
    #include <unistd.h>

    #include <string>
    #include <zeromq/zmq.hpp>

    int main( int argc, char *argv[] )
    {
    if ( argc != 2 )
    {
    fprintf( stderr, "Usage : %s <localUri>\n", basename( argv[0] ) );
    exit ( EXIT_FAILURE );
    }

    std::string pLocalUri( argv[1] );
    zmq::context_t localContext( 1 );
    zmq::socket_t *pSocket = new zmq::socket_t( localContext, ZMQ_SUB );
    if ( NULL == pSocket )
    {
    fprintf( stderr, "Couldn't create socket. Aborting...\n" );
    exit ( EXIT_FAILURE );
    }
    try
    {
    pSocket->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
    pSocket->bind( pLocalUri.c_str() );
    }
    catch( ... )
    {
    fprintf( stderr, "Couldn't bind socket. Aborting...\n" );
    exit ( EXIT_FAILURE );
    }

    int msgCounter = 0;
    printf( "Subscriber Up and running... waiting for messages\n" );
    fflush( NULL );

    do
    {
    try
    {
    zmq::message_t inTask;
    pSocket->recv ( &inTask );
    printf( "Message received : [%s]\n", inTask.data() );
    fflush( NULL );
    msgCounter++;
    }
    catch( ... )
    {
    fprintf( stderr, "Some unknown error ocurred. Aborting...\n" );
    exit ( EXIT_FAILURE );
    }
    }
    while ( msgCounter < 5 );

    // pSocket->setsockopt( ZMQ_UNSUBSCRIBE, "", 0 ); NOT UNSUBSCRIBING
    pSocket->close();
    exit ( EXIT_SUCCESS );
    }

    最佳答案

    问:为什么会这样?

    因为SUB实际上仍然连接(不够“断开”)。

    是的,可能会令人惊讶,但是 杀戮 SUB -进程,在.bind()上- 或 .connect() -套接字传输媒体的附加侧,不代表 ,I/O 泵的有限状态机已“移动”到断开状态。

    鉴于此, PUB -side 没有其他选择,只能考虑 SUB -side 仍然存在和连接(即使进程在 PUB 的视线之外被静默杀死)并且对于这种“分布式”状态,ZeroMQ 协议(protocol)定义的行为( PUB -side 职责)收集所有临时消息(是的,隐形死)SUB -scriber,其中 PUB -side 仍然认为生活公平(但可能只是在传输 I/O 级别或某些远程 CPU 资源匮乏或并发引入的暂时间歇 {本地|远程}阻塞状态等)。

    所以它缓冲...

    万一你暗杀了 SUB -side 代理似乎更优雅一些(在套接字资源实例上使用归零的 ZMQ_LINGER + 适当的 .close()) PUB -side 将识别“分布式”系统系统范围的有限状态自动机转变为真正的“断开连接”状态,并且在 PUB 上将发生适当的行为改变-“分布式-FSA”的一侧,不存储此“可见”确实“断开连接”的任何消息-ed SUB - 正是文档所述。

    “分布式-FSA”在识别“超出本地主机控制范围的状态变化事件”方面的方法非常薄弱。KILL - 一个远程进程,它实现了“分布式- FSA”是一个毁灭性的事件,而不是如何保持系统正常工作的方法。对于此类外部风险的一个不错的选择可能是

    听起来很复杂?

    哦,是的,确实很复杂。这正是 ZeroMQ 为我们解决这个问题的原因,让我们可以自由地享受基于这些(已经解决的)低级复杂性设计我们的应用程序架构的乐趣。

    分布式系统 FSA(由子 FSA-s 分层组成的系统范围的 FSA)

    想象一下在引擎盖下默默地发生了什么,想象一下只有一对简单的串联 FSA-FSA - 正是这对 .Context() 实例尝试以最简单的 1:1 为我们处理PUB/SUB 用例 KILL 的场景-s SUB 上的所有子 FSA-s -side 不承认 PUB 的意图-边。甚至 TCP 协议(protocol)(​​同时存在于 PUB -side 和 SUB -side 上)也有几个来自 [ 的状态转换。 ESTABLISHED ] 到 [ CLOSED ] 状态。

    分布式系统的 FSA-of-FSA-s 的快速 X 射线 View

    (为了清楚起见,仅描述了 TCP 协议(protocol) FSA)

    PUB -边:

    enter image description here
    .socket( .. )实例的行为 FSA:

    enter image description here

    SUB -边:

    enter image description here

    (由 nanomsg 提供)。

    关于zeromq - 为什么 ZeroMQ PUB 在没有连接订阅者的情况下对消息进行排队? (好吧, "disconnected"SUB-s),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41700291/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com