Как вызвать и обработать эти события Spring Kafka

В моем проекте Spring Boot, где у меня есть несколько потребителей Spring Kafka, я добавил несколько слушателей событий, чтобы следить за состоянием этих потребителей. Вот код:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

Кто-нибудь знает, при каких обстоятельствах эти события будут выброшены (и, возможно, как я могу вручную вызвать эти события для целей тестирования)? А также для последних трех событий (ConsumerStoppedEvent, ListenerContainerIdleEvent и NonResponsiveConsumerEvent), когда я получаю одно из них, необходимо ли вмешательство человека для решения проблемы (например, перезапуск серверов для повторного создания потребителей)? Спасибо!

1 ответ

Решение

Вы можете эмулировать их все, введя фабрику потребителей Mock в контейнер.

  • ConsumerStoppedEvent испускается, когда вы stop() контейнер.
  • ListenerContainerIdleEvent просто означает, что в idleEventInterval так что обычно это не означает, что есть проблема.
  • NonResponsiveConsumerEvent - сложно сказать; со старшими клиентами poll() будет блокироваться, если сервер не работает, поэтому мы не можем генерировать события простоя (или делать что-либо).

Я не знаю, сможете ли вы получить их с более новыми клиентами; но чтобы смоделировать это вам просто нужно заблокировать в макете потребителя poll() Метод достаточно длинный, чтобы задача монитора могла обнаружить проблему и выдать событие.

Другие вопросы по тегам