Подписчик Redis не может работать с издателем Redis
Теперь я использую Java для разработки Redis Pub/ Sub System и получил проблему. Я покажу вам детали:
Издатель здесь:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
Издатель правильный и может работать правильно.
Тогда давайте перейдем к классу подписчиков:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
В классе подписчика я использовал RxJava, чтобы внедрить Action, чтобы я мог использовать его намного проще.
Но вопрос здесь в том, что после того, как я опубликовал сообщение от издателя, я могу c, что сообщение может быть передано в метод onMessage, печать журнала не соответствует ожиданиям:
===> redis subscribe message in <===
===> action is null <===
Я ожидал, что когда я опубликую новое сообщение, его получит подписчик и запустит созданное мной действие.
Сервис, который я использовал для запуска издателя и подписчика ниже:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
Исходя из вышеприведенного кода, вы можете c подписаться на тему и установить новое действие для подписчика:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
Но я не знаю, почему после срабатывания издателя подписчик может получить сообщение, но, тем не менее, удерживать действие NULL, созданное мною действие не передается ему.
Кто-нибудь может помочь? Есть ли проблемы с этим механизмом?
==== =====EDIT
RedisMessageConfig код ниже:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
==== ==== решаемые
Наконец, я решил эту идею для каждого члена парламента, немного изменив myredismessagesscriber на myredismessageconfig, потому что поток идет от redismessageconfig к redismessagesubscriber, поэтому в redismessageconfig мне нужно сначала ввести в него действие, а затем redismessageconfig создаст новое действие redismessage и удержит подписку. Код ниже:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
1 ответ
Это не так MessageListener
предназначен для работы. Кроме того, вы создаете общее изменяемое состояние. Два одновременных вызова изменяют одновременно состояние RedisMessageSubscriber
,
Я предполагаю, что у вас возникли проблемы с видимостью при установке action
в одном потоке, а получение сообщений происходит в другом потоке.
Если вам требуется другое поведение в MessageListener
затем создайте несколько слушателей, которые реализуют это поведение.