python 运行 报道永无止境的过程

python running coverage on never ending process

我有一个多处理的 Web 服务器,其进程永无止境,我想在实时环境中检查整个项目的代码覆盖率(不仅来自测试)。

问题是,由于进程永远不会结束,我没有设置 cov.start() cov.stop() cov.save() 挂钩的好地方。

因此,我考虑生成一个线程,在无限循环中保存和合并覆盖率数据,然后休眠一段时间,但是这种方法不起作用,覆盖率报告似乎是空的,除了睡眠线。

我很乐意收到关于如何获得我的代码覆盖率的任何想法, 或者关于为什么我的想法行不通的任何建议。这是我的代码片段:

import coverage
cov = coverage.Coverage()
import time
import threading
import os

class CoverageThread(threading.Thread):
    _kill_now = False
    _sleep_time = 2

@classmethod
def exit_gracefully(cls):
    cls._kill_now = True

def sleep_some_time(self):
    time.sleep(CoverageThread._sleep_time)

def run(self):
    while True:
        cov.start()
        self.sleep_some_time()
        cov.stop()
        if os.path.exists('.coverage'):
            cov.combine()
        cov.save()
        if self._kill_now:
            break
    cov.stop()
    if os.path.exists('.coverage'):
        cov.combine()
    cov.save()
    cov.html_report(directory="coverage_report_data.html")
    print "End of the program. I was killed gracefully :)"

既然您愿意 运行 您的代码以不同的方式进行测试,为什么不添加一种方法来结束测试过程呢?这似乎比试图破解覆盖范围更简单。

显然,多个 Threads 无法很好地控制 coverage。 一旦启动了不同的线程,停止 Coverage 对象将停止所有覆盖,而 start 只会在 "starting" 线程中重新启动它。 因此,除了 CoverageThread.

之外,您的代码基本上会在 2 秒后停止对所有 Thread 的覆盖

我玩了一下 API,可以在不停止 Coverage 对象的情况下访问测量值。 因此,您可以使用 API 启动一个定期保存覆盖率数据的线程。 第一个实现就像这样

import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file

cov = Coverage(config_file=True)
cov.start()


def get_data_dict(d):
    """Return a dict like d, but with keys modified by `abs_file` and
    remove the copied elements from d.
    """
    res = {}
    keys = list(d.keys())
    for k in keys:
        a = {}
        lines = list(d[k].keys())
        for l in lines:
            v = d[k].pop(l)
            a[l] = v
        res[abs_file(k)] = a
    return res


class CoverageLoggerThread(threading.Thread):
    _kill_now = False
    _delay = 2

    def __init__(self, main=True):
        self.main = main
        self._data = CoverageData()
        self._fname = cov.config.data_file
        self._suffix = None
        self._data_files = CoverageDataFiles(basename=self._fname,
                                             warn=cov._warn)
        self._pid = os.getpid()
        super(CoverageLoggerThread, self).__init__()

    def shutdown(self):
        self._kill_now = True

    def combine(self):
        aliases = None
        if cov.config.paths:
            from coverage.aliases import PathAliases
            aliases = PathAliases()
            for paths in self.config.paths.values():
                result = paths[0]
                for pattern in paths[1:]:
                    aliases.add(pattern, result)

        self._data_files.combine_parallel_data(self._data, aliases=aliases)

    def export(self, new=True):
        cov_report = cov
        if new:
            cov_report = Coverage(config_file=True)
            cov_report.load()
        self.combine()
        self._data_files.write(self._data)
        cov_report.data.update(self._data)
        cov_report.html_report(directory="coverage_report_data.html")
        cov_report.report(show_missing=True)

    def _collect_and_export(self):
        new_data = get_data_dict(cov.collector.data)
        if cov.collector.branch:
            self._data.add_arcs(new_data)
        else:
            self._data.add_lines(new_data)
        self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
        self._data_files.write(self._data, self._suffix)

        if self.main:
            self.export()

    def run(self):
        while True:
            sleep(CoverageLoggerThread._delay)
            if self._kill_now:
                break

            self._collect_and_export()

        cov.stop()

        if not self.main:
            self._collect_and_export()
            return

        self.export(new=False)
        print("End of the program. I was killed gracefully :)")

可以在此GIST中找到更稳定的版本。 这段代码基本上是在不停止收集器的情况下获取收集器收集的信息。 get_data_dict 函数获取 Coverage.collector 中的字典并弹出可用数据。这应该足够安全,所以您不会丢失任何测量值。
报告文件每 _delay 秒更新一次。

但是如果你有多个进程运行ning,你需要付出额外的努力来确保所有进程运行都CoverageLoggerThread。这是 patch_multiprocessing 函数,猴子从 coverage 猴子补丁中修补...
代码在 GIST 中。它基本上用自定义进程替换了原始进程,它在 运行 宁 run 方法之前启动 CoverageLoggerThread 并在进程结束时加入线程。 脚本 main.py 允许使用线程和进程启动不同的测试。

此代码有 2/3 的缺点需要您注意:

  • 同时使用 combine 函数是个坏主意,因为它对 .coverage.* 文件执行并发 read/write/delete 访问。这意味着函数 export 不是超级安全的。它应该没问题,因为数据被复制了多次,但我会在生产中使用它之前做一些测试。

  • 数据导出后,将保留在内存中。因此,如果代码库很大,它可能会占用一些资源。可以转储所有数据并重新加载它,但我假设如果你想每 2 秒记录一次,你不想每次都重新加载所有数据。如果您延迟几分钟,我会每次都创建一个新的 _data,使用 CoverageData.read_file 重新加载此过程之前的覆盖状态。

  • 自定义流程将等待 _delay 完成,因为我们在流程末尾加入 CoverageThreadLogger,所以如果您有很多快速流程,您想要增加睡眠的粒度,以便能够更快地检测到进程的结束。它只需要一个在 _kill_now.

  • 中断的自定义睡眠循环

让我知道这是否对您有所帮助,或者是否有可能改进这个要点。


编辑: 看来您不需要猴子修补多处理模块来自动启动记录器。在 python 安装中使用 .pth,您可以使用环境变量在新进程上自动启动记录器:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

然后您可以使用 COVERAGE_LOGGER_START=1 python main.y

启动覆盖率记录器

您可以直接使用pyrasite,有以下两个程序。

# start.py
import sys
import coverage

sys.cov = cov = coverage.coverage()
cov.start()

还有这个

# stop.py
import sys

sys.cov.stop()
sys.cov.save()
sys.cov.html_report()

另一种方法是使用 lptrace 跟踪程序,即使它只打印调用也是有用的。