Покрытие Python на бесконечный процесс

У меня есть многопроцессорный веб-сервер с процессами, которые никогда не заканчиваются, я хотел бы проверить покрытие кода для всего проекта в реальной среде (не только из тестов).

Проблема в том, что поскольку процессы никогда не заканчиваются, у меня нет подходящего места для установки cov.start() cov.stop() cov.save() крючки.

Поэтому я подумал о создании потока, который в бесконечном цикле будет сохранять и объединять данные покрытия, а затем некоторое время спать, однако такой подход не работает, отчет о покрытии кажется пустым, за исключением строки ожидания.

Я был бы рад получить любые идеи о том, как получить покрытие моего кода, или любой совет о том, почему моя идея не работает. Вот фрагмент моего кода:

import coverage
cov = coverage.Coverage()
import time
import threading
import os

class CoverageThread(threading.Thread):
    _kill_now = False
    _sleep_time = 2

@classmethod
def exit_gracefully(cls):
    cls._kill_now = True

def sleep_some_time(self):
    time.sleep(CoverageThread._sleep_time)

def run(self):
    while True:
        cov.start()
        self.sleep_some_time()
        cov.stop()
        if os.path.exists('.coverage'):
            cov.combine()
        cov.save()
        if self._kill_now:
            break
    cov.stop()
    if os.path.exists('.coverage'):
        cov.combine()
    cov.save()
    cov.html_report(directory="coverage_report_data.html")
    print "End of the program. I was killed gracefully :)"

3 ответа

Решение

По-видимому, это не возможно контролировать coverage очень хорошо с несколькими Threads, После запуска другого потока остановка Coverage объект остановит все освещение и start только перезапустит его в "стартовой" ветке. Таким образом, ваш код в основном останавливает покрытие через 2 секунды для всех Thread кроме CoverageThread,

Я немного поиграл с API, и можно получить доступ к измерениям, не останавливая Coverage объект. Таким образом, вы можете запустить поток, который периодически сохраняет данные покрытия, используя API. Первая реализация будет что-то вроде этого

import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file

cov = Coverage(config_file=True)
cov.start()


def get_data_dict(d):
    """Return a dict like d, but with keys modified by `abs_file` and
    remove the copied elements from d.
    """
    res = {}
    keys = list(d.keys())
    for k in keys:
        a = {}
        lines = list(d[k].keys())
        for l in lines:
            v = d[k].pop(l)
            a[l] = v
        res[abs_file(k)] = a
    return res


class CoverageLoggerThread(threading.Thread):
    _kill_now = False
    _delay = 2

    def __init__(self, main=True):
        self.main = main
        self._data = CoverageData()
        self._fname = cov.config.data_file
        self._suffix = None
        self._data_files = CoverageDataFiles(basename=self._fname,
                                             warn=cov._warn)
        self._pid = os.getpid()
        super(CoverageLoggerThread, self).__init__()

    def shutdown(self):
        self._kill_now = True

    def combine(self):
        aliases = None
        if cov.config.paths:
            from coverage.aliases import PathAliases
            aliases = PathAliases()
            for paths in self.config.paths.values():
                result = paths[0]
                for pattern in paths[1:]:
                    aliases.add(pattern, result)

        self._data_files.combine_parallel_data(self._data, aliases=aliases)

    def export(self, new=True):
        cov_report = cov
        if new:
            cov_report = Coverage(config_file=True)
            cov_report.load()
        self.combine()
        self._data_files.write(self._data)
        cov_report.data.update(self._data)
        cov_report.html_report(directory="coverage_report_data.html")
        cov_report.report(show_missing=True)

    def _collect_and_export(self):
        new_data = get_data_dict(cov.collector.data)
        if cov.collector.branch:
            self._data.add_arcs(new_data)
        else:
            self._data.add_lines(new_data)
        self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
        self._data_files.write(self._data, self._suffix)

        if self.main:
            self.export()

    def run(self):
        while True:
            sleep(CoverageLoggerThread._delay)
            if self._kill_now:
                break

            self._collect_and_export()

        cov.stop()

        if not self.main:
            self._collect_and_export()
            return

        self.export(new=False)
        print("End of the program. I was killed gracefully :)")

Более стабильная версия может быть найдена в этом GIST. Этот код в основном захватывает информацию, собранную сборщиком, не останавливая ее. get_data_dict Функция взять словарь в Coverage.collector и вытолкнуть доступные данные. Это должно быть достаточно безопасно, чтобы вы не потеряли измерения.
Файлы отчета обновляются каждые _delay секунд.

Но если у вас запущено несколько процессов, вам нужно приложить дополнительные усилия, чтобы убедиться, что все процессы запускают CoverageLoggerThread, Это patch_multiprocessing функция, обезьяна исправлена ​​из coverage патч обезьяны...
Код находится в GIST. Он в основном заменяет оригинальный процесс пользовательским процессом, который запускает CoverageLoggerThread незадолго до запуска run метод и присоединиться к потоку в конце процесса. Сценарий main.py позволяет запускать различные тесты с потоками и процессами.

У этого кода есть 2/3 недостатков, о которых нужно позаботиться:

  • Это плохая идея использовать combine функция одновременно, поскольку он выполняет одновременный доступ на чтение / запись / удаление к .coverage.* файлы. Это означает, что функция export не супер безопасно. Все должно быть в порядке, поскольку данные реплицируются несколько раз, но я бы провел некоторое тестирование, прежде чем использовать его в производстве.

  • Как только данные были экспортированы, они остаются в памяти. Так что, если база кода огромна, она может съесть некоторые ресурсы. Можно выгрузить все данные и перезагрузить их, но я предположил, что если вы хотите регистрировать каждые 2 секунды, вам не нужно каждый раз перезагружать все данные. Если вы идете с задержкой в ​​минутах, я бы создал новый _data каждый раз, используя CoverageData.read_file перезагрузить предыдущее состояние покрытия для этого процесса.

  • Пользовательский процесс будет ждать _delay до окончания, как мы присоединяемся к CoverageThreadLogger в конце процесса, поэтому, если у вас много быстрых процессов, вы хотите увеличить степень детализации сна, чтобы иметь возможность быстрее определять конец процесса. Это просто нужно пользовательский цикл сна, который прерывается на _kill_now,

Дайте мне знать, если это поможет вам каким-то образом или возможно ли улучшить эту суть.


РЕДАКТИРОВАТЬ: Кажется, вам не нужно монтировать исправления многопроцессорного модуля, чтобы автоматически запустить регистратор. С использованием .pth в вашей установке Python вы можете использовать переменную окружения для автоматического запуска вашего регистратора на новых процессах:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

Затем вы можете начать регистратор покрытия с COVERAGE_LOGGER_START=1 python main.y

Поскольку вы готовы запускать свой код по-другому для теста, почему бы не добавить способ завершить процесс теста? Кажется, это будет проще, чем пытаться взломать освещение.

Вы можете использовать пиразит напрямую со следующими двумя программами.

# start.py
import sys
import coverage

sys.cov = cov = coverage.coverage()
cov.start()

И этот

# stop.py
import sys

sys.cov.stop()
sys.cov.save()
sys.cov.html_report()

Другим способом будет отслеживание программы с использованием lptrace, даже если она печатает только вызовы, это может быть полезно.

Другие вопросы по тегам