gpt4 book ai didi

multithreading - 在后台线程中限制同时运行带有信号量的异步协程

转载 作者:行者123 更新时间:2023-12-04 08:25:56 25 4
gpt4 key购买 nike

作为对 Python 的新 asyncio 模块的实验,我创建了以下代码片段来在后台工作程序中处理一组长时间运行的操作(作业)。

为了控制同时运行的作业的数量,我在 with block 中引入了一个信号量(第 56 行)。但是,有了信号量,似乎永远不会释放获取的锁,因为完成后(执行回调)等待的作业不会启动。当我放弃 with block 时,一切都按预期工作。

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)

def stop(self):
self._keep_running = False

def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())

def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))

@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx

def process(self):
background_worker.submit_coro(self._process, self._process_callback)

@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

def _process_callback(self, future):
print("callback %d triggered" % self._idx)


def main():
print("starting worker...")
background_worker.start()

for idx in range(10):
download_job = Job(idx)
download_job.process()

command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")

print("stopping...")
background_worker.stop()
background_worker.join()


if __name__ == '__main__':
main()

任何人都可以帮助我阐明这种行为吗?为什么清除 with block 后信号量不递增?

最佳答案

我发现了这个错误:信号量是用来自主线程的隐式事件循环初始化的,而不是在使用 run() 启动线程时显式设置的。

固定版本:

import asyncio

from queue import Queue, Empty
from datetime import datetime
from threading import Thread


class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set.

def stop(self):
self._keep_running = False

def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
self._loop.run_until_complete(self.process_coros())

def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))

@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run


background_worker = BackgroundWorker()


class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx

def process(self):
background_worker.submit_coro(self._process, self._process_callback)

@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))

def _process_callback(self, future):
print("callback %d triggered" % self._idx)


def main():
print("starting worker...")
background_worker.start()

for idx in range(10):
download_job = Job(idx)
download_job.process()

command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")

print("stopping...")
background_worker.stop()
background_worker.join()


if __name__ == '__main__':
main()

关于multithreading - 在后台线程中限制同时运行带有信号量的异步协程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28436509/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com