gpt4 book ai didi

sockets - ZeroMQ职位分配

转载 作者:行者123 更新时间:2023-12-03 11:55:33 29 4
gpt4 key购买 nike

我有以下设置:
有一个客户,多个工作人员和一个接收器。
工作人员通过ZeroMQ消息接收来自客户端的工作请求。他们处理输入,并将答案发送到另一个进程(接收器)。处理一条消息大约需要1毫秒,而我们需要每秒处理50,000条消息-这意味着我们需要50多名工作人员来处理负载。

我尝试了一个简单的设置,其中客户端创建了一个ZeroMQ PUSH套接字,所有工作程序都通过(PULL)套接字连接到该套接字。同样,接收器创建单个PULL套接字,所有工作程序都通过PUSH套接字连接到该套接字。

在IIUC中,ZeroMQ使用“循环”将消息发送给工作人员-每次其他工作人员获得工作时。此设置似乎可以在约10名 worker (和适当的负载)下有效地工作。但是,当增加 worker 人数并进一步增加负载时,这会很快中断,并且系统开始累积延迟。

我知道有几种模式可以解决负载平衡问题,但是它们面向多个客户端,并且需要在它们之间使用路由器,这意味着需要额外的代码+ CPU周期。问题是:

1)如果是单个客户,多个工作人员,一个接收器,那么最佳的模式是什么?

2)是否可以通过在客户端和客户端之间路由而无需在客户端和工作人员之间使用路由器?

3)应该使用哪种ZeroMQ socket ?

谢谢!

编辑:
添加代码。

客户:

    void *context = zmq_ctx_new ();

// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");

// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");

printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers\n");

// The first message is "0" and signals start of batch
s_send (sink, "0");

unsigned long i;
const int nmsgs = atoi(argv[1]);
const int nmsgs_sec = atoi(argv[2]);
const int buff_size = 1024; // 1KB msgs
unsigned long t, t_start;
t_start = timestamp();
for (i = 0; i < nmsgs; i++) {
t = timestamp();
// Pace the sending according to nmsgs_sec
while( i * 1000000 / (t+1-t_start) > nmsgs_sec) {
// busy wait
t = timestamp();
}
char buffer [buff_size];
// Write current timestamp in the packet beginning
sprintf (buffer, "%lu", t);
zmq_send (sender, buffer, buff_size, 0);
}
printf("Total time: %lu ms Planned time: %d ms\n", (timestamp() - t_start)/1000, nmsgs * 1000 / nmsgs_sec);

zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);

worker :
//  Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, receiver_addr);

// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, sender_addr);

// Process tasks forever
const int buff_size = 1024;
char buffer[buff_size];
while (1) {
zmq_recv (receiver, buffer, buff_size, 0);
s_send (sender, buffer);
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);

下沉:
//  Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");

// Wait for start of batch
char *string = s_recv (receiver);
free (string);

unsigned long t1;
unsigned long maxdt = 0;
unsigned long sumdt = 0;

int task_nbr;
int nmsgs = atoi(argv[1]);
printf("nmsgs = %d\n", nmsgs);
for (task_nbr = 0; task_nbr < nmsgs; task_nbr++) {
char *string = s_recv (receiver);
t1 = timestamp();
unsigned long t0 = atoll(string);
free (string);

unsigned long dt = t1-t0;
maxdt = (maxdt > dt ? maxdt : dt);
sumdt += dt;

if(task_nbr % 10000 == 0) {
printf("%d %lu\n", task_nbr, dt);
}
}

printf("Average time: %lu usec\tMax time: %lu usec\n", sumdt/nmsgs, maxdt);

zmq_close (receiver);
zmq_ctx_destroy (context);

最佳答案

您有多个选择,具体取决于实际错误在当前设置中出现的位置(这无法从您提供的信息中看出来)。

您绝对不需要另一个“中间”节点。

如果问题是连接数量(1-> 50)是当前设置中的问题,则可以在客户端上设置多个PUSH套接字,每个套接字都有一部分工作进程,并且可以在客户端内部进行负载平衡。

如果问题是PUSH插槽本身,则可以在“推”侧使用DEALER插槽,在“拉”侧使用ROUTER插槽。但是我不希望这是问题所在。

通常,我希望您当前的设置是“正确的”设置,并且可能在您的实现中存在错误。您知道错误在哪里引入的吗?客户-> worker 还是 worker ->水槽?还是其他地方?

关于sockets - ZeroMQ职位分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30850890/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com