Как вызвать и обработать эти события Spring Kafka
В моем проекте Spring Boot, где у меня есть несколько потребителей Spring Kafka, я добавил несколько слушателей событий, чтобы следить за состоянием этих потребителей. Вот код:
@Component
public class ApplicationContextListeningService {
@EventListener
public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
}
Кто-нибудь знает, при каких обстоятельствах эти события будут выброшены (и, возможно, как я могу вручную вызвать эти события для целей тестирования)? А также для последних трех событий (ConsumerStoppedEvent, ListenerContainerIdleEvent и NonResponsiveConsumerEvent), когда я получаю одно из них, необходимо ли вмешательство человека для решения проблемы (например, перезапуск серверов для повторного создания потребителей)? Спасибо!
1 ответ
Вы можете эмулировать их все, введя фабрику потребителей Mock в контейнер.
ConsumerStoppedEvent
испускается, когда выstop()
контейнер.ListenerContainerIdleEvent
просто означает, что вidleEventInterval
так что обычно это не означает, что есть проблема.NonResponsiveConsumerEvent
- сложно сказать; со старшими клиентамиpoll()
будет блокироваться, если сервер не работает, поэтому мы не можем генерировать события простоя (или делать что-либо).
Я не знаю, сможете ли вы получить их с более новыми клиентами; но чтобы смоделировать это вам просто нужно заблокировать в макете потребителя poll()
Метод достаточно длинный, чтобы задача монитора могла обнаружить проблему и выдать событие.