Упаковка пользовательского плагина Java `partitioner.class` для Kafka Connect в Confluent 4.1 + Kafka 1.1?

Я успешно использовал простой пользовательский класс Partitioner, написанный на Java, для приемника Kafka Connect на Confluent 3.2.x (Kafka 0.10.x). Я хочу выполнить обновление до Confluent 4.1 (Kafka 1.1) и у меня возникают ошибки.

Механизм загрузки плагинов в Kafka Connect изменился в CP 3.3.0. Раньше была только опция CLASSPATH, но с CP 3.3.0+ есть более новая и рекомендуемая plugin.path механизм.

Если я пытаюсь продолжать использовать устаревший механизм плагинов CLASSPATH, когда я пытаюсь использовать мой плагин, я получаю:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

Это внутренний класс CP. С более старым CP 3.2.x, который был доступен на classpath, однако с новыми усилиями по изоляции classpath в CP >= 3.3.0, я предполагаю, что это должно быть предоставлено вместе с плагином.

Я полагаю, что было бы разумно перейти на более новый рекомендуется plugin.path механизм. Я удаляю запись CLASSPATH. По умолчанию /etc/kafka/connect-distributed.properties, Я вижу plugin.path=/usr/share/java, поэтому я устанавливаю свой плагин.jar для /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar, Я попытался добавить и не добавлять файлы.jar зависимости там же.

Мой плагин загружается при запуске службы Kafka Connect:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

Когда я делаю:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

Я получил:

{"error_code":500,"message":null}%             

Я вижу в журнале kafka connect systemd:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

Непонятно, что происходит неправильно или почему мой класс разделителя загружается неправильно.

К вашему сведению, я перестроил свой плагин Java с зависимостями CP 4.1 + Kafka 1.1 и сделал небольшие обновления, чтобы соответствовать изменениям API, таким как добавление реализации для getSchemaGeneratorClass в мой класс разделителя.

1 ответ

Решение

Пользовательские классы Kafka Connect Partitioner не будут работать через старый механизм CLASSPATH и не будут работать как плагины с более новым механизмом изолированных плагинов Kafka 0.11.0+.

Единственное рабочее решение - скопировать ваш пользовательский файл.jar с вашим классом Kafka Connect Partitioner в kafka-connect-storage-common каталог плагинов в /usr/share/java/kafka-connect-storage-common/, Пользовательские классы плагинов Kafka Connect Partitioner должны существовать в том же каталоге, чтобы они находились в одном изолированном загрузчике классов.

К вашему сведению, вы можете видеть, что механизм изолированного плагина Kafka 0.11.0+ будет загружать только подклассы четырех конкретных классов Java, которые не охватывают разделители Kafka Connect здесь:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L279

Спасибо cricket_007 за то, что порекомендовали это точное решение: поместите пользовательские файлы.jar разделителя Kafka Connect в /share/java/kafka-storage-common каталог. Я на собственном опыте узнал, почему это нужно делать и почему альтернативы не работают.

Другие вопросы по тегам