ZeroMQ PULL получает все сообщения в очереди

У меня есть большое (~100) количество "клиентов" PUSH, которые периодически (~1/ с) отправляют сообщение на "сервер" PULL. Сервер обрабатывает сообщения в пакетном режиме со скоростью ~2 пакета в секунду. Размер пакета не является фиксированным - в идеале я хотел бы обрабатывать все сообщения в очереди на каждой итерации. Мой код для получения всех сообщений в очереди выглядит следующим образом:

buffer = []
while True:
  try:
    buffer.append(socket.recv_pyobj(zmq.NOBLOCK))
  except zmq.ZMQError as e:
    break
print(len(buffer))

Кажется, это работает, но длина буфера сильно варьируется между итерациями. Обычно я ожидаю увидеть ~50 сообщений, прочитанных каждый раз. Тем не менее, я часто вижу много итераций с <10 ​​сообщениями, а затем одно с очень большим числом (~1000). Я подозреваю, что я делаю что-то не так с буферизацией или чем-то в этом роде, поскольку этого не должно происходить.

1 ответ

Высокодинамичные изменения на принимающей стороне?

Мой подозреваемый PULL с честной очередью, встроенной в .Context() низкоуровневый информационный насос, который обслуживает (следует) этот обязательный рабочий цикл для всех .connect() сверстники

Если вы не в состоянии как-то выровнять задержки, с которыми кормление PUSH -представители отправляют свои соответствующие сообщения, цикл получения, находящийся в честной очереди, точно так же, как вы сообщили, проявит тех, кто не PUSH "вовремя" их следующее сообщение.

Другие вопросы по тегам