gpt4 book ai didi

Python异步队列未显示任何异常

转载 作者:行者123 更新时间:2023-12-04 11:23:51 24 4
gpt4 key购买 nike

  • 如果我运行此代码,它将挂起而不会抛出 ZeroDivisionError .
  • 如果我搬家 await asyncio.gather(*tasks, return_exceptions=True)以上 await queue.join() ,它最终会抛出ZeroDivisionError并停止。
  • 如果我然后注释掉 1 / 0并运行,它将执行所有内容,但最终会挂起。

  • 现在的问题是,我如何才能同时实现:
  • 能够像上面的情况 2 一样看到意外的异常,并且...
  • 当队列中的所有任务完成时实际停止

  • .
    import asyncio
    import random
    import time


    async def worker(name, queue):
    while True:
    print('Get a "work item" out of the queue.')
    sleep_for = await queue.get()

    print('Sleep for the "sleep_for" seconds.')
    await asyncio.sleep(sleep_for)

    # Error on purpose
    1 / 0

    print('Notify the queue that the "work item" has been processed.')
    queue.task_done()

    print(f'{name} has slept for {sleep_for:.2f} seconds')

    async def main():
    print('Create a queue that we will use to store our "workload".')
    queue = asyncio.Queue()

    print('Generate random timings and put them into the queue.')
    total_sleep_time = 0
    for _ in range(20):
    sleep_for = random.uniform(0.05, 1.0)
    total_sleep_time += sleep_for
    queue.put_nowait(sleep_for)

    print('Create three worker tasks to process the queue concurrently.')
    tasks = []
    for i in range(3):
    task = asyncio.create_task(worker(f'worker-{i}', queue))
    tasks.append(task)

    print('Wait until the queue is fully processed.')
    started_at = time.monotonic()

    print('Joining queue')
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print('Cancel our worker tasks.')
    for task in tasks:
    task.cancel()

    print('Wait until all worker tasks are cancelled.')
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

    asyncio.run(main())

    最佳答案

    有几种方法可以解决这个问题,但中心思想是,在 asyncio 中,与经典线程不同,一次等待多个事情很简单。

    例如,您可以等待 queue.join()和 worker 任务,以先完成者为准。由于工作人员没有正常完成(您稍后取消它们),因此工作人员完成意味着它已经提出。

    # convert queue.join() to a full-fledged task, so we can test
    # whether it's done
    queue_complete = asyncio.create_task(queue.join())

    # wait for the queue to complete or one of the workers to exit
    await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)

    if not queue_complete.done():
    # If the queue hasn't completed, it means one of the workers has
    # raised - find it and propagate the exception. You can also
    # use t.exception() to get the exception object. Canceling other
    # tasks is another possibility.
    for t in tasks:
    if t.done():
    t.result() # this will raise

    关于Python异步队列未显示任何异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60699058/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com