Коннектор Kafka Snowflake: org.apache.kafka.common.network.InvalidReceiveException: недопустимый прием

Журнал распределенного соединителя рабочего узла:

      [2021-11-23 09:05:22,605] WARN The configuration 'config.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'rest.advertised.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'status.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'group.id' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'rest.host.name' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'rest.advertised.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'config.storage.replication.factor' was supplied but isn't a known config.    (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'rest.port' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'status.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'offset.storage.replication.factor' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,606] WARN The configuration 'offset.storage.topic' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,607] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,607] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:362)
[2021-11-23 09:05:22,607] INFO Kafka version: 2.6.0 (org.apache.kafka.common.utils.AppInfoParser:117)
[2021-11-23 09:05:22,607] INFO Kafka commitId: 62abe01bee039651 (org.apache.kafka.common.utils.AppInfoParser:118)
[2021-11-23 09:05:22,607] INFO Kafka startTimeMs: 1637658322607 (org.apache.kafka.common.utils.AppInfoParser:119)
[2021-11-23 09:05:22,991] INFO Kafka cluster ID: zojXCfzxQum_fc3mC6WN_A (org.apache.kafka.connect.util.ConnectUtils:65)
[2021-11-23 09:05:23,008] INFO Logging initialized @10836ms to org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:169)
[2021-11-23 09:05:23,076] INFO Added connector for http://**masternodename**:8083 (org.apache.kafka.connect.runtime.rest.RestServer:132)
[2021-11-23 09:05:23,076] INFO Initializing REST server (org.apache.kafka.connect.runtime.rest.RestServer:204)
[2021-11-23 09:05:23,083] INFO jetty-9.4.24.v20191120; built: 2019-11-20T21:37:49.771Z; git: 363d5f2df3a8a28de40604320230664b9c793c16; jvm 1.8.0_192-BellSoft-b12 (org.eclipse.jetty.server.Server:359)
[2021-11-23 09:05:23,120] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:84)
org.apache.kafka.connect.errors.ConnectException: Unable to initialize REST server
    at   org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:216)
    at   org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:99)
    at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:78)
    Caused by: java.io.IOException: Failed to bind to MasterServerName/MasterIP:8083
    at   org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:346)
    at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:307)
    at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
        at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:231)
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72)
        at org.eclipse.jetty.server.Server.doStart(Server.java:385)
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72)
        at org.apache.kafka.connect.runtime.rest.RestServer.initializeServer(RestServer.java:214)
    ... 2 more
      Caused by: java.net.BindException: Cannot assign requested address
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:433)
        at sun.nio.ch.Net.bind(Net.java:425)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:342)
    ... 9 more

Главный узел: Server.log:

      [2021-11-23 09:23:04,041] WARN [SocketServer brokerId=0] Unexpected error from    /**workernode-ip**; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -720899)
        at   org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:103)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
        at kafka.network.Processor.poll(SocketServer.scala:913)
        at kafka.network.Processor.run(SocketServer.scala:816)
        at java.lang.Thread.run(Thread.java:748)
    [2021-11-23 09:30:35,461] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

1 ответ

BindException означает, что у вас неправильно настроены некоторые сетевые свойства или что-то уже запущено на конфликтующих портах. Например - bootstrap.servers=...localhost:9092...а также rest.advertised.port=9092будет означать, что у вас уже есть брокер Kafka, работающий на порту 9092, и вы пытаетесь заставить Kafka Connect запустить HTTP-сервер на том же порту, что не сработает.

Что касается других проблем, которые я вижу

  1. server.properties>должно быть всегда 0.0.0.0для хоста/ip, а не имя хоста машины, если вы хотите, чтобы внешние клиенты были на этой машине.

  2. Если возможно, не запускайте Kafka Connect на брокерах, поэтому localhost:9092никогда не следует добавлять к bootstrap.serversиз connect-distriubuted.properties

  3. connect-distributed.properties> rest.advertised.portне должно быть 9092, так как это не брокер. По умолчанию 8083 нормально...

  4. Вы должны начать с одного брокера и одного работника Connect на разных хостах. Если у вас нет доступа к нескольким физическим машинам, проще всего использовать Docker-Compose, а не виртуальные машины.

Я подозреваю, что последние два являются вашей ошибкой, потому что Connect пытается использовать TCP-протокол Kafka сам по себе, поэтому «Неверный прием» относится к байтам в запросе/ответе. Затем, чтобы правильно настроить кластер Kafka и клиентов, затем listenersне должно быть просто именем хоста локальной машины процесса; Это то, что advertised.listenersна брокерах для

Другие вопросы по тегам