- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
上下文
我最近发布了 timer class for review on Code Review .我有一种直觉,因为我曾经看到 1 个单元测试失败,但无法重现该失败。因此,我发布了代码审查。
我得到了一些很好的反馈,突出了代码中的各种竞争条件。 (我想)我了解问题和解决方案,但在进行任何修复之前,我想通过单元测试来暴露错误。当我尝试时,我意识到这很困难。各种堆栈交换答案表明我必须控制线程的执行以暴露错误,并且任何人为的时间不一定可以移植到不同的机器上。这似乎是我试图解决的问题之外的许多意外复杂性。
我尝试使用 the best static analysis (SA) tool for python ,PyLint,看看它是否会找出任何错误,但它不能。为什么人类可以通过代码审查(本质上是 SA)找到错误,而 SA 工具却不能?
不敢尝试get Valgrind working with python (这听起来像牦牛剃须),我决定在不先复制它们的情况下修复错误。现在我陷入了困境。
现在是代码。
from threading import Timer, Lock
from time import time
class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass
class KitchenTimer(object):
'''
Loosely models a clockwork kitchen timer with the following differences:
You can start the timer with arbitrary duration (e.g. 1.2 seconds).
The timer calls back a given function when time's up.
Querying the time remaining has 0.1 second accuracy.
'''
PRECISION_NUM_DECIMAL_PLACES = 1
RUNNING = "RUNNING"
STOPPED = "STOPPED"
TIMEUP = "TIMEUP"
def __init__(self):
self._stateLock = Lock()
with self._stateLock:
self._state = self.STOPPED
self._timeRemaining = 0
def start(self, duration=1, whenTimeup=None):
'''
Starts the timer to count down from the given duration and call whenTimeup when time's up.
'''
with self._stateLock:
if self.isRunning():
raise AlreadyRunningError
else:
self._state = self.RUNNING
self.duration = duration
self._userWhenTimeup = whenTimeup
self._startTime = time()
self._timer = Timer(duration, self._whenTimeup)
self._timer.start()
def stop(self):
'''
Stops the timer, preventing whenTimeup callback.
'''
with self._stateLock:
if self.isRunning():
self._timer.cancel()
self._state = self.STOPPED
self._timeRemaining = self.duration - self._elapsedTime()
else:
raise NotRunningError()
def isRunning(self):
return self._state == self.RUNNING
def isStopped(self):
return self._state == self.STOPPED
def isTimeup(self):
return self._state == self.TIMEUP
@property
def timeRemaining(self):
if self.isRunning():
self._timeRemaining = self.duration - self._elapsedTime()
return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)
def _whenTimeup(self):
with self._stateLock:
self._state = self.TIMEUP
self._timeRemaining = 0
if callable(self._userWhenTimeup):
self._userWhenTimeup()
def _elapsedTime(self):
return time() - self._startTime
问题
在此代码示例的上下文中,我如何公开竞争条件、修复它们并证明它们已修复?
加分
适用于其他实现和问题而不是专门针对此代码的测试框架的加分项。
外卖
我的结论是,重现已识别竞争条件的技术解决方案是控制两个线程的同步性,以确保它们按照会暴露错误的顺序执行。这里的重点是它们是已经确定的竞争条件。我发现识别竞争条件的最佳方法是将您的代码提交代码审查并鼓励更多专家对其进行分析。
最佳答案
传统上,在多线程代码中强制竞争条件是通过信号量完成的,因此您可以强制一个线程等到另一个线程达到某种边缘条件后再继续。
例如,您的对象有一些代码来检查如果对象已经在运行,是否没有调用 start
。您可以通过执行以下操作强制此条件以确保其行为符合预期:
KitchenTimer
AlreadyRunningError
要做到这一点,您可能需要扩展 KitchenTimer 类。正式的单元测试通常会使用定义为在关键时刻阻塞的模拟对象。模拟对象是一个比我在这里可以解决的更大的话题,但是谷歌搜索“python 模拟对象”会发现很多文档和许多实现可供选择。
这是一种强制代码抛出 AlreadyRunningError
的方法:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()
def resume(self):
with self._runningLock:
self._runningLock.notify()
timer = TestKitchenTimer()
# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()
# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()
通读代码,确定您要测试的一些边界条件,然后考虑您需要在哪里暂停计时器以强制该条件出现,并添加条件、信号量、事件等来实现它发生。例如如果在计时器运行 whenTimeUp 回调时,另一个线程试图停止它会发生什么?您可以通过让计时器在输入_whenTimeUp 后立即等待来强制该条件:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)
def resume(self):
with self._runningLock:
self._runningLock.notify()
def TimeupCallback():
print "TimeupCallback was called"
timer = TestKitchenTimer()
# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)
# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()
# Now allow the countdown thread to resume.
timer.resume()
子类化你想测试的类并不是一个很好的方法来测试它:你必须重写基本上所有的方法来测试每个方法的竞争条件,在那个时候有一个很好的要提出的论点是您并没有真正测试原始代码。相反,您可能会发现将信号量直接放在 KitchenTimer 对象中但默认初始化为 None 更简洁,并让您的方法在获取或等待锁之前检查 if testRunningLock is not None:
。然后,您可以强制对您提交的实际代码进行竞争。
一些关于 Python 模拟框架的阅读可能会有所帮助。事实上,我不确定模拟是否有助于测试这段代码:它几乎完全是独立的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过任何这些,但是这些文档是一个很好的开始:
关于python - 如何可靠地重现此 python 代码中的竞争条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19602535/
我正在尝试运行以下内容:: Press to see code - name: Snapshot BI nodes hosts: [CLUSTER-BI,CLUSTER-BI-REPL
在这里尝试心理重置:我尝试使用 MSMQ 创建一个可靠、持久的堆栈,但没有成功 所以更一般地说: 我有生产者(一个 web 服务,虽然“只有一个”,但也是多线程的)/消费者(多个进程,根据需要设置)。
试图为分布式系统找到一个商业日志框架。此框架必须允许远程服务器上的 .NET 应用程序记录可以在中央位置收集的消息。如果可能,中央位置应将消息存储在 SQL Server 数据库中。 要求: 能够在远
我正在开发 Restful 服务,我们将在数据库中插入/更新新记录。 由于REST使用HTTP进行通信,而HTTP并不可靠,我担心如果连接失败,请求可能无法发送到服务器。 我在 link 中找到的建议
我正在尝试实现一个页面,员工可以在其中登录并添加、修改、更新工作案例。 我有一个选择列表,其中包含从数据库加载的数据(员工姓名)。在这个数据库中,我有基本信息、用户名、ID、密码、电子邮件。 选择列表
我在 C 代码和 Python 代码之间(偶尔)得到略有不同的计算结果,并设法找到了一个例子。在 Python 中,我得到了这个: >>> print "%.55f" %\ ... (-2.49999
例如如果我将计时器设置为每天午夜到期,如果一个“失火”(例如,由于服务器关闭而不会触发回调)会发生什么?我在文档中找不到它。 有没有办法让这个定时器在服务器重启时立即触发回调? PS:我了解 Quar
我有一组不同长度的非零序列,我正在使用 Keras LSTM 对这些序列建模。我使用 Keras Tokenizer 进行分词(分词从 1 开始)。为了使序列具有相同的长度,我使用了填充。 填充示例:
我遇到了一个非常有趣的可靠 session 行为。我正在使用 netTcp 绑定(bind) + 双工 channel + 可靠 session 。 当我尝试在 channel.faulted 上收听
问题: 给定表 table_a 和 table_b,每当 table_a 更新时,我都需要可靠地(并发地)执行这样的操作: SELECT table_a 中的一些行。 在应用程序代码中计算一些内容。
我们目前的设计 环境 Redis 2.8.17 我们已经实现了我们的可靠队列,使用类似于 redis 文档中描述的模式的模式,在 RPOPLPUSH 下 但是,考虑到其阻塞性质,我们正在使用 BRPO
在我们的 WCF 应用程序中,我正在尝试配置可靠的 session 。 服务: 客户:
我使用这个 Delphi 7 代码来检测 Internet Explorer 是否正在运行: function IERunning: Boolean; begin Result := FindWi
我正在准备构建一个应用程序,该应用程序能够向 GPS 设备发送/接收航路点。通过一些谷歌搜索,我发现了很多可能对此目的有用的库: Java Chaeron GPS GPSLib4J Python Py
我有几个关于 WCF 可靠 session 可靠性的问题: WCF 是否在重试期间重新序列化消息? 2。如果 1 是正确的 - 它是否在消息参数被处理后发生? 3. 如果 2 是正确的 - 是否有任何
对于使用 $(this)[0].defaultValue 来确定文本框值是否已从原始值发生变化的一些反馈,我将不胜感激,例如 //keyUp event if($(this)[0].defaultVa
我正在开发一个具有以下特征的实时应用程序: 数百个客户端将同时插入行/文档,每个客户端每隔几秒插入一行。 大部分仅追加;几乎所有的行/文档,一旦插入,就永远不会改变。 只有当数据刷新到磁盘时,客户端才
场景:最终用户(不受信任的)提供了一个字符串,例如 "Hello, {name}!" .在服务器上,我想以 my_string.format(name="Homer") 的形式对该用户提供的字符串进行
我在推送通知方面遇到一些问题。我们使用 Firebase 来推送通知。问题是我可以在一台 iPhone 上正确接收 PushNotifications,但无法在另一台 iPhone 上接收它们。我在
从 python 到 c++,这是我能得到的最接近 python 的装饰器。 这个解决方案感觉有点像 hack,因为在要装饰的函数之后运行的代码在 Timer 析构函数中是隐式调用的。不过它确实有效。
我是一名优秀的程序员,十分优秀!