Маршрутизаторы с поддержкой Akka Cluster - делитесь экземпляром redis со всеми маршрутизаторами

В контексте кластерного приложения Akka я столкнулся с проблемой, касающейся одного свойства, ожидаемого Akka: каждый (cas) класс и каждое используемое сообщение должны быть сериализуемыми. У меня есть следующий контекст: я хочу использовать данные из кластера redis, и для этого я решил принять кластерный маршрутизатор с поддержкой кластера, чтобы добавить узлы, чтобы иметь больше рабочих. Рабочие читают данные из redis и сохраняют некоторые метаданные в mongodb. В первой версии я сделал это:

object MasterWorkers {

  def props
  (  awsBucket : String,
     gapMinValueMicroSec : Long,
     persistentCache: RedisCache,
     mongoURI : String,
     mongoDBName : String,
     mongoCollectioName : String
  ) : Props =
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp)
}

case class MasterWorkers
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

  val workerRouter =
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)),
    name = "workerRouter")

Рабочий класс:

object Worker {

  def props
  (
    awsBucket : String,
    gapMinValueMicroSec : Long,
    replyTo : ActorRef,
    persistentCache: RedisCache,
    mongoURI : String,
    mongoDBName : String,
    mongoCollectioName : String
  ) : Props =
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
  case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp)

  case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
}

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

Но возникает следующее исключение, когда я запускаю два узла:

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894)
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786)
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl

Кэш redis - это простой класс case с сопутствующим объектом, реализующим такой интерфейс:

object RedisCache { // some static functions }

case class RedisCache
(
  master : RedisServer,
  slaves : Seq[RedisServer]
)(implicit actorSystem : ActorSystem)
  extends PersistentCache[DeviceKey, BCPPackets] with LazyLogging {
// some code here
}

Затем, чтобы решить проблему, я переместил redisCache в рабочем, и я не отдам его на главный узел:

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

// redis cache here now 
val redisCache = ...

Но с таким дизайном каждый пользователь создаст новый экземпляр кэша Redis, и это не ожидаемое поведение. Мне нужно иметь один экземпляр моего Redis-кэша, а затем делиться им со всеми моими маршрутами, но в контексте кластерного приложения это кажется невозможным, поэтому я не знаю, является ли это ошибкой проекта или каким-то отсутствующим опытом с аккой. Если кто-то сталкивался с подобными проблемами, я с удовольствием принимаю советы!

1 ответ

Проблема в том, что ваш RedisCache не все так просто Он несет вокруг ActorSystem, который не может быть сериализован.

Я думаю, это потому, что он содержит RedisClient экземпляры из, например, библиотеки Rediscala, и для этого требуется ActorSystem,

Вам нужно будет абстрагироваться от системы акторов и передать своим работникам только самые подробные сведения о кластере Redis (т.е. RedisServer объекты).

Затем рабочие будут создавать RedisClient сами - используя их context.system,

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  redisMaster: RedisServer,
  redisSlaves: Seq[RedisServer],
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

  val masterSlaveClient = ??? //create from the RedisServer details

}

Это позволит каждому работнику установить свое собственное соединение с кластером redis.

В качестве альтернативы, если вы хотите подключиться к своему мастеру только один раз и поделиться им с вашими работниками, вам нужно обойти RedisClientActor ( источник здесь), который встраивает вашу связь. Это ActorRef и могут быть разделены удаленно.

Это можно получить, позвонив client.redisConnection,

Рабочие могут затем построить ActorRequest например, вокруг

case class Worker
    (
      awsBucket : String,
      gapMinValueMicroSec : Long,
      replyTo : ActorRef,
      redisConnection: ActorRef,
      mongoURI : String,
      mongoDBName : String,
      mongoCollectioName : String
    ) extends Actor with ActorLogging with ActorRequest {

      // you will need to implement the execution context that ActorRequest needs as well..

      send(redisCommand)

    }
Другие вопросы по тегам