Сельдерей - Индивидуальный набор результатов каждой группы в цепочке

У меня есть периодическое задание, которое дает мне вновь добавленные пользователи говорят каждые 100 секунд. Я хочу выполнить две разные задачи в списке пользователей, полностью независимых друг от друга. Я настроил две очереди, используя celery_routes. Когда обе задачи будут выполнены, я хочу, чтобы одна задача обновляла временную метку у пользователей, чтобы я не забрал их снова. Я заблокировал задачу, чтобы избежать дублирования пользователей в каждом запросе БД для новых пользователей. Это моя установка на высоком уровне.

@periodic_task(run_every=100)
def monitor_status():
    if __acquire_lock(): # to avoid duplicate users
        users = get_users() # DB query which returns me newly added users
        result = (group([task_A.s(user_id) for user_id in users]) |
                  group([task_B.s(user_id) for user_id in users]) |
                  set_last_updated.s(users) # update the timestamp and release lock .. it gives me new set of users everytime
                  )()

@task
def task_A(user_id): # have similair setup for task_B()
    try:
        # DO something
        return user_id
    except Exception as e:
        return -1

Эта установка работает хорошо. Проблема заключается в группировке результатов обоих наборов задач (групп). Скажем, БД возвращает [101,102] как список пользователей. Эта настройка возвращает мне список user_ids, например [101, 102, 101, 102] (потому что каждый набор задач возвращает идентификатор пользователя). Как я могу сгруппировать результат как [[101,102],[101,102]] ... [[group1_results],[group2_results]].

Я старался,

group1=group([task_A.s(user_id) for user_id in users]).apply_async()
group2=group([task_B.s(user_id) for user_id in users]).apply_async()

это позволяет мне получать результаты, используя get(), и я также могу поддерживать порядок, используя join(), но проблема в том, что я не знаю, как вызвать set_last_updated(), когда оба набора задач (группы) завершены. Я могу изменить тип возврата каждой задачи (на dict{} возможно) и перебрать большой список, чтобы определить статус задачи, но я думаю, что должен быть лучший способ. Я использую Redis в качестве брокера.

0 ответов

Другие вопросы по тегам