gpt4 book ai didi

python - 制作计时器 : timeout inaccuracy of threading. Event.wait - Python 3.6

转载 作者:行者123 更新时间:2023-12-03 12:44:51 28 4
gpt4 key购买 nike

首先,我是 Python 新手,不熟悉它的功能。我一直主要使用MATLAB。

PC 简要规范:Windows 10、Intel i7

我正在尝试制作一个计时器类,用于定期执行 MATLAB 等函数,这显然是从 Java 计时器借来的。 MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下都超过 2 毫秒。事实上,它对我的​​项目来说已经足够准确了。

最近,由于MATLAB的并行计算和网络访问功能较差,我打算转向Python。然而,不幸的是,与我必须制作自己的计时器类的 MATLAB 相比,Python 的标准包提供了某种程度较低的计时器 (threading.Timer)。首先,我提到了 QnA Executing periodic actions in Python [duplicate] . Michael Anderson 提出的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持时间。该方法非常准确,有时比 MATLAB 计时器显示出更好的准确性。大约0.5 毫秒分辨率。但是,定时器在time.sleep() 中被捕获期间不能被中断(暂停或恢复)。但有时我必须立即停止,无论它是否处于 sleep() 状态。

我发现的问题的解决方案是利用线程包中的 Event 类。引用 Python threading.timer - repeat function every 'n' seconds .使用 Event.wait() 的超时功能,我可以在执行之间设置时间间隔,并用于保持时间段。也就是说,事件通常会被清除,因此 wait(timeout) 可以像 time.sleep(interval) 一样,我可以在需要时通过设置 event 立即退出 wait()。

一切似乎都很好,但 Event.wait() 中存在一个严重问题。时间延迟在 1 ~ 15 ms 之间变化太大。我认为它来自 Event.wait() 的开销。

我制作了一个示例代码,显示了 time.sleep() 和 Event.wait() 之间的准确性比较。这总计 1000 次迭代 1 ms sleep() 和 wait() 以查看累积时间错误。预期结果约为 1.000。

import time
from threading import Event

time.sleep(3) # to relax

# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
time.sleep(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

time.sleep(3) # to relax

# Event.wait()
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
event.wait(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

结果:
1.1379848184879964
15.614547161211096

结果表明 time.sleep() 在准确性上要好得多。但是我不能完全依赖前面提到的 time.sleep()。

总之,
  • time.sleep():准确但不可中断
  • threading.Event.wait():不准确但可中断

  • 我目前正在考虑一个折衷方案:就像在示例中一样,创建一个微小的 time.sleep() 循环(间隔为 0.5 毫秒)并使用 if 语句退出循环并在需要时中断。据我所知,该方法在 Python 2.x Python time.sleep() vs event.wait() 中使用.

    这是一个冗长的介绍,但我的问题可以总结如下。
  • 我可以通过外部信号或事件强制线程进程从 time.sleep() 中断吗? (这似乎是最有效的。???)
  • 使 Event.wait() 更准确或减少开销时间。
  • 除了 sleep() 和 Event.wait() 方法之外,是否还有更好的方法来提高计时精度。

  • 非常感谢。

    最佳答案

    我遇到了与 Event.wait() 相同的计时问题.我想出的解决方案是创建一个模仿 threading.Event 的类。 .在内部,它使用了 time.sleep() 的组合。循环和繁忙循环可大大提高精度。 sleep 循环在单独的线程中运行,以便阻塞 wait()主线程中的调用仍然可以立即中断。当set()方法被调用, sleep 线程应该在不久之后终止。此外,为了最大限度地减少 CPU 使用率,我确保繁忙循环的运行时间不会超过 3 毫秒。
    这是我的自定义 Event类以及最后的计时演示(演示的打印执行时间将以纳秒为单位):

    import time
    import _thread
    import datetime


    class Event:
    __slots__ = (
    "_flag", "_lock", "_nl",
    "_pc", "_waiters"
    )

    _lock_type = _thread.LockType
    _timedelta = datetime.timedelta
    _perf_counter = time.perf_counter
    _new_lock = _thread.allocate_lock

    class _switch:
    __slots__ = ("_on",)

    def __call__(self, on: bool = None):
    if on is None:
    return self._on

    self._on = on

    def __bool__(self):
    return self._on

    def __init__(self):
    self._on = False

    def clear(self):
    with self._lock:
    self._flag(False)

    def is_set(self) -> bool:
    return self._flag()

    def set(self):
    with self._lock:
    self._flag(True)
    waiters = self._waiters

    for waiter in waiters:
    waiter.release()

    waiters.clear()

    def wait(
    self,
    timeout: float = None
    ) -> bool:
    with self._lock:
    return self._wait(self._pc(), timeout)

    def _new_waiter(self) -> _lock_type:
    waiter = self._nl()
    waiter.acquire()
    self._waiters.append(waiter)
    return waiter

    def _wait(
    self,
    start: float,
    timeout: float,
    td=_timedelta,
    pc=_perf_counter,
    end: _timedelta = None,
    waiter: _lock_type = None,
    new_thread=_thread.start_new_thread,
    thread_delay=_timedelta(milliseconds=3)
    ) -> bool:
    flag = self._flag

    if flag:
    return True
    elif timeout is None:
    waiter = self._new_waiter()
    elif timeout <= 0:
    return False
    else:
    delay = td(seconds=timeout)
    end = td(seconds=start) + delay

    if delay > thread_delay:
    mark = end - thread_delay
    waiter = self._new_waiter()
    new_thread(
    self._wait_thread,
    (flag, mark, waiter)
    )

    lock = self._lock
    lock.release()

    try:
    if waiter:
    waiter.acquire()

    if end:
    while (
    not flag and
    td(seconds=pc()) < end
    ):
    pass

    finally:
    lock.acquire()

    if waiter and not flag:
    self._waiters.remove(waiter)

    return flag()

    @staticmethod
    def _wait_thread(
    flag: _switch,
    mark: _timedelta,
    waiter: _lock_type,
    td=_timedelta,
    pc=_perf_counter,
    sleep=time.sleep
    ):
    while not flag and td(seconds=pc()) < mark:
    sleep(0.001)

    if waiter.locked():
    waiter.release()

    def __new__(cls):
    _new_lock = cls._new_lock
    _self = object.__new__(cls)
    _self._waiters = []
    _self._nl = _new_lock
    _self._lock = _new_lock()
    _self._flag = cls._switch()
    _self._pc = cls._perf_counter
    return _self


    if __name__ == "__main__":
    def test_wait_time():
    wait_time = datetime.timedelta(microseconds=1)
    wait_time = wait_time.total_seconds()

    def test(
    event=Event(),
    delay=wait_time,
    pc=time.perf_counter
    ):
    pc1 = pc()
    event.wait(delay)
    pc2 = pc()
    pc1, pc2 = [
    int(nbr * 1000000000)
    for nbr in (pc1, pc2)
    ]
    return pc2 - pc1

    lst = [
    f"{i}.\t\t{test()}"
    for i in range(1, 11)
    ]
    print("\n".join(lst))

    test_wait_time()
    del test_wait_time

    关于python - 制作计时器 : timeout inaccuracy of threading. Event.wait - Python 3.6,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48984512/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com