在 python 中使用 multiprocessing.Process() 对函数进行单元测试和覆盖

Unit testing and coverage-ing a function using multiprocessing.Process() in python

我正在尝试使用 multiprocessing.Process() 测试一个可能会崩溃的函数。如果函数在 10 分钟内没有理想地响应,那么下面编写的代码将重新启动调用,这完全正常,即使 self.assert... 测试也工作正常。问题出在覆盖范围上,出于某种原因,运行 覆盖我的文件显示 my_func 正在被调用,但在测试时它内部没有任何内容被覆盖,而测试(这需要整个在不崩溃的情况下执行的功能)正在通过。

        while True:
           p = multiprocessing.Process(
               target=my_func, args=(return_dict) # return_dict to get the return values
           )
           p.start()
           p.join(600)

           if p.is_alive():
               print(
                   "Taking too long, "
                   + "KILLING IT and starting AGAIN."
               )
               p.kill()
               p.join()
           else:
               break

我做错了什么吗?如果信息似乎不完整,请告诉我,我会添加我的测试和函数。

Reference link

要覆盖子流程中的代码 运行,您需要实际指定它。

  1. 在您的文件中添加 import coverage 导入语句

  2. 在每个 multiprocessing.Process() 语句之上添加 coverage.process_startup(),在我的例子中是这样的 -

    coverage.process_startup()

    p = multiprocessing.Process( 目标=my_func, 参数=(return_dict) )

    ..其余代码

  3. 在您的根目录中创建一个 .coveragerc 文件并添加以下行以指定覆盖子进程和多进程。

    [运行]

    并发=多处理

  4. 设置一个名为COVERAGE_PROCESS_START的路径变量来存储.coveragerc文件的位置

    设置COVERAGE_PROCESS_START=%cd%\.coveragerc // Windows

    export COVERAGE_PROCESS_START=$PWD/.coveragerc // UNIX/LINUX

  5. 一切就绪,运行 使用 coverage run -m unittest 的测试,请注意现在生成了 1 个以上的覆盖率报告文件。将它们合并 运行 coverage combine 最后用于报告 运行 coverage report