Как настроить публичного брокера Kafka с использованием динамического DNS?
Я настроил Kafka Cluster с 3 брокерами, используя 3 Zookeepers вместе с каждым брокером. На рисунке ниже представлено графическое представление моего кластера.
Тестирование производителя и потребителя в одной сети с использованием хоста 192.168.0.10
отлично работал через kafka-console-producer
а также kafka-console-consumer
команды.
Исходя из этого контекста, когда я пытаюсь произвести некоторые данные через kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets
через интернет я получаю следующую ошибку:
[2018-12-10 09: 59: 20,772] ОШИБКА Ошибка при отправке сообщения в тему twitter_tweets с ключом: null, значением: 16 байт с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Истекающие 1 запись (и) для twitter_tweets-1: 1505 мс прошло с момента создания пакета плюс время ожидания [2018-12-10 09:59:22,273] WARN [Producer clientId=console-produser] Соединение с узлом 1 не может быть установлено. Брокер может быть недоступен. (Org.apache.kafka.clients.NetworkClient)
Слушатели брокера настраиваются со следующими свойствами:
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443
Очевидно, что IP-адрес изменился в каждом брокере для advertised.listeners
имущество. я использую CentOS 6.10
а также Kafka 2.0.1
для этой установки. Тест Telnet сработал. Еще один перенаправленный на порт Kafka REST Proxy работает через Интернет и перечисляет все темы.
1 ответ
Смотрите https://rmoff.net/2018/08/02/kafka-listeners-explained/
Вам нужны два слушателя - один отвечает и рекламирует внутренние адреса, другой - внешний.
Главное, чтобы прослушиватель, к которому подключается ваш клиент, возвращал адрес хоста и порт этого прослушивателя.
В данный момент вы переключаете внешний на внутренний, и ваш внешний трафик, таким образом, попадает во внутренний слушатель.
Вам нужно что-то вроде этого (изменение IP/ имени хоста aws_internal_listener
по требованию брокера):
KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092
KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener
Тогда ваш портовый экспедитор для DYNAMIC_DNS_ADDR
следует перенаправить соединения на 29092 на узле AWS. Ключевым моментом является то, что внешние соединения не должны заканчиваться портом слушателя на хосте, соответствующем внутреннему слушателю (который объявляет внутренний 192.168.0
адрес)
использование kafkacat -L -b DYNAMIC_DNS_ADDR:29092
для отладки и проверки вашего конфига, как описано в статье здесь.