gpt4 book ai didi

c - 使用 CZMQ-4.1.0 新 zsock API 更新的异步 Majordomo 模式示例不起作用

转载 作者:太空宇宙 更新时间:2023-11-04 03:18:28 28 4
gpt4 key购买 nike

用brew安装zmq和czmq后,我尝试编译播放Asynchronous-Majordomo-Pattern但它不起作用,因为它需要 czmq v3。据我所知,我尝试使用 zactor 将其更新为 v4,因为

zthread is deprecated in favor of zactor http://czmq.zeromq.org/czmq3-0:zthread

所以现在我认为以下代码作为更新的 async-majordomo 模式看起来不错,但它没有按预期工作,当我通过我的终端运行它时它没有创建任何线程。

//  Round-trip demonstrator
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. The client task signals to
// main when it's ready.

#include "czmq.h"
#include <stdlib.h>

void dbg_write_in_file(char * txt, int nb_request) {
FILE * pFile;
pFile = fopen ("myfile.txt","a");

if (pFile!=NULL)
{
fputs (txt, pFile);

char str_nb_request[12];
sprintf(str_nb_request, "%d", nb_request);
fputs (str_nb_request, pFile);

fputs ("\n", pFile);
fclose (pFile);
}
}

static void
client_task (zsock_t *pipe, void *args)
{
zsock_t *client = zsock_new (ZMQ_DEALER);
zsock_connect (client, "tcp://localhost:5555");
printf ("Setting up test...\n");
zclock_sleep (100);

printf("child 1: parent: %i\n\n", getppid());
printf("child 1: my pid: %i\n\n", getpid());

int requests;
int64_t start;

printf ("Synchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");

// stuck here /!\

char *reply = zstr_recv (client);
zstr_free (&reply);

// check if it does something
dbg_write_in_file("sync round-trip requests : ", requests);
// end check
}
printf (" %d calls/second\n",
(1000 * 10000) / (int) (zclock_time () - start));

printf ("Asynchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++) {
zstr_send (client, "hello");

// check if it does something
dbg_write_in_file("async round-trip send requests : ", requests);
// end check
}
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
zstr_free (&reply);

// check if it does something
dbg_write_in_file("async round-trip rec requests : ", requests);
// end check
}
printf (" %d calls/second\n",
(1000 * 100000) / (int) (zclock_time () - start));

zstr_send (pipe, "done");
}

// Here is the worker task. All it does is receive a message, and
// bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
printf("child 2: parent: %i\n\n", getppid());
printf("child 2: my pid: %i\n\n", getpid());

zsock_t *worker = zsock_new (ZMQ_DEALER);
zsock_connect (worker, "tcp://localhost:5556");

while (true) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zsock_destroy (&worker);
}

// Here is the broker task. It uses the zmq_proxy function to switch
// messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
printf("child 3: parent: %i\n\n", getppid());
printf("child 3: my pid: %i\n\n", getpid());

// Prepare our sockets
zsock_t *frontend = zsock_new (ZMQ_DEALER);
zsock_bind (frontend, "tcp://localhost:5555");
zsock_t *backend = zsock_new (ZMQ_DEALER);
zsock_bind (backend, "tcp://localhost:5556");
zmq_proxy (frontend, backend, NULL);

zsock_destroy (&frontend);
zsock_destroy (&backend);
}

// Finally, here's the main task, which starts the client, worker, and
// broker, and then runs until the client signals it to stop:

int main (void)
{
// Create threads
zactor_t *client = zactor_new (client_task, NULL);
assert (client);
zactor_t *worker = zactor_new (worker_task, NULL);
assert (worker);
zactor_t *broker = zactor_new (broker_task, NULL);
assert (broker);

// Wait for signal on client pipe
char *signal = zstr_recv (client);
zstr_free (&signal);

zactor_destroy (&client);
zactor_destroy (&worker);
zactor_destroy (&broker);
return 0;
}

当我运行它的时候,程序好像卡在了评论处

// stuck here /!\

然后当我杀死它时因为它没有完成,或者根本不打印任何东西,我需要按五次 Ctrl+C ( ^C )。只有这样,它在控制台上看起来更加冗长,就像它确实在运行一样。
=> 请注意,我删除了所有 printf() 步骤的输出,因为它读起来真的很乱。

运行时,它不会向文件写入任何内容,由 dbg_write_in_file() 函数调用,仅在发送五个 Ctrl+ C ( ^C ).

客户端工作人员和代理任务都返回与程序本身相同的 getppid 编号(我的终端)和 getpid

我使用 gcc trippingv4.c -o trippingv4 -L/usr/local/lib -lzmq -lczmq 编译。

当我试图杀死它时:

