Сбой RabbitMQ Consumer при получении сообщения MQTT
Я пытаюсь опубликовать сообщение MQTT и получить сообщение с потребителем AMQP с помощью плагина RabbitMQ-MQTT в Ubuntu14.04. Я публикую сообщение MQTT с пакетом Mosquitto-clients. Я включил плагин MQTT для RabbitMQ.
Теперь, если я хочу отправить сообщение MQTT, мой потребительский код AMQP выдает исключение:
Traceback (most recent call last):
File "consume_topic.py", line 33, in <module>
channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 722, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 308, in _handle_read
self._on_data_available(data)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1134, in _on_data_available
consumed_count, frame_value = self._read_frame()
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1201, in _read_frame
return frame.decode_frame(self._frame_buffer)
File "/usr/local/lib/python2.7/dist-packages/pika/frame.py", line 254, in decode_frame
out = properties.decode(frame_data[12:])
File "/usr/local/lib/python2.7/dist-packages/pika/spec.py", line 2479, in decode
(self.headers, offset) = data.decode_table(encoded, offset)
File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 106, in decode_table
value, offset = decode_value(encoded, offset)
File "/usr/local/lib/python2.7/dist-packages/pika/data.py", line 174, in decode_value
raise exceptions.InvalidFieldTypeException(kind)
pika.exceptions.InvalidFieldTypeException: b
Мой код пользователя Pika (python) выглядит следующим образом:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',type='topic',durable=False)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Мой файл конфигурации RabbitMQ выглядит следующим образом:
[{rabbit, [{tcp_listeners, [5672]}]},
{rabbitmq_mqtt, [{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{allow_anonymous, true},
{vhost, <<"/">>},
{exchange, <<"logs">>},
{subscription_ttl, 1800000},
{prefetch, 10},
{ssl_listeners, []},
%% Default MQTT with TLS port is 8883
%% {ssl_listeners, [8883]}
{tcp_listeners, [1883]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
{backlog, 128},
{nodelay, true}]}]}
].
Файл журнала показывает следующее:
=INFO REPORT==== 14-Apr-2015::10:57:50 ===
accepting AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672)
=INFO REPORT==== 14-Apr-2015::10:58:30 ===
accepting MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)
=WARNING REPORT==== 14-Apr-2015::10:58:30 ===
closing AMQP connection <0.1174.0> (127.0.0.1:42447 -> 127.0.0.1:5672):
connection_closed_abruptly
=INFO REPORT==== 14-Apr-2015::10:58:30 ===
closing MQTT connection <0.1232.0> (127.0.0.1:53581 -> 127.0.0.1:1883)
Кто-нибудь может мне помочь? Я погуглил "pika.exceptions.IvalidFieldTypeException" и обнаружил, что я не использую правильный "тип поля", как это?
1 ответ
Это, скорее всего, ошибка в спецификациях (декодер) для Пика. Я бы порекомендовал вам изменить библиотеку на что-то более часто обновляемое. В качестве примера вы можете взглянуть на автора новой библиотеки pika RabbitPy или моей собственной библиотеки AMQP-Storm, вдохновленной пиками.
Хотя, возможно, вы используете очень старую версию Pika. Я нашел этот коммит от gmr, который должен был исправить вашу проблему. Вы можете попробовать перейти на pika 0.9.14.