gpt4 book ai didi

python - Asyncio 检测断开挂起

转载 作者:太空狗 更新时间:2023-10-30 00:55:35 24 4
gpt4 key购买 nike

我在 Python 3.4 中使用 Asyncio,我将尝试解释我在这一点上所做的事情以及我(认为)导致问题的原因。

一方面,我有一个带有阻塞操作的 UDP 连接框架,我从这个流中获取数据并创建我以 SSE 格式传递给客户端的 json。这一切都很好。

我遇到的问题是,如果我不做任何事情并且客户端断开连接,我将无法让它正确处理客户端断开连接我将开始收到此错误:

WARNING [selector_events:613] socket.send() raised exception.

由于循环仍在运行,我一直在寻找彻底中断循环并触发 .close() 的方法,但我遇到了我发现的示例的问题,而且在线资源不多。

一个似乎实际有效的例子是尝试从客户端读取一行,如果它是一个空字符串,则意味着客户端已断开连接。

    while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break

然而,在大约十条消息之后,发送给客户端的所有消息都停止了,我认为这是因为它在挂起后卡在“data = (yield from client_reader.readline())”上,如果我关闭客户端然后它会正确关闭并且“结束连接”确实被调用。任何想法为什么它可能会挂起?我认为此时我对 Asyncio 有很好的处理,但这一个让我感到困惑。

注意:location() 和 status() 是我从 UDP 套接字获取信息的两个调用 - 我已经使用相同的代码成功运行了很多小时而没有问题 - 减去客户端断开连接线。

clients = {}

def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)


def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)

@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
yield from postmessage(data,client_writer)
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
data = yield from asyncio.wait_for(location(),
timeout=1.0)
yield from postmessage(data,client_writer)

data = yield from asyncio.wait_for(status(),
timeout=1.0)
yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()

更新:如果我在“yield from client_reader”上添加超时,当它达到通常会挂起的程度时,我会收到以下错误。

2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = next(coro)
File "try.py", line 35, in handle_client
timeout=1.0))
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

这是一个显示该错误的示例脚本 - 只需在 python 3.4.2 中运行它,在 9 次迭代后,它会在从客户端读取时挂起。

(脚本完成,您可以运行它自己看看)

import asyncio
import logging

import json
import time

log = logging.getLogger(__name__)

clients = {}

def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)

def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")

log.info("New Connection")
task.add_done_callback(client_done)


@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
if not data: #client disconnected
break

data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)

data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)

@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()

@asyncio.coroutine
def test1():
data = {'result':{
'test1':{ }
}
}
data = json.dumps(data)
return data

@asyncio.coroutine
def test2():
data = {'result':{
'test2':{ }
}
}
data = json.dumps(data)
return data

def main():
loop = asyncio.get_event_loop()
f = asyncio.start_server(accept_client, host=None, port=2991)
loop.run_until_complete(f)
loop.run_forever()

if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " +
"[%(module)s:%(lineno)d] %(message)s")
# log the things
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

ch.setFormatter(formatter)
log.addHandler(ch)
main()

另一个更新:我发现它死了,因为它从客户端的标题中读取所有行,然后在用完行时超时。我认为,我正在寻找的真正答案是当您实际上不需要从客户端接收数据(初始连接除外)时如何检测客户端断开连接。

最佳答案

好的,我想我明白你的问题了。您正在从客户端读取只是为了查明客户端是否已断开连接,但是一旦客户端发送了其 header , readline() 将在客户端仍处于连接状态时无限期阻塞,这会阻止您从实际做任何工作。使用超时来避免阻塞很好,您只需要处理 TimeoutError,因为当它发生时您可以假设客户端没有断开连接:

from concurrent.futures import TimeoutError

@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
try:
# See if client has disconnected.
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=0.01))
if not data: # Client disconnected
break
except TimeoutError:
pass # Client hasn't disconnected.

data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)

data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)

请注意,我在这里将超时设置得非常短,因为我们真的根本不想阻塞,我们只想知道连接是否已关闭。

但更好的解决方案是不显式检查连接是否已关闭,而是处理在连接已关闭时尝试通过套接字发送数据时出现的任何异常:

@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
try:
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)

data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
except ConnectionResetError: # And/or whatever other exceptions you see.
break

关于python - Asyncio 检测断开挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26964923/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com