gpt4 book ai didi

python - 如何可靠地重现此 python 代码中的竞争条件?

转载 作者:IT老高 更新时间:2023-10-28 22:09:25 25 4
gpt4 key购买 nike

上下文

我最近发布了 timer class for review on Code Review .我有一种直觉,因为我曾经看到 1 个单元测试失败,但无法重现该失败。因此,我发布了代码审查。

我得到了一些很好的反馈,突出了代码中的各种竞争条件。 (我想)我了解问题和解决方案,但在进行任何修复之前,我想通过单元测试来暴露错误。当我尝试时,我意识到这很困难。各种堆栈交换答案表明我必须控制线程的执行以暴露错误,并且任何人为的时间不一定可以移植到不同的机器上。这似乎是我试图解决的问题之外的许多意外复杂性。

我尝试使用 the best static analysis (SA) tool for python ,PyLint,看看它是否会找出任何错误,但它不能。为什么人类可以通过代码审查(本质上是 SA)找到错误,而 SA 工具却不能?

不敢尝试get Valgrind working with python (这听起来像牦牛剃须),我决定在不先复制它们的情况下修复错误。现在我陷入了困境。

现在是代码。

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
'''
Loosely models a clockwork kitchen timer with the following differences:
You can start the timer with arbitrary duration (e.g. 1.2 seconds).
The timer calls back a given function when time's up.
Querying the time remaining has 0.1 second accuracy.
'''

PRECISION_NUM_DECIMAL_PLACES = 1
RUNNING = "RUNNING"
STOPPED = "STOPPED"
TIMEUP = "TIMEUP"

def __init__(self):
self._stateLock = Lock()
with self._stateLock:
self._state = self.STOPPED
self._timeRemaining = 0

def start(self, duration=1, whenTimeup=None):
'''
Starts the timer to count down from the given duration and call whenTimeup when time's up.
'''
with self._stateLock:
if self.isRunning():
raise AlreadyRunningError
else:
self._state = self.RUNNING
self.duration = duration
self._userWhenTimeup = whenTimeup
self._startTime = time()
self._timer = Timer(duration, self._whenTimeup)
self._timer.start()

def stop(self):
'''
Stops the timer, preventing whenTimeup callback.
'''
with self._stateLock:
if self.isRunning():
self._timer.cancel()
self._state = self.STOPPED
self._timeRemaining = self.duration - self._elapsedTime()
else:
raise NotRunningError()

def isRunning(self):
return self._state == self.RUNNING

def isStopped(self):
return self._state == self.STOPPED

def isTimeup(self):
return self._state == self.TIMEUP

@property
def timeRemaining(self):
if self.isRunning():
self._timeRemaining = self.duration - self._elapsedTime()
return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

def _whenTimeup(self):
with self._stateLock:
self._state = self.TIMEUP
self._timeRemaining = 0
if callable(self._userWhenTimeup):
self._userWhenTimeup()

def _elapsedTime(self):
return time() - self._startTime

问题

在此代码示例的上下文中,我如何公开竞争条件、修复它们并证明它们已修复?

加分

适用于其他实现和问题而不是专门针对此代码的测试框架的加分项。

外卖

我的结论是,重现已识别竞争条件的技术解决方案是控制两个线程的同步性,以确保它们按照会暴露错误的顺序执行。这里的重点是它们是已经确定的竞争条件。我发现识别竞争条件的最佳方法是将您的代码提交代码审查并鼓励更多专家对其进行分析。

最佳答案

传统上,在多线程代码中强制竞争条件是通过信号量完成的,因此您可以强制一个线程等到另一个线程达到某种边缘条件后再继续。

例如,您的对象有一些代码来检查如果对象已经在运行,是否没有调用 start。您可以通过执行以下操作强制此条件以确保其行为符合预期:

  • 启动一个KitchenTimer
  • 在处于运行状态时在信号量上设置计时器 block
  • 在另一个线程中启动相同的计时器
  • 捕获 AlreadyRunningError

要做到这一点,您可能需要扩展 KitchenTimer 类。正式的单元测试通常会使用定义为在关键时刻阻塞的模拟对象。模拟对象是一个比我在这里可以解决的更大的话题,但是谷歌搜索“python 模拟对象”会发现很多文档和许多实现可供选择。

这是一种强制代码抛出 AlreadyRunningError 的方法:

import threading

class TestKitchenTimer(KitchenTimer):

_runningLock = threading.Condition()

def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()

def resume(self):
with self._runningLock:
self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()

通读代码,确定您要测试的一些边界条件,然后考虑您需要在哪里暂停计时器以强制该条件出现,并添加条件、信号量、事件等来实现它发生。例如如果在计时器运行 whenTimeUp 回调时,另一个线程试图停止它会发生什么?您可以通过让计时器在输入_whenTimeUp 后立即等待来强制该条件:

import threading

class TestKitchenTimer(KitchenTimer):

_runningLock = threading.Condition()

def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)

def resume(self):
with self._runningLock:
self._runningLock.notify()

def TimeupCallback():
print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

子类化你想测试的类并不是一个很好的方法来测试它:你必须重写基本上所有的方法来测试每个方法的竞争条件,在那个时候有一个很好的要提出的论点是您并没有真正测试原始代码。相反,您可能会发现将信号量直接放在 KitchenTimer 对象中但默认初始化为 None 更简洁,并让您的方法在获取或等待锁之前检查 if testRunningLock is not None:。然后,您可以强制对您提交的实际代码进行竞争。

一些关于 Python 模拟框架的阅读可能会有所帮助。事实上,我不确定模拟是否有助于测试这段代码:它几乎完全是独立的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过任何这些,但是这些文档是一个很好的开始:

关于python - 如何可靠地重现此 python 代码中的竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19602535/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com