Как отправить сообщения RabbitMQ актеру Pykka?

ОБНОВЛЕНИЕ Авг, 2015: Для людей, желающих использовать обмен сообщениями, в настоящее время я бы порекомендовал zeromq. Может использоваться в дополнение или в качестве полной замены pykka.

Как я могу прослушать очередь RabbitMQ для сообщений и затем переслать их субъекту в Pykka?

В настоящее время, когда я пытаюсь это сделать, у меня странное поведение, и система останавливается.

Вот как я реализовал своего актера:

class EventListener(eventlet.EventletActor):
    def __init__(self, target):
        """
        :param pykka.ActorRef target: Where to send the queue messages.
        """
        super(EventListener, self).__init__()

        self.target = target

    def on_start(self):
        ApplicationService.listen_for_events(self.actor_ref)

И вот мой метод внутри ApplicationService класс, который должен проверять очередь на наличие новых сообщений:

@classmethod
def listen_for_events(cls, actor):
    """
    Subscribe to messages and forward them to the given actor.
    """    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test')
    def callback(ch, method, properties, body):
        message = pickle.loads(body)
        actor.tell(message)

    channel.basic_consume(callback, queue='test', no_ack=True)
    channel.start_consuming()            

Это похоже на start_consuming блокирует на неопределенный срок. Есть ли способ, которым я могу периодически "опрашивать" очередь?

1 ответ

Решение

Весь ваш код выглядит правильно для меня. Если вы хотите проверить очередь, используемую каждым актером, вы можете проверить их actor_inbox свойство доступно по ссылке актера, возвращенной из Actor#start,

Я столкнулся с аналогичными проблемами при наследовании от EventletActor поэтому, чтобы проверить, я попробовал тот же код, используя EventletActor и используя ThreadingActor, Насколько я могу судить по исходному коду, который они оба используют eventlet делать работу. ThreadingActor прекрасно работает для меня, но EventletActor не работает с ActorRef#tellработает с ActorRef#ask,

Я начал с двух файлов в одном каталоге, как показано ниже.

my_actors.py: Инициализирует двух участников, которые будут отвечать на сообщения, печатая содержимое сообщения, перед которым стоит имя их класса.

from pykka.eventlet import EventletActor
import pykka


class MyThreadingActor(pykka.ThreadingActor):
    def __init__(self):
        super(MyThreadingActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyThreadingActor Received: {message}".format(
                message=message)
        )


class MyEventletActor(EventletActor):
    def __init__(self):
        super(MyEventletActor, self).__init__()

    def on_receive(self, message):
        print(
            "MyEventletActor Received: {message}".format(
                message=message)
        )


my_threading_actor_ref = MyThreadingActor.start()
my_eventlet_actor_ref = MyEventletActor.start()

my_queue.py: Устанавливает очередь в pika, отправляет сообщение в очередь, которое пересылается ранее настроенным двум акторам. После того, как каждому субъекту сообщают о сообщении, его входной почтовый ящик текущего участника проверяется на наличие чего-либо в очереди.

from my_actors import my_threading_actor_ref, my_eventlet_actor_ref
import pika


def on_message(channel, method_frame, header_frame, body):
    print "Received Message", body
    my_threading_actor_ref.tell({"msg": body})
    my_eventlet_actor_ref.tell({"msg": body})

    print "ThreadingActor Inbox", my_threading_actor_ref.actor_inbox
    print "EventletActor Inbox", my_eventlet_actor_ref.actor_inbox

    channel.basic_ack(delivery_tag=method_frame.delivery_tag)


queue_name = 'test'
connection = pika.BlockingConnection()

channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(on_message, queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body='A Message')

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()

    # It is very important to stop these actors, otherwise you may lockup
    my_threading_actor_ref.stop()
    my_eventlet_actor_ref.stop()
connection.close()

Когда я бегу my_queue.py вывод выглядит следующим образом:

Полученное сообщение Сообщение

Входящие ThreadingActor <Queue.Queue instance at 0x10bf55878>

MyThreadingActor получил: {'msg': 'A Message'}

EventletActor Inbox <Queue maxsize=None queue=deque([{'msg': 'A Message'}]) tasks=1 _cond=<Event at 0x10bf53b50 result=NOT_USED _exc=None _waiters[0]>>

Когда я ударил CTRL+C чтобы остановить очередь, я заметил, что EventletActor наконец получает сообщение и печатает его:

^CMyEventletActor получил: {'msg': 'A Message'}

Все это заставляет меня поверить, что в EventletActorЯ думаю, что ваш код в порядке, и существует ошибка, которую я не смог найти в коде при первой проверке.

Я надеюсь, что эта информация поможет.

Другие вопросы по тегам