- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想知道是否有办法让 ZeroMQ 套接字只进行读取或只进行写入。因为,在我看来,即使有异步/多线程示例,每个线程仍然使用 receive-then-send 循环。我遇到的问题是,我想要从 ZeroMQ 套接字读取的 receiveMessage()
和写入 ZeroMQ 套接字的 sendMessage(msg)
。但是这些方法中的每一个都将在另一个类中构造的单独线程中运行。这是我的代码(我使用 Scala 中的 jeromq):
trait ZmqProtocol extends Protocol {
val context: ZContext = new ZContext(1)
private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
frontendSocket.bind("tcp://*:5555")
backendSocket.bind("inproc://backend")
new Thread(() => {
println("Started receiving messages")
// Connect backend to frontend via a proxy
ZMQ.proxy(frontendSocket, backendSocket, null)
}).start()
override def receiveMessage(): (String, String) = {
val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocReadSocket.connect("inproc://backend")
// The DEALER socket gives us the address envelope and message
val msg = ZMsg.recvMsg(inprocReadSocket)
// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val address = msg.pop
val emptyFrame = msg.pop
val request = msg.pop
assert(request != null)
msg.destroy()
println(s"RECEIVED: $request FROM: $address")
(address.toString, request.toString)
}
override def sendMessage(address: String, response: String): Unit = {
val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocWriteSocket.connect("inproc://backend")
val addressFrame = new ZFrame(address)
val emptyFrame = new ZFrame("")
val responseFrame = new ZFrame(response)
addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
responseFrame.send(inprocWriteSocket, ZFrame.REUSE)
addressFrame.destroy()
emptyFrame.destroy()
responseFrame.destroy()
}
}
这是我如何使用它:
class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
val responseQueue: LinkedBlockingQueue[(String, String)])
extends Protocol {
def startHandlingTraffic(): Unit = {
new Thread(() => {
while (true) {
val (address, message) = receiveMessage()
requestQueue.put((address, message))
}
}).start()
new Thread(() => {
while (true) {
val (address, response) = responseQueue.take()
sendMessage(address, response)
}
}).start()
}
在调试过程中,我注意到我收到了消息,并使用正确的目标地址正确地从响应队列(并发阻塞队列)中获取了它,但默默地未能发送它。我深入研究了 jeromq 代码,在我看来,它与身份有关,因为 outPipe 为空。我猜这是因为我没有正确的接收发送循环。
在 @user3666197 响应后编辑代码有效! (尽管如果您先启动服务器,则绑定(bind)和连接到 PUSH
和 PULL
套接字需要一些时间)
以下是使用 PUSH
和 PULL
套接字的修改后的代码:
trait ZmqProtocol extends Protocol {
val context: ZContext = new ZContext(1)
val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
frontendSocket.bind("tcp://*:5555")
val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
requestQueueSocket.bind("inproc://requestQueueSocket")
val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
responseQueueSocket.bind("inproc://responseQueueSocket")
val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
inprocRequestQueueSocket.connect("inproc://requestQueueSocket")
val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
inprocResponseQueueSocket.connect("inproc://responseQueueSocket")
new Thread(() => {
println("Started receiving messages")
while (true) {
val msg = ZMsg.recvMsg(frontendSocket)
// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val reqAddress = msg.pop
val emptyFrame = msg.pop
val reqPayload = msg.pop
assert(reqPayload != null)
msg.destroy()
println(s"RECEIVED: $reqPayload FROM: $reqAddress")
requestQueueSocket.send(s"$reqAddress;$reqPayload")
val responseMessage = new String(responseQueueSocket.recv(0))
val respMessageSplit = responseMessage.split(";")
val respAddress = respMessageSplit(0)
val respPayload = respMessageSplit(1)
val array = new BigInteger(respAddress, 16).toByteArray
val respAddressFrame = new ZFrame(array)
val respEmptyFrame = new ZFrame("")
val respPayloadFrame = new ZFrame(respPayload)
respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
respPayloadFrame.send(frontendSocket, ZFrame.REUSE)
respAddressFrame.destroy()
respEmptyFrame.destroy()
respPayloadFrame.destroy()
}
}).start()
override def receiveMessage(): (String, String) = {
val message = new String(inprocRequestQueueSocket.recv(0))
val messageSplit = message.split(";")
val address = messageSplit(0)
val payload = messageSplit(1)
(address, payload)
}
override def sendMessage(address: String, response: String): Unit = {
inprocResponseQueueSocket.send(s"$address;$response")
}
}
如果需要的话,这是客户端:
trait ZmqClientProtocol extends ClientProtocol {
val context: ZMQ.Context = ZMQ.context(1)
val socket: ZMQ.Socket = context.socket(ZMQ.REQ)
println("Connecting to server")
socket.connect("tcp://localhost:5555")
override protected def send(message: String): String = {
// Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
val request = message.getBytes()
// Send the message
println(s"Sending request $request")
socket.send(request, 0)
// Get the reply.
val reply = socket.recv(0)
new String(s"$message=${new String(reply)}")
}
}
最佳答案
是的,有几种方法。
a ) 使用串联的单纯形原型(prototype):PUSH/PULL
写入和 PULL/PUSH
读取< br/>b)使用串联的单纯形原型(prototype):(X)PUB/(X)SUB
写入和(X)SUB/(X)PUB
读取
.recv()
-then-.send()
循环。嗯,这个观察结果更多地与实际的套接字原型(prototype)相关,其中一些确实需要对 .recv() 进行强制两步(在其内部 FSA 内部硬连线)排序
--.send()
--...
好吧,挑战开始了:ZeroMQ 从一开始就被设计为主要是零共享,以提高性能和独立性。 Zen-of-Zero 是 distributed-system 中有趣的设计原则。设计。
然而,最近的重新设计工作在 API 4.2+ 中提出了实现 ZeroMQ 套接字访问点成为线程安全的意愿(这违背了 share-nothing 的最初原则),因此,如果要朝这个方向进行实验,您可能会到达可行的领域,但代价是从“零”禅宗开始下降。
由于设计纯粹性,ZeroMQ 套接字访问点永远不应该共享,即使可能也是如此。
如果您努力分离 OOP 关注点,那么最好为此类提供另一对单纯形 PUSH/PULL
-s,但您的此类只读专用的头端+ 只写专用套接字必须处理这种情况,当“远程”(超出抽象的外部类边界)ZeroMQ 套接字原型(prototype) FSA 及其设置和性能调整以及错误状态和““remote”类必须安排所有这些内容,并协调所有与 native ZeroMQ 套接字之间的消息传输(对于两个头端(专用)类来说,它基本上是隔离和隐藏的)。
无论如何,通过适当的设计谨慎都是可行的。
<小时/>一个想法:
...
override def sendMessage( address: String,
response: String
): Unit = {
val inprocWriteSocket: ZMQ.Socket = context.createSocket( ZMQ.DEALER )
inprocWriteSocket.connect( "inproc://backend" )
...
在源代码中可能看起来很简单,但忽略了实际的设置开销,并且还应尊重这样一个事实:没有套接字( inproc://
-transport-class作为一种特殊情况)在 Context()
内实例化的一微秒内即可获得 RTO(Ready-To-Operate),更不用说是完全的 .connect()
-ed 和 RTO-ed 在与远程对方握手之后进行,因此最好提前设置好 SIG/MSG 基础设施,并将其最好地保持为半持久通信层,而不是任何临时/即时启动的可组合/一次性...(资源生态学)
inproc://
-transport-class 在 API 4.x 之前还有一项要求:Connecting a socket
When connecting a socket to a peer address usingzmq_connect()
with theinproc://
transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order ofzmq_bind()
andzmq_connect()
does not matter just like for thetcp://
transport type.
因此,在某些情况下,当您的部署不确定实际的 localhost API 版本时,请注意执行 .bind()
/.connect()
的正确顺序,否则inproc://
管道不适合您。
关于java - ZeroMQ 异步多线程与 ROUTER 和 DEALER,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49329294/
你能举一些zeromq的例子吗? 最佳答案 假设您想要某种公告板。您希望通过订阅公告板来只允许某些人看到它。 这可以使用 ZeroMQ 的发布者/订阅者模型来完成。 现在,假设您需要发送一些异步消息。
因此,正如我在上一篇文章中所问的那样,我希望能够使用不同语言编写的程序或函数在它们之间进行通信。 我最近遇到了 zeromq,我试图弄清楚这是否可以帮助我,因为它提供了某种套接字。例如,zeromq
与通过 POLLIN 多路复用多个套接字有何不同? while True: socks = dict(poller.poll()) if socks.get(control_recei
我正在设计一个与 ZeroMQ 对话的服务器应用程序。无需深入细节,服务器将存储和服务(来自查询请求)(eventid, eventstring)元组。 我的问题涉及有线协议(protocol)的设计
我有一个服务器(在 Amazon 上运行)和一个连接到它的客户端。建立连接后,客户端和服务器专门相互通信并发送消息。 例如 1. Client -> Server 2. Client -> Serve
我正在开发一个新的客户端-服务器应用程序 (.Net),并且到目前为止一直在使用 WCF,它非常适合应用程序的请求-响应方法。然而,我被要求用基于套接字的解决方案替换它,部分是为了支持非 .Net 客
我正在尝试做一个发布/订阅架构,其中多个发布者和多个订阅者存在于同一总线上。根据我在互联网上阅读的内容,只有一个套接字应该调用 bind(),而所有其他套接字(无论是 pub 还是 sub)都应该调用
使用zeromq,发送者发送10条消息后,发送者崩溃。 场景1:接收方正在一条一条地处理消息,花费了一些明显的时间成本,在这种情况下它还会收到 10 条消息吗? 场景 2:另一种情况是,当接收器崩溃时
我有一个 ZeroMQ 套接字,它正在从不同机器上的多个进程接收数据。在不改变数据内容的情况下,有没有办法识别数据的来源呢?具体来说,我想要发送者的 IP 地址(如果它来自 TCP 连接)。 最佳答案
有人知道在哪里可以找到有关 ZeroMQ 延迟与 29 West LBM 等竞争对手的性能详细信息吗? 看起来便宜得多,但我找不到任何指标来决定哪个更合适。 最佳答案 ZeroMQ 和 29West
有没有办法在不使用转发器概念的情况下使用 zeroMQ 库进行消息广播? 最佳答案 是的,一个 PUB 套接字将广播到所有连接的 SUB 套接字。只有当您想要桥接不同的网络时才需要转发器(代理),例如
几天前我才开始使用zeromq。我的目标是设计一个具有多个代理(代理网络)的发布订阅系统。我已经阅读了 zeromq 指南的相关部分,并为简单的发布子系统编写了代码。如果有人可以帮助我解决以下问题:
我需要编写一个订单管理器,将客户(股票、外汇等)订单发送到适当的交易所。客户想要发送订单,但对 FIX 或其他专有协议(protocol)一无所知,只知道发送订单的内部(规范化)格式。我有应用程序(服
我正在尝试从示例 wuclient/wuserver 在 zeromq 上实现一个惰性订阅者。 客户端比服务器慢得多,因此它必须只获取服务器最后发送的消息。 到目前为止,我发现这样做的唯一方法是连接/
我是 ZeroMQ 的新手并试图找出设计问题。我的情况是我有一个或多个客户端向单个服务器发送请求。服务器将处理请求,做一些事情,并向客户端发送回复。有两个条件: 回复必须发送到发送请求的客户端。 如果
如 docs 中所述在 3.x 版本的 zeromq 中,PUB/SUB 场景中的消息正在被过滤 出版商侧(而不是在订阅者方面,这是微不足道的)。 对我来说,这听起来像是发布者必须持有所有连接的套接字
引自 ZeroMQ 指南 However, with a little extra work, this humble pattern becomes a good basis for real wo
假设我有一个带有 ZeroMQ 接口(interface)的节点(进程、线程等),比方说一个 REP 套接字。这意味着我有一个无限主循环,它在 zmq_recv 或 zmq_poll 函数中休眠。 现
我想以某种方式比较 grpc 与 Zeromq 及其模式的功能:并且我想创建一些比较(功能集) - 不知何故 - 0mq 是“更好”的套接字 - 但无论如何 - 如果我应用 0mq模式 - 我认为我得
我正在试验 ZeroMQ。我发现在 ZeroMQ 中非常有趣,connect 或 bind 先发生并不重要。我试着查看 ZeroMQ 的源代码,但它太大了,找不到任何东西。 代码如下。 # clien
我是一名优秀的程序员,十分优秀!