gpt4 book ai didi

python - 使用 asyncio 创建两个并发的异步任务

转载 作者:行者123 更新时间:2023-11-28 22:16:22 25 4
gpt4 key购买 nike

我需要创建一个同时从网络套接字和管道接收消息的软件,它在另一个 channel 上发送消息(它从套接字接收消息,创建一个新线程并发送到管道。以同样的方式从管道,创建一个新线程并发送到套接字)。

我有一个多线程问题,在程序启动时我必须启动方法 socket_receiverpipe_receiver 但我只能启动 pipe_receiver。我尝试删除所有代码并仅保留 socket_receiverpipe_receiver 但它仅进入 pipe_receiverwhile True >.

import asyncio
import sys
import json
from concurrent.futures.thread import ThreadPoolExecutor
import websockets

# make the Pool of workers
executor = ThreadPoolExecutor(max_workers=10)
# Make connection to socket and pipe
header = {"Authorization": r"Basic XXXX="}
connection = websockets.connect('wss://XXXXXXXX', extra_headers=header)


async def socket_receiver():
"""Listening from web socket"""
async with connection as web_socket:
while True:
message = await web_socket.recv()
# send the message to the pipe in a new thread
executor.submit(send_to_pipe(message))


async def pipe_receiver():
"""Listening from pipe"""
while True:
message = sys.stdin.readline()
if not message:
break
executor.submit(send_to_socket(message))
# jsonValue = json.dump(str(line), file);
sys.stdout.flush()


def send_to_pipe(message):
# Check if message is CAM or DENM
json_message = json.loads(message)
type = int(json_message["header"]["messageID"])
# 1 is DENM message, 2 is CAM message
if type == 1 or type == 2:
# send the message to the pipe
sys.stdout.print(json_message);


async def send_to_socket(message):
async with connection as web_socket:
json_message = json.dumps(message)
await web_socket.send(json_message)


asyncio.get_event_loop().run_until_complete(
asyncio.gather(socket_receiver(),pipe_receiver()))

此程序由子进程调用,父进程通过连接到 stdout 和 stdin 的管道与其通信。

更新:我通过@Martijn Pieters 代码收到此异常

Traceback (most recent call last):
File "X", line 121, in <module>
main()
File "X", line 119, in main
loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete
return future.result()
File "X", line 92, in connect_pipe
reader, writer = await stdio()
File "X", line 53, in stdio
lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe
transport = self._make_read_pipe_transport(pipe, protocol, waiter)
File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport
raise NotImplementedError
NotImplementedError

最佳答案

您没有正确使用 ThreadPoolExecutor,您真的不想在这里使用它。相反,您需要设置消费者和生产者来处理您的套接字和管道,并使用队列在它们之间发送消息。

  • 对于每种连接类型,创建一个创建连接的协程,然后将该单个连接传递给该连接的消费者和生产者任务(使用 asyncio.create_task() 创建)。使用 asyncio.wait()使用 return_when=asyncio.FIRST_COMPLETED 运行这两个任务,这样当两个任务之一“提前”完成(例如失败)时,您可以取消任何仍在运行的任务。

  • 使用 queue将消息从一个连接的消费者传递到另一个连接的生产者。

  • sys.stdinsys.stdout阻塞流,不要只是读写它们!参见 https://gist.github.com/nathan-hoad/8966377对于尝试设置非阻塞 STDIO 流的要点,以及 this asyncio issue要求非阻塞流功能。

  • 不要使用全局套接字连接,当然也不要使用两个单独的 async with 语句。您的 send_to_socket() 方法实际上会关闭 套接字,因为 async with connection as web_socket: 上下文管理器在发送第一条消息时退出,并且这会导致 socket_receiver 代码出现问题,该代码假定套接字无限期保持打开状态。

  • 不要在这里使用线程!您的连接完全由 asyncio 管理,线程将主要影响这一点。

  • asyncio.Executor() instances应该只与常规的可调用对象一起使用,与协程一起使用。 Executor.submit()声明它需要一个可调用对象,通过 executor.submit(send_to_pipe(message))executor.submit(send_to_socket(message)) 传入协程将导致异常提出作为协同程序不是可调用的。您可能没有看到异常消息,因为该异常是在另一个线程中引发的。

    这就是您的 socket_receiver() 协程失败的原因;它当然开始但尝试发送消息失败。当我针对本地模拟的 websocket 服务器运行您的代码时,会打印一条警告:

    RuntimeWarning: coroutine 'send_to_socket' was never awaited
    executor.submit(send_to_socket(message))

    当协程未被等待时,协程中的代码永远不会被执行。将协程包装在一个打印出 stderr 异常的协程中(try: callable(), except Exception: traceback.print_exc(file=sys.stderr)))你得到:

    Traceback (most recent call last):
    File "soq52219672.py", line 15, in log_exception
    callable()
    TypeError: 'coroutine' object is not callable

