Комбу / Сельдерей обмен сообщениями
У меня есть простое приложение, которое отправляет и получает сообщения, kombu, и использует Celery для задания сообщения. Комбу Алон, я могу получить сообщение правильно. когда я посылаю "Привет", Комбу получает "Привет". Но когда я добавил задачу, комбу получает идентификатор задачи сельдерея.
Моя цель для этого проекта заключается в том, чтобы я мог планировать, когда отправлять и получать сообщения, следовательно, Celery.
Я хотел бы знать, почему комбу получает идентификатор задачи вместо отправленного сообщения? Я искал и искал и не нашел никаких связанных результатов по этому вопросу. Я новичок в использовании этих приложений, и я был бы признателен за помощь в решении этого вопроса.
Мои коды:
task.py
from celery import Celery
app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')
@app.task(name='task.add')
def add(x, y):
return x+y
send.py
import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
if connection.connect() is False:
print("not connected")
else:
print("connected")
#checks if connection is okay
#rabbitmq connection
channel = connection.channel()
#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
#message here
x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)
#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
exchange = exchange,
routing_key='queue1')
print("Message sent: [x]")
connection.release()
receive.py
import kombu
#receive
connection = kombu.Connection(hostname='xx',
userid='xx',
password='xx',
virtual_host='xx')
connection.connect()
channel = connection.channel()
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')
print("Waiting for messages...")
def callback(body, message):
print('Got message - %s' % body)
message.ack()
consumer = kombu.Consumer(channel,
queues=queue,
callbacks=[callback])
consumer.consume()
while True:
connection.drain_events()
Я использую:
Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker
Что я отправил:
xxx
yyy
Что получает комбу:
Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a
2 ответа
Я нашел ответ на свою проблему, и всем, кто может столкнуться с такой проблемой, я поделюсь ответом, который помог мне.
Или здесь - пользователь jennaliu ответ, возможно, поможет вам, если первая ссылка не работает.
Вам нужно позвонить result.get()
получить реальную стоимость add.delay()
, То, что вы видите как тело сообщения AsyncResult
экземпляр в строковом формате. Что не имеет особого смысла.