Кафка в докере не работает

Я пытаюсь использовать wurstmeister\kafka-docker изображение с docker-compose, но у меня есть реальные проблемы с подключением всего.

Все сообщения или вопросы, которые я проверяю, похоже, не имеют никаких проблем, но я откровенно потерян. (И есть как минимум два вопроса в SO, которые пытаются решить проблему)

Я считаю, что проблема заключается в моем плохом понимании сетей docker, Итак, проблема:

Я могу потреблять и производить из того же контейнера kaf ka, но, когда я пытаюсь создать другой контейнер (или использовать свой ноутбук с клиентом Python), я получил несколько ошибок, связанных с advertised.host.name параметр (на изображении этот параметр KAFKA_ADVERTISED_HOST_NAME)

Я уже пытался установить эту переменную многими способами, но она просто не работает.

Поэтому я ищу автору ответ (то есть, как автоматически установить эти параметры и что это значит), как установить docker-compose.yml

Это мое:

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"

kafka:
  image: wurstmeister/kafka
 # hostname: kafka
  ports:
    - "9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: "kafka"
    KAFKA_ADVERTISED_PORT: "9092"
    KAFKA_ZOOKEEPER_CONNECT: "zk:2181"

ОБНОВИТЬ

Следуя совету @dnephin, я изменил start-kafka.sh в следующих строках:

...
if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then
    export KAFKA_ADVERTISED_PORT=$(hostname -i)
fi
...

и удалить KAFKA_ADVERTISED_HOST_NAME: "kafka" от docker-compose.yml

Я начал контейнеры каноническим способом:

docker-compose up -d

Оба контейнера работают:

$ docker-compose ps
           Name                          Command               State                     Ports                    
-----------------------------------------------------------------------------------------------------------------
infraestructura_kafka_1       start-kafka.sh                   Up      0.0.0.0:32768->9092/tcp                    
infraestructura_zookeeper_1   /opt/zookeeper/bin/zkServe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp 

После этого я сделал:

docker-compose logs

И все идет гладко.

Для проверки IP-адресов:

$ KAFKA_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_kafka_1)                                                                                                            
$ echo $KAFKA_IP
172.17.0.4

and

$ ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_zookeeper_1)                                                                                                           
$ echo $ZK_IP 
172.17.0.3

Затем я выполняю в двух разных консолях:

Производитель:

$ docker run --rm --interactive wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092  

Потребитель:

$ docker run --rm --interactive  wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --topic grillo --from-beginning --zookeeper 172.17.0.3:2181 

Почти сразу предупреждения начинают летать по всему экрану:

[2016-03-11 00:39:17,010] WARN Fetching topic metadata with correlation id 0 for topics [Set(grillo)] from broker [BrokerEndPoint(1001,ba53d4fd7595,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-03-11 00:39:17,013] WARN [console-consumer-79688_9dd5f575d557-1457656747003-f1ed369d-leader-finder-thread], Failed to find leader for Set([grillo,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFin
derThread)
kafka.common.KafkaException: fetching topic metadata for topics [Set(grillo)] from broker [ArrayBuffer(BrokerEndPoint(1001,ba53d4fd7595,9092))] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:119)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        ... 3 more

и так далее

В консоли производителя я написал несколько предложений:

$ docker run --rm --interactive klustera/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092                                                           
Hola
¿Cómo estáń?
¿Todo bien?

И через несколько мгновений я получил такой ответ:

[2016-03-11 00:39:28,955] ERROR Error when sending message to topic grillo with key: null, value: 4 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-03-11 00:40:28,956] ERROR Error when sending message to topic grillo with key: null, value: 16 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-03-11 00:41:28,956] ERROR Error when sending message to topic grillo with key: null, value: 12 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

И в docker-compose logs

...
zookeeper_1 | 2016-03-11 00:39:07,072 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c
xid:0x2 zxid:0x47 txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers
zookeeper_1 | 2016-03-11 00:39:07,243 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c
xid:0x19 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners/grillo Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners/grillo
zookeeper_1 | 2016-03-11 00:39:07,247 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create $xid:0x1a zxid:0x4c txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners
...

