- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我想为我们的分布式系统实现一个发布-订阅基础设施。您可以在图中看到,网络背后的想法是,我想用 java 实现发布者和订阅者。但在JZmq中尚不支持曲线加密。所以我想在 C(++) 可用的地方实现代理。(目前我只在 java 中实现)
这是我的代码
订阅者.java:
import java.nio.charset.Charset;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Subscriber {
public static void main(String[] args) {
String address = args[0];
String topic = args[1];
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect(address);
subscriber.subscribe(topic.getBytes());
while (!Thread.currentThread().isInterrupted()) {
String top = subscriber.recvStr(Charset.defaultCharset());
String contents = subscriber.recvStr(Charset.defaultCharset());
System.out.println(top + ": " + contents);
}
subscriber.close();
context.term();
}
}
Publisher.java:
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
public class Publisher {
public static void main(String[] args) {
String url = args[0];
String topic = args[1];
int intervall = Integer.valueOf(args[2]);
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
Random rand = new Random();
publisher.connect(url);
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(intervall);
} catch (InterruptedException e) {
e.printStackTrace();
}
int value = rand.nextInt(20) * (rand.nextBoolean() ? (-1) : 1);
publisher.sendMore(topic);
publisher.send(String.valueOf(value));
System.out.println("PUB: " + topic + ":" + value);
}
publisher.close();
context.term();
}
}
PubSubProxy.java:
import java.io.PrintStream;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;
public class PubSubProxy {
static Socket frontend;
static Socket backend;
public static void main(String[] args) {
String addressSubscriber = args[0];
String modeSubscriber = args[1];
String addressPublisher = args[2];
String modePublisher = args[3];
// Prepare our context and sockets
// ZContext context = ZMQ.context(1);
ZContext context = new ZContext();
// This is where the weather server sits
frontend = context.createSocket(ZMQ.XSUB);
if (modeSubscriber.equals("client")) {
System.out.println("Subscriber connecting to: " + addressSubscriber);
frontend.connect(addressSubscriber);
} else if (modeSubscriber.equalsIgnoreCase("server")) {
System.out.println("Subscriber binding to: " + addressSubscriber);
frontend.bind(addressSubscriber);
}
// This is our public endpoint for subscribers
backend = context.createSocket(ZMQ.XPUB);
if (modePublisher.equals("client")) {
System.out.println("Publisher connecting to: " + addressPublisher);
backend.connect(addressPublisher);
} else if (modePublisher.equalsIgnoreCase("server")) {
System.out.println("Publisher binding to: " + addressPublisher);
backend.bind(addressPublisher);
}
// Subscribe on everything
// frontend.subscribe("".getBytes());
// Run the proxy until the user interrupts us
IAttachedRunnable runnable = new Listener();
Socket listener = ZThread.fork(context, runnable);
ZMQ.proxy(frontend, backend, listener);
frontend.close();
backend.close();
context.destroy();
}
private static class Listener implements IAttachedRunnable {
@Override
public void run(Object[] args, ZContext ctx, Socket pipe) {
// Print everything that arrives on pipe
while (true) {
ZFrame frame = ZFrame.recvFrame(pipe);
if (frame == null)
break; // Interrupted
System.out.println(frame.toString());
frame.destroy();
}
}
}
}
如您所见,我已向代理添加了一个监听器以查看是否收到消息。在发布者端代理(图中最上面的一个)上,我收到了消息,但在另一个代理上却没有收到任何消息。
这是我执行应用程序的方式
#beaglebone #1
#proxy #1
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub server tcp://*:5555 server
#pub
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub humidity 1000
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub testvar 5000
#beaglebone #2
#proxy #2
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 client ipc:///tmp/sub server
#sub
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub humidity
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub testvar
最佳答案
您似乎混淆了客户端/服务器模式与发布/订阅模式的混合。
在发布/订阅模式中,发布者通知其订阅者(如果有)。发布者应使用 bind
(监听订阅),订阅者应使用 connect
(请求订阅)。
那么你的交流就变成:
为了做到这一点,您可以:
publisher.connect(url);
替换为 publisher.bind(url);
import java.io.PrintStream;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZThread;
import org.zeromq.ZThread.IAttachedRunnable;
public class PubSubProxy {
static Socket frontend;
static Socket backend;
public static void main(String[] args) {
String addressSubscriber = args[0];
String addressPublisher = args[1];
// Prepare our context and sockets
ZContext context = new ZContext();
// This is where the weather server sits
frontend = context.createSocket(ZMQ.XSUB);
System.out.println("Subscriber connecting to: " + addressSubscriber);
frontend.connect(addressSubscriber);
// This is our public endpoint for subscribers
backend = context.createSocket(ZMQ.XPUB);
System.out.println("Publisher binding to: " + addressPublisher);
backend.bind(addressPublisher);
// Run the proxy until the user interrupts us
ZMQ.proxy(frontend, backend, null);
frontend.close();
backend.close();
context.destroy();
}
}
然后您应该能够使用以下方式从后端接收数据到前端:
#beaglebone #1
#proxy #1
java -Djava.library.path=/usr/local/lib -jar proxy.jar ipc:///tmp/pub tcp://*:5555
#pub
java -Djava.library.path=/usr/local/lib -jar publisher.jar ipc:///tmp/pub temperature 10000
#beaglebone #2
#proxy #2
java -Djava.library.path=/usr/local/lib -jar proxy.jar tcp://192.168.0.192:5555 ipc:///tmp/sub
#sub
java -Djava.library.path=/usr/local/lib -jar subscriber.jar ipc:///tmp/sub temperature
关于java - ZeroMQ 两个 PUB-SUB 代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28495202/
你能举一些zeromq的例子吗? 最佳答案 假设您想要某种公告板。您希望通过订阅公告板来只允许某些人看到它。 这可以使用 ZeroMQ 的发布者/订阅者模型来完成。 现在,假设您需要发送一些异步消息。
因此,正如我在上一篇文章中所问的那样,我希望能够使用不同语言编写的程序或函数在它们之间进行通信。 我最近遇到了 zeromq,我试图弄清楚这是否可以帮助我,因为它提供了某种套接字。例如,zeromq
与通过 POLLIN 多路复用多个套接字有何不同? while True: socks = dict(poller.poll()) if socks.get(control_recei
我正在设计一个与 ZeroMQ 对话的服务器应用程序。无需深入细节,服务器将存储和服务(来自查询请求)(eventid, eventstring)元组。 我的问题涉及有线协议(protocol)的设计
我有一个服务器(在 Amazon 上运行)和一个连接到它的客户端。建立连接后,客户端和服务器专门相互通信并发送消息。 例如 1. Client -> Server 2. Client -> Serve
我正在开发一个新的客户端-服务器应用程序 (.Net),并且到目前为止一直在使用 WCF,它非常适合应用程序的请求-响应方法。然而,我被要求用基于套接字的解决方案替换它,部分是为了支持非 .Net 客
我正在尝试做一个发布/订阅架构,其中多个发布者和多个订阅者存在于同一总线上。根据我在互联网上阅读的内容,只有一个套接字应该调用 bind(),而所有其他套接字(无论是 pub 还是 sub)都应该调用
使用zeromq,发送者发送10条消息后,发送者崩溃。 场景1:接收方正在一条一条地处理消息,花费了一些明显的时间成本,在这种情况下它还会收到 10 条消息吗? 场景 2:另一种情况是,当接收器崩溃时
我有一个 ZeroMQ 套接字,它正在从不同机器上的多个进程接收数据。在不改变数据内容的情况下,有没有办法识别数据的来源呢?具体来说,我想要发送者的 IP 地址(如果它来自 TCP 连接)。 最佳答案
有人知道在哪里可以找到有关 ZeroMQ 延迟与 29 West LBM 等竞争对手的性能详细信息吗? 看起来便宜得多,但我找不到任何指标来决定哪个更合适。 最佳答案 ZeroMQ 和 29West
有没有办法在不使用转发器概念的情况下使用 zeroMQ 库进行消息广播? 最佳答案 是的,一个 PUB 套接字将广播到所有连接的 SUB 套接字。只有当您想要桥接不同的网络时才需要转发器(代理),例如
几天前我才开始使用zeromq。我的目标是设计一个具有多个代理(代理网络)的发布订阅系统。我已经阅读了 zeromq 指南的相关部分,并为简单的发布子系统编写了代码。如果有人可以帮助我解决以下问题:
我需要编写一个订单管理器,将客户(股票、外汇等)订单发送到适当的交易所。客户想要发送订单,但对 FIX 或其他专有协议(protocol)一无所知,只知道发送订单的内部(规范化)格式。我有应用程序(服
我正在尝试从示例 wuclient/wuserver 在 zeromq 上实现一个惰性订阅者。 客户端比服务器慢得多,因此它必须只获取服务器最后发送的消息。 到目前为止,我发现这样做的唯一方法是连接/
我是 ZeroMQ 的新手并试图找出设计问题。我的情况是我有一个或多个客户端向单个服务器发送请求。服务器将处理请求,做一些事情,并向客户端发送回复。有两个条件: 回复必须发送到发送请求的客户端。 如果
如 docs 中所述在 3.x 版本的 zeromq 中,PUB/SUB 场景中的消息正在被过滤 出版商侧(而不是在订阅者方面,这是微不足道的)。 对我来说,这听起来像是发布者必须持有所有连接的套接字
引自 ZeroMQ 指南 However, with a little extra work, this humble pattern becomes a good basis for real wo
假设我有一个带有 ZeroMQ 接口(interface)的节点(进程、线程等),比方说一个 REP 套接字。这意味着我有一个无限主循环,它在 zmq_recv 或 zmq_poll 函数中休眠。 现
我想以某种方式比较 grpc 与 Zeromq 及其模式的功能:并且我想创建一些比较(功能集) - 不知何故 - 0mq 是“更好”的套接字 - 但无论如何 - 如果我应用 0mq模式 - 我认为我得
我正在试验 ZeroMQ。我发现在 ZeroMQ 中非常有趣,connect 或 bind 先发生并不重要。我试着查看 ZeroMQ 的源代码,但它太大了,找不到任何东西。 代码如下。 # clien
我是一名优秀的程序员,十分优秀!