gpt4 book ai didi

python - 将来在不同的时间启动(非常短的) Action 时如何避免启动数百个线程

转载 作者:行者123 更新时间:2023-12-04 12:19:08 24 4
gpt4 key购买 nike

我用这种方法发起了几十(不到一千)次 do_it 的调用。在 future 的不同时间:

import threading
timers = []
while True:
for i in range(20):
t = threading.Timer(i * 0.010, do_it, [i]) # I pass the parameter i to function do_it
t.start()
timers.append(t) # so that they can be cancelled if needed
wait_for_something_else() # this can last from 5 ms to 20 seconds
每个 do_it的运行时间调用非常快(远小于 0.1 毫秒)且无阻塞。我想避免 产生数百个新线程 对于这么简单的任务。
我怎么能用 做到这一点只有一个附加线程 所有 do_it电话?
有没有一种简单的方法可以用 Python 做到这一点,没有第三方库,只有标准库?

最佳答案

据我了解,您需要一个单独的工作线程来处理提交的任务,而不是按照它们提交的顺序,而是按照某种优先顺序。这似乎是线程安全的工作 queue.PriorityQueue .

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
while True:
do_it(q.get().item)
q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
for i in range(20):
q.put(PrioritizedItem(priority=i * 0.010, item=i))
wait_for_something_else()
此代码假定您想永远运行。如果没有,您可以向 q.get 添加超时在 thread_worker , 并返回时 queue.Empty由于超时到期而引发异常。这样,您就可以在处理完所有作业并且超时到期后加入队列/线程。
如果您想等到将来某个特定时间运行任务,它会变得更复杂一些。这是一种扩展上述方法的方法,它通过在工作线程中休眠直到指定的时间到来,但请注意 time.sleep is only as accurate as your OS allows it to be .
from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
when: datetime
item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
while True:
when, item = astuple(q.get())
sleep_time = (when - datetime.now()).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
do_it(item)
q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
now = datetime.now()
for i in range(20):
q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
wait_for_something_else()
为了仅使用一个额外的线程来解决这个问题,我们必须在该线程中休眠,因此在工作线程休眠时,具有更高优先级的新任务可能会进入。在这种情况下,工作人员将在完成当前任务后处理新的高优先级任务。上面的代码假设不会发生这种情况,根据问题描述这似乎是合理的。如果可能发生这种情况,您可以更改 sleep 代码以重复轮询优先级队列前面的任务是否到期。像这样的轮询方法的缺点是它会更加占用 CPU。
另外,如果你能保证任务的相对顺序在提交给工作线程后不会改变,那么你可以用常规的 queue.Queue 替换优先级队列。稍微简化代码。
这些 do_it可以通过从队列中删除任务来取消任务。
上面的代码使用以下模拟定义进行了测试:
def do_it(x):
print(x)

def wait_for_something_else():
sleep(5)

正如 smcjones 所指出的,另一种不使用额外线程的方法是使用 asyncio。这是使用 asyncio 调用 do_it 的方法在 future 的特定时间使用 loop.call_later :
import asyncio


def do_it(x):
print(x)


async def wait_for_something_else():
await asyncio.sleep(5)


async def main():
loop = asyncio.get_event_loop()
while True:
for i in range(20):
loop.call_later(i * 0.010, do_it, i)
await wait_for_something_else()

asyncio.run(main())
这些 do_it可以使用 loop.call_later 返回的句柄取消任务.
但是,这种方法需要切换程序以始终使用 asyncio,或者在单独的线程中运行 asyncio 事件循环。

关于python - 将来在不同的时间启动(非常短的) Action 时如何避免启动数百个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69224969/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com