Упаковка пользовательского плагина Java `partitioner.class` для Kafka Connect в Confluent 4.1 + Kafka 1.1?
Я успешно использовал простой пользовательский класс Partitioner, написанный на Java, для приемника Kafka Connect на Confluent 3.2.x (Kafka 0.10.x). Я хочу выполнить обновление до Confluent 4.1 (Kafka 1.1) и у меня возникают ошибки.
Механизм загрузки плагинов в Kafka Connect изменился в CP 3.3.0. Раньше была только опция CLASSPATH, но с CP 3.3.0+ есть более новая и рекомендуемая plugin.path
механизм.
Если я пытаюсь продолжать использовать устаревший механизм плагинов CLASSPATH, когда я пытаюсь использовать мой плагин, я получаю:
java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner
Это внутренний класс CP. С более старым CP 3.2.x, который был доступен на classpath, однако с новыми усилиями по изоляции classpath в CP >= 3.3.0, я предполагаю, что это должно быть предоставлено вместе с плагином.
Я полагаю, что было бы разумно перейти на более новый рекомендуется plugin.path
механизм. Я удаляю запись CLASSPATH. По умолчанию /etc/kafka/connect-distributed.properties
, Я вижу plugin.path=/usr/share/java
, поэтому я устанавливаю свой плагин.jar для /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar
, Я попытался добавить и не добавлять файлы.jar зависимости там же.
Мой плагин загружается при запуске службы Kafka Connect:
INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)
Когда я делаю:
curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config
Я получил:
{"error_code":500,"message":null}%
Я вижу в журнале kafka connect systemd:
java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
Непонятно, что происходит неправильно или почему мой класс разделителя загружается неправильно.
К вашему сведению, я перестроил свой плагин Java с зависимостями CP 4.1 + Kafka 1.1 и сделал небольшие обновления, чтобы соответствовать изменениям API, таким как добавление реализации для getSchemaGeneratorClass
в мой класс разделителя.
1 ответ
Пользовательские классы Kafka Connect Partitioner не будут работать через старый механизм CLASSPATH и не будут работать как плагины с более новым механизмом изолированных плагинов Kafka 0.11.0+.
Единственное рабочее решение - скопировать ваш пользовательский файл.jar с вашим классом Kafka Connect Partitioner в kafka-connect-storage-common
каталог плагинов в /usr/share/java/kafka-connect-storage-common/
, Пользовательские классы плагинов Kafka Connect Partitioner должны существовать в том же каталоге, чтобы они находились в одном изолированном загрузчике классов.
К вашему сведению, вы можете видеть, что механизм изолированного плагина Kafka 0.11.0+ будет загружать только подклассы четырех конкретных классов Java, которые не охватывают разделители Kafka Connect здесь:
Спасибо cricket_007 за то, что порекомендовали это точное решение: поместите пользовательские файлы.jar разделителя Kafka Connect в /share/java/kafka-storage-common
каталог. Я на собственном опыте узнал, почему это нужно делать и почему альтернативы не работают.