gpt4 book ai didi

java - ZeroMQ 产生的结果微乎其微

转载 作者:太空宇宙 更新时间:2023-11-04 14:33:18 25 4
gpt4 key购买 nike

我正在测试 ZeroMQ,每秒只能收到大约 1227 - 1276 条消息。然而我读到这些应该超过这个数量的 100 倍。

我做错了什么?我可以指定一些配置来解决这个问题吗?

我正在使用以下功能:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
ZContext ctx = new ZContext();

Socket frontend = ctx.createSocket(ZMQ.PULL);
frontend.bind("tcp://*:"+SERVER_BIND_PORT);

int i = 1;
do{
ZMsg msg = ZMsg.recvMsg(frontend);
ZFrame content = msg.pop();
if(content!= null){
msg.destroy();
System.out.println("Received: "+i);
i++;
content.destroy();
}
}while(true);
}

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
ZContext ctx = new ZContext();
Socket client = ctx.createSocket(ZMQ.PUSH);

client.setIdentity("i".getBytes());
client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);

PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
int i = 1;
Timer t = new Timer(timeToSpendSending);
t.start();
do{
client.send(/* object to send*/ , 0);
i++;
}while(!t.isDone());

System.out.println("Done with "+i);
}

用于限制程序运行时间的定时器类:

class Timer extends Thread{
int time;
boolean done;
public Timer(int time){
this.time = time;
done = false;
}
public void run(){
try {
this.sleep(time);
done = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean isDone(){
return done;
}
}

编辑:我正在使用 jeroMQ

<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.3.4</version>
</dependency>

最佳答案

我必须更换连接方法并删除高水位线(设置为 0 以表示内存中的消息不受限制)

代码如下:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;
public static final String TOPIC = "topic1";

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
// Prepare our context and subscribe
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);

subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
subscriber.setRcvHWM(0);
subscriber.subscribe(TOPIC.getBytes());
System.out.println("subscribed to "+TOPIC);
int i = 1;
boolean started = false;
Timer t = new Timer(timeToSpendSending);
do{
String msg = subscriber.recvStr();
if(!TOPIC.equals(msg)){
if(!started){
t.start();
started = true;
}
i++;
}
}while(!t.isDone());
System.out.println("Done with: "+i);
subscriber.close();
context.term();
}
public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUSH);
publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
publisher.setHWM(0);
publisher.setSndHWM(0);

int i = 1;
Timer t = new Timer(timeToSpendSending);
t.start();
do{
publisher.sendMore(TOPIC);
publisher.send("Test Data number "+i);
i++;
}while(!t.isDone());
System.out.println("Done with: "+i);
publisher.close();
context.term();
}

像这样,我收到的消息计数范围为发送时每秒 250,000 条,接收时每秒 145,000 条。

关于java - ZeroMQ 产生的结果微乎其微,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25884948/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com