我怎样才能为 类 编写单元测试而不是 运行 直到 Keyboardinterrupt 并且不 return 任何东西?
How can I write Unit Tests for classes that run until Keyboard Interrupt and don't return anything?
我正在开发一个 Python 服务来监视文件系统中的目录。当它看到一个文件已被创建或移动到那里时,它会将文件的路径发送到 Kafka 队列中。我的服务完全按照我的需要工作,但我的问题是我应该至少有 90% 的单元测试覆盖率。我是 Python 的新手,而且我以前从未使用过任何语言的单元测试,所以我真的感到力不从心。我只是想不通我将如何测试这些 classes。
这是监控文件系统的class,我用的是watchdog库
我将 handler=FileHandler
参数添加到 init 因为我想我可以用它来传递 class 我可以用于测试的假处理程序,但感觉好像不必要地复杂。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def parse_args():
path = sys.argv[1] if len(sys.argv) > 1 else '.'
queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
return path, queue
if __name__ == "__main__":
path, queue = parse_args()
monitor = FileSystemMonitor(path, queue)
monitor.start()
这是我做的class,它处理监视器抛出的事件,并将路径传递给Kafka Queue。
class FileHandler(PatternMatchingEventHandler):
def __init__(self, queue):
super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
self.queue = queue
def on_any_event(self, event):
super(FileHandler, self).on_any_event(event)
#print(event, self.queue)
result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
print("Handler:", result)
return result
我已经为 kafkaProducer class 编写了一些测试,我并没有遇到什么困难,因为它实际上 return 是一个我可以测试的值。
FileSystemMonitor 无限运行,只等待键盘中断,当它结束时,它不会return 任何东西,那么如何为它编写单元测试?
至于 FileHandler class 它取决于监视器 class 触发的事件,那么我如何隔离 Handler class 来测试它?
FileSystemMonitor.start
很难测试,因为它会阻塞直到外部事件发生,但由于阻塞,测试无法轻易使事件发生。我想您可以使用多线程或多处理或可能只是一个计时器来做一些技巧,但这会给您的测试增加一些我不喜欢的不确定性。
更明确的方法是允许调用者指定 while
循环内发生的事情,以便在测试中引发异常,而 time.sleep
将在生产代码中调用。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self, loop_action):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
loop_action()
except KeyboardInterrupt:
observer.stop()
observer.join()
这就是您的测试的样子:
def fake_loop_action():
raise KeyboardInterrupt
def test_FileSystemMonitor():
# Initialize target_path, kafka_queue and handler here.
# You might want to use test doubles.
monitor = FileSystemMonitor(target_path, kafka_queue, handler)
monitor.start(loop_action=fake_loop_action)
而在生产代码中,您将改用 time.sleep
。您现在甚至可以在通话中指定延迟。
monitor.start(loop_action=lambda: time.sleep(1))
我正在开发一个 Python 服务来监视文件系统中的目录。当它看到一个文件已被创建或移动到那里时,它会将文件的路径发送到 Kafka 队列中。我的服务完全按照我的需要工作,但我的问题是我应该至少有 90% 的单元测试覆盖率。我是 Python 的新手,而且我以前从未使用过任何语言的单元测试,所以我真的感到力不从心。我只是想不通我将如何测试这些 classes。
这是监控文件系统的class,我用的是watchdog库
我将 handler=FileHandler
参数添加到 init 因为我想我可以用它来传递 class 我可以用于测试的假处理程序,但感觉好像不必要地复杂。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def parse_args():
path = sys.argv[1] if len(sys.argv) > 1 else '.'
queue = sys.argv[2] if len(sys.argv) > 2 else 'default'
return path, queue
if __name__ == "__main__":
path, queue = parse_args()
monitor = FileSystemMonitor(path, queue)
monitor.start()
这是我做的class,它处理监视器抛出的事件,并将路径传递给Kafka Queue。
class FileHandler(PatternMatchingEventHandler):
def __init__(self, queue):
super(FileHandler, self).__init__(ignore_patterns=["*/.DS_Store"], ignore_directories=True)
self.queue = queue
def on_any_event(self, event):
super(FileHandler, self).on_any_event(event)
#print(event, self.queue)
result = kafkaProducer.send_msg(self.queue, event.src_path, event.event_type)
print("Handler:", result)
return result
我已经为 kafkaProducer class 编写了一些测试,我并没有遇到什么困难,因为它实际上 return 是一个我可以测试的值。
FileSystemMonitor 无限运行,只等待键盘中断,当它结束时,它不会return 任何东西,那么如何为它编写单元测试?
至于 FileHandler class 它取决于监视器 class 触发的事件,那么我如何隔离 Handler class 来测试它?
FileSystemMonitor.start
很难测试,因为它会阻塞直到外部事件发生,但由于阻塞,测试无法轻易使事件发生。我想您可以使用多线程或多处理或可能只是一个计时器来做一些技巧,但这会给您的测试增加一些我不喜欢的不确定性。
更明确的方法是允许调用者指定 while
循环内发生的事情,以便在测试中引发异常,而 time.sleep
将在生产代码中调用。
class FileSystemMonitor:
def __init__(self, target_path, kafka_queue, handler=FileHandler):
self.path = target_path
self.queue = kafka_queue
self.handler = handler(self.queue)
def start(self, loop_action):
observer = Observer()
observer.schedule(self.handler, self.path, recursive=True)
observer.start()
try:
while True:
loop_action()
except KeyboardInterrupt:
observer.stop()
observer.join()
这就是您的测试的样子:
def fake_loop_action():
raise KeyboardInterrupt
def test_FileSystemMonitor():
# Initialize target_path, kafka_queue and handler here.
# You might want to use test doubles.
monitor = FileSystemMonitor(target_path, kafka_queue, handler)
monitor.start(loop_action=fake_loop_action)
而在生产代码中,您将改用 time.sleep
。您现在甚至可以在通话中指定延迟。
monitor.start(loop_action=lambda: time.sleep(1))