Flume за подключение отказался от кафки брокера

Я пытаюсь отправить сообщения из Apache Flume в мой экземпляр Apache Kafka. Делая все это локально через localhost, у меня нет проблем. Когда я пытаюсь сделать это с разных виртуальных машин на моем компьютере, я получаю отказ в соединении в журнале отладки Flume. Для примера, я просто отправляю сообщения через telnet в мой экземпляр Flume.

172.16.26.1 - это IP моего mac, который видят виртуальные машины

172.16.26.138 - это IP-адрес виртуальной машины, на которой работает Kafka/Zookeeper.

172.16.26.139 - IP-адрес виртуальной машины, на которой работает Flume

Конфигурация Flume

    #Name the components of this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    #Describe/configure the source
    a1.sources.r1.type = netcat 
    a1.sources.r1.port = 44444
    a1.sources.r1.bind = 0.0.0.0 
    a1.sources.r1.host = 172.16.26.1 

    #Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = macdemo
    a1.sinks.k1.kafka.bootstrap.servers = 172.16.26.138:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy

    #Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    #Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Конфигурация Kafka (ключевые части, все остальное по умолчанию)

    broker.id=0
    listeners=PLAINTEXT://localhost:9092
    zookeeper.connect=localhost:2181

Вот конфиг, распечатанный из Flume при запуске агента

    compression.type = snappy
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [172.16.26.138:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = 1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 1

Сценарии, которые работают

Flume/Kafka/Zookeeper все работает локально с Mac -> работает

Flume в виртуальной машине (на Mac), Kafka/Zookeeper запускается локально с Mac -> работает

Flume в виртуальной машине (на Mac), Kafka/Zookeeper в виртуальной машине (на Mac) -> соединение отказано

Вот ошибка вывода я получаю

    2018-02-26 14:23:47,009 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)]            Connection with /172.16.26.138 disconnected
    java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
    at java.lang.Thread.run(Thread.java:748)
    2018-02-26 14:23:47,010 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:454)] Node -1 disconnected.
    2018-02-26 14:23:47,011 (kafka-producer-network-thread | producer-1) [DEBUG - org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:608)] Give up sending metadata request since no node is available

Я уверен, что есть кое-что супер простое, которое я пропускаю, но провел несколько дней без удачи от доктора Гугла. Что касается виртуальной машины, они работают на Centos 7, и у меня отключен firewalld.service вместе с SELinux.

1 ответ

Я обнаружил проблему с параметром listeners=PLAINTEXT://localhost:9092. Изменение localhost на фактический IP-адрес виртуальной машины (IP-адрес Кафки) сработало.

Другие вопросы по тегам