Как связать задачу Celery, которая возвращает список в группу?
Я хочу создать группу из списка, возвращаемого задачей Celery, чтобы для каждого элемента в наборе результатов задачи в группу была добавлена одна задача.
Вот простой пример кода для объяснения варианта использования. ???
должен быть результатом предыдущего задания.
@celery.task
def get_list(amount):
# In reality, fetch a list of items from a db
return [i for i in range(amount)]
@celery.task
def process_item(item):
#do stuff
pass
process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))
Я, вероятно, не подхожу к этому правильно, но я почти уверен, что вызывать задачи изнутри не безопасно:
@celery.task
def process_list():
for i in get_list.delay().get():
process_item.delay(i)
Мне не нужен результат из задачи секунд.
1 ответ
Решение
Вы можете получить такое поведение, используя промежуточную задачу. Вот демонстрация создания метода, подобного "карте", который работает так, как вы предлагали.
from celery import task, subtask, group
@task
def get_list(amount):
return [i for i in range(amount)]
@task
def process_item(item):
# do stuff
pass
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
# runs process_item for each item in the return of get_list
process_list = (get_list.s(10) | dmap.s(process_item.s()))
Благодарю Спросить Солема за то, что он дал мне это предложение, когда я попросил его помочь по аналогичной проблеме.