Как обрабатывать события Debezium от Kafka с помощью Pyspark?
Я следую этой инструкции https://debezium.io/docs/tutorial-for-0-2/. Мой CDC работает нормально для события MySQL (Создать, Обновить, Удалить).
Я пытаюсь получить это событие kafka из pyspark, используя python, мой код все еще не может получить это событие.
Ниже код:
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'
if __name__ == '__main__':
conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")
sc = SparkContext(conf=conf)
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 20)
#Already try use :2181
kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
print('contexts =================== {} {}')
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
Из этого кода я получил следующую ошибку:
2018-11-14 16:22:39 ОШИБКА Исполнитель:91 - Исключение в задаче 0.0 на этапе 0.0 (TID 0) java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNeeded (Logging.scala:99) в org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNeeded (KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor). ReceiverSupervisor. org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scal:600) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.schedu.Task.run(Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) в java.lang.Thread.run(Thread.java:748) 2018-11-14 16:22:39 WARN TaskSetManager:66 - Утраченная задача 0.0 на этапе 0.0 (TID 0, localhost, драйвер исполнителя): java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNeeded (Logging.scala:99) в org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) в организации.apache.spark.internal.Logging $ class.logInfo (Logging.scala: 54) в org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) в org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) по адресу org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) по адресу org.apache.spark.streaming.receiver.ReceiverSupercest.: 131) в org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) в org.apache.spark.streaming.scheduler.ReceiverTracker $ ReceiverTrackerEndpoint $$ anonfun $ (ReceiverTracker.scala: 590) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.SparkContext $$ anonfun $ 34.apply (SparkContext.scala: 218 5) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.scheduler.Task.run(Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolEx624) jj. lang.Thread.run (Thread.java:748)
2018-11-14 16:22:39 ОШИБКА TaskSetManager:70 - Задача 0 на этапе 0.0 не выполнена 1 раз; прерывание задания 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - Удален TaskSet 0.0, задачи которого все выполнены, из пула 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - Этап отмены 0 2018-11-14 16:22:39 ИНФОРМАЦИЯ DAGScheduler:54 - ResultStage 0 (запуск с NativeMethodAccessorImpl.java:0) завершился неудачно через 0,438 с из-за того, что задание было прервано из-за сбоя этапа: задание 0 на этапе 0.0 не выполнено 1 раз, последний сбой: потерян Задача 0.0 на этапе 0.0 (TID 0, localhost, драйвер исполнителя): java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNeeded (Logging.scala:99) в org.apache.spark.streaming. kafka.KafkaReceiver.initializeLogIfNeeded (KafkaInputDStream.scala:68) в org.apache.spark.internal.Logging$class.log(Logging.scala:46) в org.apache.spark.streaming.kafka.KafkaReceiver.log (KafkaIn. scala: 68) в org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) в org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) в org.a pache.spark.streaming.kafka.KafkaReceiver.onStart (KafkaInputDStream.scala: 90) в org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) в org.apache.sparkcestreamreing. ReceiverSupervisor.start (ReceiverSupervisor.scala: 131) по адресу org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) по адресу org.apache.spark.streaming.sciverduReRe $. ReceiverTrackerEndpoint $$ anonfun $ 9.apply (ReceiverTracker.scala: 590) в org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) в org.apache.spark.SparkContext $$ anonfun $ 34. SparkContext.scala: 2185) в org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) в org.apache.spark.scheduler.Task.run(Task.scala:109) в org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) в java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Wo rker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748)
Любое предложение, как получить потоковые данные? С уважением.