./trippingv4
Setting up test...
child 1: parent: 60967

child 1: my pid: 76853

Synchronous round-trip test...
^Cchild 2: parent: 60967

child 2: my pid: 76853

^Cchild 3: parent: 60967

child 3: my pid: 76853

^C^C^CE: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:29
E: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:89

更新

感谢@user3666197 的详细回答。在第一部分,编译器不编译 assert 调用,所以我只显示值并进行视觉比较,它们是相同的。

int czmqMAJOR,
czmqMINOR,
czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ ( %d, %d, %d ) -version\n",
czmqMAJOR,
czmqMINOR,
czmqPATCH
);

printf( "INF: CZMQ_VERSION_MAJOR %d, CZMQ_VERSION_MINOR %d, CZMQ_VERSION_PATCH %d\n",
CZMQ_VERSION_MAJOR,
CZMQ_VERSION_MINOR,
CZMQ_VERSION_PATCH
);

输出:

INF: detected CZMQ ( 4, 1, 0 ) -version
INF: CZMQ_VERSION_MAJOR 4, CZMQ_VERSION_MINOR 1, CZMQ_VERSION_PATCH 0

zsys_info 调用确实可以编译但不会在终端上显示任何内容,即使使用 fflush(stdout) 以防万一所以我只是使用 printf :

INF: This system's Context() limit is 65535 ZeroMQ socketsINF: current state of the global Context()-instance has:
( 1 )-IO-threads ready
( 1 )-ZMQ_BLOCKY state

然后我用 zsys_set_io_threads(2) 和/或 zmq_ctx_set (aGlobalCONTEXT, ZMQ_BLOCKY, false); 更改了全局上下文线程值,仍然阻塞。看起来 zactor 不像 zthread 那样与系统线程一起工作......或者没有给出类似的行为。鉴于我在 zeromq(也是零)方面的经验,我可能正在尝试一些无法实现的事情。

更新已解决但不合适

我的主要错误是没有正确启动 zactor 实例

An actor function MUST call zsock_signal (pipe) when initialized and MUST listen to pipe and exit on $TERM command.

并且在调用 zactor_destroy (&proxy);

之前没有阻止 zactor 的代理执行

我在下面给出了最终代码,但你仍然需要在最后使用 Ctrl+C 退出,因为我没有弄清楚如何管理 $ TERM 信号正确。此外,zactor 似乎仍然不使用系统 theads。它可能是这样设计的,但我不知道它在木头后面是如何工作的。

//  Round-trip demonstrator
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. The client task signals to
// main when it's ready.

#include <czmq.h>

static void
client_task (zsock_t *pipe, void *args)
{
assert (streq ((char *) args, "Hello, Client"));
zsock_signal (pipe, 0);

zsock_t *client = zsock_new (ZMQ_DEALER);
zsock_connect (client, "tcp://127.0.0.1:5555");

printf ("Setting up test...\n");
zclock_sleep (100);

int requests;
int64_t start;

printf ("Synchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");

zmsg_t *msgh = zmsg_recv (client);
zmsg_destroy (&msgh);

}
printf (" %d calls/second\n",
(1000 * 10000) / (int) (zclock_time () - start));

printf ("Asynchronous round-trip test...\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++) {
zstr_send (client, "hello");
}
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
zstr_free (&reply);
}
printf (" %d calls/second\n",
(1000 * 100000) / (int) (zclock_time () - start));

zstr_send (pipe, "done");
printf("send 'done' to pipe\n");
}

// Here is the worker task. All it does is receive a message, and
// bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
assert (streq ((char *) args, "Hello, Worker"));
zsock_signal (pipe, 0);

zsock_t *worker = zsock_new (ZMQ_DEALER);
zsock_connect (worker, "tcp://127.0.0.1:5556");

bool terminated = false;
while (!terminated) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
// zstr_send (worker, "hello back"); // Give better perf I don't know why

}
zsock_destroy (&worker);
}

// Here is the broker task. It uses the zmq_proxy function to switch
// messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
assert (streq ((char *) args, "Hello, Task"));
zsock_signal (pipe, 0);

// Prepare our proxy and its sockets
zactor_t *proxy = zactor_new (zproxy, NULL);
zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
zsock_wait (proxy);
zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
zsock_wait (proxy);

bool terminated = false;
while (!terminated) {
zmsg_t *msg = zmsg_recv (pipe);
if (!msg)
break; // Interrupted
char *command = zmsg_popstr (msg);

if (streq (command, "$TERM")) {
terminated = true;
printf("broker received $TERM\n");
}

freen (command);
zmsg_destroy (&msg);
}

zactor_destroy (&proxy);
}