Executors 应该只用于集成不能转换为使用协程的代码;执行器管理该代码以与 asyncio 任务并行运行而不受干扰。如果该代码想与 asyncio 任务交互,请务必小心,始终使用 asyncio.run_coroutine_threadsafe()asyncio.call_soon_threadsafe()跨界调用。查看Concurrency and multithreading section .

这是我如何重写您的代码以使用消费者/生产者模式的示例,使用基于 Nathan Hoad gist on the subjectstdio() ,加上 Windows 的回退,其中 support for treating stdio as pipes is limited :

import asyncio
import json
import os
import sys

import websockets

async def socket_consumer(socket, outgoing):
# take messages from the web socket and push them into the queue
async for message in socket:
await outgoing.put(message)

async def socket_producer(socket, incoming):
# take messages from the queue and send them to the socket
while True:
message = await incoming.get()
jsonmessage = json.dumps(message)
await socket.send(jsonmessage)

async def connect_socket(incoming, outgoing):
header = {"Authorization": r"Basic XXXX="}
uri = 'wss://XXXXXXXX'
async with websockets.connect(uri, extra_headers=header) as websocket:
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
producer_task = asyncio.create_task(socket_producer(websocket, incoming))

# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()


# pipe support
async def stdio(loop=None):
if loop is None:
loop = asyncio.get_running_loop()

if sys.platform == 'win32':
# no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
# use an executor to read from stdio and write to stdout
class Win32StdinReader:
def __init__(self):
self.stdin = sys.stdin.buffer
async def readline():
# a single call to sys.stdin.readline() is thread-safe
return await loop.run_in_executor(None, self.stdin.readline)

class Win32StdoutWriter:
def __init__(self):
self.buffer = []
self.stdout = sys.stdout.buffer
def write(self, data):
self.buffer.append(data)
async def drain(self):
data, self.buffer = self.buffer, []
# a single call to sys.stdout.writelines() is thread-safe
return await loop.run_in_executor(None, sys.stdout.writelines, data)

return Win32StdinReader(), Win32StdoutWriter()

reader = asyncio.StreamReader()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
sys.stdin
)

writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(sys.stdout.fileno(), 'wb')
)
writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)

return reader, writer

async def pipe_consumer(pipereader, outgoing):
# take messages from the pipe and push them into the queue
while True:
message = await pipereader.readline()
if not message:
break
await outgoing.put(message.decode('utf8'))

async def pipe_producer(pipewriter, incoming):
# take messages from the queue and send them to the pipe
while True:
jsonmessage = await incoming.get()
message = json.loads(jsonmessage)
type = int(message.get('header', {}).get('messageID', -1))
# 1 is DENM message, 2 is CAM message
if type in {1, 2}:
pipewriter.write(jsonmessage.encode('utf8') + b'\n')
await pipewriter.drain()

async def connect_pipe(incoming, outgoing):
reader, writer = await stdio()
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
producer_task = asyncio.create_task(pipe_producer(writer, incoming))

# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()

async def main():
pipe_to_socket = asyncio.Queue()
socket_to_pipe = asyncio.Queue()

socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)

await asyncio.gather(socket_coro, pipe_coro)

if __name__ == '__main__':
asyncio.run(main())

这从两个任务开始,一个是管理套接字,另一个是管理 STDIO 管道。两者都为他们的消费者和生产者分别启动了 2 个以上的任务。有两个队列将消息从一个的消费者发送到另一个的生产者。

关于python - 使用 asyncio 创建两个并发的异步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52219672/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com