Apache Bahir, отправь материал в ActorReceiver

Я пытаюсь настроить простой процесс с помощью Spark Streaming, используя Apache Bahir для подключения к Akka. Я пытался последовать их примеру вместе с этим старшим. У меня есть простой актер экспедитора

class ForwarderActor extends ActorReceiver {
  def receive = {
    case data: MyData => store(data)
  }
}

и я создаю поток с

val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)

конфигурация выглядит так:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "localhost"
      port = 7777
    }
  }
}

и моя проблема: как я могу отправлять сообщения актеру Форвардер? Может быть, я не понимаю, как в этом случае используется Akka Remote. Когда приложение запускается, я вижу журнал

[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]

а потом вижу

[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]

Который, кажется, напоминает описание в ScalaDoc:

 /**
   * A default ActorSystem creator. It will use a unique system name
   * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
   * communication.
   */

В общем, я не уверен, как я должен отправлять сообщения актеру Экспедитора. Спасибо за любую помощь!

1 ответ

Актеры Akka могут отправлять сообщения другим актерам Akka, работающим на удаленной JVM. Итак... когда субъект-отправитель должен знать адрес предполагаемого субъекта-получателя.

AkkaUtil (Бахир) позволяет вам создать искровой поток из сообщений, которые ReceiverActor получает. Но откуда собираются получать сообщения? Ну... какой-то удаленный актер. И для отправки сообщений этому удаленному субъекту понадобится адрес вашего ReceiverActor который работает в вашем spark-приложении.

В общем, вы не можете быть слишком уверены в IP-адресе, на котором будет работать ваше искровое приложение. Итак, мы сделаем так, чтобы актер, работающий с spark, сообщал актеру продюсера свою ссылку и просил его отправить свои вещи.

Просто убедитесь, что оба приложения написаны с использованием одной и той же версии Scala и работают с одной и той же JRE.

Теперь... давайте сначала напишем актера, который будет источником данных,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class MyActor extends Actor with ActorLogging {

  val theListOfMyStrings = List("one", "two", "three")

  override def receive: Receive = {
    case SendMeYourStringsRequest(requesterRef) => {
      theListOfMyStrings.foreach(s => {
        requesterRef ! RequestedString(s)
      })
    }
  }
}

object MyApplication extends App {

  val config = ConfigFactory.parseString(
    """
      |akka{
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    untrusted-mode = off
      |    netty.tcp {
      |      hostname="my-ip-address"
      |      port=18000
      |    }
      |  }
      |}
    """.stripMargin
  )

  val actorSystem = ActorSystem("my-actor-system", config)

  var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")

}

Теперь... давайте напишем наше простое искровое приложение,

import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)

class YourStringRequesterActor extends ActorReceiver {
  def receive = {
    case RequestedString(s) => store(s)
  }

  override def preStart(): Unit = {
    val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
    val myActorSelection = context.actorSelection(myActorPath)

    myActorSelection ! SendMeYourStringsRequest(self)
  }
}

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ActorWordCount")

    if (!sparkConf.contains("spark.master")) {
      sparkConf.setMaster("local[2]")
    }

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val stringStream = AkkaUtils.createStream[String](
        ssc,
        Props(classOf[YourStringRequesterActor]),
        "your-string-requester-actor"
    )

    stringStream.foreach(println)

  }
}

Примечание:: Просто позаботьтесь о my-ip-address, Если есть какие-либо другие проблемы, пожалуйста, дайте мне знать в комментариях.

Другие вопросы по тегам