Не удалось запустить Python примеры потоковой передачи Spark Kafka в Spark 2.3.0

Я планирую перейти с версии 2.2.0 на Spark 2.3.0. Я пытаюсь запустить существующие примеры, чтобы убедиться, что все будет работать. Я запускаю пример kafka_wordcount.py

Код как следовать

r"""
 Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 Usage: kafka_wordcount.py <zk> <topic>

 To run this on your local machine, you need to setup Kafka and create a producer first, see
 http://kafka.apache.org/documentation.html#quickstart

 and then run the example
    `$ bin/spark-submit --jars \
      external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
      examples/src/main/python/streaming/kafka_wordcount.py \
      localhost:2181 test`
"""
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)

    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

Я использую команду

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10-assembly_2.11:2.3.0 kafka_wordcount.py

Я получаю исключение. Это происходит, когда он создает streamingContext

Traceback (most recent call last):
  File "/usr/hdp/2.6.5.0-292/spark2/examples/src/main/python/streaming/kafka_wordcount.py", line 45, in <module>
    ssc = StreamingContext(sc, 1)
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/streaming/context.py", line 61, in __init__
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/streaming/context.py", line 65, in _initialize_context
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1428, in __call__
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.AbstractMethodError
    at org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:35)
    at org.apache.spark.streaming.scheduler.StreamingListenerBus.<init>(StreamingListenerBus.scala:30)
    at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:57)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:184)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:76)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

Я пробовал с разными версиями стриминга кафки

  • искровым потоковой-Кафка-0-10_2.11-2.3.0.jar
  • искровым потоковой-Кафку-0-8-assembly_2.11-2.3.0.jar
  • искровым потоковой-Кафка-0-8_2.11-2.3.0.2.6.5.0-292.jar
  • искровым потоковой-Кафка-0-8_2.11-2.3.0.jar
  • искровой SQL-Кафка-0-10_2.11-2.3.0.2.6.5.0-292.jar
  • искровой SQL-Кафка-0-10_2.11-2.3.0.jar
  • искровым потоковой-Кафка-0-10_2.11-2.3.0.2.6.5.0-292.jar

Детали версий:

  • Искра: 2.3.0.2.6.5.0-292
  • питон: 2.7.5
  • scala: 2.11.8 (Java HotSpot (TM) 64-разрядная серверная виртуальная машина, Java 1.8.0_112)

Я думаю, что это проблема, связанная с управлением версиями. Выше того же кода с StreamingContext (sc, 1) работает на spark 2.2.0 с spark-streaming-kafka-0-8_2.11-2.2.0.jar

Кто-нибудь знает о проблеме?

Обновление: запуск искры в автономном режиме.

0 ответов

Другие вопросы по тегам