Redis передает одно сообщение каждому потребителю с помощью Java

Я пытаюсь реализовать Java-приложение с потоками Redis, где каждый потребитель потребляет ровно одно сообщение. Как конвейер / очередь, где каждый потребитель берет ровно одно сообщение, обрабатывает его и после завершения принимает следующее сообщение, которое до сих пор не было обработано в потоке. Что работает, так это то, что каждое сообщение используется ровно одним потребителем (с xreadgroup).

Я начал с этого урока из redislabs

Код:

    RedisClient redisClient = RedisClient.create("redis://pw@host:port");
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisCommands<String, String> syncCommands = connection.sync();

    try {
        syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
    } catch (RedisBusyException redisBusyException) {
        System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
    }

    System.out.println("Waiting for new messages ");

    while (true) {
        List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
                Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

        if (!messages.isEmpty()) {
            System.out.println(messages.size()); // 
            for (StreamMessage<String, String> message : messages) {
                System.out.println(message.getId());
                Thread.sleep(5000);
                syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
            }
        }

    }

Моя текущая проблема заключается в том, что потребитель берет более одного сообщения из очереди, а в некоторых ситуациях другие потребители ждут, и один потребитель обрабатывает 10 сообщений одновременно.

Заранее спасибо!

1 ответ

Решение

Обратите внимание, что XREADGROUP может получитьCOUNT аргумент.

См. JavaDoc, как это сделать в Lettuce xreadgroup, передав XReadArgs.

Другие вопросы по тегам