Как подключить очереди к ZeroMQ PUB/SUB
Учтите следующее:
- набор из 3 логических сервисов:
S1
,S2
а такжеS3
- два экземпляра каждого сервиса работают, поэтому у нас есть следующие процессы:
S1P1
,S1P2
,S2P1
,S2P2
,S3P1
,S3P2
-
ZeroMQ
брокер работает в одном процессе и доступен всем сервисным процессам
Логичный сервис, скажем так S1
публикует сообщение M1
что представляет интерес для логических сервисов S2
а также S3
, Только один процесс каждого логического сервиса должен получить M1
так скажем S2P1
а также S3P2
,
Я пробовал следующее, но безуспешно:
- посредник потока 1 работает
XSUB/XPUB
полномочие - посредник потока 2 работает
ROUTER/DEALER
прокси сROUTER
подключен кXPUB
сокет и подписался на все (для логикиS1
) - посредник потока 3 работает
ROUTER/DEALER
прокси сROUTER
подключен кXPUB
сокет и подписался на все (для логикиS2
) - посредник потока 4 работает
ROUTER/DEALER
прокси сROUTER
подключил к сокету XPUB и подписался на все (для логикиS3
) - каждый логический процесс обслуживания выполняется
REP
резьба сокета, подключенная к брокеруDEALER
разъем
Я понял, что XSUB/XPUB
прокси даст мне семантику публикации / подписки и что ROUTER/DEALER
Прокси будут вводить конкуренцию между REP
сокеты для сообщений, отправленных XSUB/XPUB
прокси.
Как я могу совместить ZeroMQ
сокеты для этого?
Update1
Я знаю, что "без успеха" не поможет, я пробовал разные конфигурации и получал разные ошибки. Последняя конфигурация, которую я попробовал, следующая:
(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP
CopyLoop выглядит так:
public void start() {
context = ZMQ.context(1);
subSocket = context.socket(ZMQ.SUB);
subSocket.connect(subSocketUrl);
subSocket.subscribe("".getBytes());
reqSocket = context.socket(ZMQ.REQ);
reqSocket.connect(reqSocketUrl);
while (!Thread.currentThread().isInterrupted()) {
final Message msg = receiveNextMessage();
resendMessage(msg);
}
}
private Message receiveNextMessage() {
final String header = subSocket.recvStr();
final String entity = subSocket.recvStr();
return new Message(header, entity);
}
private void resendMessage(Message msg) {
reqSocket.sendMore(msg.getKey());
reqSocket.send(msg.getData(), 0);
}
Исключение, которое я получаю, следующее:
java.lang.IllegalStateException: Cannot send another request
at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]
Я использую JeroMQ 0.3.4, Oracle Java 8 JVM и Windows 7.
2 ответа
Вы, кажется, добавляете некоторую сложность с вашим ROUTER
подключение - вы должны иметь возможность делать все, что связано непосредственно с вашим издателем.
Ошибка, с которой вы сейчас сталкиваетесь, заключается в том, что REQ
сокеты имеют строгий порядок сортировки сообщений - вам не разрешено send()
дважды подряд, вы должны отправить / получить / отправить / получить / и т. д. (аналогично, REP
сокеты должны получать / отправлять / получать / отправлять / и т. д.). Исходя из того, как это выглядит, вы просто делаете отправку / отправку / отправку / и т. Д. REQ
сокет, никогда не получая ответ. Если вас не волнует ответ вашего сверстника, вы должны получить и отказаться от него или использовать DEALER
(или же ROUTER
, но DEALER
имеет больше смысла в вашей текущей диаграмме).
Я создал диаграмму того, как я смог бы выполнить эту архитектуру ниже - используя вашу базовую структуру процесса.
Broker T1 Broker T2 Broker T3 Broker T4
(PUB*)------>(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*)
|_____________________||____| || | ||
|_____________________||_______________________||____| ||
|| || ||
========================|| ==================|| ===========||=
|| || || || || ||
|| || || || || ||
|| || || || || ||
(REP*) (REP*) (REP*) (REP*) (REP*) (REP*)
S1P1 S1P2 S2P1 S2P2 S3P1 S3P2
Итак, главное отличие в том, что я бросил (SUB copyLoop=> REQ)
шаг. Выбираете ли вы XPUB/XSUB
против PUB/SUB
зависит от вас, но я бы начал проще, если вы в настоящее время не хотите использовать дополнительные функции XPUB/XSUB
,
Очевидно, что эта диаграмма не имеет отношения к тому, как информация поступает к вашему брокеру, где вы в настоящее время показываете XSUB
сокет - это выходит за рамки предоставленной вами информации, возможно, вы уже можете успешно получать информацию в своем брокере, поэтому я не буду с этим сталкиваться.
Я предполагаю, что потоки вашего брокера, предназначенные для каждой службы, делают разумный выбор относительно того, отправлять сообщение их службе или нет? Если это так, то ваш выбор подписки на все должно работать нормально, в противном случае могут потребоваться более интеллектуальные настройки подписки.
Если вы используете REP
Если процесс сокета находится в ваших сервисных процессах, то сервисный процесс должен принимать это сообщение и обрабатывать его асинхронно, никогда не сообщая брокеру какие-либо подробности об этом сообщении. Затем он должен ответить на каждое сообщение подтверждением (например, "ПОЛУЧЕНО"), чтобы оно следовало строгому шаблону приема / отправки / получения / отправки для REP
Розетки.
Если вам нужен какой-либо другой тип связи о том, как служба обрабатывает это сообщение, отправленное обратно брокеру, REP
больше не подходит тип сокета для ваших сервисных процессов, и DEALER
может больше не быть правильным типом сокета для вашего брокера. Если вам нужна какая-то форма балансировки нагрузки для отправки в следующий открытый процесс обслуживания, вам необходимо использовать ROUTER/REQ
и чтобы каждая служба указывала свою доступность, и чтобы брокер удерживал сообщение, пока следующий процесс службы не сообщит, что он доступен, отправив результаты обратно. Если вам нужен какой-то другой тип обработки сообщений, вам нужно будет указать, какая именно архитектура может быть предложена.
Очевидно, я перепутал несколько элементов:
- Сокеты имеют один и тот же API, независимо от того, используете ли вы его в качестве сокета на стороне клиента (
Socket.connect
) или сокет на стороне сервера (Socket.bind
) - Сокеты имеют одинаковый API независимо от типа (например,
Socket.subscribe
не должен вызываться наPUSH
разъем) - Некоторые типы сокетов требуют петли ответа отправки / получения (например,
REQ/REP
) - Некоторые нюансы в шаблонах общения (
PUSH/PULL
противROUTER/DEALER
) - Сложность (невозможность?) В отладке настройки ZeroMQ
Так что большое спасибо Джейсону за его невероятно подробный ответ (и потрясающую диаграмму!), Который указал мне верное направление.
Я закончил со следующим дизайном:
- Поток 1 брокера выполняет разветвление
XSUB/XPUB
прокси наbind(localhost:6000)
а такжеbind(localhost:6001)
- посредник потока 2 работает в очереди
SUB/PUSH
прокси наconnect(localhost:6001)
а такжеbind(localhost:6002)
; потоки брокера 3 и 4 используют аналогичную конструкцию с разными номерами портов привязки - производители сообщений подключаются к брокеру, используя
PUB
розетка наconnect(localhost:6000)
- потребители сообщений подключаются к посреднику очереди посредника с помощью
PULL
розетка наconnect(localhost:6002)
Вдобавок к этому сервисно-ориентированному механизму очередей, я смог довольно просто добавить подобный сервисно-ориентированный механизм разветвления:
- посредник потока запускает
SUB/PUB
прокси наconnect(localhost:6001)
а такжеbind(localhost:6003)
- производители сообщений по-прежнему подключаются к брокеру, используя
PUB
розетка наconnect(localhost:6000)
- потребители сообщений подключаются к прокси-серверу посредника, используя
SUB
розетка наconnect(localhost:6003)
Это была интересная поездка.