gpt4 book ai didi

czmq 多个 zactor 实例崩溃

转载 作者:行者123 更新时间:2023-12-02 11:47:53 40 4
gpt4 key购买 nike

我正在尝试使用 czmq 库中的 zhash 和 zactor 编写一个基本示例。我想要实现的主要想法是:

  1. 创建 1024 个 zactor 实例,并在每个 Actor 创建后发送“START”命令。
  2. 等待 actor 的响应以继续创建 zactor。
  3. 删除所有之前创建的 actor。

我有点不明白为什么这段代码会崩溃。每次我到达 Actor 编号 60 时,应用程序都会退出:

...
...
58 actor started!
START command received!
59 actor started!
START command received!
60 actor started!
> Assertion failed: (self), function zsock_set_sndtimeo, file
> src/zsock_option.c, line 1344.
> Abort trap: 6

编译:gcc -o demo demo.c -g -lczmq

代码如下:

#include <stdio.h>
#include <czmq.h>

typedef struct {
zsock_t *pipe; // Actor command pipe
zpoller_t *poller; // Socket poller
int terminated;
} accountactor_t;

typedef struct{
zactor_t *actor;
int foo;
} account_t;

accountactor_t *
accountactor_new (zsock_t *pipe, void *args)
{
accountactor_t *self = (accountactor_t *) zmalloc (sizeof (accountactor_t));
assert (self);

self->pipe = pipe;
self->poller = zpoller_new (self->pipe, NULL);
self->terminated = 0;
return self;

}

static void
accountactor_recv_api (accountactor_t *self)
{
// Get the whole message of the pipe in one go
zmsg_t *request = zmsg_recv (self->pipe);
if (!request){
return; // Interrupted
}

char *command = zmsg_popstr (request);

if (streq (command, "START")){
zsys_debug("START command received!");
zsock_signal (self->pipe, 0);
}else
if (streq (command, "STOP")){
zsys_debug("STOP command received!");
zsock_signal (self->pipe, 0);
}else
if (streq (command, "$TERM")){
zsys_debug("$TERM command received!");
// The $TERM command is send by zactor_destroy() method
self->terminated = 1;

}else {
zsys_error ("Invalid command '%s'", command);
zsock_signal (self->pipe, -1);
}

zmsg_destroy(&request);
if(command){
free(command);
}
}

void
actor_fcn (zsock_t *pipe, void *args)
{
accountactor_t * self = accountactor_new (pipe, args);
if (!self)
return; // Interrupted

int rc = 0;
// Signal actor successfully initiated
zsock_signal (self->pipe, 0);

while (!self->terminated) {
zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1);
if (which == self->pipe){
accountactor_recv_api (self);
}
}

if(zpoller_terminated(self->poller)){
zsys_debug("Poller Interrupted!");
}else
if(zpoller_expired(self->poller)){
zsys_debug("Poller Expired!");
}

// Free object itself
zpoller_destroy (&self->poller);
zsock_destroy(&self->pipe);
free(self);
self = NULL;

}

void
s_account_free (void *argument)
{
account_t *account = (account_t *) argument;
zstr_send (account->actor, "$TERM");
zactor_destroy(&account->actor);
free(account);
zsys_debug("Item removed!");
}

int main(){

zhash_t *table = zhash_new();
int i = 0;

account_t *ptrs[1024];
char key[10];

for(i=0; i<1024; i++){

ptrs[i] = (account_t *) zmalloc (sizeof (account_t));

ptrs[i]->actor = zactor_new (actor_fcn, NULL);
sprintf(&key[0],"%d",i);
zhash_insert(table, key, (void *)ptrs[i]);
zhash_freefn(table, key, s_account_free);

zstr_send (ptrs[i]->actor, "START");
zsock_wait (ptrs[i]->actor);
zsys_debug("%d actor started!",i);

}

i = zhash_size(table);
// Delete all
while(i--){
sprintf(&key[0],"%d",i);
zhash_delete(table, key);
free(ptrs[i]);
}

return 0;

}

有什么想法吗?我不明白为什么会达到 60 个数量的限制。

最佳答案

您使用的是什么操作系统?是 OS/X 吗?

当前的 Actor 实现仍然使用 PAIR 套接字,并且它们在内部使用实际的文件句柄来发送信号。每个 actor 有两个 PAIR 套接字,每个套接字使用两个文件句柄,因此 60 个 actor = 240 个文件句柄。

在 OS/X 上,每个进程的默认限制为 256。您可以提高此值,请参阅:http://zeromq.org/docs:tuning-zeromq

在 Linux 上,默认值为 1024,您可以提高此值。在 Windows 上,您需要重新编译 libzmq,并将 FD_SETSIZE 设置为 16K 或类似值(这是 libzmq master 现在所做的,但旧版本具有较低的值)。

完全分开:

  • 构建时不需要任何握手,因为这已经由 zactor_new () 完成,您将看到所有 Actor 在初始化时都会发送信号。

  • 创建 1024 个 actor 可能过多,除非您实际测试系统限制。 Actor 使用系统线程;为了获得最佳性能,您需要每个代码一个线程。为了获得最佳设计,每个并发工作线程一个线程。

关于czmq 多个 zactor 实例崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35965983/

40 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com