// Finally, here's the main task, which starts the client, worker, and
// broker, and then runs until the client signals it to stop:

int main (void)
{

// Create threads
zactor_t *client = zactor_new (client_task, "Hello, Client");
assert (client);
zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
assert (worker);
zactor_t *broker = zactor_new (broker_task, "Hello, Task");
assert (broker);

char *signal = zstr_recv (client);
printf("signal %s\n", signal);
zstr_free (&signal);

zactor_destroy (&client);
printf("client done\n");
zactor_destroy (&worker);
printf("worker done\n");
zactor_destroy (&broker);
printf("broker done\n");

return 0;
}

最佳答案

让我们一步步诊断现状:

int czmqMAJOR,
czmqMINOR,
czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ( %d, %d, %d )-version",
czmqMAJOR,
czmqMINOR,
czmqPATCH
);
assert ( czmqMAJOR == CZMQ_VERSION_MAJOR & "Major: does not match\n" );
assert ( czmqMINOR == CZMQ_VERSION_MINOR & "Minor: does not match\n" );
assert ( czmqPATCH == CZMQ_VERSION_PATCH & "Patch: does not match\n" );

如果这符合您的预期,您可能希望 DLL 版本都匹配并在正确的位置找到。


下一步:

可以测试整个马戏团以非阻塞模式运行,以证明没有其他阻塞,但作为简要检查,我没有发现 CZMQ-API 中公开了这样的选项, native API 允许标记一个NOBLOCK { _send() | 上的选项_recv() }- 操作,这可以防止它们保持阻塞状态(在 _send()-s 的情况下,DEALER 套接字实例可能就是这种情况,当还没有任何具有 POSACK-ed .bind()/.connect() 状态的交易对手时。

在这里,我没有找到一些工具可以像 native API 中预期的那样快地执行此操作。也许你会更幸运地经历这个。


测试全局 Context() 实例是否已准备就绪:

在第一个套接字实例化之前添加,以确保我们在任何和所有套接字生成及其各自的 _bind()/_connect() 操作之前添加以下 self 报告行,使用:

 zsys_info ( "INF: This system's Context() limit is %zu ZeroMQ sockets",
zsys_socket_limit ()
);

也可以手动执行 Context() 实例化:

为了确保全局 Context() 实例启动并运行,在任何更高的抽象实例询问是否实现额外的内部性(套接字、计数器、处理程序、端口管理等)之前

//  Initialize CZMQ zsys layer; this happens automatically when you create
// a socket or an actor; however this call lets you force initialization
// earlier, so e.g. logging is properly set-up before you start working.
// Not threadsafe, so call only from main thread. Safe to call multiple
// times. Returns global CZMQ context.
CZMQ_EXPORT void *
zsys_init (void);

// Optionally shut down the CZMQ zsys layer; this normally happens automatically
// when the process exits; however this call lets you force a shutdown
// earlier, avoiding any potential problems with atexit() ordering, especially
// with Windows dlls.
CZMQ_EXPORT void
zsys_shutdown (void);

并可能更好地调整 IO 性能,在初始化状态下使用它:

//  Configure the number of I/O threads that ZeroMQ will use. A good
// rule of thumb is one thread per gigabit of traffic in or out. The
// default is 1, sufficient for most applications. If the environment
// variable ZSYS_IO_THREADS is defined, that provides the default.
// Note that this method is valid only before any socket is created.
CZMQ_EXPORT void
zsys_set_io_threads (size_t io_threads);

这种手动实例化提供了一个额外的好处,它具有实例句柄空指针,因此可以通过 zmq_ctx_get() 工具检查它的当前状态和形状:

void *aGlobalCONTEXT = zsys_init();

printf( "INF: current state of the global Context()-instance has:\n" );
printf( " ( %d )-IO-threads ready\n", zmq_ctx_get( aGlobalCONTEXT,
ZMQ_IO_THREADS
)
);
printf( " ( %d )-ZMQ_BLOCKY state\n", zmq_ctx_get( aGlobalCONTEXT,
ZMQ_BLOCKY
)
); // may generate -1 in case DLL is << 4.2+
...


如果对信号处理不满意,可以设计并使用另一种:

//  Set interrupt handler; this saves the default handlers so that a
// zsys_handler_reset () can restore them. If you call this multiple times
// then the last handler will take affect. If handler_fn is NULL, disables
// default SIGINT/SIGTERM handling in CZMQ.
CZMQ_EXPORT void
zsys_handler_set (zsys_handler_fn *handler_fn);

在哪里

//  Callback for interrupt signal handler
typedef void (zsys_handler_fn) (int signal_value);

关于c - 使用 CZMQ-4.1.0 新 zsock API 更新的异步 Majordomo 模式示例不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49019846/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com