Как использовать Gevent с Kombu
Я пишу основной скрипт публикации-подписки на python, используя kombu и rabbitmq вместе с gevent. Как мы можем переключить контекст на издателя, когда потребитель ожидает в очереди, пока не прибудет сообщение? Я пытался использовать gevent.sleep(), но в моем случае это не работает
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue May 30 12:15:56 2017
@author: kartikkillawala
"""
from kombu import Connection,Exchange,Queue,Producer
from kombu.mixins import ConsumerMixin
import gevent
from gevent import Timeout
class Worker(ConsumerMixin):
def __init__(self,connection,queues):
self.connection=connection
self.queues=queues
def get_consumers(self,Consumer,channel):
return([Consumer(queues=self.queues,callbacks= self.on_message] )])
def on_message(self,body,message):
print("Got Message: {0}".format(body))
message.ack()
self.should_stop=True
gevent.sleep(1)
class MessageVerify:
exchange_name = "Test-Exchange"
exchange_type = "direct"
queue_name = "Test-Queue"
routing_key = "ABC"
connection_producer = None
channel_producer = None
exchange_producer = None
producer = None
consumer = None
queue = None
worker = None
def __init__(self,rabbitmq1):
self.connection_producer = Connection(hostname=rabbitmq1['host'],userid=rabbitmq1['username'],password=rabbitmq1['password'])
self.channel_producer = self.connection_producer.channel()
self.exchange_producer = Exchange(name =self.exchange_name,type=self.exchange_type)
self.queue = Queue(name=self.queue_name,exchange=self.exchange_producer,routing_key=self.routing_key)
self.queue.maybe_bind(self.connection_producer)
self.queue.declare()
def sendMessage(self,routing_key,data):
self.producer = Producer(channel=self.channel_producer,exchange=self.exchange_producer,routing_key=routing_key)
self.producer.publish(data,routing_key=routing_key)
print("[p]Published Message",data,'with routing key',routing_key)
gevent.sleep(2)
def receiveMessage(self,routing_key,timeout=120):
try:
print ("Worker run")
timer = Timeout(timeout)
timer.start()
gevent.sleep(2)
self.worker = Worker(self.connection_producer,self.queue)
self.worker.run()
except Timeout:
print("Connection Timeout")
rabbitmq1 = {
"username":"guest",
"password":"guest",
"host":"localhost"}
msg_verify = MessageVerify(rabbitmq1)
threads = [gevent.spawn(msg_verify.receiveMessage("ABC",timeout=120)),gevent.spawn(msg_verify.sendMessage("ABC","Sample Message"))]
gevent.joinall(threads)
print ("After joinall")