NATS постоянное сообщение Java Client
У кого-нибудь есть опыт использования потокового сервера NATS в сочетании с клиентом Java? В частности, я не могу понять, как получать сообщения с помощью Java-клиента, которые отправляются, когда подписчик находится в автономном режиме.
С помощью клиента Go я могу опубликовать сообщение, а затем добавить подписку для получения всех опубликованных сообщений. Это находится в документации NATS Streaming Getting Started и работает как рекламируется.
Опубликуйте несколько сообщений. Для каждой публикации вы должны получить результат.
$ cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples
go run stan-pub.go foo "msg one"
Published [foo] : 'msg one'
$ go run stan-pub.go foo "msg two"
Published [foo] : 'msg two'
$ go run stan-pub.go foo "msg three"
Published [foo] : 'msg three'
Запустите клиент подписчика. Используйте флаг --all для получения всех опубликованных сообщений.
$ go run stan-sub.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196
Я пытаюсь сделать это с помощью клиента NATS Java. Я не могу понять, просто ли я не нахожу аналогичные вызовы методов или эта функция не существует в клиенте Java.
Вот что я пробовал
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Constants;
import io.nats.client.Message;
import io.nats.client.SyncSubscription;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class NatsTest2 {
private static final SecureRandom random = new SecureRandom();
public static void main(String... args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory(Constants.DEFAULT_URL);
try (final Connection conn = factory.createConnection()) {
// Simple Async Subscriber
final String expectMessage = "Yum, cookies " + System.currentTimeMillis();
works(conn, expectMessage);
broken(conn, expectMessage);
}
}
private static void works(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
conn.publish(queue, expectMessage.getBytes());
subscribe(subscription);
}
}
private static void broken(Connection conn, String expectMessage) throws IOException, TimeoutException {
final String queue = Long.toString(random.nextLong());
System.out.print(queue + "=>");
conn.publish(queue, expectMessage.getBytes());
try (final SyncSubscription subscription = conn.subscribeSync(queue)) {
subscribe(subscription);
}
}
private static void subscribe(SyncSubscription subscription) throws IOException, TimeoutException {
final Message message = subscription.nextMessage(1, TimeUnit.SECONDS);
System.out.println(new String(message.getData()));
}
}
Это дает вывод
-8522002637987832314=>Yum, cookies 1473462495040
-3024385525006291780=>Exception in thread "main" java.util.concurrent.TimeoutException: Channel timed out waiting for items
1 ответ
Если вы используете nats-streaming-server
, вам нужно использовать java-nats-streaming client. Функция, которую вы ищете (подписка на исторические сообщения), существует только в этом клиенте.
Несмотря на это, вот почему вы видели, что вы сделали с jnats
клиент:
nats-streaming-server
в настоящее время встраивает сервер NATS (gnatsd
) и, следовательно, разрешает стандартную функциональность NATS для обычных клиентов NATS, что вы и видите.
В вашем примере кода, works()
случается, потому что ваша подписка уже создана до публикации сообщения (другими словами, ваша try-with-resources
Блок гарантирует, что подписка уже активна, прежде чем что-либо еще происходит). Поэтому вы на самом деле не получаете сообщение, которое было опубликовано в прошлом; вы получаете сообщение, которое было опубликовано после начала подписки.
broken()
пример не работает, потому что он на самом деле публикует сообщение до того, как подписка создана, и сообщение отбрасывается сервером, потому что нет никакого интереса (пока).