Apache Bahir, отправь материал в ActorReceiver
Я пытаюсь настроить простой процесс с помощью Spark Streaming, используя Apache Bahir для подключения к Akka. Я пытался последовать их примеру вместе с этим старшим. У меня есть простой актер экспедитора
class ForwarderActor extends ActorReceiver {
def receive = {
case data: MyData => store(data)
}
}
и я создаю поток с
val stream = AkkaUtils.createStream[RSVP](ssc, Props[ForwarderActor], actorName)
конфигурация выглядит так:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 7777
}
}
}
и моя проблема: как я могу отправлять сообщения актеру Форвардер? Может быть, я не понимаю, как в этом случае используется Akka Remote. Когда приложение запускается, я вижу журнал
[akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test@localhost:7777]
а потом вижу
[akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://streaming-actor-system-0@192.168.192.7:52369]
Который, кажется, напоминает описание в ScalaDoc:
/**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/
В общем, я не уверен, как я должен отправлять сообщения актеру Экспедитора. Спасибо за любую помощь!
1 ответ
Актеры Akka могут отправлять сообщения другим актерам Akka, работающим на удаленной JVM. Итак... когда субъект-отправитель должен знать адрес предполагаемого субъекта-получателя.
AkkaUtil (Бахир) позволяет вам создать искровой поток из сообщений, которые ReceiverActor
получает. Но откуда собираются получать сообщения? Ну... какой-то удаленный актер. И для отправки сообщений этому удаленному субъекту понадобится адрес вашего ReceiverActor
который работает в вашем spark-приложении.
В общем, вы не можете быть слишком уверены в IP-адресе, на котором будет работать ваше искровое приложение. Итак, мы сделаем так, чтобы актер, работающий с spark, сообщал актеру продюсера свою ссылку и просил его отправить свои вещи.
Просто убедитесь, что оба приложения написаны с использованием одной и той же версии Scala и работают с одной и той же JRE.
Теперь... давайте сначала напишем актера, который будет источником данных,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class MyActor extends Actor with ActorLogging {
val theListOfMyStrings = List("one", "two", "three")
override def receive: Receive = {
case SendMeYourStringsRequest(requesterRef) => {
theListOfMyStrings.foreach(s => {
requesterRef ! RequestedString(s)
})
}
}
}
object MyApplication extends App {
val config = ConfigFactory.parseString(
"""
|akka{
| actor {
| provider = remote
| }
| remote {
| enabled-transports = ["akka.remote.netty.tcp"]
| untrusted-mode = off
| netty.tcp {
| hostname="my-ip-address"
| port=18000
| }
| }
|}
""".stripMargin
)
val actorSystem = ActorSystem("my-actor-system", config)
var myActor = actorSystem.actorOf(Props(classOf[MyActor]), "my-actor")
}
Теперь... давайте напишем наше простое искровое приложение,
import akka.actor.{Actor, ActorRef, ActorLogging, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SendMeYourStringsRequest(requesterRef: ActorRef)
case class RequestedString(s: String)
class YourStringRequesterActor extends ActorReceiver {
def receive = {
case RequestedString(s) => store(s)
}
override def preStart(): Unit = {
val myActorPath = ActorPath.fromString("akka.tcp://my-actor-system@my-ip-address:18000/user/my-actor")
val myActorSelection = context.actorSelection(myActorPath)
myActorSelection ! SendMeYourStringsRequest(self)
}
}
object YourSparkApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ActorWordCount")
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local[2]")
}
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stringStream = AkkaUtils.createStream[String](
ssc,
Props(classOf[YourStringRequesterActor]),
"your-string-requester-actor"
)
stringStream.foreach(println)
}
}
Примечание:: Просто позаботьтесь о my-ip-address
, Если есть какие-либо другие проблемы, пожалуйста, дайте мне знать в комментариях.