pyzmq recv_json не может декодировать сообщение, отправленное send_json
Вот мой код с извлеченными посторонними вещами:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
worker.py
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(
(os.getpid(), True)
)
что происходит, когда я запускаю это:
process_id, val = socket.recv_json()
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
return jsonapi.loads(msg)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
return jsonmod.loads(s, **kwargs)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
return _default_decoder.decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
obj, end = self.raw_decode(s)
File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)
и если я копаюсь с ipdb:
> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
379 msg = self.recv(flags)
--> 380 return jsonapi.loads(msg)
381
ipdb> p msg
'\x00\x9f\xd9\x06\xa2'
хм, это не похоже на JSON... это ошибка в pyzmq? я использую это неправильно?
1 ответ
Хм, ок, нашел ответ.
Существует раздражающая асимметрия в интерфейсе ØMQ, поэтому вы должны знать, какой тип сокета вы используете.
В этом случае мое использование архитектуры ROUTER/DEALER означает, что сообщение JSON, отправленное из сокета DEALER, когда я это сделаю send_json
, завернутый в конверт из нескольких сообщений. Первая часть - это идентификатор клиента (думаю, это '\x00\x9f\xd9\x06\xa2'
что я получил выше), а вторая часть - это строка JSON, которая нас интересует.
Так что в последней строке моего координатора.py мне нужно сделать это вместо этого:
id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)
ИМХО, это плохой дизайн со стороны ØMQ/pyzmq, библиотека должна абстрагировать это и иметь только send
а также recv
методы, которые просто работают.
Я получил ответ на этот вопрос. Как я могу использовать send_json с pyzmq PUB SUB, чтобы, похоже, архитектура PUB/SUB имела ту же проблему, и, несомненно, другие.
Это описано в документации, но не очень понятно
http://zguide.zeromq.org/page:all
Обновить
Фактически, я обнаружил, что в моем случае я мог бы еще больше упростить код, напрямую используя часть "client id" конверта сообщения. Так что работник просто делает:
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid()) # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))
socket.send_json(True)
Также стоит отметить, что когда вы хотите отправить сообщение в другом направлении, от ROUTER, вы должны отправить его как составное, указав, для какого клиента оно предназначено, например:
coordinator.py
context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
pids = set()
while True:
event = poller.poll(1)
if not event:
continue
process_id, val = socket.recv_json()
pids.add(process_id)
# need some code in here to decide when to stop listening
# and break the loop
for pid in pids:
socket.send_multipart([pid, 'a string message'])
# ^ do your own json encoding if required
Я полагаю, что, возможно, существует некоторый ØMQ способ сделать широковещательное сообщение, а не отправлять каждому клиенту в цикле, как я делал выше. Я хотел бы, чтобы в документах было четкое описание каждого доступного типа сокета и как их использовать.