Как читать таблицу scylladb в фрейме pyspark
Я пытаюсь прочитать таблицу scylladb, установленную на одном компьютере в фрейме данных pyspark на другом компьютере.
Эти 2 устройства имеют подключение по протоколу SSH, и я могу прочитать таблицу с помощью кода Python, проблема возникает только при подключении с помощью spark. Я использовал этот разъем:
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11 ,
Моя искра-версия = 2.3.1, версия-версия-2.11.8.
**First Approach**
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.cassandra.connection.host","192.168.0.118")
sc = SparkContext(conf = conf)
spark=SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
data=spark.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
Результирующая ошибка:
Файл "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", строка 172, в загрузочном файле "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 1257, в файле вызова "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", строка 63, в файле deco"/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", строка 328, в get_return_value py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o43.load.: java.lang.ClassNotFoundException: org.apache.spark.Logging был удален в Spark 2.0. Проверьте, совместима ли ваша библиотека с Spark 2.0, по адресу org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:646) по адресу org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala.:190) в org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) в sun.reflect.NativeMethodAccessorImpl.invoke0(собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethoho) sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke(Method.java:498) в py4j.reflection.MethodInvoker.invoke(Method4jref).ReflectionEngine.invoke(ReflectionEngine.java:357) в py4j.Gateway.invoke(Gateway.java:282) в py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) в py4j.commands.CallCommand.execute(Java:79) на py4j.GatewayConnection.run(GatewayConnection.java:238) на java.lang.Thread.run(Thread.java:748) Причина: java.lang.NoClassDefFoundError: org/apache/spark/ Регистрация в java.lang.ClassLoader.defineClass1(собственный метод) в java.lang.ClassLoader.defineClass(ClassLoader.java:763) в java.security.SecureClassLaderClassLader.java:142) на java.net.URLClassLoader.defineClass(URLClassLoader.java:467) на java.net.URLClassLoader.access$100(URLClassLoader.java:73) на java.net.URLClassLoader$1.run(URLClassLoader.java:368) в java.net.URLClassLoader$1.run(URLClassLoader.java:362) в java.security.AccessController.doPrivileged(собственный метод) в java.net.URLClassLoader.findClass(URLClassLoader.java:361) в java.lang.ClassLoader.loadClass(ClassLoader.java:424) в sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) в java.lang.ClassLoader.loadClass(ClassLoader.java:411) в java.lang.ClassLoader.loadClass(ClassLoader.java:357) в org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618) в org.apache.spark.sql.execution.dataso urces.DataSource $$ anonfun $ 23 $$ anonfun $ apply $ 15.apply (DataSource.scala: 618) в scala.util.Try$.apply(Try.scala:192) в org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) в org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618) в scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618) ... еще 13 Причина: java.lang.ClassNotFoundException: org.apache.spark.Logging в java.net.URLClassLoader.findClass(URLClassLoader.java:381) в java.lang.ClassLoader.loadClass (ClassLoader.java:424) в sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) в java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... еще 33
Другой подход, который я использовал:
data=sc.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
За это я получаю:
AttributeError: у объекта 'SparkContext' нет атрибута 'read'
Третий подход:
data=sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
За это я получаю ту же ошибку, что и первый подход.
Посоветуйте, пожалуйста, является ли это проблемой с искровым соединителем scylla или какой-либо проблемой с библиотекой искры, и как ее решить
2 ответа
Следуй этим шагам:
1. Запустите spark-shell с помощью строки пакетов. Чтобы настроить пары значений ключа передачи Spark Configuration по умолчанию с параметром --conf, в моем случае хостом scylla является 172.17.0.2.
bin/spark-shell --conf spark.cassandra.connection.host=172.17.0.2 --packages datastax:spark-cassandra-connector:2.3.0-s_2.11
2.Включить функции, специфичные для Cassandra, в SparkContext, SparkSession, RDD и DataFrame:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
3. Загрузка данных из Scylla
val rdd = sc.cassandraTable("my_keyspace", "my_table")
4.Test
scala> rdd.collect().foreach(println)
CassandraRow{id: 1, name: ash}
Возникшая ошибка возникает из-за конфликта версий. Может быть, вы можете решить это, читая здесь.
Первый подход будет работать, потому что метод чтения доступен на SparkSession.