java-nats-streaming: публикация сообщений после переподключения сервера
У меня есть потоковый кластер NATS с 3 узлами. Кажется, что сообщения NATS, публикуемые моим java-приложением во время простоя сервера, теряются (то есть не публикуются повторно, когда мои серверы снова работают и работают).
Более подробное описание:
- NATS кластер онлайн. Приложения издателя и подписчика приходят в онлайн. Издатель начинает публиковать сообщения каждую секунду. Подписчик получает сообщения.
- Серверы NATS отключены. Издатель продолжает публиковать сообщения (назовем эти сообщения "автономными сообщениями"). Абонент перестает получать что-либо
- Серверы NATS возвращаются в онлайн. Подписчик снова начинает получать сообщения, но "автономные сообщения" не принимаются.
Мои приложения издателя и подписчика настроены на попытку переподключения к серверу NATS и не имеют времени ожидания. Я не получаю никаких исключений во всем.
NATS соединение:
Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();
Connection nc = Nats.connect(options);
StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();
Издательство:
// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());
Абонент:
streamingConnection.subscribe(subject, new MessageHandler() {
public void onMessage(Message m) {
System.out.prinf("Received msg: %s\n", m.getData())
}
}, new SubscriptionOptions.Builder().durableName(durableName).build());
Судя по документам, клиент Java NATS имеет встроенный буфер переподключения. Я попытался увеличить буфер в 10 раз, но безрезультатно (также мои сообщения состоят только из 2-значных чисел). Как мне заставить его отправить эти "автономные сообщения"?
0 ответов
У меня та же проблема, единственное решение, которое я вижу, что другой метод подписки занят, сохранить последовательность сообщений, но это, я не думаю, является лучшим
// Receive messages starting at a specific sequence number
sc.subscribe("foo", new MessageHandler() {
public void onMessage(Message m) {
logger.info("Sequence message " + m.getSequence());
System.out.printf("Received a message: %s\n", m.getData());
}
}, new SubscriptionOptions.Builder().startAtSequence(22).build());