Аккорд динамически созданной группы не называется
Недавно узнал о сельдерее task.replace
и пытаясь использовать его силу, я столкнулся с загадочной ситуацией, которая согласно документации Celery (и различным вопросам Pull-Requests/SO, лежащим в Интернете) - должна сработать.
Допустим, у меня есть следующий код:
from celery import Celery, group
app = Celery(broker='pyamqp://', backend='redis://localhost:6379/0')
@app.task
def to_char(seq):
return map(chr, seq)
@app.task
def flatten(seqs):
final = list()
for seq in seqs:
final.extend(seq)
return final
@app.task(bind=True)
def combine(self):
sig = group([to_char.s([97]), to_char.s([98, 98])]) | flatten.s()
raise self.replace(sig)
Я, очевидно, немного упростил проблему - но даже в этом простом случае, когда я звоню combine.delay().get()
Я ожидаю получить [a, b, b]
,
Вместо того, чтобы получить результат, вызов get()
зависает на неопределенный срок и просматривает журналы рабочих - я вижу оба звонка to_char
получение и завершение успешно, но без вызова flatten
,
Когда я пытаюсь выполнить его по отдельности (например, в консоли Python), он работает и возвращает ожидаемые результаты, поэтому проблема определенно заключается в моем понимании механизма task.replace
:
g = group([to_char.s([97]), to_char.s([98, 98])])
c = g | flatten.s()
c.delay().get() # -> [a, b, b]
Большое спасибо за то, кто может пролить свет на проблему!