Как добавить SparkListener из pySpark в Python?

Я хочу создать расширение Jupyter/IPython для мониторинга рабочих мест Apache Spark.

Spark предоставляет REST API.

Однако вместо опроса сервера я хочу, чтобы обновления событий отправлялись через обратные вызовы.

Я пытаюсь зарегистрировать SparkListener с SparkContext.addSparkListener(), Эта функция недоступна в PySpark SparkContext объект в Python. Итак, как я могу зарегистрировать слушателя Python для контекста Scala/Java из Python. Можно ли сделать это через py4j? Я хочу, чтобы функции python вызывались, когда в слушателе запускаются события.

2 ответа

Решение

Это возможно, хотя это немного связано. Мы можем использовать механизм обратного вызова Py4j для передачи сообщения от SparkListener, Сначала давайте создадим пакет Scala со всеми необходимыми классами. Структура каталогов:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala определяет интерфейс Python, который мы собираемся реализовать позже

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener {
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any
}

Manager.scala будет использоваться для пересылки сообщений слушателю Python:

package net.zero323.spark.examples.listener

object Manager {
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = {
    this.synchronized {
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    }
  }

  def unregister(uuid: String) = {
    this.synchronized {
      listeners = listeners - uuid
    }
  }

  def notifyAll(message: String): Unit = {
    for { (_, listener) <- listeners } listener.notify(message)
  }

}

Наконец-то простой SparkListener:

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  }
}

Давайте посмотрим на наше расширение:

sbt package

и начать сеанс PySpark, добавив сгенерированный jar к пути к классу и регистрации слушателя:

 $SPARK_HOME/bin/pyspark \
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

Далее мы должны определить объект Python, который реализует Listener интерфейс:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

запустить сервер обратного вызова:

sc._gateway.start_callback_server()

создать и зарегистрировать слушателя:

listener = PythonListener()

зарегистрировать его:

listener.register()

и проверить:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}

При выходе вы должны выключить сервер обратного вызова:

sc._gateway.shutdown_callback_server()

Примечание:

Это следует использовать с осторожностью при работе с потоковой передачей Spark, которая внутренне использует сервер обратного вызова.

Редактировать:

Если это слишком хлопотно, вы можете просто определить org.apache.spark.scheduler.SparkListenerInterface:

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

расширить это:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

и использовать его напрямую:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

Хотя этот метод проще, он не избирателен (больше трафика между JVM и Python) требует обработки объектов Java внутри сеанса Python.

Я знаю, что это очень старый вопрос. Однако я столкнулся с той же самой проблемой, когда нам пришлось настроить специально разработанный прослушиватель в приложении PySpark. Возможно, за последние несколько лет подход изменился.

Все, что нам нужно было сделать, это указать зависимый файл jar, который содержал jar прослушивателя, а также установить имущество.

Пример

      --conf spark.extraListeners=fully.qualified.path.to.MyCustomListenerClass --conf my.param.name="hello world"

MyCustomListenerClass может иметь конструктор с одним аргументом, который принимает объект SparkConf. Если вы хотите передать какие-либо параметры вашему слушателю, просто установите их как ключ-значение конфигурации, и вы сможете получить к ним доступ из конструктора.

Пример

      public MyCustomListenerClass(SparkConf conf) {
        this.myParamName = conf.get("my.param.name", "default_param_value");
}

Надеюсь, это поможет кому-то найти более простую стратегию. Этот подход работает как на Scala, так и на PySpark, потому что в приложении spark ничего не меняется, фреймворк позаботится о регистрации вашего слушателя, просто передав параметр extraListeners.

Другие вопросы по тегам