ОБНОВЛЕНИЕ 2

Я заставил это работать, по крайней мере, в docker-machine:

Сначала я определил переменную с именем docker-machine:

DOCKER_VM=kafka_test

Затем я изменяю docker-compose.yml следующее:

KAFKA_ADVERTISED_HOST_NAME: "${DOCKER_MACHINE_IP}"

Наконец, в среде docker-machineЯ выполняю:

DOCKER_MACHINE_IP=$(docker-machine ip $DOCKER_VM) docker-compose up -d

Но в ноутбуке (я имею в виду, без использования виртуальной машины, это не работает)

11 ответов

Мое решение этой проблемы немного отличается. Я настраиваю Кафку для рекламы на kafka хост и, потому что он выставлен на хост-машине на localhost:9092Я добавляю запись в /etc/hosts за kafka разрешить localhost, При этом доступ к Kafka возможен как из других контейнеров Docker, так и из localhost.

докер-compose.yml:

  my-web-service:
    build: ./my-web-service
    ports:
     - "8000:8000"
    links:
     - kafka
  kafka:
    image: "wurstmeister/kafka:0.10.2.0"
    ports:
     - "9092:9092"
    hostname: kafka
    links: 
     - zookeeper
    environment:
     - KAFKA_ADVERTISED_HOST_NAME=kafka
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_ADVERTISED_PORT=9092
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

Обновленный файл hosts:

more /etc/hosts
127.0.0.1       localhost kafka

Для разработки приложения в localhost, есть решение в документации: "HOSTNAME_COMMAND"

kafka:
  image: wurstmeister/kafka
  ports:
    - 9092:9092
environment:
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"

Надеюсь, что это поможет другим...

Я считаю, что ценность, которую вы используете для KAFKA_ADVERTISED_HOST_NAME будет меняться в зависимости от того, как контейнер может быть достигнут.

Если вы пытаетесь подключиться из другого контейнера, используя kafka должно быть правильным (при условии, что вы используете это имя в качестве псевдонима ссылки).

Если вы пытаетесь подключиться с хоста, это имя не будет работать. Вам нужно будет использовать IP-адрес контейнера, который вы можете получить, используя docker inspect, Однако IP-адрес контейнера изменится, поэтому может быть лучше установить его изнутри контейнера, используя $(hostname -i) чтобы получить это.

Вот улучшенная версия ответа @radek1st.

links это старый способ докера, networks это текущий метод.

В любом случае, внесение каких-либо изменений в систему нехорошо и никогда не требуется. это также отчасти побеждает цель использования Docker.

version: '2.1'

networks:
  sb:
    driver: bridge

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    hostname: zookeeper
    networks:
     - sb
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    hostname: ${KAFKA_HOSTNAME:-kakfa}
    depends_on:
      - zookeeper
    networks:
     - sb
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_HOSTNAME:-kakfa}
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://${KAFKA_HOSTNAME:-kakfa}:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Затем я использую скрипт bash для начала. Это позволяет мне переопределить имя хоста Kafka для локальной разработки. ./startup.sh localhost

#!/bin/bash
echo KAFKA_HOSTNAME=${1:-kakfa} > .env
docker-compose up -d

Прочитайте больше -

Просто попробуйте следующее и используйте обнаружение службы, например, это.

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"
kafka:
  build: .
  ports:
    - "9092:9092"
  links:
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103
    KAFKA_ADVERTISED_PORT: 9092
    KAFKA_CREATE_TOPICS: "test:1:1"
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

Или вы используете это:

zookeeper:
  image: wurstmeister/zookeeper
  ports: 
    - "2181"
kafka:
  build: .
  ports:
    - "9092"
  links: 
    - zookeeper:zk
  environment:
    KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103
    DOCKER_HOST: 192.168.59.103:2375
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

