Модель подписки на события, основанная на событиях

Я работаю над требованием, согласно которому процессу (например, производителю) необходимо отправлять односторонние сообщения переменному числу процессов (например, потребителям).

Модель публикации-подписки казалась хорошей для этого, потому что потребители будут подписываться на сообщения от производителя. Я пытался использовать ZeroMQ для достижения этой цели.

Однако у меня есть несколько проблем с этим:

  1. Потребители должны постоянно опрашивать сообщения. Я хотел бы, чтобы потребители были уведомлены, когда появится новое сообщение.

  2. Существует возможность заполнения очереди производителя. Мне бы хотелось, чтобы производитель удалял сообщения из очереди на основании какого-либо условия (скажем, удаление сообщений старше 5 секунд или удаление сообщений, которые были прочитаны 5 раз).

  3. Поскольку потребители опрашивают, а сообщения не удаляются из очереди, потребители видят повторяющиеся сообщения до тех пор, пока не придет новое сообщение. Я хочу, чтобы потребитель был уведомлен только один раз для каждого нового сообщения.

Я понимаю, что могу использовать неправильную модель (публикация-подписка может не подходить). Я думал об использовании запроса-ответа, но это не работает, так как производитель не хочет отслеживать количество потребителей.

Кто-нибудь может предложить хорошую альтернативу?

4 ответа

Промежуточное программное обеспечение DDS (Служба распространения данных) поддерживает именно то, что вы пытаетесь достичь, и гораздо проще.

Непосредственно отвечая на ваши вопросы:

  1. DDS поддерживает механизм прослушивания, ваши подписчики не должны постоянно опрашивать.

  2. DDS имеет хорошие настройки QoS для предотвращения заполнения очередей издателя. Вы можете использовать History QoS, чтобы сказать "сохранить только последние 10 образцов в очереди", или вы можете использовать Lifespan QoS, чтобы сказать "сохранить только образцы, опубликованные за последние 10 секунд".

  3. Опять же, вы можете использовать механизм прослушивания DDS, и вы будете уведомлены только один раз для каждого нового образца. Нет необходимости в опросе.

В настоящее время существует две реализации с открытым исходным кодом.

Я предлагаю пойти на модель Push-Pull с брокером между продюсером и потребителем.

  1. Брокер должен быть уведомлен о любом новом сообщении.
  2. Потребители будут прислушиваться к уведомлениям брокеров (ведите таблицу для отслеживания успеха / неудачи. Поэтому при повторных попытках избежать дублирования)
  3. Как только #2 завершен, потребитель может получить данные от источника (источника) и отправить подтверждение брокеру для успеха / неудачи.

Надеюсь это поможет

Вам нужно больше, чем один производитель? Если нет, вы можете использовать PUSH/PULL вместо PUB/SUB.

С PUSH/PULL вы можете иметь столько потребителей, сколько хотите (они являются стороной PULL модели). Все сообщения, записанные в конечную точку PUSH, распределяются в круговом стиле среди всех подключенных потребителей. Это также гарантирует, что два потребителя не получат одно и то же сообщение.

Как вы описали, имея потребителей в качестве конечной точки SUB, вы можете в конечном итоге доставлять одно и то же сообщение более чем одному потребителю (если это будет проблемой в вашей модели), если два или более потребителей подписаны на один и тот же "префикс".

Предполагая, что префикс - это строка, которую вы передаете sock.setsockopt(ZMQ_SUBSCRIBE, "prefix", ...);

Попробуйте использовать JMS-провайдера или AMQP-провайдера. Вот некоторые из вещей, которые вы ищете с Темами:

  1. Push-уведомление для подписчиков.

  2. Атрибут времени жизни сообщений, который позволяет удалять сообщения или помещать их в очередь недоставленных сообщений, если они не используются в TTL.

  3. Одноразовое уведомление - в зависимости от вашей конфигурации.

Обратите внимание, что однократный обмен сообщениями имеет граничные условия в случае сбоя сети, что может привести к потере или дублированию сообщения... вы выбираете.

С точки зрения того, кто предоставляет, чтобы использовать. RabbitMQ популярен для AMQP. Для JMS существует любое количество проприетарных продуктов или реализаций с открытым исходным кодом.

Другие вопросы по тегам