gpt4 book ai didi

python - 3.4.2 中的 asyncio 问题 - 它只是由于某种原因终止

转载 作者:太空狗 更新时间:2023-10-30 02:44:31 26 4
gpt4 key购买 nike

python 的新手,花了很多时间阅读文档和其他代码,我似乎无法获得新的 asyncio Python 3 中的模块。它在没有堆栈跟踪的情况下不断终止给我一个线索,应该永远运行但没有。

我试图模仿的基本流程概念如下:

从端口读取:打开端口 -> 读取数据(可变长度) -> 放入queue1

然后处理数据:从队列 1 获取数据 -> 条件适用 -> 结果放在队列 2

然后写入端口:从队列2获取数据并写入端口

永远从顶部循环

注意:输入端口上的数据是零星的、可变长度的,并且几个 block 可能不按“顺序”到达,因此我使用 asyncio。我了解 asyncio 将允许 block 到达的情况,然后在我的应用程序响应之前另一个 block 到达 - 即调用 get_io_from_port() 促进协同例程的多次执行。这就是为什么我使用队列来确保 process_queue()

的非阻塞

到目前为止我的玩具示例代码:

import queue
import asyncio

@asyncio.coroutine
def process_queue(q1, q2):
tmp = q1.Get()
if tmp == 'ABCDEF':
q2.put('12345')
elif tmp == 'GHIJKL':
q2.put =('67890')
else:
print('There is a data error')

@asyncio.coroutine
def put_io_to_port(writer, q2):

if not q2.empty():
try:
writer.write(q2.get())

except IOError as e:
print('OUT Port issue: ', e)

@asyncio.coroutine
def get_io_from_port(reader, q1):

try:
data_i = yield from reader.read(1200)
q1.put(data_i)

except IOError as e:
print('IN Port issue: ', e)

def main():

q1 = queue()
q2 = queue()

loop = asyncio.get_event_loop() # main loop declaration
reader, writer = yield from asyncio.open_connection('192.168.1.103', 5555)
# high-level call open streams - read and write

print('Start')
tasks = [
asyncio.async(get_io_from_port(reader,q1)),
asyncio.async(process_queue(q1, q2)),
asyncio.async(put_io_to_port(writer, q2)),] # do these tasks - in this order

loop.run_forever(tasks) # loop through on main loop forever
loop.close()

if __name__ == '__main__':
main()

此外,顺便说一句 - 如何调试此代码 - 即跟踪?可以建议哪些技术?我正在使用 Eclipse 和 PyDev,但无济于事。

最佳答案

你在这里犯了几个错误。首先,您将 main 视为一个普通函数,但您在其中放置了一个 yield from 调用,它会自动将其转换为一个生成器。这意味着当你这样做时

if __name__ == "__main__":
main()

main 并没有真正执行;对 main() 的调用只会创建一个生成器对象,该对象会立即被丢弃(因为您没有将其分配给变量)。这就是为什么您很难调试 - main 中的代码甚至都没有执行。您应该将 main 转换为协程并使用 loop.run_until_complete 调用它。

接下来,您将尝试使用 queue 模块,该模块不是为在单线程异步程序中使用而设计的。一旦你调用 queue.get(),它就会阻塞你的主线程,这意味着你的 asyncio 事件循环将被阻塞,这意味着你的整个程序将被陷入僵局。你应该使用协程安全 asyncio.Queue相反。

您在 put_io_to_port 中也有竞争条件。如果 q2 不为空,您只是尝试从中消费,但 put_io_to_port 可能会在 process_queue 有机会执行之前执行运行并填充队列。如果您只是从 put_io_to_port 中完全删除 if not q2.empty() 检查,看起来您会没事的。

最后,您使用 asyncio.async 将协程添加到事件循环中,这很好。但是您有一条评论说 # do these tasks, in this order,但这不是程序在 asyncio.async 中的行为方式。它只是将所有协程添加到事件循环中,它们将全部并行运行。如果您真的希望它们按顺序运行,您应该这样做:

yield from get_io_from_port(reader,q1)
yield from process_queue(q1, q2)
yield from put_io_to_port(writer, q2)

但这里真的没有必要。您可以同时运行所有这些并获得正确的行为;如果一个协程先于另一个协程执行,它只会等到它所依赖的协程将它需要的数据传递给它,然后再继续执行。

您还有一些拼写错误(q1.Get()q2.put =(...) 等)。

所以,将所有这些修复放在一起,你会得到:

import queue
import asyncio

@asyncio.coroutine
def process_queue(q1, q2):
while True:
tmp = yield from q1.get()
if tmp == 'ABCDEF':
yield from q2.put('12345')
elif tmp == 'GHIJKL':
yield from q2.put('67890')
else:
print('There is a data error')

@asyncio.coroutine
def put_io_to_port(writer, q2):
while True:
try:
data = yield from q2.get()
writer.write(data)
except IOError as e:
print('OUT Port issue: ', e)

@asyncio.coroutine
def get_io_from_port(reader, q1):
while True:
try:
data_i = yield from reader.read(1200)
yield from q1.put(data_i)
except IOError as e:
print('IN Port issue: ', e)

@asyncio.coroutine
def main():
q1 = asyncio.Queue()
q2 = asyncio.Queue()

reader, writer = yield from asyncio.open_connection('192.168.1.103', 5555)
# high-level call open streams - read and write

print('Start')
tasks = [
asyncio.async(get_io_from_port(reader,q1)),
asyncio.async(process_queue(q1, q2)),
asyncio.async(put_io_to_port(writer, q2)),]


if __name__ == '__main__':
loop = asyncio.get_event_loop() # main loop declaration
loop.run_until_complete(main())

关于python - 3.4.2 中的 asyncio 问题 - 它只是由于某种原因终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29098955/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com