Сбой RabbitMQ Consumer при получении сообщения MQTT

Я пытаюсь опубликовать сообщение MQTT и получить сообщение с потребителем AMQP с помощью плагина RabbitMQ-MQTT в Ubuntu14.04. Я публикую сообщение MQTT с пакетом Mosquitto-clients. Я включил плагин MQTT для RabbitMQ.

Теперь, если я хочу отправить сообщение MQTT, мой потребительский код AMQP выдает исключение:

Traceback (most recent call last):
   File "consume_topic.py", line 33, in <module>
channel.start_consuming()
   File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 722, in start_consuming
self.connection.process_data_events()
   File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
if self._handle_read():
   File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
super(BlockingConnection, self)._handle_read()
   File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 308, in _handle_read
self._on_data_available(data)
   File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1134, in _on_data_available
consumed_count, frame_value = self._read_frame()
   File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1201, in _read_frame
return frame.decode_frame(self._frame_buffer)
   File "/usr/local/lib/python2.7/dist-packages/pika/frame.py", line 254, in decode_frame
out = properties.decode(frame_data[12:])
   File "/usr/local/lib/python2.7/dist-packages/pika/spec.py", line 2479, in decode
(self.headers, offset) = data.decode_table(encoded, offset)
   File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 106, in decode_table
value, offset = decode_value(encoded, offset)
   File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 174, in decode_value
raise exceptions.InvalidFieldTypeException(kind)
pika.exceptions.InvalidFieldTypeException: b

Мой код пользователя Pika (python) выглядит следующим образом:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='topic',durable=False)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
   channel.queue_bind(exchange='logs',
                      queue=queue_name,
                      routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Мой файл конфигурации RabbitMQ выглядит следующим образом:

[{rabbit,        [{tcp_listeners,    [5672]}]},
 {rabbitmq_mqtt, [{default_user,     <<"guest">>},
                  {default_pass,     <<"guest">>},
                  {allow_anonymous,  true},
                  {vhost,            <<"/">>},
                  {exchange,         <<"logs">>},
                  {subscription_ttl, 1800000},
                  {prefetch,         10},
                  {ssl_listeners,    []},
                  %% Default MQTT with TLS port is 8883
                  %% {ssl_listeners,    [8883]}
                  {tcp_listeners,    [1883]},
                  {tcp_listen_options, [binary,
                                        {packet,    raw},
                                        {reuseaddr, true},
                                        {backlog,   128},
                                        {nodelay,   true}]}]}
].

Файл журнала показывает следующее:

=INFO REPORT==== 14-Apr-2015::10:57:50 ===
accepting AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672)

=INFO REPORT==== 14-Apr-2015::10:58:30 ===
accepting MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)

=WARNING REPORT==== 14-Apr-2015::10:58:30 ===
closing AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672):
connection_closed_abruptly

=INFO REPORT==== 14-Apr-2015::10:58:30 ===
closing MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)

Кто-нибудь может мне помочь? Я погуглил "pika.exceptions.IvalidFieldTypeException" и обнаружил, что я не использую правильный "тип поля", как это?

1 ответ

Решение

Это, скорее всего, ошибка в спецификациях (декодер) для Пика. Я бы порекомендовал вам изменить библиотеку на что-то более часто обновляемое. В качестве примера вы можете взглянуть на автора новой библиотеки pika RabbitPy или моей собственной библиотеки AMQP-Storm, вдохновленной пиками.

Хотя, возможно, вы используете очень старую версию Pika. Я нашел этот коммит от gmr, который должен был исправить вашу проблему. Вы можете попробовать перейти на pika 0.9.14.

Другие вопросы по тегам