java-nats-streaming: публикация сообщений после переподключения сервера

У меня есть потоковый кластер NATS с 3 узлами. Кажется, что сообщения NATS, публикуемые моим java-приложением во время простоя сервера, теряются (то есть не публикуются повторно, когда мои серверы снова работают и работают).

Более подробное описание:

  1. NATS кластер онлайн. Приложения издателя и подписчика приходят в онлайн. Издатель начинает публиковать сообщения каждую секунду. Подписчик получает сообщения.
  2. Серверы NATS отключены. Издатель продолжает публиковать сообщения (назовем эти сообщения "автономными сообщениями"). Абонент перестает получать что-либо
  3. Серверы NATS возвращаются в онлайн. Подписчик снова начинает получать сообщения, но "автономные сообщения" не принимаются.

Мои приложения издателя и подписчика настроены на попытку переподключения к серверу NATS и не имеют времени ожидания. Я не получаю никаких исключений во всем.

NATS соединение:

Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();

Connection nc = Nats.connect(options);

StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();

Издательство:

// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());

Абонент:

streamingConnection.subscribe(subject, new MessageHandler() {
    public void onMessage(Message m) {
        System.out.prinf("Received msg: %s\n", m.getData())
    }
},  new SubscriptionOptions.Builder().durableName(durableName).build());

Судя по документам, клиент Java NATS имеет встроенный буфер переподключения. Я попытался увеличить буфер в 10 раз, но безрезультатно (также мои сообщения состоят только из 2-значных чисел). Как мне заставить его отправить эти "автономные сообщения"?

0 ответов

У меня та же проблема, единственное решение, которое я вижу, что другой метод подписки занят, сохранить последовательность сообщений, но это, я не думаю, является лучшим

   // Receive messages starting at a specific sequence number
   sc.subscribe("foo", new MessageHandler() {
   public void onMessage(Message m) {
     logger.info("Sequence message " +  m.getSequence());
     System.out.printf("Received a message: %s\n", m.getData());
   }
   }, new SubscriptionOptions.Builder().startAtSequence(22).build());
Другие вопросы по тегам