gpt4 book ai didi

java - ZMQ 中的流量不均匀

转载 作者:行者123 更新时间:2023-12-01 15:16:37 31 4
gpt4 key购买 nike

我在我的 Java 应用程序中使用 ZMQ。我发现它的行为不均匀,即如果我发送大约 100 条消息,其中一个消费者说需要 1 秒,那么如果我们继续增加消费者,所花费的时间将变为 2,1.5,3,这样。没有逐渐增加或减少的情况。我该如何纠正这个问题。在下面查找我的代码

// Broker

导入 org.zeromq.ZMQ;
导入 org.zeromq.ZMQ.Context;
导入 org.zeromq.ZMQ.Socket;
导入 org.zeromq.ZMQStreamer;

公共(public)类代理{

/**
* @param args
*/
public static void main(String[] args)
{
Context context = ZMQ.context(1);

Socket frontEnd = context.socket(ZMQ.PULL);
frontEnd.bind("tcp://*:5555");

Socket backEnd = context.socket(ZMQ.PUSH);
backEnd.bind("tcp://*:5560");

ZMQStreamer zmqStreamer = new ZMQStreamer(context, frontEnd, backEnd);
zmqStreamer.run();
}

}

//生产者

导入org.zeromq.ZMQ;导入org.zeromq.ZMQ.Socket;

公共(public)类生产者{

public void init()
{
ZMQ.Context context = ZMQ.context(1);
socket = context.socket(ZMQ.PUSH);
socket.connect("tcp://localhost:5555");
}

public void initMessage(String message)
{
this.message = message;
}

public void sendMessage()
{
String sendMessage = System.nanoTime() +"#"+ message;
socket.send(sendMessage.getBytes(), 0);
}
/**
* @param args
*/
public static void main(String[] args)
{
Producer producer = new Producer();
producer.init();
byte[] message = new byte[Integer.parseInt(args[0])];
//message = "Hello".getBytes();
producer.initMessage(new String(message));
for(int i=0;i<100;i++)
{
producer.sendMessage();
}
}

private Socket socket = null;
private String message;
}

//Consumer

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

public class Consumer
{

public void init()
{
ZMQ.Context context = ZMQ.context(1);
socket = context.socket(ZMQ.PULL);
socket.connect("tcp://localhost:5560");
}

public void reciveMessage()
{
byte[] recived = socket.recv(0);
//System.out.println(recived.length);
long recivedTime = System.nanoTime();
String message = new String(recived);
String[] splitMessage = message.split("#");
long sendTime = Long.parseLong(splitMessage[0]);
System.out.println("Send Time " + sendTime + " RecivedTime "
+ recivedTime + " Time taken " + (recivedTime - sendTime)
+ " Message " + message);
}
/**
* @param args
*/
public static void main(String[] args)
{
Consumer consumer = new Consumer();
consumer.init();
for (int i=0;i<100;i++)
{
consumer.reciveMessage();
}
}
private Socket socket = null;
}

最佳答案

为了可靠地对一段多线程代码进行计时,您需要有某种方法来同步收集器/接收器的开始和结束时间(您目前尚未显示对其进行编程)。

查看this example来自 ZMQ 指南,其中指出以下过程是 ZMQ 划分数据集的正确方法之一:

...

Our supercomputing application is a fairly typical parallel processing model:

We have a ventilator that produces tasks that can be done in parallel.
We have a set of workers that process tasks.
We have a sink that collects results back from the worker processes.

...

关于java - ZMQ 中的流量不均匀,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11516621/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com