gpt4 book ai didi

python - Python 线程定时器如何在内部工作?

转载 作者:太空宇宙 更新时间:2023-11-04 08:57:03 26 4
gpt4 key购买 nike

我想知道 python threading.Timer 是如何工作的。

更详细地说,当我运行几个 threading.Timer 时,它是否运行单独的线程来计算时间和运行处理程序?

或者一个线程一起管理和计数几个定时器?

我问是因为我的应用程序需要安排很多事件,但是

如果 threading.Timer 单独运行每个线程来计算一个计时器,并且我运行许多计时器,它可能会影响性能。

所以我担心如果我必须实现一个只运行一个线程的调度程序,如果它对性能有很大影响的话。

最佳答案

threading.Timer 类是 threading.Thread 的子类,基本上它只是运行一个单独的线程,在该线程中休眠指定的时间并运行相应的功能。

这绝对不是安排事件的有效方式。更好的方法是使用 Queue.PriorityQueue 在单个线程中进行调度,您可以在其中将事件放在“优先级”实际上意味着“下一个触发日期”的地方。类似于 cron 的工作方式。

或者甚至更好:使用已经存在的东西,不要重新发明轮子:Cron、Celery 等等......

通过 Queue.PriorityQueue 制作调度器的一个非常简单的例子:

import time
from Queue import PriorityQueue

class Task(object):
def __init__(self, fn, crontab):
# TODO: it should be possible to pass args, kwargs
# so that fn can be called with fn(*args, **kwargs)
self.fn = fn
self.crontab = crontab

def get_next_fire_date(self):
# TODO: evaluate next fire date based on self.crontab
pass

class Scheduler(object):
def __init__(self):
self.event_queue = PriorityQueue()
self.new_task = False

def schedule_task(self, fn, crontab):
# TODO: add scheduling language, crontab or something
task = Task(fn, crontab)
next_fire = task.get_next_fire_date()
if next_fire:
self.new_task = True
self.event_queue.put((next_fire, task))

def run(self):
self.new_task = False

# TODO: do we really want an infinite loop?
while True:
# TODO: actually we want .get() with timeout and to handle
# the case when the queue is empty
next_fire, task = self.event_queue.get()

# incremental sleep so that we can check
# if new tasks arrived in the meantime
sleep_for = int(next_fire - time.time())
for _ in xrange(sleep_for):
time.sleep(1)
if self.new_task:
self.new_task = False
self.event_queue.put((next_fire, task))
continue

# TODO: run in separate thread?
task.fn()

time.sleep(1)
next_fire = task.get_next_fire_date()

if next_fire:
event_queue.put((next_fire, task))

def test():
return 'hello world'

sch = Scheduler()
sch.schedule_task(test, '5 * * * *')
sch.schedule_task(test, '0 22 * * 1-5')
sch.schedule_task(test, '1 1 * * *')
sch.run()

这只是一个想法。您必须正确实现 TaskScheduler 类,即 get_next_fire_date 方法加上某种调度语言(crontab?)和错误处理。我仍然强烈建议使用现有的库之一。

关于python - Python 线程定时器如何在内部工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29209350/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com