gpt4 book ai didi

python - PyZMQ req 套接字 - 卡在 context.term() 上

转载 作者:行者123 更新时间:2023-12-01 07:00:19 25 4
gpt4 key购买 nike

当服务器不可用时,很难正确关闭基于 pyzmq 的简单客户端。以下是 2 个片段。

首先是服务器。这或多或少是 pyzmq 的例子。这里没有特殊代码:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
message = socket.recv_json()
print(json.dumps(message))
socket.send_json({'response': 'Hello'})

接下来是客户端。

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
socket.send_json(message, flags=zmq.NOBLOCK)
if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
response = socket.recv_json()
print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

在这里,我尝试增强默认客户端,使其能够在服务器不可用时超时。下面的代码使用 poll 方法,尽管我也尝试过在套接字上设置接收超时。

如果服务器正在运行,客户端会发送和接收响应并干净地退出。

如果服务器未运行,客户端会立即执行第一个 socket.poll 调用(因为 zmq 只是在内部缓冲消息)。它在第二个 socket.poll 调用时阻塞 1 秒,并正确跳过 receive_json block 。然后它卡在 context.term() 调用上。我的理解是,根据搜索,如果有尚未关闭的套接字,则会挂起,但情况似乎并非如此。

非常感谢任何帮助。

最佳答案

关于“如果服务器不可用则能够超时

超时是可能的,但这将不允许硬连线的REQ/REP两步舞蹈生存,更不可能以正确的方式继续,如果一侧使d分布式F有限S状态A自动方案中的其他强制步骤超时(dFSA 无法采取单面快捷方式,它是双面 dFSA )。

<小时/>

假设:

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call.

验证:

让我们逐步回顾一下代码

def  Test( SetImmediate = False ):
##################################################################################
import zmq, json, time; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
##################################################################################
ip = 'localhost'; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
port = 5555; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
addr = "tcp://{0}:{1}".format( ip, port ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
message = { 'value': 10 }; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
##################################################################################
context = zmq.Context(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; aReqSock = context.socket( zmq.REQ ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
pass; aReqSock.setsockopt( zmq.LINGER, 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
pass; rc = aReqSock.getsockopt( zmq.LINGER ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
if SetImmediate:
aReqSock.setsockopt( zmq.IMMEDIATE, 1 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
pass; rc = aReqSock.getsockopt( zmq.IMMEDIATE ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
##################################################################################################################################################################################################################################
pass; aReqSock.connect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
pass; aReqSock.send_json( message, flags = zmq.NOBLOCK ) # .send()-s dispatches message the REP-side may .recv() at some later time
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; rc = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ), "|", zmq.strerror( zmq.zmq_errno() ) )
if 0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
response = aReqSock.recv_json() # .recv() BLOCKS until ... if ever ...
print( response ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ), "|", zmq.strerror( zmq.zmq_errno() ) )
pass; print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
##################################################################################
rc = aReqSock.disconnect( addr ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = aReqSock.close( linger = 0 ); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
rc = context.term(); print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ), rc, "|", zmq.strerror( zmq.zmq_errno() ) )
##################################################################################

这会产生一些关于此的内容:

>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER: 4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... == 2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~ None | Success

>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER: 4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~ None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~ None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~ None | Success

这就证明了假设不正确:context.term()没有问题,但是有办法,.connect如何( aTransportClass_Target ) 针对不存在的目标正在内部处理。

令我惊讶的是,在测试版本 ( v4.2.5 ) 中,.poll( zmq.POLLOUT ) 报告有 2 .POLLOUT 方向中的项目已经存在于用户报告的 TxQueue 状态中,而无需进行单个显式 .send() (如.poll().connect() 之后立即启动。

在我看来,这与以前的版本有些不一致(就好像它会尝试报告与 .connect() 相关的“协议(protocol)/身份”-遥测,而不是仅报告用户-应用程序级消息)。

虽然我在试图找出一些基本原理时可能是错误的,为什么一个主要为空的队列会尝试报告一条消息已经在其 .POLLOUT 方向内,但我希望有足够的信息事实证明,该问题与 .LINGER == 0/.term() 无关>Context()-实例。

Q.E.D.

关于python - PyZMQ req 套接字 - 卡在 context.term() 上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58663965/

25 4 0
文章推荐: java - 使用 java 流从包含 List 的 List 检索 List