Преобразование PySpark RDD с помощью Scala

TL;DR - у меня есть то, что выглядит как DStream of Strings в приложении PySpark. Я хочу отправить это как DStream[String] в библиотеку Scala. Однако строки не конвертируются Py4j.

Я работаю над приложением PySpark, которое извлекает данные из Kafka с помощью Spark Streaming. Мои сообщения являются строками, и я хотел бы вызвать метод в коде Scala, передав его DStream[String] пример. Однако я не могу получить правильные строки JVM в коде Scala. Мне кажется, что строки Python не преобразуются в строки Java, а вместо этого сериализуются.

Мой вопрос будет: как получить строки Java из DStream объект?


Вот самый простой код Python, который я придумал:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

Я запускаю этот код в PySpark, передавая ему путь к моему JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

На стороне Scala у меня есть:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

Теперь, допустим, я отправляю некоторые данные в Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

println оператор в коде Scala выводит что-то похожее на:

[B@758aa4d9

Я ожидал получить foo bar вместо.

Теперь, если я заменю простой println утверждение в коде Scala со следующим:

rdd.foreach(v => println(v.getClass.getCanonicalName))

Я получил:

java.lang.ClassCastException: [B cannot be cast to java.lang.String

Это говорит о том, что строки фактически передаются как массивы байтов.

Если я просто попытаюсь преобразовать этот массив байтов в строку (я знаю, что даже не определяю кодировку):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

Я получаю что-то похожее (специальные символы могут быть удалены):

�]qXfoo barqa.

Это говорит о том, что строка Python была сериализована (замаринована?). Как я мог получить правильную строку Java вместо этого?

1 ответ

Решение

Короче говоря, нет поддерживаемого способа сделать что-то подобное. Не пробуй это в производстве. Вы были предупреждены.

В общем, Spark не использует Py4j для чего-либо еще, кроме некоторых простых вызовов RPC для драйвера, и не запускает шлюз Py4j на любой другой машине. Когда это требуется (в основном MLlib и некоторые части SQL), Spark использует Pyrolite для сериализации объектов, передаваемых между JVM и Python.

Эта часть API является либо частной (Scala), либо внутренней (Python) и как таковая не предназначена для общего использования. Хотя теоретически вы получаете к нему доступ в любом случае для каждой партии:

package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}

полный поток:

object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}

или выставляя отдельные партии как DataFrames (вероятно наименее злой вариант):

object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}

и используйте эти обертки следующим образом:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

это не поддерживается, не проверено и, как таковое, довольно бесполезно ни для чего, кроме экспериментов с Spark API.

Другие вопросы по тегам