Как очистить ресурсы веб-сокета akka-http после отключения и повторить попытку?
Приведенный ниже код успешно устанавливает соединение с веб-сокетом.
Сервер websockets (также akk-http) намеренно закрывает соединение, используя предложенный Эндрю ответ здесь.
SinkActor
ниже получает сообщение типа akka.actor.Status.Failure
поэтому я знаю, что поток сообщений с сервера на клиент был прерван.
Мой вопрос... Как мой клиент должен восстановить соединение через веб-сокет? имеет source.via(webSocketFlow).to(sink).run()
завершена?
Как лучше всего очистить ресурсы и повторить попытку подключения через веб-сокет?
class ConnectionAdminActor extends Actor with ActorLogging {
implicit val system: ActorSystem = context.system
implicit val flowMaterializer = ActorMaterializer()
private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")
private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)
private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
ref => {
self ! StartupWithActor(ref.path)
ref
}
}
private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))
source
.via(webSocketFlow)
.to(sink)
.run()
1 ответ
Попробуйте recoverWithRetries
комбинатор (документы здесь).
Это позволяет вам предоставить альтернативу Source
ваш конвейер переключится на, в случае сбоя вышестоящего потока. В самом простом случае вы можете просто повторно использовать тот же Source
, который должен выдать новое соединение.
val wsSource = source via webSocketFlow
wsSource
.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
.to(sink)
Обратите внимание, что
attempts = -1
будет пытаться восстановить соединение до бесконечности- частичная функция позволяет более детально контролировать, какое исключение может вызвать переподключение