python 运行 报道永无止境的过程
python running coverage on never ending process
我有一个多处理的 Web 服务器,其进程永无止境,我想在实时环境中检查整个项目的代码覆盖率(不仅来自测试)。
问题是,由于进程永远不会结束,我没有设置 cov.start() cov.stop() cov.save()
挂钩的好地方。
因此,我考虑生成一个线程,在无限循环中保存和合并覆盖率数据,然后休眠一段时间,但是这种方法不起作用,覆盖率报告似乎是空的,除了睡眠线。
我很乐意收到关于如何获得我的代码覆盖率的任何想法,
或者关于为什么我的想法行不通的任何建议。这是我的代码片段:
import coverage
cov = coverage.Coverage()
import time
import threading
import os
class CoverageThread(threading.Thread):
_kill_now = False
_sleep_time = 2
@classmethod
def exit_gracefully(cls):
cls._kill_now = True
def sleep_some_time(self):
time.sleep(CoverageThread._sleep_time)
def run(self):
while True:
cov.start()
self.sleep_some_time()
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
if self._kill_now:
break
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
cov.html_report(directory="coverage_report_data.html")
print "End of the program. I was killed gracefully :)"
既然您愿意 运行 您的代码以不同的方式进行测试,为什么不添加一种方法来结束测试过程呢?这似乎比试图破解覆盖范围更简单。
显然,多个 Threads
无法很好地控制 coverage
。
一旦启动了不同的线程,停止 Coverage
对象将停止所有覆盖,而 start
只会在 "starting" 线程中重新启动它。
因此,除了 CoverageThread
.
之外,您的代码基本上会在 2 秒后停止对所有 Thread
的覆盖
我玩了一下 API,可以在不停止 Coverage
对象的情况下访问测量值。
因此,您可以使用 API 启动一个定期保存覆盖率数据的线程。
第一个实现就像这样
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
可以在此GIST中找到更稳定的版本。
这段代码基本上是在不停止收集器的情况下获取收集器收集的信息。
get_data_dict
函数获取 Coverage.collector
中的字典并弹出可用数据。这应该足够安全,所以您不会丢失任何测量值。
报告文件每 _delay
秒更新一次。
但是如果你有多个进程运行ning,你需要付出额外的努力来确保所有进程运行都CoverageLoggerThread
。这是 patch_multiprocessing
函数,猴子从 coverage
猴子补丁中修补...
代码在 GIST 中。它基本上用自定义进程替换了原始进程,它在 运行 宁 run
方法之前启动 CoverageLoggerThread
并在进程结束时加入线程。
脚本 main.py
允许使用线程和进程启动不同的测试。
此代码有 2/3 的缺点需要您注意:
同时使用 combine
函数是个坏主意,因为它对 .coverage.*
文件执行并发 read/write/delete 访问。这意味着函数 export
不是超级安全的。它应该没问题,因为数据被复制了多次,但我会在生产中使用它之前做一些测试。
数据导出后,将保留在内存中。因此,如果代码库很大,它可能会占用一些资源。可以转储所有数据并重新加载它,但我假设如果你想每 2 秒记录一次,你不想每次都重新加载所有数据。如果您延迟几分钟,我会每次都创建一个新的 _data
,使用 CoverageData.read_file
重新加载此过程之前的覆盖状态。
自定义流程将等待 _delay
完成,因为我们在流程末尾加入 CoverageThreadLogger
,所以如果您有很多快速流程,您想要增加睡眠的粒度,以便能够更快地检测到进程的结束。它只需要一个在 _kill_now
.
中断的自定义睡眠循环
让我知道这是否对您有所帮助,或者是否有可能改进这个要点。
编辑:
看来您不需要猴子修补多处理模块来自动启动记录器。在 python 安装中使用 .pth
,您可以使用环境变量在新进程上自动启动记录器:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
然后您可以使用 COVERAGE_LOGGER_START=1 python main.y
启动覆盖率记录器
您可以直接使用pyrasite,有以下两个程序。
# start.py
import sys
import coverage
sys.cov = cov = coverage.coverage()
cov.start()
还有这个
# stop.py
import sys
sys.cov.stop()
sys.cov.save()
sys.cov.html_report()
另一种方法是使用 lptrace 跟踪程序,即使它只打印调用也是有用的。
我有一个多处理的 Web 服务器,其进程永无止境,我想在实时环境中检查整个项目的代码覆盖率(不仅来自测试)。
问题是,由于进程永远不会结束,我没有设置 cov.start() cov.stop() cov.save()
挂钩的好地方。
因此,我考虑生成一个线程,在无限循环中保存和合并覆盖率数据,然后休眠一段时间,但是这种方法不起作用,覆盖率报告似乎是空的,除了睡眠线。
我很乐意收到关于如何获得我的代码覆盖率的任何想法, 或者关于为什么我的想法行不通的任何建议。这是我的代码片段:
import coverage
cov = coverage.Coverage()
import time
import threading
import os
class CoverageThread(threading.Thread):
_kill_now = False
_sleep_time = 2
@classmethod
def exit_gracefully(cls):
cls._kill_now = True
def sleep_some_time(self):
time.sleep(CoverageThread._sleep_time)
def run(self):
while True:
cov.start()
self.sleep_some_time()
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
if self._kill_now:
break
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
cov.html_report(directory="coverage_report_data.html")
print "End of the program. I was killed gracefully :)"
既然您愿意 运行 您的代码以不同的方式进行测试,为什么不添加一种方法来结束测试过程呢?这似乎比试图破解覆盖范围更简单。
显然,多个 Threads
无法很好地控制 coverage
。
一旦启动了不同的线程,停止 Coverage
对象将停止所有覆盖,而 start
只会在 "starting" 线程中重新启动它。
因此,除了 CoverageThread
.
Thread
的覆盖
我玩了一下 API,可以在不停止 Coverage
对象的情况下访问测量值。
因此,您可以使用 API 启动一个定期保存覆盖率数据的线程。
第一个实现就像这样
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
可以在此GIST中找到更稳定的版本。
这段代码基本上是在不停止收集器的情况下获取收集器收集的信息。
get_data_dict
函数获取 Coverage.collector
中的字典并弹出可用数据。这应该足够安全,所以您不会丢失任何测量值。
报告文件每 _delay
秒更新一次。
但是如果你有多个进程运行ning,你需要付出额外的努力来确保所有进程运行都CoverageLoggerThread
。这是 patch_multiprocessing
函数,猴子从 coverage
猴子补丁中修补...
代码在 GIST 中。它基本上用自定义进程替换了原始进程,它在 运行 宁 run
方法之前启动 CoverageLoggerThread
并在进程结束时加入线程。
脚本 main.py
允许使用线程和进程启动不同的测试。
此代码有 2/3 的缺点需要您注意:
同时使用
combine
函数是个坏主意,因为它对.coverage.*
文件执行并发 read/write/delete 访问。这意味着函数export
不是超级安全的。它应该没问题,因为数据被复制了多次,但我会在生产中使用它之前做一些测试。数据导出后,将保留在内存中。因此,如果代码库很大,它可能会占用一些资源。可以转储所有数据并重新加载它,但我假设如果你想每 2 秒记录一次,你不想每次都重新加载所有数据。如果您延迟几分钟,我会每次都创建一个新的
_data
,使用CoverageData.read_file
重新加载此过程之前的覆盖状态。自定义流程将等待
_delay
完成,因为我们在流程末尾加入CoverageThreadLogger
,所以如果您有很多快速流程,您想要增加睡眠的粒度,以便能够更快地检测到进程的结束。它只需要一个在_kill_now
. 中断的自定义睡眠循环
让我知道这是否对您有所帮助,或者是否有可能改进这个要点。
编辑:
看来您不需要猴子修补多处理模块来自动启动记录器。在 python 安装中使用 .pth
,您可以使用环境变量在新进程上自动启动记录器:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
然后您可以使用 COVERAGE_LOGGER_START=1 python main.y
您可以直接使用pyrasite,有以下两个程序。
# start.py
import sys
import coverage
sys.cov = cov = coverage.coverage()
cov.start()
还有这个
# stop.py
import sys
sys.cov.stop()
sys.cov.save()
sys.cov.html_report()
另一种方法是使用 lptrace 跟踪程序,即使它只打印调用也是有用的。