pyzmq recv_json не может декодировать сообщение, отправленное send_json

Вот мой код с извлеченными посторонними вещами:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()

worker.py

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(
    (os.getpid(), True)
)

что происходит, когда я запускаю это:

    process_id, val = socket.recv_json()
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
    return jsonapi.loads(msg)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
    return jsonmod.loads(s, **kwargs)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
    return _default_decoder.decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
    obj, end = self.raw_decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
    raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)

и если я копаюсь с ipdb:

> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
    379             msg = self.recv(flags)
--> 380             return jsonapi.loads(msg)
    381

ipdb> p msg
'\x00\x9f\xd9\x06\xa2'

хм, это не похоже на JSON... это ошибка в pyzmq? я использую это неправильно?

1 ответ

Решение

Хм, ок, нашел ответ.

Существует раздражающая асимметрия в интерфейсе ØMQ, поэтому вы должны знать, какой тип сокета вы используете.

В этом случае мое использование архитектуры ROUTER/DEALER означает, что сообщение JSON, отправленное из сокета DEALER, когда я это сделаю send_json, завернутый в конверт из нескольких сообщений. Первая часть - это идентификатор клиента (думаю, это '\x00\x9f\xd9\x06\xa2' что я получил выше), а вторая часть - это строка JSON, которая нас интересует.

Так что в последней строке моего координатора.py мне нужно сделать это вместо этого:

id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)

ИМХО, это плохой дизайн со стороны ØMQ/pyzmq, библиотека должна абстрагировать это и иметь только send а также recv методы, которые просто работают.

Я получил ответ на этот вопрос. Как я могу использовать send_json с pyzmq PUB SUB, чтобы, похоже, архитектура PUB/SUB имела ту же проблему, и, несомненно, другие.

Это описано в документации, но не очень понятно
http://zguide.zeromq.org/page:all

Обновить

Фактически, я обнаружил, что в моем случае я мог бы еще больше упростить код, напрямую используя часть "client id" конверта сообщения. Так что работник просто делает:

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid())  # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(True)

Также стоит отметить, что когда вы хотите отправить сообщение в другом направлении, от ROUTER, вы должны отправить его как составное, указав, для какого клиента оно предназначено, например:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

pids = set()
while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()
    pids.add(process_id)

    # need some code in here to decide when to stop listening
    # and break the loop

for pid in pids:
    socket.send_multipart([pid, 'a string message'])
    # ^ do your own json encoding if required

Я полагаю, что, возможно, существует некоторый ØMQ способ сделать широковещательное сообщение, а не отправлять каждому клиенту в цикле, как я делал выше. Я хотел бы, чтобы в документах было четкое описание каждого доступного типа сокета и как их использовать.

Другие вопросы по тегам