Цепочка Задач Сельдерея и Доступ **kwargs
У меня есть ситуация, аналогичная описанной здесь, за исключением того, что вместо цепочки задач с несколькими аргументами я хочу объединить задачи, которые возвращают словарь с несколькими записями.
Это - очень свободно и абстрактно - то, что я пытаюсь сделать:
tasks.py
@task()
def task1(item1=None, item2=None):
item3 = #do some stuff with item1 and item2 to yield item3
return_object = dict(item1=item1, item2=item2, item3=item3)
return return_object
def task2(item1=None, item2=None, item3=None):
item4 = #do something with item1, item2, item3 to yield item4
return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
return return_object
Работая с ipython, я могу вызывать task1 индивидуально и асинхронно, без проблем.
Я также могу вызывать task2 индивидуально с результатом, возвращаемым task1 в качестве аргумента двойной звезды:
>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
Однако в конечном итоге я хочу добиться того же конечного результата, что и выше, но через цепочку, и здесь я не могу понять, как создать экземпляр task2 не с (позиционными) аргументами, возвращаемыми task1, а с помощью task1.result as **kwargs:
chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK!
Я подозреваю, что могу вернуться и переписать свои задачи, чтобы они возвращали позиционные аргументы вместо словаря, и это может прояснить ситуацию, но мне кажется, что должен быть какой-то способ доступа к объекту возврата task1 в task2 с эквивалентным функциональность ** двойной звезды. Я также подозреваю, что мне не хватает чего-то достаточно очевидного в реализации подзадач Celery или в *args vs. **kwargs.
Надеюсь, это имеет смысл. И заранее спасибо за любые советы.
3 ответа
chain
и другие холст примитивы находятся в семействе функциональных утилит, таких как map
а также reduce
,
Например где map(target, items)
звонки target(item)
для каждого элемента в списке Python имеет редко используемую версию карты, которая называется itertools.starmap
, который вместо этого называет target(*item)
,
Хотя мы могли бы добавить starchain
и даже kwstarchain
для инструментария, они будут очень специализированными и, вероятно, будут использоваться не так часто.
Интересно, что Python сделал это ненужным с помощью выражений списка и генератора, так что map заменяется на [target(item) for item in item]
и starmap с [target(*item) for item in item]
,
Таким образом, вместо реализации нескольких альтернатив для каждого примитива, я думаю, что нам следует сосредоточиться на поиске более гибкого способа поддержки этого, например, при наличии выражений генератора на основе сельдерея (если это возможно, а если не что-то столь же мощное)
Это мой взгляд на проблему с использованием абстрактного класса задач:
from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app
class ChainedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], dict):
kwargs.update(args[0])
args = ()
return super(ChainedTask, self).__call__(*args, **kwargs)
@app.task(base=ChainedTask)
def task1(x, y):
return {'x': x * 2, 'y': y * 2, 'z': x * y}
@app.task(base=ChainedTask)
def task2(x, y, z):
return {'x': x * 3, 'y': y * 3, 'z': z * 2}
Теперь вы можете определить и выполнить свою цепочку следующим образом:
from celery import chain
pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
Так как это не встроено в сельдерей, я сам написал функцию декоратора для чего-то подобного.
# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.
class UnwrapMe(object):
def __init__(self, contents):
self.contents = contents
def __call__(self):
return self.contents
def wrap_for_chain(f):
""" Too much deep magic. """
@functools.wraps(f)
def _wrapper(*args, **kwargs):
if type(args[0]) == UnwrapMe:
args = list(args[0]()) + list(args[1:])
result = f(*args, **kwargs)
if type(result) == tuple and current_task.request.callbacks:
return UnwrapMe(result)
else:
return result
return _wrapper
Мой разворачивается как starchain
концепция, но вы можете легко изменить его, чтобы развернуть kwargs вместо этого.