Закрыть akka-http соединение с сервером через сокет
В моем сценарии клиент отправляет "прощай" сообщение веб-сокета, и мне нужно закрыть ранее установленное соединение на стороне сервера.
Из документов akka-http:
Закрытие соединений возможно путем отмены входящего потока Flow от логики вашего сервера (например, путем подключения его нисходящего потока к Sink.cancelled и его восходящего потока к Source.empty). Также возможно отключить сокет сервера, отменив исходные соединения IncomingConnection.
Но мне не понятно, как это сделать, учитывая, что Sink
а также Source
устанавливаются один раз при согласовании нового соединения:
(get & path("ws")) {
optionalHeaderValueByType[UpgradeToWebsocket]() {
case Some(upgrade) ⇒
val connectionId = UUID()
complete(upgrade.handleMessagesWithSinkSource(sink, source))
case None ⇒
reject(ExpectedWebsocketRequestRejection)
}
}
3 ответа
ПОДСКАЗКА: Этот ответ основан на akka-stream-experimental
версия 2.0-M2
, API может немного отличаться в других версиях.
Простой способ закрыть соединение с помощью PushStage
:
import akka.stream.stage._
val closeClient = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]) = elem match {
case "goodbye" ⇒
// println("Connection closed")
ctx.finish()
case msg ⇒
ctx.push(msg)
}
}
Каждый элемент, который получен на стороне клиента или на стороне сервера (и в целом каждый элемент, который проходит через Flow
) проходит через такое Stage
составная часть. В Акке полная абстракция называется GraphStage
Более подробную информацию можно найти в официальной документации.
С PushStage
мы можем наблюдать конкретные входящие элементы по их значению и соответствующим образом трансформировать контекст. В приведенном выше примере, как только goodbye
сообщение получено, мы заканчиваем контекст, иначе мы просто пересылаем значение через push
метод.
Теперь мы можем подключить closeClient
компонент произвольного потока через transform
метод:
val connection = Tcp().outgoingConnection(address, port)
val flow = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.transform(() ⇒ closeClient)
.map(_ ⇒ StdIn.readLine("> "))
.map(_ + "\n")
.map(ByteString(_))
connection.join(flow).run()
Поток выше получает ByteString
и возвращает ByteString
, что означает, что он может быть подключен к connection
сквозь join
метод. Внутри потока мы сначала конвертируем байты в строку перед тем, как отправить их closeClient
, Если PushStage
не завершает поток, элемент перенаправляется в поток, где он отбрасывается и заменяется некоторым вводом из stdin, который затем отправляется обратно по проводам. Если поток завершен, все дальнейшие этапы обработки потока после компонента этапа будут отброшены - поток теперь закрыт.
Это может быть достигнуто следующим образом в текущей (2.4.14) версии akka-stream
package com.trackabus.misc
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
pred: T => Boolean,
forwardTerminatingMessage: Boolean = false,
terminate: Boolean = true)
extends GraphStage[FlowShape[T, T]]
{
val in = Inlet[T]("TerminateFlowStage.in")
val out = Outlet[T]("TerminateFlowStage.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with OutHandler {
override def onPull(): Unit = { pull(in) }
override def onPush(): Unit = {
val chunk = grab(in)
if (pred(chunk)) {
if (forwardTerminatingMessage)
push(out, chunk)
if (terminate)
failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
else
completeStage()
}
else
push(out, chunk)
}
})
}
}
Чтобы использовать его, определите свою сцену
val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])
а затем включить его как часть потока
.via(termOnKillMe)
Другой способ - управлять соединением, используя очередь из Source.queue. Очередь может использоваться для отправки сообщений клиенту, а также для закрытия соединения.
def socketFlow: Flow[Message, Message, NotUsed] = {
val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()
// receive client message
val sink = Sink.foreach[Message] {
case TextMessage.Strict("goodbye") =>
queue.complete() // this closes the connection
case TextMessage.Strict(text) =>
// send message to client by using offer
queue.offer(TextMessage(s"you sent $text"))
}
Flow.fromSinkAndSource(sink, source)
}
// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)
Преимущество использования очереди для WebSockets заключается в том, что вы можете использовать ее для отправки сообщений в любое время, пока у вас есть доступ к ней, вместо того, чтобы ждать ответа на входящее сообщение.