gpt4 book ai didi

java - ZeroMQ 异步多线程与 ROUTER 和 DEALER

转载 作者:行者123 更新时间:2023-11-30 06:15:20 25 4
gpt4 key购买 nike

我想知道是否有办法让 ZeroMQ 套接字只进行读取或只进行写入。因为,在我看来,即使有异步/多线程示例,每个线程仍然使用 receive-then-send 循环。我遇到的问题是,我想要从 ZeroMQ 套接字读取的 receiveMessage() 和写入 ZeroMQ 套接字的 sendMessage(msg) 。但是这些方法中的每一个都将在另一个类中构造的单独线程中运行。这是我的代码(我使用 Scala 中的 jeromq):

trait ZmqProtocol extends Protocol {

val context: ZContext = new ZContext(1)
private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)

frontendSocket.bind("tcp://*:5555")
backendSocket.bind("inproc://backend")


new Thread(() => {

println("Started receiving messages")
// Connect backend to frontend via a proxy
ZMQ.proxy(frontendSocket, backendSocket, null)

}).start()


override def receiveMessage(): (String, String) = {

val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocReadSocket.connect("inproc://backend")

// The DEALER socket gives us the address envelope and message
val msg = ZMsg.recvMsg(inprocReadSocket)

// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val address = msg.pop
val emptyFrame = msg.pop
val request = msg.pop

assert(request != null)
msg.destroy()

println(s"RECEIVED: $request FROM: $address")

(address.toString, request.toString)
}

override def sendMessage(address: String, response: String): Unit = {

val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocWriteSocket.connect("inproc://backend")

val addressFrame = new ZFrame(address)
val emptyFrame = new ZFrame("")
val responseFrame = new ZFrame(response)

addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
responseFrame.send(inprocWriteSocket, ZFrame.REUSE)

addressFrame.destroy()
emptyFrame.destroy()
responseFrame.destroy()
}

}

这是我如何使用它:

class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
val responseQueue: LinkedBlockingQueue[(String, String)])
extends Protocol {

def startHandlingTraffic(): Unit = {

new Thread(() => {

while (true) {

val (address, message) = receiveMessage()

requestQueue.put((address, message))
}
}).start()

new Thread(() => {
while (true) {

val (address, response) = responseQueue.take()
sendMessage(address, response)
}
}).start()
}

在调试过程中,我注意到我收到了消息,并使用正确的目标地址正确地从响应队列(并发阻塞队列)中获取了它,但默默地未能发送它。我深入研究了 jeromq 代码,在我看来,它与身份有关,因为 outPipe 为空。我猜这是因为我没有正确的接收发送循环。

在 @user3666197 响应后编辑代码有效! (尽管如果您先启动服务器,则绑定(bind)和连接到 PUSHPULL 套接字需要一些时间)
以下是使用 PUSHPULL 套接字的修改后的代码:

trait ZmqProtocol extends Protocol {

val context: ZContext = new ZContext(1)

val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
frontendSocket.bind("tcp://*:5555")

val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
requestQueueSocket.bind("inproc://requestQueueSocket")

val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
responseQueueSocket.bind("inproc://responseQueueSocket")

val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
inprocRequestQueueSocket.connect("inproc://requestQueueSocket")

val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
inprocResponseQueueSocket.connect("inproc://responseQueueSocket")

new Thread(() => {

println("Started receiving messages")

while (true) {

val msg = ZMsg.recvMsg(frontendSocket)

// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val reqAddress = msg.pop
val emptyFrame = msg.pop
val reqPayload = msg.pop

assert(reqPayload != null)
msg.destroy()

println(s"RECEIVED: $reqPayload FROM: $reqAddress")

requestQueueSocket.send(s"$reqAddress;$reqPayload")

val responseMessage = new String(responseQueueSocket.recv(0))
val respMessageSplit = responseMessage.split(";")

val respAddress = respMessageSplit(0)
val respPayload = respMessageSplit(1)

val array = new BigInteger(respAddress, 16).toByteArray
val respAddressFrame = new ZFrame(array)
val respEmptyFrame = new ZFrame("")
val respPayloadFrame = new ZFrame(respPayload)

respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
respPayloadFrame.send(frontendSocket, ZFrame.REUSE)

respAddressFrame.destroy()
respEmptyFrame.destroy()
respPayloadFrame.destroy()

}

}).start()


override def receiveMessage(): (String, String) = {

val message = new String(inprocRequestQueueSocket.recv(0))
val messageSplit = message.split(";")

val address = messageSplit(0)
val payload = messageSplit(1)

(address, payload)
}

override def sendMessage(address: String, response: String): Unit = {

inprocResponseQueueSocket.send(s"$address;$response")
}
}

如果需要的话,这是客户端:

trait ZmqClientProtocol extends ClientProtocol {

val context: ZMQ.Context = ZMQ.context(1)
val socket: ZMQ.Socket = context.socket(ZMQ.REQ)

println("Connecting to server")
socket.connect("tcp://localhost:5555")

override protected def send(message: String): String = {

// Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
val request = message.getBytes()

// Send the message
println(s"Sending request $request")
socket.send(request, 0)

// Get the reply.
val reply = socket.recv(0)

new String(s"$message=${new String(reply)}")
}
}

最佳答案

有没有办法让 ZeroMQ 套接字只进行读取或只进行写入?

是的,有几种方法。

a ) 使用串联的单纯形原型(prototype):PUSH/PULL 写入和 PULL/PUSH 读取< br/>b)使用串联的单纯形原型(prototype):(X)PUB/(X)SUB写入和(X)SUB/(X)PUB 读取

