Сбой соединения kafka с s3 с ошибкой Неожиданное исключение в потоке [KafkaBasedLog Work Thread -
Я установил на EC2 сливной (4.0) разъем, который читает с кафки и пишет на S3.
Автономная попытка идет хорошо:
bin / connect-standalone etc / standalone / example-connect-worker.properties etc / standalone / example-connect-s3-sink.properties
Однако распределенная версия продолжает сбой с
[2018-01-30 21:26:05,860] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1097)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)
at org.apache.kafka.connect.util.KafkaBasedLog.access$500(KafkaBasedLog.java:69)
at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)
Я просто хотел сначала использовать класс соединителя, равный FileStreamSinkConnector
Приемник conf файлов выглядит так:
name=local-file-sink
#connector.class=FileStreamSink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=tests3
s3.bucket=tests3
s3.prefix=tests3
s3.endpoint=http://localhost:9090
s3.path_style=true
local.buffer.dir=/tmp/connect-system-test
Большое спасибо!
1 ответ
Когда вы запускаете распределенный рабочий Connect с ./bin/connect-distributed
Вы можете указывать свойства работника только через командную строку.
Чтобы загрузить соединитель, разместив его конфигурацию на конечной точке REST рабочего, вы можете использовать curl
или эквивалентная команда.
Например:
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
где config.json
это файл, содержащий свойства вашего коннектора
Более подробная информация здесь: https://docs.confluent.io/current/connect/managing.html