Покрытие Python на бесконечный процесс
У меня есть многопроцессорный веб-сервер с процессами, которые никогда не заканчиваются, я хотел бы проверить покрытие кода для всего проекта в реальной среде (не только из тестов).
Проблема в том, что поскольку процессы никогда не заканчиваются, у меня нет подходящего места для установки cov.start() cov.stop() cov.save()
крючки.
Поэтому я подумал о создании потока, который в бесконечном цикле будет сохранять и объединять данные покрытия, а затем некоторое время спать, однако такой подход не работает, отчет о покрытии кажется пустым, за исключением строки ожидания.
Я был бы рад получить любые идеи о том, как получить покрытие моего кода, или любой совет о том, почему моя идея не работает. Вот фрагмент моего кода:
import coverage
cov = coverage.Coverage()
import time
import threading
import os
class CoverageThread(threading.Thread):
_kill_now = False
_sleep_time = 2
@classmethod
def exit_gracefully(cls):
cls._kill_now = True
def sleep_some_time(self):
time.sleep(CoverageThread._sleep_time)
def run(self):
while True:
cov.start()
self.sleep_some_time()
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
if self._kill_now:
break
cov.stop()
if os.path.exists('.coverage'):
cov.combine()
cov.save()
cov.html_report(directory="coverage_report_data.html")
print "End of the program. I was killed gracefully :)"
3 ответа
По-видимому, это не возможно контролировать coverage
очень хорошо с несколькими Threads
, После запуска другого потока остановка Coverage
объект остановит все освещение и start
только перезапустит его в "стартовой" ветке. Таким образом, ваш код в основном останавливает покрытие через 2 секунды для всех Thread
кроме CoverageThread
,
Я немного поиграл с API, и можно получить доступ к измерениям, не останавливая Coverage
объект. Таким образом, вы можете запустить поток, который периодически сохраняет данные покрытия, используя API. Первая реализация будет что-то вроде этого
import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file
cov = Coverage(config_file=True)
cov.start()
def get_data_dict(d):
"""Return a dict like d, but with keys modified by `abs_file` and
remove the copied elements from d.
"""
res = {}
keys = list(d.keys())
for k in keys:
a = {}
lines = list(d[k].keys())
for l in lines:
v = d[k].pop(l)
a[l] = v
res[abs_file(k)] = a
return res
class CoverageLoggerThread(threading.Thread):
_kill_now = False
_delay = 2
def __init__(self, main=True):
self.main = main
self._data = CoverageData()
self._fname = cov.config.data_file
self._suffix = None
self._data_files = CoverageDataFiles(basename=self._fname,
warn=cov._warn)
self._pid = os.getpid()
super(CoverageLoggerThread, self).__init__()
def shutdown(self):
self._kill_now = True
def combine(self):
aliases = None
if cov.config.paths:
from coverage.aliases import PathAliases
aliases = PathAliases()
for paths in self.config.paths.values():
result = paths[0]
for pattern in paths[1:]:
aliases.add(pattern, result)
self._data_files.combine_parallel_data(self._data, aliases=aliases)
def export(self, new=True):
cov_report = cov
if new:
cov_report = Coverage(config_file=True)
cov_report.load()
self.combine()
self._data_files.write(self._data)
cov_report.data.update(self._data)
cov_report.html_report(directory="coverage_report_data.html")
cov_report.report(show_missing=True)
def _collect_and_export(self):
new_data = get_data_dict(cov.collector.data)
if cov.collector.branch:
self._data.add_arcs(new_data)
else:
self._data.add_lines(new_data)
self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
self._data_files.write(self._data, self._suffix)
if self.main:
self.export()
def run(self):
while True:
sleep(CoverageLoggerThread._delay)
if self._kill_now:
break
self._collect_and_export()
cov.stop()
if not self.main:
self._collect_and_export()
return
self.export(new=False)
print("End of the program. I was killed gracefully :)")
Более стабильная версия может быть найдена в этом GIST. Этот код в основном захватывает информацию, собранную сборщиком, не останавливая ее. get_data_dict
Функция взять словарь в Coverage.collector
и вытолкнуть доступные данные. Это должно быть достаточно безопасно, чтобы вы не потеряли измерения.
Файлы отчета обновляются каждые _delay
секунд.
Но если у вас запущено несколько процессов, вам нужно приложить дополнительные усилия, чтобы убедиться, что все процессы запускают CoverageLoggerThread
, Это patch_multiprocessing
функция, обезьяна исправлена из coverage
патч обезьяны...
Код находится в GIST. Он в основном заменяет оригинальный процесс пользовательским процессом, который запускает CoverageLoggerThread
незадолго до запуска run
метод и присоединиться к потоку в конце процесса. Сценарий main.py
позволяет запускать различные тесты с потоками и процессами.
У этого кода есть 2/3 недостатков, о которых нужно позаботиться:
Это плохая идея использовать
combine
функция одновременно, поскольку он выполняет одновременный доступ на чтение / запись / удаление к.coverage.*
файлы. Это означает, что функцияexport
не супер безопасно. Все должно быть в порядке, поскольку данные реплицируются несколько раз, но я бы провел некоторое тестирование, прежде чем использовать его в производстве.Как только данные были экспортированы, они остаются в памяти. Так что, если база кода огромна, она может съесть некоторые ресурсы. Можно выгрузить все данные и перезагрузить их, но я предположил, что если вы хотите регистрировать каждые 2 секунды, вам не нужно каждый раз перезагружать все данные. Если вы идете с задержкой в минутах, я бы создал новый
_data
каждый раз, используяCoverageData.read_file
перезагрузить предыдущее состояние покрытия для этого процесса.Пользовательский процесс будет ждать
_delay
до окончания, как мы присоединяемся кCoverageThreadLogger
в конце процесса, поэтому, если у вас много быстрых процессов, вы хотите увеличить степень детализации сна, чтобы иметь возможность быстрее определять конец процесса. Это просто нужно пользовательский цикл сна, который прерывается на_kill_now
,
Дайте мне знать, если это поможет вам каким-то образом или возможно ли улучшить эту суть.
РЕДАКТИРОВАТЬ: Кажется, вам не нужно монтировать исправления многопроцессорного модуля, чтобы автоматически запустить регистратор. С использованием .pth
в вашей установке Python вы можете использовать переменную окружения для автоматического запуска вашего регистратора на новых процессах:
# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
import atexit
from coverage_logger import CoverageLoggerThread
thread_cov = CoverageLoggerThread(main=False)
thread_cov.start()
def close_cov()
thread_cov.shutdown()
thread_cov.join()
atexit.register(close_cov)
Затем вы можете начать регистратор покрытия с COVERAGE_LOGGER_START=1 python main.y
Поскольку вы готовы запускать свой код по-другому для теста, почему бы не добавить способ завершить процесс теста? Кажется, это будет проще, чем пытаться взломать освещение.
Вы можете использовать пиразит напрямую со следующими двумя программами.
# start.py
import sys
import coverage
sys.cov = cov = coverage.coverage()
cov.start()
И этот
# stop.py
import sys
sys.cov.stop()
sys.cov.save()
sys.cov.html_report()
Другим способом будет отслеживание программы с использованием lptrace, даже если она печатает только вызовы, это может быть полезно.