Закрыть akka-http соединение с сервером через сокет

В моем сценарии клиент отправляет "прощай" сообщение веб-сокета, и мне нужно закрыть ранее установленное соединение на стороне сервера.

Из документов akka-http:

Закрытие соединений возможно путем отмены входящего потока Flow от логики вашего сервера (например, путем подключения его нисходящего потока к Sink.cancelled и его восходящего потока к Source.empty). Также возможно отключить сокет сервера, отменив исходные соединения IncomingConnection.

Но мне не понятно, как это сделать, учитывая, что Sink а также Source устанавливаются один раз при согласовании нового соединения:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}

3 ответа

Решение

ПОДСКАЗКА: Этот ответ основан на akka-stream-experimental версия 2.0-M2, API может немного отличаться в других версиях.


Простой способ закрыть соединение с помощью PushStage:

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

Каждый элемент, который получен на стороне клиента или на стороне сервера (и в целом каждый элемент, который проходит через Flow) проходит через такое Stage составная часть. В Акке полная абстракция называется GraphStageБолее подробную информацию можно найти в официальной документации.

С PushStage мы можем наблюдать конкретные входящие элементы по их значению и соответствующим образом трансформировать контекст. В приведенном выше примере, как только goodbye сообщение получено, мы заканчиваем контекст, иначе мы просто пересылаем значение через push метод.

Теперь мы можем подключить closeClient компонент произвольного потока через transform метод:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

Поток выше получает ByteString и возвращает ByteString, что означает, что он может быть подключен к connection сквозь join метод. Внутри потока мы сначала конвертируем байты в строку перед тем, как отправить их closeClient, Если PushStage не завершает поток, элемент перенаправляется в поток, где он отбрасывается и заменяется некоторым вводом из stdin, который затем отправляется обратно по проводам. Если поток завершен, все дальнейшие этапы обработки потока после компонента этапа будут отброшены - поток теперь закрыт.

Это может быть достигнуто следующим образом в текущей (2.4.14) версии akka-stream

package com.trackabus.misc

import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}

// terminates the flow based on a predicate for a message of type T
// if forwardTerminatingMessage is set the message is passed along the flow
// before termination
// if terminate is true the stage is failed, if it is false the stage is completed
class TerminateFlowStage[T](
    pred: T => Boolean, 
    forwardTerminatingMessage: Boolean = false, 
    terminate: Boolean = true)
  extends GraphStage[FlowShape[T, T]]
{
  val in = Inlet[T]("TerminateFlowStage.in")
  val out = Outlet[T]("TerminateFlowStage.out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) {

      setHandlers(in, out, new InHandler with OutHandler {
        override def onPull(): Unit = { pull(in) }

        override def onPush(): Unit = {
          val chunk = grab(in)

          if (pred(chunk)) {
            if (forwardTerminatingMessage)
              push(out, chunk)
            if (terminate)
              failStage(new RuntimeException("Flow terminated by TerminateFlowStage"))
            else
              completeStage()
          }
          else
            push(out, chunk)
        }
      })
  }
}

Чтобы использовать его, определите свою сцену

val termOnKillMe = new TerminateFlowStage[Message](_.isInstanceOf[KillMe])

а затем включить его как часть потока

.via(termOnKillMe)

Другой способ - управлять соединением, используя очередь из Source.queue. Очередь может использоваться для отправки сообщений клиенту, а также для закрытия соединения.

def socketFlow: Flow[Message, Message, NotUsed] = {
  val (queue, source) = Source.queue[Message](5, OverflowStrategy.fail).preMaterialize()

  // receive client message 
  val sink = Sink.foreach[Message] {
    case TextMessage.Strict("goodbye") =>
      queue.complete() // this closes the connection
    case TextMessage.Strict(text) =>
      // send message to client by using offer
      queue.offer(TextMessage(s"you sent $text")) 
  }
  Flow.fromSinkAndSource(sink, source)
}

// you then produce the upgrade response like this
val response = upgrade.handleMessages(socketFlow)

Преимущество использования очереди для WebSockets заключается в том, что вы можете использовать ее для отправки сообщений в любое время, пока у вас есть доступ к ней, вместо того, чтобы ждать ответа на входящее сообщение.

Другие вопросы по тегам