Как подключить очереди к ZeroMQ PUB/SUB

Учтите следующее:

  • набор из 3 логических сервисов: S1, S2 а также S3
  • два экземпляра каждого сервиса работают, поэтому у нас есть следующие процессы: S1P1, S1P2, S2P1, S2P2, S3P1, S3P2
  • ZeroMQ брокер работает в одном процессе и доступен всем сервисным процессам

Логичный сервис, скажем так S1 публикует сообщение M1 что представляет интерес для логических сервисов S2 а также S3, Только один процесс каждого логического сервиса должен получить M1 так скажем S2P1 а также S3P2,

Я пробовал следующее, но безуспешно:

  • посредник потока 1 работает XSUB/XPUB полномочие
  • посредник потока 2 работает ROUTER/DEALER прокси с ROUTER подключен к XPUB сокет и подписался на все (для логики S1)
  • посредник потока 3 работает ROUTER/DEALER прокси с ROUTER подключен к XPUB сокет и подписался на все (для логики S2)
  • посредник потока 4 работает ROUTER/DEALER прокси с ROUTER подключил к сокету XPUB и подписался на все (для логики S3)
  • каждый логический процесс обслуживания выполняется REP резьба сокета, подключенная к брокеру DEALER разъем

Я понял, что XSUB/XPUB прокси даст мне семантику публикации / подписки и что ROUTER/DEALER Прокси будут вводить конкуренцию между REP сокеты для сообщений, отправленных XSUB/XPUB прокси.

Как я могу совместить ZeroMQ сокеты для этого?

Update1

Я знаю, что "без успеха" не поможет, я пробовал разные конфигурации и получал разные ошибки. Последняя конфигурация, которую я попробовал, следующая:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

CopyLoop выглядит так:

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

Исключение, которое я получаю, следующее:

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

Я использую JeroMQ 0.3.4, Oracle Java 8 JVM и Windows 7.

2 ответа

Решение

Вы, кажется, добавляете некоторую сложность с вашим ROUTER подключение - вы должны иметь возможность делать все, что связано непосредственно с вашим издателем.

Ошибка, с которой вы сейчас сталкиваетесь, заключается в том, что REQ сокеты имеют строгий порядок сортировки сообщений - вам не разрешено send() дважды подряд, вы должны отправить / получить / отправить / получить / и т. д. (аналогично, REP сокеты должны получать / отправлять / получать / отправлять / и т. д.). Исходя из того, как это выглядит, вы просто делаете отправку / отправку / отправку / и т. Д. REQ сокет, никогда не получая ответ. Если вас не волнует ответ вашего сверстника, вы должны получить и отказаться от него или использовать DEALER (или же ROUTER, но DEALER имеет больше смысла в вашей текущей диаграмме).

Я создал диаграмму того, как я смог бы выполнить эту архитектуру ниже - используя вашу базовую структуру процесса.

Broker T1         Broker T2                Broker T3                Broker T4
(PUB*)------>(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)
       |_____________________||____|                  ||    |                  ||
       |_____________________||_______________________||____|                  ||
                             ||                       ||                       ||
     ========================||     ==================||            ===========||=
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
(REP*)         (REP*)          (REP*)          (REP*)          (REP*)          (REP*)
 S1P1           S1P2            S2P1            S2P2            S3P1            S3P2

Итак, главное отличие в том, что я бросил (SUB copyLoop=> REQ) шаг. Выбираете ли вы XPUB/XSUB против PUB/SUB зависит от вас, но я бы начал проще, если вы в настоящее время не хотите использовать дополнительные функции XPUB/XSUB,

Очевидно, что эта диаграмма не имеет отношения к тому, как информация поступает к вашему брокеру, где вы в настоящее время показываете XSUB сокет - это выходит за рамки предоставленной вами информации, возможно, вы уже можете успешно получать информацию в своем брокере, поэтому я не буду с этим сталкиваться.

Я предполагаю, что потоки вашего брокера, предназначенные для каждой службы, делают разумный выбор относительно того, отправлять сообщение их службе или нет? Если это так, то ваш выбор подписки на все должно работать нормально, в противном случае могут потребоваться более интеллектуальные настройки подписки.

Если вы используете REP Если процесс сокета находится в ваших сервисных процессах, то сервисный процесс должен принимать это сообщение и обрабатывать его асинхронно, никогда не сообщая брокеру какие-либо подробности об этом сообщении. Затем он должен ответить на каждое сообщение подтверждением (например, "ПОЛУЧЕНО"), чтобы оно следовало строгому шаблону приема / отправки / получения / отправки для REP Розетки.

Если вам нужен какой-либо другой тип связи о том, как служба обрабатывает это сообщение, отправленное обратно брокеру, REP больше не подходит тип сокета для ваших сервисных процессов, и DEALER может больше не быть правильным типом сокета для вашего брокера. Если вам нужна какая-то форма балансировки нагрузки для отправки в следующий открытый процесс обслуживания, вам необходимо использовать ROUTER/REQ и чтобы каждая служба указывала свою доступность, и чтобы брокер удерживал сообщение, пока следующий процесс службы не сообщит, что он доступен, отправив результаты обратно. Если вам нужен какой-то другой тип обработки сообщений, вам нужно будет указать, какая именно архитектура может быть предложена.

Очевидно, я перепутал несколько элементов:

  • Сокеты имеют один и тот же API, независимо от того, используете ли вы его в качестве сокета на стороне клиента (Socket.connect) или сокет на стороне сервера (Socket.bind)
  • Сокеты имеют одинаковый API независимо от типа (например, Socket.subscribe не должен вызываться на PUSH разъем)
  • Некоторые типы сокетов требуют петли ответа отправки / получения (например, REQ/REP)
  • Некоторые нюансы в шаблонах общения (PUSH/PULL против ROUTER/DEALER)
  • Сложность (невозможность?) В отладке настройки ZeroMQ

Так что большое спасибо Джейсону за его невероятно подробный ответ (и потрясающую диаграмму!), Который указал мне верное направление.

Я закончил со следующим дизайном:

  • Поток 1 брокера выполняет разветвление XSUB/XPUB прокси на bind(localhost:6000) а также bind(localhost:6001)
  • посредник потока 2 работает в очереди SUB/PUSH прокси на connect(localhost:6001) а также bind(localhost:6002); потоки брокера 3 и 4 используют аналогичную конструкцию с разными номерами портов привязки
  • производители сообщений подключаются к брокеру, используя PUB розетка на connect(localhost:6000)
  • потребители сообщений подключаются к посреднику очереди посредника с помощью PULL розетка на connect(localhost:6002)

Вдобавок к этому сервисно-ориентированному механизму очередей, я смог довольно просто добавить подобный сервисно-ориентированный механизм разветвления:

  • посредник потока запускает SUB/PUB прокси на connect(localhost:6001) а также bind(localhost:6003)
  • производители сообщений по-прежнему подключаются к брокеру, используя PUB розетка на connect(localhost:6000)
  • потребители сообщений подключаются к прокси-серверу посредника, используя SUB розетка на connect(localhost:6003)

Это была интересная поездка.

Другие вопросы по тегам