gpt4 book ai didi

python - 使用 0mq (ZeroMQ) 同步两个简单 python3 脚本时出现死锁

转载 作者:行者123 更新时间:2023-11-30 22:57:59 25 4
gpt4 key购买 nike

当我尝试使用 0mq (ZeroMQ) 同步两个 python3 脚本时,我遇到了这个奇怪的死锁。这些脚本在数千次迭代中运行良好,但迟早它们都会停止并互相等待。我在 Windows 7 上从不同的 CMD-Windows 运行这两个脚本。

我无法弄清楚
为什么可能出现这样的僵局
这里会出现什么问题?

脚本A:

while (1):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10001')
msg = socket.recv() # Waiting for script B to send done
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10002')
socket.send_string("done") # Tell script B we are done

脚本B

while (1):
# ............................................................................
# ... do something useful (takes only a few millisecs)
# ............................................................................
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10001')
socket.send_string("done") # Tell script A we are done

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10002')
msg = socket.recv() # Waiting for script A to send done

最佳答案

这不是死锁案例

当然,代码仍然需要一些小心。

消歧:您的场景不会进入资源相互锁定状态,即死锁。是的,当然,您的代码崩溃了,但很可能不是由于 REQ/REP DeadLock (它可能并且确实出现在有损网络 tcp: 传输类上) 。发布的代码崩溃是由于非托管资源处理,而不是由于达到死锁/活锁的相互阻塞状态。

<小时/>

如何修复它?

首先,我们假设您的超低延迟驱动系统不允许重复实例化任何内容。虽然也有异常(exception),但让我们专业一点。

  1. 您的.Context()资源设置(或从外部调用继承)移出循环

  2. 审查,您是否需要以及您的延迟限制是否允许您在每次循环运行中设置/拆除 .socket() 资源两次。

  3. 决定,一旦第一条消息在传输路径中丢失,您是否可以忍受真正的REQ/REP死锁

  4. 强制优雅的资源使用终止(.socket()-s、O/S port#s、.Context()-s ).不要让它们永远悬而未决,同时创建无限数量的其他系统,这会破坏任何“容错”系统。资源永远是无限的。

  5. 以非阻塞方式设计信令和传输行为。这使您可以检测和处理远程进程超时,并提供本地补救/响应操作的机会。

  6. 重新设计代码到您需要的安全代码级别(下面的示例在分布式处理框架中的软实时控制无限循环 24/7/365 中工作了几年使用远程键盘和一些其他本地和远程诊断工具)。

<小时/>

生产级代码缺少什么?

您的代码必须“预见”分布式系统的任何部分可能出现的问题。是的,这很难,但却是必要的。您的远程节点(通信对方)停止响应、丢失消息、重新启动、由于操作系统崩溃而停滞,等等(加上一些相当令人讨厌的惊讶,您只会在运行中发现......) 。这是这篇小文章中要介绍的另一个潘多拉魔盒,但这并不意味着没有必要。这是你的救生背心。

尽可能以非阻塞方式进行设计,这样您就可以保持对事件的控制......

无论如何,总是以优雅的方式释放系统资源和.term()所有ZeroMQ .Context()实例——“整洁” up”是一种公平的做法——无论是在现实生活中还是在代码帝国中。

# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL = "tcp://A.B.C.D:8889",
anExternalPREDICTOR = None,
anExternallyManagedZmqCONTEXT = None,
aSpreadMinSafetyMUL = 3.0,
aSilentMODE = True
):
try: # RESOURCES LAYER
# ... SETUP
# ------------------------------------------------- .Context()
# can setup a locally-managed context or re-use
# anExternallyManagedZmqCONTEXT obtained upon a func Call
aZmqCONTEXT = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )

# localhost:8887 [REP] ... remote [REQ] peer .connect() + .send()
aCtrlPORT_URL = "tcp://*:8887"

# localhost:8890 [PUB] ... remote [SUB] peers .connect() +
# .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
aSIGsPORT_URL = "tcp://*:8890"
aXmitPORT_URL = aTarget2Bind2_URL

aListOfSOCKETs = []

pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: XmitPORT
aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )

# XmitPORT
aXmitSOCKET.bind( aXmitPORT_URL )
aListOfSOCKETs.append( aXmitSOCKET )
except:
# EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT

msg = "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: CtrlPORT
# CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )

# CtrlPORT <-REQ/REP means a remote peer [REQ] has to
# .send()+.recv() before sending another CtrlCMD
aCtrlSOCKET.bind( aCtrlPORT_URL )
aListOfSOCKETs.append( aCtrlSOCKET )
except:
# EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT
msg = "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ
try: # -------------------------------------------------------------#
# try: SIGsPORT

# SIGsPORT [PUB] .send()s--> [SUB]s
aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB )

# SIGsPORT --> PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
aSIGsSOCKET.bind( aSIGsPORT_URL )
aListOfSOCKETs.append( aSIGsSOCKET )
except:
# EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
# and XmitPORT and SIGsPORT
msg = "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
pass # -------------------------------------------------------------# ZMQ

# vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
# ... SETUP YOUR APPLICATION CODE

try: # APP LAYER ___________________________________________
# what you want to do
# here you go ...

except: # APP LAYER ___________________________________________
# handle EXCs

finally: # APP LAYER ___________________________________________
# your own application post-mortem / pre-exit code

# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

except: # RESOURCES LAYER .............................................
# ... code shall handle it's own exceptions + externally caused events

finally: # RESOURCES LAYER .............................................
# ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )

[ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
[ allSOCKETs.close( ) for allSOCKETs in aListOfSOCKETs ]

# --------------------------------------------------------------#
# RESOURCES dismantled, may .term()

# .TERM(), NOP otherwise
if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ): #
aZmqCONTEXT.term() #
return

关于python - 使用 0mq (ZeroMQ) 同步两个简单 python3 脚本时出现死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36391495/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com