Подписчик Redis не может работать с издателем Redis

Теперь я использую Java для разработки Redis Pub/ Sub System и получил проблему. Я покажу вам детали:

Издатель здесь:

public class RedisMessagePublisher implements MessagePublisher {

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
    this.redisTemplate = redisTemplate;
    this.topic = topic;
}

private StringRedisTemplate redisTemplate;

private ChannelTopic topic;

@Override
public void publish(String message) {
    redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

Издатель правильный и может работать правильно.

Тогда давайте перейдем к классу подписчиков:

public class RedisMessageSubscriber implements MessageListener {

//action inspect here
private Action2<Message, byte[]> action;

public void setAction(Action2<Message, byte[]> action) {
    logger.info("action set");
    this.action = action;
}

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);

@Override
public void onMessage(Message message, byte[] bytes) {

    logger.info("===> redis subscribe message in <===");

    if (action != null)
        action.call(message, bytes);
    else
        logger.info("===> action is null <===");
    }
}

В классе подписчика я использовал RxJava, чтобы внедрить Action, чтобы я мог использовать его намного проще.

Но вопрос здесь в том, что после того, как я опубликовал сообщение от издателя, я могу c, что сообщение может быть передано в метод onMessage, печать журнала не соответствует ожиданиям:

===> redis subscribe message in <===
===> action is null <===

Я ожидал, что когда я опубликую новое сообщение, его получит подписчик и запустит созданное мной действие.

Сервис, который я использовал для запуска издателя и подписчика ниже:

@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {

@Autowired
private RedisMessagePublisher redisMessagePublisher;

@Autowired
private RedisMessageSubscriber redisMessageSubscriber;

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);

@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
                                            HttpServletResponse response) {

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String result = objectMapper.readValue(message.getBody(), String.class);
                logger.info("receive:"+result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    });

    redisMessagePublisher.publish(message);

    return new ApiResponse("success","message sent");
    }
}

Исходя из вышеприведенного кода, вы можете c подписаться на тему и установить новое действие для подписчика:

 redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
    @Override
    public void call(Message message, byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String result = objectMapper.readValue(message.getBody(), String.class);
            logger.info("receive:"+result);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

Но я не знаю, почему после срабатывания издателя подписчик может получить сообщение, но, тем не менее, удерживать действие NULL, созданное мною действие не передается ему.

Кто-нибудь может помочь? Есть ли проблемы с этим механизмом?

==== =====EDIT

RedisMessageConfig код ниже:

@Configuration
public class RedisMessageConfig {

@Bean
ChannelTopic topic() {
    return new ChannelTopic("useraddresspubsub:queue");
}

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

@Autowired
private RedisConnectionFactory JedisConnectionFactory;

@Bean
RedisMessageListenerContainer redisContainer() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(JedisConnectionFactory);
    container.addMessageListener(messageListener(), topic());
    return container;
    }
}

==== ==== решаемые

Наконец, я решил эту идею для каждого члена парламента, немного изменив myredismessagesscriber на myredismessageconfig, потому что поток идет от redismessageconfig к redismessagesubscriber, поэтому в redismessageconfig мне нужно сначала ввести в него действие, а затем redismessageconfig создаст новое действие redismessage и удержит подписку. Код ниже:

@Component
public class MyRedisMessageConfig extends RedisMessageConfig {

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);

public MyRedisMessageConfig() {
    super.action = new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            String result = new String(message.getBody());
                logger.info("received:" + result);
            }
        };
    }
}

Снимок экрана ниже:

1 ответ

Решение

Это не так MessageListener предназначен для работы. Кроме того, вы создаете общее изменяемое состояние. Два одновременных вызова изменяют одновременно состояние RedisMessageSubscriber,

Я предполагаю, что у вас возникли проблемы с видимостью при установке action в одном потоке, а получение сообщений происходит в другом потоке.

Если вам требуется другое поведение в MessageListenerзатем создайте несколько слушателей, которые реализуют это поведение.

Другие вопросы по тегам