gpt4 book ai didi

asynchronous - 带有 zmq_proxy 和回复的 ZeroMQ 异步客户端

转载 作者:行者123 更新时间:2023-12-05 07:58:48 26 4
gpt4 key购买 nike

我正在尝试使用 libzmq C API 使 C 模型中的异步客户端/服务器运行。

我在 Linux 平台上使用 ZeroMQ 3.2.2,我尝试使用的模式如下所示:

>Client -> DEALER  
>
>Router -> ROUTER
>--- proxy ---
>Dealer -> DEALER
>
>Workers-> DEALER

我需要客户端是非阻塞的,还需要接收对消息的响应。从我看到的示例中,我的理解是将 ROUTER-DEALER 与 zmq_proxy 一起使用应该将消息返回给初始客户端。

但是,通过将捕获套接字附加到 zmq_proxy,似乎没有将响应路由回。

当我将客户端更改为 REQ 并将工作人员更改为 REP 时,一切都按预期工作。欢迎任何对我哪里出错或误解的反馈。

下面是 3 个组件(客户端、代理和工作程序)。

测试客户端

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>

int main (void)
{
printf ("Connecting to hello world broker...\n");

void *ctx = zmq_ctx_new ();
void *requester = zmq_socket (ctx, ZMQ_DEALER);
//void *requester = zmq_socket (ctx, ZMQ_REQ);
zmq_connect (requester, "tcp://10.1.1.31:5555");

printf ("Sending Request : HELLO \n");
int rc = zmq_send (requester, "HELLO", 6, 0);

if (rc > 0) {
printf ("Success : Sent size ... %d!\n",rc);
} else {
printf("Error: %s\n", zmq_strerror(errno));
}

printf ("Wait for response ..\n");

char buffer [256];
zmq_recv (requester, buffer, 256, 0);

printf ("Response Received : %s\n",&buffer);

zmq_close (requester);
zmq_ctx_destroy (ctx);
return 0;
}

测试代理

#define _MULTI_THREADED
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>

#define ZFRAME_MORE 1
#define ZFRAME_REUSE 2
#define ZFRAME_DONTWAIT 4

static void *proxy_capture (void *ctx)
{
int zerr = 0 ;
int rRes;
void *worker = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_connect (worker, "ipc://capture.ipc");

if (zerr != 0)
{
printf ("\n-------------- > proxy_capture bind error : %s\n", zmq_strerror(errno));
return 0;
}

while (1) {
char buf [256];
int rc = zmq_recv (worker, buf, 256, 0);
assert (rc != -1);
printf ("Capture value : %s !\n", &buf);
}
}

int main(int argc, char *argv[])
{
int zerr = 0 ;
int rc = 0 ;
int rRes;

// Frontend socket talks to clients over TCP Port
void *ctx = zmq_ctx_new ();
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
zerr = zmq_bind (frontend, "tcp://10.1.1.31:5555");

if (zerr != 0)
{
printf ("\nFrontend bind error : %s\n", zmq_strerror(errno));
return 0;
}

// Backend socket talks to workers
void *backend = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_bind (backend, "tcp://10.1.1.31:6555");

if (zerr != 0)
{
printf ("\nBackend bind error : %s\n", zmq_strerror(errno));
return 0;
}

void *capture = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_bind (capture, "ipc://capture.ipc");

if (zerr != 0)
{
printf ("\nCapture bind error : %s\n", zmq_strerror(errno));
return 0;
}

pthread_t capworker;
rc = pthread_create(&capworker, NULL, proxy_capture, ctx);

zmq_proxy (frontend, backend, capture);

while (1) {
printf ("Broker loop …\n");
sleep(1);
}

sleep(1);
zmq_ctx_destroy (&ctx);
printf ("\nEnd server…\n");

return 0;
}

测试人员

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>

#define ZFRAME_MORE 1
#define ZFRAME_REUSE 2
#define ZFRAME_DONTWAIT 4

int main(int argc, char *argv[])
{
int zerr = 0 ;
int rc = 0 ;
int rRes;

void *ctx = zmq_ctx_new ();
//void *worker = zmq_socket (ctx, ZMQ_DEALER);
void *worker = zmq_socket (ctx, ZMQ_REP);
zerr = zmq_connect (worker, "tcp://10.1.1.31:6555");

sleep(1);

if (zerr != 0)
{
printf ("Worker connect error : %s\n", zmq_strerror(errno));
return 0;
}

while (1) {
char buf [256];
rc = zmq_recv (worker, buf, 256, 0);
assert (rc != -1);
printf ("Received : %s !\n", &buf);
printf ("Responding to Client... !\n");

rc = zmq_send(worker, "WORLD", 6, 0);

if (rc > 0) {
printf ("Success : Sent size ... %d!\n",rc);
//break;
} else {
printf("Error: %s\n", zmq_strerror(errno));
}
}
zmq_close (worker);
zmq_ctx_destroy (ctx);

return 0;
}

非工作输出(客户端 DEALER 和 worker DEALER)

TestClient

Connecting to hello world broker...
Sending Request : HELLO
Success : Sent size ... 6!
Wait for response ..

TestBroker

Capture value : !
Capture value : HELLO ! <-- Req from client Capture
value : WORLD ! <-- Resp from worker
Capture value : WORLD !

TestWorker

Received : !
Responding to Client... !
Success : Sent size ... 6!
Received : HELLO !
Responding to Client... !
Success : Sent size ...
6!

所以看起来工作人员响应了,但是响应丢失了或者路由器错误地定向到客户端?

感谢任何帮助

最佳答案

我在捕获部分偶然发现了您对我的 zmq_proxy 问题的回答,我正试图理解这一点。

对于您的问题,您希望客户端是异步的,而不是使用 REQ,而是使用 DEALER。

下面是我如何让它异步的。我的 zeromq 版本是 4.2.1。

        //  Socket to talk to server
void *context = zmq_ctx_new();

void *requester = zmq_socket (context, ZMQ_REQ);
int timeout = 5000; //Timeout of 5 seconds to make sure not having it hang either while sending or receving...
int linger = 0;
zmq_setsockopt (requester, ZMQ_LINGER, &linger, sizeof(int));
zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(int));
zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(int));
int connection_status = zmq_connect (requester, "tcp://localhost:5559");

您可以通过将其设置为 0 来使用 linger,也可以在从 REQ 发送到代理时使用标志 ZMQ_DONTWAIT

关于asynchronous - 带有 zmq_proxy 和回复的 ZeroMQ 异步客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23470208/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com