- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当服务器不可用时,很难正确关闭基于 pyzmq 的简单客户端。以下是 2 个片段。
首先是服务器。这或多或少是 pyzmq 的例子。这里没有特殊代码:
import zmq
import json
port = 5555
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))
while True:
message = socket.recv_json()
print(json.dumps(message))
socket.send_json({'response': 'Hello'})
接下来是客户端。
import zmq
ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)
if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
socket.send_json(message, flags=zmq.NOBLOCK)
if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
response = socket.recv_json()
print(response)
socket.disconnect(addr)
socket.close(linger=0)
context.term()
在这里,我尝试增强默认客户端,使其能够在服务器不可用时超时。下面的代码使用 poll 方法,尽管我也尝试过在套接字上设置接收超时。
如果服务器正在运行,客户端会发送和接收响应并干净地退出。
如果服务器未运行,客户端会立即执行第一个 socket.poll
调用(因为 zmq 只是在内部缓冲消息)。它在第二个 socket.poll
调用时阻塞 1 秒,并正确跳过 receive_json block 。然后它卡在 context.term()
调用上。我的理解是,根据搜索,如果有尚未关闭的套接字,则会挂起,但情况似乎并非如此。
非常感谢任何帮助。
最佳答案
超时是可能的,但这将不允许硬连线的REQ/REP
两步舞蹈生存,更不可能以正确的方式继续,如果一侧使d分布式F有限S状态A自动方案中的其他强制步骤超时(dFSA 无法采取单面快捷方式,它是双面 dFSA )。
If the server is not running, the client passes immediately through the first
socket.poll
call (sincezmq
just buffers the message internally). It blocks for 1 second on the secondsocket.poll
call and correctly skips therecv_json
block. It then hangs on thecontext.term()
call.
让我们逐步回顾一下代码
def Test( SetImmediate = False ):
##################################################################################
import zmq, json, time; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
##################################################################################
ip = 'localhost'; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
port = 5555; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
addr = "tcp://{0}:{1}".format( ip, port ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
message = { 'value': 10 }; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
##################################################################################
context = zmq.Context(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; aReqSock = context.socket( zmq.REQ ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
pass; aReqSock.setsockopt( zmq.LINGER, 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
if SetImmediate:
aReqSock.setsockopt( zmq.IMMEDIATE, 1 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; aReqSock.connect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
pass; aReqSock.send_json( message, flags = zmq.NOBLOCK ) # .send()-s dispatches message the REP-side may .recv() at some later time
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
response = aReqSock.recv_json() # .recv() BLOCKS until ... if ever ...
print( response ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
##################################################################################
rc = aReqSock.disconnect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = aReqSock.close( linger = 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = context.term(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
这会产生一些关于此的内容:
>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER: 4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... == 2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~ None | Success
和
>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER: 4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~ None | Success
这就证明了假设不正确:context.term()
没有问题,但是有办法,.connect如何( aTransportClass_Target )
针对不存在的目标正在内部处理。
令我惊讶的是,在测试版本 ( v4.2.5 ) 中,.poll( zmq.POLLOUT )
报告有 2
.POLLOUT
方向中的项目已经存在于用户报告的 TxQueue 状态中,而无需进行单个显式 .send()
(如.poll()
在 .connect()
之后立即启动。
在我看来,这与以前的版本有些不一致(就好像它会尝试报告与 .connect()
相关的“协议(protocol)/身份”-遥测,而不是仅报告用户-应用程序级消息)。
虽然我在试图找出一些基本原理时可能是错误的,为什么一个主要为空的队列会尝试报告一条消息已经在其 .POLLOUT
方向内,但我希望有足够的信息事实证明,该问题与 的
-实例。.LINGER == 0
/.term()
无关>Context()
Q.E.D.
关于python - PyZMQ req 套接字 - 卡在 context.term() 上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58663965/
我最近开始使用 ZeroMQ 并观察到 有时,ZeroMQ 上下文的初始化可能需要多达 40 秒 .通常它比这快得多。 例如: import zmq # the next line will so
我有一些代码正在监视其他一些正在更改的文件,我想做的是启动使用带有不同套接字的 Zeromq 的代码,我现在这样做的方式似乎会导致 libzmq 中的某个地方断言失败,因为我可能会重复使用同一个套接字
我正在使用 PyZMQ 创建请求/回复服务器,并且我试图弄清楚为每个新客户端连接创建线程的行为是否由 PyZMQ 自动处理。最终,我试图弄清楚来自一个客户端的需要很长时间才能回复的请求是否会阻止来自所
我有以下发布者代码,它实例化一些类实例并发布一些消息。 但是,我在订阅者端没有收到任何内容。 发布商 import zmq import time from multiprocessing impor
我一直在想办法在我的套接字尚未连接到绑定(bind)地址时如何关闭 Context 实例(或者如果我需要的话)。这是我的演示代码: import zmq import json data = {} d
有人能给我指出一个带有 Python 绑定(bind)的 REQ/REP 非阻塞 ZeroMQ (0MQ) 的例子吗?可能是我对 ZMQ 的理解有问题,但是我在网上找不到例子。 我在 Node.JS
大家好 StackOverflow 的好人。 我正在使用 pyzmq,我有一些长时间运行的进程,这导致我发现套接字句柄处于打开状态。我已将有问题的代码缩小为以下内容: import zmq uri =
我有以下代码使用请求和回复模式从服务器发送数组到客户端, def send_array( socket, A, flags = 0, copy = True, track = False ): ""
我已经连接了 zeromq,“msg_in”已经排队。如果在该时间段内没有新消息队列来设置超时。如何设置超时。以下是核心代码 requestDict = {"id":111, "name":"test
我目前正在研究一些用 C 编写的模拟代码,这些代码在不同的远程机器上运行。当 C 部分完成后,我想通过使用 python 模拟 api 和某种作业队列系统扩展它来简化我的工作,它应该执行以下操作: 1
我对 PyZMQ 中的 HWM 行为感到困惑。我希望以下代码会被阻止 失败示例 In [1]: import zmq In [2]: context = zmq.Context() In [3]: a
我有一个ROUTER,其目的是积累来自多个DEALER客户端的图像数据并对完整图像执行OCR。我发现处理 OCR 最有效的方法是使用 Python 的多处理库;累积的图像字节被放入到队列中,以便在单独
我正在尝试使用 pyzmq 的内置日志处理程序(参见 here)为我的分布式应用程序实现集中式日志记录服务器。虽然从架构的角度我可以理解 PUB/SUB 模式的使用,但我无法掌握这种系统的正确技术实现
我正在尝试与通过 ZeroMQ 套接字开始使用 multiprocessing.Process 的子进程进行通信。我知道存在与 multiprocessing 模块内的子进程通信的解决方案,但我希望最
我想限制我的 ZeroMQ 消息队列在 Python 应用程序中消耗的内存量。我知道设置高水位线会限制在发送方排队的数量,但是有没有办法控制在接收方排队的数量? Python ZeroMQ 绑定(bi
在 python 中寻找有效的 IPC 解决方案时,我偶然发现了 zeromq;我有几个 python 进程需要在主进程中对来自 dict 的数据进行一些 cpu 密集型处理。这些工作进程只能从字典中
这是我的第一篇 StackOverflow 帖子! 我在创建具有 pyzmq (v22.0.2) 依赖项的 pyinstaller(v4.2) 可执行文件时遇到问题。我通过运行“pyinstaller
根据 ZeroMQ 文档,一旦排队的消息数量达到高水位线,pub 套接字就会删除消息。 这在以下示例中似乎不起作用(是的,我确实在绑定(bind)/连接之前设置了 hwm): import time
我第一次使用 python 中的 asyncio 并尝试将其与 ZMQ 结合起来。 基本上我的问题是我有一个 REP/REQ 系统,位于 async def 中具有我需要等待的功能。值如何不更新。下面
ZMQ socket documentation 中的 ZMQ_PUSH 部分假设在没有下游节点的 PUSH 套接字上调用 send() 应该阻塞,直到至少有一个节点可用。 但是,运行以下代码似乎不会
我是一名优秀的程序员,十分优秀!