Apache Flink: коннектор Kafka в потоковом API Python, "Не удается загрузить класс пользователя"
Я испытываю новый потоковый API-интерфейс от Flink и пытаюсь запустить свой скрипт с ./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py
, Сценарий python довольно прост, я просто пытаюсь извлечь из существующей темы и отправить все на стандартный вывод (или файл *.out в каталоге журнала, где метод вывода по умолчанию генерирует данные).
import glob
import os
import sys
from java.util import Properties
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.streaming.api.collector.selector import OutputSelector
from org.apache.flink.api.common.serialization import SimpleStringSchema
directories=['/home/user/flink/flink-1.6.1/lib']
for directory in directories:
for jar in glob.glob(os.path.join(directory,'*.jar')):
sys.path.append(jar)
from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer09
props = Properties()
config = {"bootstrap_servers": "localhost:9092",
"group_id": "flink_test",
"topics": ["TopicCategory-TopicName"]}
props.setProperty("bootstrap.servers", config['bootstrap_servers'])
props.setProperty("group_id", config['group_id'])
props.setProperty("zookeeper.connect", "localhost:2181")
def main(factory):
consumer = FlinkKafkaConsumer09([config["topics"]], SimpleStringSchema(), props)
env = factory.get_execution_environment()
env.add_java_source(consumer) \
.output()
env.execute()
Я взял несколько файлов JAR из репозитория Maven, а именно flink-connector-kafka-0.9_2.11-1.6.1.jar
, flink-connector-kafka-base_2.11-1.6.1.jar
а также kafka-clients-0.9.0.1.jar
и скопировал их в Флинк lib
каталог. Если я не неправильно понял документацию, этого должно хватить, чтобы Flink загрузил разъем kafka. Действительно, если я удалю любую из этих банок, импорт не удастся, но этого, по-видимому, недостаточно для фактического вызова плана. Добавление цикла для динамического добавления этих sys.path
тоже не работал Вот что печатается в консоли:
Starting execution of program
Failed to run plan: null
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/flink_streaming_plan_9cfed4d9-0288-429c-99ac-df02c86922ec/read_from_kafka.py", line 32, in main
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at org.apache.flink.streaming.python.api.environment.PythonStreamExecutionEnvironment.execute(PythonStreamExecutionEnvironment.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: bbcc0cb2c4fe6e3012d228b06b270eba)
The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment.
Вот что я вижу в логах:
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-9f6930fa-f1cf-4851-a0bf-2e620391596f/job_ca486746e7feb42d2d162026b74e9935/blob_p-9321896d165fec27a617d44ad50e3ef09c3211d9-405ccc9b490fa1e1348f0a76b1a48887' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Есть ли способ исправить это и сделать разъем доступным для Python? Я подозреваю, что это проблема Classloader с Jython, но я не знаю, как исследовать дальше (также учитывая, что я не знаю Java). Большое спасибо.
1 ответ
Вы используете неправильный потребитель Kafka здесь. В вашем коде это FlinkKafkaConsumer09
, но используемая вами библиотека flink-connector-kafka-0.11_2.11-1.6.1.jar
для чего FlinkKafkaConsumer011
, Попробуй заменить FlinkKafkaConsumer09
с этим FlinkKafkaConsumer011
или используйте файл lib flink-connector-kafka-0.9_2.11-1.6.1.jar
вместо текущего.
Я гость, файл jar может иметь встроенный импорт или зависимости, поэтому трех файлов jar недостаточно. Что касается того, как узнать зависимые отношения java jar, это то, что делает java maven. Вы можете обратиться за помощью на официальном сайте "Настройка сборки проекта". В моем случае я следую официальной настройке java-проекта, использую "from org.apache.flink.streaming.connectors.kafka import FlinkKafkaConsumer" и добавляю зависимость "org.apache.flink
flink-clients_2.11
1.8.0" в pom.xml, то теперь я могу выводить записи kafka на стандартный вывод с помощью Python API.