Акка Кластер не может зарегистрироваться для координатора

Я пытаюсь создать кластер шардинга Акка. Я хочу использовать режим "только прокси" на одном из узлов, чтобы просто перенаправить сообщение в области сегментов. Я получаю следующее предупреждение:

[WARN] [02/11/2019 17:04:17.819] [ClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://ClusterSystem@127.0.0.1:2555/system/sharding/ShardnameProxy] Trying to register to coordinator at [Some(ActorSelection[Anchor(akka.tcp://ClusterSystem@127.0.0.1:2551/), Path(/system/sharding/ShardnameCoordinator/singleton/coordinator)])], but no acknowledgement. Total [1] buffered messages.

** Main.java: ** Запуск кластера с использованием конфигурации из application.conf(код добавлен позже)

object Main {
  val shardName = "Shardname"
  val role = "Master"
  var shardingProbeLocalRegin: Option[ActorRef] = None
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load()
    val system = ActorSystem("ClusterSystem",conf.getConfig("main"))
    ClusterSharding(system).start(shardName,Test.props,ClusterShardingSettings(system),ShardDetails.extractEntityId,ShardDetails.extractShardId)
  }
}

Test.java: объект для кластера Sharding

object Test {
  def props: Props = Props(classOf[Test])

  class Test extends Actor {
    val log = Logger.getLogger(getClass.getName)


    override def receive = {
      case msg: String =>
        log.info("Message from " + sender().path.toString + " Message is " + msg)
        sender() ! "Done"

    }
  }

}

MessageProducer.java(Режим только для прокси). Источник сообщений отправляет сообщение в Shard каждую секунду.

object MessageProducer {

  var shardingProbeLocalRegin: Option[ActorRef] = None
  object DoSharding
  def prop:Props = Props(classOf[MessageProducer])
  var numeric : Long = 0
  def main(args: Array[String]): Unit = {
    val conf = ConfigFactory.load
    val system = ActorSystem("ClusterSystem",conf.getConfig("messgaeProducer"))
    ClusterSharding(system).startProxy(Main.shardName,None,extractEntityId,extractShardId)
    shardingProbeLocalRegin  = Some(ClusterSharding(system).shardRegion(Main.shardName))
    val actor = system.actorOf(Props[MessageProducer],"message")
  }
}

class RemoteAddressExtensionImpl(system: ExtendedActorSystem) extends Extension {
  def address = system.provider.getDefaultAddress
}

object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImpl]

class MessageProducer extends Actor{
  val log = Logger.getLogger(getClass.getName)


  override def preStart(): Unit = {
    println("Starting "+self.path.address)
    context.system.scheduler.schedule(10 seconds,1 second ,self,DoSharding)
  }


  override def receive = {
    case DoSharding =>
      log.info("sending message" + MessageProducer.numeric)
      MessageProducer.shardingProbeLocalRegin.foreach(_ ! "" + (MessageProducer.numeric))
      MessageProducer.numeric += 1

  }

}

** application.conf: ** Файл конфигурации

    main {
      akka {
        actor {
          provider = "akka.cluster.ClusterActorRefProvider"
        }

        remote {
          log-remote-lifecycle-events = on

          netty.tcp {
            hostname = "127.0.0.1"
            port = 2551
          }

        }

        cluster {
          seed-nodes = [
            "akka.tcp://ClusterSystem@127.0.0.1:2551"
          ]

          sharding.state-store-mode = ddata
          auto-down-unreachable-after = 1s
        }

        akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.ddata.DistributedData"]

      }
    }
messgaeProducer {
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
      log-remote-lifecycle-events = on

      netty.tcp {
        hostname = "192.168.2.96"
        port = 2554
      }

    }

    cluster {
      seed-nodes = [
        "akka.tcp://ClusterSystem@127.0.0.1:2551"
        //, "akka.tcp://ClusterSystem@127.0.0.1:2552"
      ]

      sharding.state-store-mode = ddata
      auto-down-unreachable-after = 1s
    }

    akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.ddata.DistributedData"]


  }
}

Я делаю что-то не так? Есть ли другой способ подать заявку на этот подход. Моя главная цель - избежать единой точки отказа для моего кластера. Если какой-либо узел выходит из строя, он не должен влиять на любое другое состояние. Кто-нибудь может мне с этим помочь?

0 ответов

Решено? Если нет, проверьте конфигурацию akka.cluster. Вы должны установить такую ​​конфигурацию. Это работает для меня

для прокси

akka.cluster {
  roles = ["Proxy"]
  sharding {
    role = "Master"
  } 
}

для мастера

akka.cluster {
  roles = ["Master"]
  sharding {
    role = "Master"
  } 
}
Другие вопросы по тегам