<小时/>

...仍然使用.recv()-then-.send()循环。

嗯,这个观察结果更多地与实际的套接字原型(prototype)相关,其中一些确实需要对 .recv() 进行强制两步(在其内部 FSA 内部硬连线)排序--.send()--...

<小时/>

...但是每个方法都将在单独的线程中运行

好吧,挑战开始了:ZeroMQ 从一开始就被设计为主要是零共享,以提高性能和独立性。 Zen-of-Zero 是 中有趣的设计原则。设计。

然而,最近的重新设计工作在 API 4.2+ 中提出了实现 ZeroMQ 套接字访问点成为线程安全的意愿(这违背了 share-nothing 的最初原则),因此,如果要朝这个方向进行实验,您可能会到达可行的领域,但代价是从“零”禅宗开始下降。

由于设计纯粹性,ZeroMQ 套接字访问点永远不应该共享,即使可能也是如此。

如果您努力分离 OOP 关注点,那么最好为此类提供另一对单纯形 PUSH/PULL-s,但您的此类只读专用的头端+ 只写专用套接字必须处理这种情况,当“远程”(超出抽象的外部类边界)ZeroMQ 套接字原型(prototype) FSA 及其设置和性能调整以及错误状态和““remote”类必须安排所有这些内容,并协调所有与 native ZeroMQ 套接字之间的消息传输(对于两个头端(专用)类来说,它基本上是隔离和隐藏的)。

无论如何,通过适当的设计谨慎都是可行的。

<小时/>

ZeroMQ 资源不是任何廉价的可组合/一次性垃圾

一个想法:

...
override def sendMessage( address: String,
response: String
): Unit = {

val inprocWriteSocket: ZMQ.Socket = context.createSocket( ZMQ.DEALER )
inprocWriteSocket.connect( "inproc://backend" )
...

在源代码中可能看起来很简单,但忽略了实际的设置开销,并且还应尊重这样一个事实:没有套接字( inproc://-transport-class作为一种特殊情况)在 Context() 内实例化的一微秒内即可获得 RTO(Ready-To-Operate),更不用说是完全的 .connect() -ed 和 RTO-ed 在与远程对方握手之后进行,因此最好提前设置好 SIG/MSG 基础设施,并将其最好地保持为半持久通信层,而不是任何临时/即时启动的可组合/一次性...(资源生态学)

<小时/>

inproc://-transport-class 在 API 4.x 之前还有一项要求:

Connecting a socket

When connecting a socket to a peer address using zmq_connect() with the inproc:// transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp:// transport type.

因此,在某些情况下,当您的部署不确定实际的 localhost API 版本时,请注意执行 .bind()/.connect() 的正确顺序,否则inproc:// 管道不适合您。

关于java - ZeroMQ 异步多线程与 ROUTER 和 DEALER,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49329294/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com