gpt4 book ai didi

python - Gevent.monkey.patch_all 破坏依赖于 socket.shutdown() 的代码

转载 作者:太空宇宙 更新时间:2023-11-03 11:52:31 24 4
gpt4 key购买 nike

我目前正在努力将对 gevent-socketio 的支持添加到现有的 django 项目中。我发现 gevent.monkey.patch_all() 调用正在破坏负责从套接字接收数据的线程的取消机制,我们现在将调用 SocketReadThread 类。

SocketReadThread 非常简单,它在阻塞套接字上调用 recv()。当它接收到数据时,它会对其进行处理并再次调用 recv()。当发生异常或 recv() 返回 0 字节时线程停止,如在 SocketReadThread.stop_reading() 中调用 socket.shutdown(SHUT_RDWR)

当 gevent.monkey.patch_all() 替换默认套接字实现时会出现问题。我没有很好地关闭,而是出现以下异常:

error: [Errno 9] File descriptor was closed in another greenlet

I'm assuming this is occurring because gevent makes my socket non-blocking in order to work its magic. This means that when I call socket.shutdown(socket.SHUT_RDWR) the greenlet that was doing the work for the monkey patched socket.recv call tried to read from the closed file descriptor.

I coded an example to isolate this issue:

from gevent import monkey

monkey.patch_all()

import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
def __init__(self, socket):
super(SocketReadThread, self).__init__()
self._socket = socket

def run(self):
connected = True
while connected:
try:
print "calling socket.recv"
data = self._socket.recv(1024)
if (len(data) < 1):
print "received nothing, assuming socket shutdown"
connected = False
else :
print "Recieved something: {}".format(data)
except socket.timeout as e:
print "Socket timeout: {}".format(e)
connected = false
except :
ex = sys.exc_info()[1]
print "Unexpected exception occurrred: {}".format(str(ex))
raise ex

def stop_reading(self):
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()


if __name__ == '__main__':

sock = socket.socket()
sock.connect(('127.0.0.1', 4242))

st = SocketReadThread(sock)
st.start()
time.sleep(3)
st.stop_reading()
st.join()

如果您打开一个终端并运行 nc -lp 4242 &(为该程序提供连接对象)然后运行该程序,您将看到上述异常。如果你删除对 monkey.patch_all() 的调用,你会发现它工作得很好。

我的问题是:如何支持取消 SocketReadThread 以一种可以使用或不使用 gevent monkey patching 的方式,并且不需要使用会使取消变慢的任意超时(即超时调用 recv() 并检查条件)?

最佳答案

我发现对此有两种不同的解决方法。第一种是简单地捕获并抑制异常。这似乎工作正常,因为一个线程关闭套接字以使另一个线程退出阻塞读取是常见的做法。我不知道或不明白为什么 greenlets 除了调试辅助工具会提示这个。这真的只是一个烦恼。

第二种选择是使用自管道技巧(快速搜索会产生许多解释)作为唤醒阻塞线程的机制。本质上,我们创建了第二个文件描述符(套接字就像操作系统的一种文件描述符)用于信号取消。然后我们使用 select 作为我们的阻塞来等待套接字上的传入数据或取消请求进入取消文件描述符。请参阅下面的示例代码。

from gevent import monkey

monkey.patch_all()

import os
import select
import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
def __init__(self, socket):
super(SocketReadThread, self).__init__()
self._socket = socket
self._socket.setblocking(0)
r, w = os.pipe()
self._cancelpipe_r = os.fdopen(r, 'r')
self._cancelpipe_w = os.fdopen(w, 'w')

def run(self):
connected = True
read_fds = [self._socket, self._cancelpipe_r]
while connected:
print "Calling select"
read_list, write_list, x_list = select.select(read_fds, [], [])
print "Select returned"
if self._cancelpipe_r in read_list :
print "exiting"
self._cleanup()
connected = False
elif self._socket in read_list:
print "calling socket.recv"
data = self._socket.recv(1024)
if (len(data) < 1):
print "received nothing, assuming socket shutdown"
connected = False
self._cleanup()
else :
print "Recieved something: {}".format(data)


def stop_reading(self):
print "writing to pipe"
self._cancelpipe_w.write("\n")
self._cancelpipe_w.flush()
print "joining"
self.join()
print "joined"

def _cleanup(self):
self._cancelpipe_r.close()
self._cancelpipe_w.close()
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()


if __name__ == '__main__':

sock = socket.socket()
sock.connect(('127.0.0.1', 4242))

st = SocketReadThread(sock)
st.start()
time.sleep(3)
st.stop_reading()

同样,在运行上面的程序之前运行 netcat -lp 4242 & 给它一个监听套接字来连接。

关于python - Gevent.monkey.patch_all 破坏依赖于 socket.shutdown() 的代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22442100/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com