gpt4 book ai didi

java - ZeroMQ 两个 PUB-SUB 代理

转载 作者:太空宇宙 更新时间:2023-11-04 14:09:32 25 4
gpt4 key购买 nike

我想为我们的分布式系统实现一个发布-订阅基础设施。您可以在图中看到,网络背后的想法是,我想用 java 实现发布者和订阅者。但在JZmq中尚不支持曲线加密。所以我想在 C(++) 可用的地方实现代理。(目前我只在 java 中实现)

这是我的代码

订阅者.java:

import java.nio.charset.Charset;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Subscriber {
public static void main(String[] args) {
String address = args[0];
String topic = args[1];

Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect(address);
subscriber.subscribe(topic.getBytes());

while (!Thread.currentThread().isInterrupted()) {
String top = subscriber.recvStr(Charset.defaultCharset());
String contents = subscriber.recvStr(Charset.defaultCharset());

System.out.println(top + ": " + contents);
}
subscriber.close();
context.term();
}
}

Publisher.java:

import java.util.Random;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

public class Publisher {
public static void main(String[] args) {
String url = args[0];
String topic = args[1];
int intervall = Integer.valueOf(args[2]);

Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);

Random rand = new Random();
publisher.connect(url);
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(intervall);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = rand.nextInt(20) * (rand.nextBoolean() ? (-1) : 1);
publisher.sendMore(topic);
publisher.send(String.valueOf(value));
System.out.println("PUB: " + topic + ":" + value);
}

publisher.close();
context.term();
}
}

PubSubProxy.java:

import java.io.PrintStream;

import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;

public class PubSubProxy {
static Socket frontend;
static Socket backend;

public static void main(String[] args) {
String addressSubscriber = args[0];
String modeSubscriber = args[1];
String addressPublisher = args[2];
String modePublisher = args[3];

// Prepare our context and sockets
// ZContext context = ZMQ.context(1);
ZContext context = new ZContext();

// This is where the weather server sits
frontend = context.createSocket(ZMQ.XSUB);
if (modeSubscriber.equals("client")) {
System.out.println("Subscriber connecting to: " + addressSubscriber);
frontend.connect(addressSubscriber);
} else if (modeSubscriber.equalsIgnoreCase("server")) {
System.out.println("Subscriber binding to: " + addressSubscriber);
frontend.bind(addressSubscriber);
}
// This is our public endpoint for subscribers
backend = context.createSocket(ZMQ.XPUB);

if (modePublisher.equals("client")) {
System.out.println("Publisher connecting to: " + addressPublisher);
backend.connect(addressPublisher);
} else if (modePublisher.equalsIgnoreCase("server")) {
System.out.println("Publisher binding to: " + addressPublisher);
backend.bind(addressPublisher);
}

// Subscribe on everything
// frontend.subscribe("".getBytes());

// Run the proxy until the user interrupts us
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);

frontend.close();
backend.close();
context.destroy();
}

private static class Listener implements IAttachedRunnable {
@Override
public void run(Object[] args, ZContext ctx, Socket pipe) {
// Print everything that arrives on pipe
while (true) {
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}

如您所见,我已向代理添加了一个监听器以查看是否收到消息。在发布者端代理(图中最上面的一个)上,我收到了消息,但在另一个代理上却没有收到任何消息。

这是我执行应用程序的方式

#beaglebone #1
#proxy #1
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub server tcp://*:5555 server
#pub
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub humidity 1000
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub testvar 5000

#beaglebone #2
#proxy #2
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 client ipc:///tmp/sub server
#sub
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub humidity
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub testvar

pub-sub network

最佳答案

您似乎混淆了客户端/服务器模式与发布/订阅模式的混合。

在发布/订阅模式中,发布者通知其订阅者(如果有)。发布者应使用 bind (监听订阅),订阅者应使用 connect (请求订阅)。

那么你的交流就变成: enter image description here

为了做到这一点,您可以:

  • 修改 Publisher.java,将 publisher.connect(url); 替换为 publisher.bind(url);
  • 修改 PubSubProxy.java 删除无用的客户端/服务器参数
import java.io.PrintStream;

import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;

public class PubSubProxy {
static Socket frontend;
static Socket backend;

public static void main(String[] args) {
String addressSubscriber = args[0];
String addressPublisher = args[1];

// Prepare our context and sockets
ZContext context = new ZContext();

// This is where the weather server sits
frontend = context.createSocket(ZMQ.XSUB);
System.out.println("Subscriber connecting to: " + addressSubscriber);
frontend.connect(addressSubscriber);

// This is our public endpoint for subscribers
backend = context.createSocket(ZMQ.XPUB);
System.out.println("Publisher binding to: " + addressPublisher);
backend.bind(addressPublisher);

// Run the proxy until the user interrupts us
ZMQ.proxy(frontend, backend, null);

frontend.close();
backend.close();
context.destroy();
}
}

然后您应该能够使用以下方式从后端接收数据到前端:

#beaglebone #1
#proxy #1
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub tcp://*:5555
#pub
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000

#beaglebone #2
#proxy #2
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 ipc:///tmp/sub
#sub
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature

关于java - ZeroMQ 两个 PUB-SUB 代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28495202/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com