Как настроить публичного брокера Kafka с использованием динамического DNS?

Я настроил Kafka Cluster с 3 брокерами, используя 3 Zookeepers вместе с каждым брокером. На рисунке ниже представлено графическое представление моего кластера.

Тестирование производителя и потребителя в одной сети с использованием хоста 192.168.0.10 отлично работал через kafka-console-producer а также kafka-console-consumer команды.

Исходя из этого контекста, когда я пытаюсь произвести некоторые данные через kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets через интернет я получаю следующую ошибку:

[2018-12-10 09: 59: 20,772] ОШИБКА Ошибка при отправке сообщения в тему twitter_tweets с ключом: null, значением: 16 байт с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Истекающие 1 запись (и) для twitter_tweets-1: 1505 мс прошло с момента создания пакета плюс время ожидания [2018-12-10 09:59:22,273] WARN [Producer clientId=console-produser] Соединение с узлом 1 не может быть установлено. Брокер может быть недоступен. (Org.apache.kafka.clients.NetworkClient)

Слушатели брокера настраиваются со следующими свойствами:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443

Очевидно, что IP-адрес изменился в каждом брокере для advertised.listeners имущество. я использую CentOS 6.10 а также Kafka 2.0.1 для этой установки. Тест Telnet сработал. Еще один перенаправленный на порт Kafka REST Proxy работает через Интернет и перечисляет все темы.

1 ответ

Решение

Смотрите https://rmoff.net/2018/08/02/kafka-listeners-explained/

Вам нужны два слушателя - один отвечает и рекламирует внутренние адреса, другой - внешний.

Главное, чтобы прослушиватель, к которому подключается ваш клиент, возвращал адрес хоста и порт этого прослушивателя.

В данный момент вы переключаете внешний на внутренний, и ваш внешний трафик, таким образом, попадает во внутренний слушатель.

Вам нужно что-то вроде этого (изменение IP/ имени хоста aws_internal_listener по требованию брокера):

KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092

KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener

Тогда ваш портовый экспедитор для DYNAMIC_DNS_ADDR следует перенаправить соединения на 29092 на узле AWS. Ключевым моментом является то, что внешние соединения не должны заканчиваться портом слушателя на хосте, соответствующем внутреннему слушателю (который объявляет внутренний 192.168.0 адрес)

использование kafkacat -L -b DYNAMIC_DNS_ADDR:29092 для отладки и проверки вашего конфига, как описано в статье здесь.

Другие вопросы по тегам