Не прямой ответ, но если кто-то пытается понять сеть докеров kafka wurstmeister\kafka-dockerАвтор написал замечательную вики-статью о сетевых подключениях .

В нем объясняются три основных требования для настройки сети Kafka с помощью docker-compose.

  1. Каждый брокер должен иметь возможность общаться с Zookeeper - для избрания лидера и т. Д.
  2. Каждый брокер должен иметь возможность общаться с любым другим брокером - для репликации и т. Д.
  3. Каждый Потребитель / Производитель должен иметь возможность общаться с каждым Брокером - для чтения / записи данных и т. Д.

Мало кто попался:

Поскольку на одном интерфейсе можно выполнить привязку к каждому уникальному порту только один раз, мы больше не можем публиковать порт брокера (9092). Вместо этого мы просто открываем порт.

порты:

  • «9092»

Начиная с Kafka 0.9.0 - появилась возможность указать несколько портов для прослушивания. Это сделано для облегчения поддержки нескольких протоколов (например, PLAINTEXT,SASL,SSL и т. Д.) И разделения внутреннего и внешнего трафика. С этим изменением host.name и port устарели в пользу слушателей. Advertised.host.name и Advertised.port устарели в пользу рекламируемых слушателей.

Лично у меня была проблема, потому что KAFKA_ADVERTISED_PORT: "9092" отсутствовал в среде Кафки.

 kafka:
    image : wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: "9092"
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper

В моем случае я забыл обновить docker-compose.yml environment конфигурация для

Предыдущий

          environment:
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://localhost:9092

Обновленные заменяет localhost с участием kafka:

          environment:
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://kafka:9092

полный:

      networks:
  kafka-net:
    driver: bridge

volumes:
  kafka:
  zookeeper_data:
  zookeeper_txns:


services:
  kafka:
    image: "bitnami/kafka:2.7.0"
    networks:
      - kafka-net
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://kafka:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:29092,EXTERNAL://kafka:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_CFG_NUM_PARTITIONS=10
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      - ALLOW_PLAINTEXT_LISTENER=yes
      #       10MB max message size (vs 1MB default)
      - KAFKA_CFG_MESSAGE_MAX_BYTES=10485760
      - KAFKA_CFG_REPLICA_FETCH_MAX_BYTES=10485760
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1
    depends_on:
      - zookeeper
  zookeeper:
    image: "bitnami/zookeeper:3.6.2"
    networks:
      - kafka-net
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

Это относится к тому, что кафка избавляется от следующего типа ошибки:

[2022-05-27 15:53:41,940] ОШИБКА Ошибка при создании эфемерного в /brokers/ids/1, узел уже существует, а владелец «72062330391560192» не соответствует текущему сеансу «72058027900010496» (kafka.zk.KafkaZkClient$CheckedEphemeral)

      [2022-05-27 15:53:41,944] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists
    at org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1904)
    at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1842)
    at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1809)
    at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:96)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:324)
    at kafka.Kafka$.main(Kafka.scala:109)
    at kafka.Kafka.main(Kafka.scala)

Вот что сработало для меня:

определить, какие тома удалить

      docker volume ls

удалить их

      docker volume rm beepboop_zookeeper_data
docker volume rm beepboop_zookeeper_txns

перезапустить кафку

      docker-compose restart kafka

Я решил эту проблему, используя код ниже:

zookeeper:
  image: wurstmeister/zookeeper
  ports:
    - "2181:2181"
kafka:
  image: wurstmeister/kafka
  ports:
    - "9092:9092"
  depends_on:
    - zookeeper
  environment:
    HOSTNAME_COMMAND: "ifconfig eth0 | grep 'inet addr' | awk '{ print $$2}' | awk -F: '{print $$2}''"
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  volumes:
    - /var/run/docker.sock:/var/run/docker.sock

Я просто обновил свой хост-файл и добавил

127.0.0.1 localhost kafkaserver

это работает нормально для меня, я использовал тот же образ докера на Windows 10

Другие вопросы по тегам