Как оправиться от akka.stream.io.Framing$FramingException

On: akka-stream-экспериментальный_2.11 1.0.

Мы используем Framing.delimiter на Tcp-сервере. Когда приходит сообщение с длиной, превышающей MaximumFrameLength, генерируется исключение FramingException, и мы можем захватить его из OnError объекта ActorSubscriber.

Код сервера:

def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int, maxFrameLength: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = maxFrameLength, allowTruncation = true))
          .map(raw ⇒ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(Promise().future))
    }
    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

Абонентский код:

class TargetSubscriber(target: ActorRef, maxInFlight: Int) extends ActorSubscriber with ActorLogging {
  private var inFlight = 0

  override protected def requestStrategy = new MaxInFlightRequestStrategy(maxInFlight) {
    override def inFlightInternally = inFlight
  }

  override def receive = {
    case OnNext(msg: Message) ⇒
      target ! msg
      inFlight += 1
    case OnError(t) ⇒
      inFlight -= 1
      log.error(t, "Subscriber encountered error")
    case TargetAck(_) ⇒
      inFlight -= 1
  }
}

Проблема: сообщения с максимальной длиной кадра не передаются после этого исключения для этого входящего соединения. убить клиента и запустить его работает нормально.

ActorSubscriber не соблюдает надзор

Как правильно пропустить плохое сообщение и продолжить со следующим хорошим сообщением?

2 ответа

Вы пытались надзор за targetFlow тонуть вместо всего материализатора? Я не вижу этого нигде здесь, и я полагаю, что это должно быть установлено непосредственно в этом потоке.

Стил это скорее догадка, чем наука;)

У меня было такое же исключение при чтении из файла, и для меня это было решено путем ввода возврата после последней строки.

Другие вопросы по тегам