Как непрерывно прослушивать поток Redis с помощью библиотеки java lettuce
Я пытаюсь прослушивать поток Redis и обрабатывать сообщения по мере их поступления. Я использую команду async и ожидаю, что сообщение будет отправлено, а не извлечено. Поэтому я не думаю, что нужен цикл while. Но следующий код, похоже, не работает.
public static void main(String[] args) throws InterruptedException {
RedisClient redisClient = RedisClient
.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection
= redisClient.connect();
RedisAsyncCommands commands = connection.async();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.thenAccept(System.out::println);
Thread.currentThread().join();
}
Он просто печатает все, что есть в потоке при запуске программы, и не печатает сообщения, которые добавляются при запуске программы. Разве обратный вызов не должен вызываться для каждого нового сообщения, добавляемого в поток?
4 ответа
Я знаю, что этот вопрос немного устарел, но ответ может быть полезен для кого-то другого. Вы можете неоднократно подписываться на одно и то жеFlux
как показано ниже, и это сработало для меня сxread
. Я думаю, что то же самое должно работать дляxreadgroup
также.
RedisPubSubReactiveCommands<String, String> commands = connection.reactive();
commands.xread(new XReadArgs().block(Duration.ofSeconds(20)), XReadArgs.StreamOffset.from("some-stream", "$"))
.doOnNext(msg -> {
sink.tryEmitNext(msg.getBody().get("key"));
})
.repeat()
.subscribe();
Я думаю, вы должны использовать метод xgroupCreate для создания ссылки между потребителем и группой, иначе вы получите ошибку.
exception in thread "main" java.util.concurrent.ExecutionException: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at com.test.TestList.main(TestList.java:57)
Caused by: io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'my-stream1' or consumer group 'group1' in XREADGROUP with GROUP option
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:135)
at io.lettuce.core.ExceptionFactory.createExecutionException(ExceptionFactory.java:108)
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:120)
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:111)
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:381)
at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:211)
at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:289)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
пример кода следующий:
package com.test;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.util.List;
public class TestList {
public static void main(String[] args) throws Exception {
RedisClient redisClient = RedisClient.create("redis://localhost:6379/");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisAsyncCommands commands = connection.async();
RedisFuture<String> redisFuture = commands.xadd("my-stream1", "test", "1234");
String redisFutureGet = redisFuture.get();
System.out.println(redisFutureGet);
commands.xgroupCreate(StreamOffset.latest("my-stream1"), "group1", new XGroupCreateArgs()); // add a group pointing to the stream
RedisFuture<List<StreamMessage<String, String>>> messages = commands.xreadgroup(Consumer.from("group1", "my-stream1"),
StreamOffset.lastConsumed("my-stream1"));
List<StreamMessage<String, String>> res = messages.get();
System.out.println(res);
}
}
Я думаю, что Lettuce — это только ответ для связи с Redis, будь то синхронизация, асинхронность или потоковая передача, это низкоуровневая библиотека, поэтому, если вам нужна такая высокоуровневая функция, используя spinrg-данные, что-то вроде этого:
Для этого вы можете использовать реактивные команды Redis:
RedisReactiveCommands<String, String> commands = connection.reactive();
commands.xgroupCreate(StreamOffset.latest("my-stream"), "G1", new XGroupCreateArgs());
commands
.xreadgroup(Consumer.from("G1", "c1"), StreamOffset.lastConsumed("my-stream"))
.subscribe(System.out::println, Throwable::printStackTrace);