Динамическое планирование с Celery Beat

Я создаю утилиту для тестирования, которая периодически запускает несколько тестов, некоторые из которых запланированы на ежедневную работу, некоторые ежечасно, некоторые еженедельно и т. Д. Я пытаюсь создать эту функцию на Celery Beat с брокером RabbitMQ. Однако список тестов, запускаемых в каждый такой период, может изменяться (тесты можно добавлять, редактировать или удалять). Мне нужно запланировать тесты в этом динамическом списке тестов.

Это то, что я до сих пор:

def test_list():
    tests = ['t1','t2','t4','t5','t7']
    return tests

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'schedule_task':{
            'task':'test_celery.tasks.test_scheduler',
            'schedule': timedelta(seconds=10),
            # 'args': (test_list(),)
        }
    }
)

@app.task
def test_scheduler():
    tests = test_list()
    for test in tests:
        print "RUNNING TEST {}".format(test)

Я попытался передать список тестов для запуска - (a) в качестве параметра в 'args' конфигурации CELERYBEAT_SCHEDULE, (b) в качестве вызова функции, возвращающей список тестов параметру args конфигурации CELERYBEAT_SCHEDULE, и (c) путем вызова функции test_list() в функции test_scheduler(). Кажется, что ни один из этих трех методов не может обработать список, который можно динамически обновлять (например, если я добавлю еще один тест - t8 - в список тестов, в идеале, при следующем запланированном запуске функции test_schedule(), Я хотел бы, чтобы "RUNNING TEST t8" печатался на консоли, без перезапуска Celery Beat.)

Обратите внимание, что планировщик не может быть остановлен / перезапущен, он должен работать все время.

Любое руководство приветствуется, спасибо заранее!

0 ответов

Другие вопросы по тегам