Аккорд динамически созданной группы не называется

Недавно узнал о сельдерее task.replace и пытаясь использовать его силу, я столкнулся с загадочной ситуацией, которая согласно документации Celery (и различным вопросам Pull-Requests/SO, лежащим в Интернете) - должна сработать.
Допустим, у меня есть следующий код:

from celery import Celery, group
app = Celery(broker='pyamqp://', backend='redis://localhost:6379/0')

@app.task
def to_char(seq):
    return map(chr, seq)

@app.task
def flatten(seqs):
    final = list()
    for seq in seqs:
        final.extend(seq)
    return final

@app.task(bind=True)
def combine(self):
    sig = group([to_char.s([97]), to_char.s([98, 98])]) | flatten.s()
    raise self.replace(sig)

Я, очевидно, немного упростил проблему - но даже в этом простом случае, когда я звоню combine.delay().get() Я ожидаю получить [a, b, b],
Вместо того, чтобы получить результат, вызов get() зависает на неопределенный срок и просматривает журналы рабочих - я вижу оба звонка to_char получение и завершение успешно, но без вызова flatten,

Когда я пытаюсь выполнить его по отдельности (например, в консоли Python), он работает и возвращает ожидаемые результаты, поэтому проблема определенно заключается в моем понимании механизма task.replace:

g = group([to_char.s([97]), to_char.s([98, 98])])
c = g | flatten.s()
c.delay().get() # -> [a, b, b]

Большое спасибо за то, кто может пролить свет на проблему!

0 ответов

Другие вопросы по тегам