gpt4 book ai didi

python-3.x - 如何为直到Keyboardinterrupt之前运行且不返回任何内容的类编写单元测试?

转载 作者:行者123 更新时间:2023-12-03 13:18:52 25 4
gpt4 key购买 nike

我正在使用Python服务来监视文件系统中的目录。当发现文件已在其中创建或移动时,它将文件的路径发送到Kafka队列中。我的服务工作完全符合我的需要,但是我的问题是我应该至少拥有90%的单元测试覆盖率。我对Python相对较新,并且以前从未使用过任何语言的单元测试,因此我感到自己的确超出了我的理解范围。我只是无法确定如何测试这些类。

这是监视文件系统的类,我正在使用watchdog库。

我将handler=FileHandler参数添加到 init 中,因为我认为可以使用该参数将可用于测试的伪处理程序传递给该类,但这感觉不必要地复杂。

class FileSystemMonitor:

def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)

def start(self):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()

def parse_args():
path = sys.argv[1] if len(sys.argv) > 1 else '.'
queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
return path, queue

if __name__ == "__main__":
path, queue = parse_args()
monitor = FileSystemMonitor(path, queue)
monitor.start()

这是我制作的用于处理监视器引发的事件的类,并将该路径传递给Kafka队列。

class FileHandler(PatternMatchingEventHandler):

def __init__(self, queue):
super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
self.queue = queue

def on_any_event(self, event):
super(FileHandler, self).on_any_event(event)
#print(event, self.queue)
result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
print("Handler:", result)
return result

我已经为kafkaProducer类编写了一些测试,对此我并不太费劲,因为它实际上返回了我可以测试的值。

FileSystemMonitor无限运行,只是等待键盘中断,当它结束时,它不会返回任何内容,那么我如何为此编写单元测试?

至于FileHandler类,它取决于Monitor类触发的事件,那么我将如何隔离Handler类来对其进行测试?

最佳答案

FileSystemMonitor.start非常难以测试,因为它会阻塞直到发生外部事件,但是由于阻塞,测试无法轻易使事件发生。我想您可以使用多线程或多处理或仅使用计时器来完成一些技巧,但这会给您的测试增加一些不确定性,这是我不喜欢的。

更明确的方法是允许调用者指定while循环内发生的情况,以便在测试中引发异常,而在生产代码中将调用time.sleep

class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)

def start(self, loop_action):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
loop_action()
except KeyboardInterrupt:
observer.stop()
observer.join()

这是您的测试结果:
def fake_loop_action():
raise KeyboardInterrupt

def test_FileSystemMonitor():
# Initialize target_path, kafka_queue and handler here.
# You might want to use test doubles.
monitor = FileSystemMonitor(target_path, kafka_queue, handler)
monitor.start(loop_action=fake_loop_action)

在生产代码中,您将改为使用 time.sleep。您甚至可以立即在通话中指定延迟。
monitor.start(loop_action=lambda: time.sleep(1))

关于python-3.x - 如何为直到Keyboardinterrupt之前运行且不返回任何内容的类编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56894411/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com