Почему возвращаемый параметр моей цепочки не применяется частично к следующей задаче?
Я пишу цепочку, которая преобразует некоторый JSON в URI на основе его содержимого, а затем помещает данные в этот URI. Я хотел бы сделать это асинхронно с Celery и знать, что группировка цепей позволит мне сделать это легко.
У меня есть следующие задачи написано:
import time
from celery import group, chain
from celery.utils.log import get_task_logger
from app import celery
logger = get_task_logger(__name__)
@celery.task
def create_uri(obj_json, endpoint):
uri = "{0}:{1}/{2}".format(
obj_json["host"],
obj_json["port"],
endpoint
)
logger.debug("Created host {0} from {1}".format(uri, obj_json))
return uri
@celery.task
def send_post(uri, data):
logger.debug("Posting {0} to {1}...".format(data, uri))
return uri
def send_messages(objs, endpoint, data):
chains = [
# The next line is causing problems.
(create_uri.s(obj, endpoint) | send_post.s(data))
for obj in objs
]
g = group(*chains)
res = g.apply_async(queue="default")
while not res.ready():
time.sleep(1)
uris = res.get()
print("Posted to {0}".format(uris))
return uris
Я нахожу, однако, когда я пытаюсь использовать это, create_uri
немного цепей заканчивается, но send_post
никогда не вызывается в моей цепи. Это странно, потому что я слежу за документами о цепочках и, на самом деле, почти следую приведенному здесь примеру об избежании синхронных заданий.
Я работаю с
celery worker -A celery_worker.celery -l debug -c 5 -Q default
где celery_worker
просто толкает контекст приложения и импортирует app.celery
,
и мой конфиг выглядит так:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Строка из моих журналов:
[2017-01-09 12: 35: 59,298: DEBUG / MainProcess] TaskPool: Apply (args:(app.tasks.create_uri ',' 197f4836-1cd8-4f7f-adaf-b8cebdb304ef ', {' timelimit ': [Нет, None], 'group': None, 'parent_id': None, 'retries': 0, 'argsrepr': "({'port': 8079, 'host': 'localhost'}, 'start')", 'lang': 'py', 'eta': нет, 'expires': нет, 'delivery_info': {'routing_key': 'default', 'priority': 0, 'redelivered': None, 'exchange': ''}, 'kwargsrepr': '{}', 'task': 'app.tasks.create_uri', 'root_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'correlation_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'origin': 'foobar', 'reply_to': '6559d43e-6cae-3b6f-89be-7b80e2a43098', 'id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef'}, b [ [{"port": 8079, "host": "localhost"}, "start"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": [{ "task": "app.tasks.send_post", "subtask_type": null, "options": {"group_id": "60c2c9b2-eb51-457d-b248-b8e5552e0fd8", "task_id":... kwargs: {})
Когда я печатаю chains[0].tasks
, Я вижу это:
(app.tasks.create_uri({'host': 'localhost', 'port': 8079}, 'start'),
app.tasks.send_post({'hello': 'world'}))
Признавая, что send_post
является следующей задачей в цепочке, но эта задача никогда не принимается.
Почему мой group
висеть после окончания первых заданий в цепочке?
1 ответ
Вы правильно создаете цепочки и группы. Однако задачи, отправленные в недопустимые очереди, не будут распознаваться работниками. Когда вы делаете .get()
на них они нависают навсегда, так как это никогда не даст результата.
Таким образом, вы можете использовать по умолчанию celery
очередь
res = g.apply_async().get()
# explicit
res = g.apply_async(queue="celery").get()
Или правильно настройте маршрутизацию, а затем используйте пользовательскую очередь.
res = g.apply_async(queue='foo').get()