Паттерны постоянства zeromq
Кто должен управлять стойкой в ZeroMQ?
Когда мы используем клиенты ZeroMQ на языке Python, какие плагины / модули доступны для управления постоянными?
Я хотел бы знать шаблоны для использования ZeroMQ.
3 ответа
Насколько я знаю, у Zeromq нет никакой настойчивости. Это выходит за рамки его применения и должно обрабатываться конечным пользователем. Так же, как сериализация сообщения. В C# я использовал db4o, чтобы добавить постоянство. Обычно я сохраняю объект в исходном состоянии, затем сериализую его и отправляю в сокет ZMQ. Кстати, это было для пары PUB/SUB.
В конце приложения вы можете сохраниться соответствующим образом, например, я создал слой постоянства в node.js, который связывался с внутренними вызовами php и через веб-сокеты.
Аспект персистентности удерживал сообщения в течение определенного периода времени (http://en.wikipedia.org/wiki/Time_to_live), чтобы дать клиентам возможность подключиться. Я использовал структуры данных в памяти, но мне нравилась идея использовать Redis для получения устойчивости на диске.
Нам нужно было сохранить полученные сообщения от подписчика перед их обработкой. Сообщения принимаются в отдельном потоке и хранятся на диске, в то время как в основном потоке манипулируется с сохраненной очередью сообщений.
Модуль доступен по адресу: https://pypi.org/project/persizmq. Из документации:
import pathlib
import zmq
import persizmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)
def on_exception(exception: Exception)->None:
print("an exception in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(
callback=storage.add_message, subscriber=subscriber,
on_exception=on_exception):
msg = storage.front() # non-blocking
if msg is not None:
print("Received a persistent message: {}".format(msg))
storage.pop_front()