HTTP-поток Akka не останавливается при возникновении исключения

Я пытаюсь использовать Akka HTTP для размещения на веб-сервере. Если POST терпит неудачу, я хотел бы, чтобы он прекратил и не отправлял больше POST, поскольку они не являются идемпотентными.

Приведенный ниже код создает сообщения POST и отправляет их на тестовый веб-сервер. Выдает исключение при первом ответе. Код должен быть запущен, и в этом случае вы увидите, что он печатает:

i = 0
got response
i = 1
stopping
Exception in thread "main" java.lang.Exception
i = 2
i = 3
i = 4
i = 5

Таким образом, "остановка" происходит после того, как следующий запрос был составлен (i = 1), то код просто продолжается.

Кто-нибудь знает, как остановить поток при возникновении ошибки и не отправлять дальнейшие сообщения POST?

(Scala 2.11.8, Akka 2.4.4)

object FlowTest {
  def main(args: Array[String]) {
    val stop: Supervision.Decider = {
      case _ =>
        println("stopping")
        Supervision.Stop
    }

    implicit val system = ActorSystem()
    import system.dispatcher
    implicit val mat = ActorMaterializer()
    val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
      Http().outgoingConnection(host = "posttestserver.com", port = 80)

    val future: Future[Done] = Source(0 to 10).map {
      i =>
        val uri = s"/post.php?dir=so_akka&i=$i"
        println(s"i = $i")
        HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i")
    }.via(connectionFlow).mapAsync(1) {
      resp =>
        Unmarshal(resp.entity).to[String]
          .map { str =>
            println(str)
            throw new Exception("") // Always fail
            str
          }
    }.withAttributes(ActorAttributes.supervisionStrategy(stop)).runForeach(println)

    Await.result(future, Duration.Inf)
  }
}

1 ответ

Поэтому я думаю, что у меня были две проблемы с приведенным выше кодом.

  1. HTTP POST не должны передаваться по конвейеру. Я надеялся, что Akka HTTP подождет, пока один POST будет обработан без ошибок, прежде чем отправлять следующий. Этого не происходит

  2. Исключения не распространялись вверх по течению. Таким образом, добавление кода обработки не помешало Source создать больше POST и их отправку.

Итак, есть два исправления.

  1. Я установил withSyncProcessingLimit на ActorMaterializer к одному. Что мешает источнику отправлять новые сообщения до их обработки. Я также должен был изменить .mapAsync часть, так что теперь есть .map который проверяет код состояния и ошибки, если это необходимо, и .mapAsync который смотрит на тело ответа. Вы не можете смотреть на тело ответа в .map часть.

  2. Я добавил KillSwitch остановить поток. Бросок исключения должен иметь тот же эффект, но это не так. Так что это ужасный взлом, но работает.

Я думаю, что должен быть лучший способ сделать это. Использование HTTP-потока Akka с HTTP POST не должно быть таким болезненным.

Вот новый код.

object FlowTest {
  def main(args: Array[String]) {
    implicit val system = ActorSystem()
    import system.dispatcher
    implicit val mat = ActorMaterializer.create(
      ActorMaterializerSettings.create(system).withSyncProcessingLimit(1), system
    )
    val connectionFlow = Http().outgoingConnection(host = "posttestserver.com", port = 80)
    val source = Source(0 to 10)
    val killSwitch = KillSwitches.shared("HttpPostKillSwitch")

    try {
      val future: Future[Done] = source.via(killSwitch.flow).map {
        i =>
          val uri = s"/post.php?dir=test&i=$i"
          println(s"i? = $i")
          HttpRequest(method = HttpMethods.POST, uri = uri, entity = s"data $i")
      }
        .via(connectionFlow)
        .map {
          resp =>
            println("got response")
//          if(resp.status != OK) { // always fail for testing
              val e = new Exception("")
              killSwitch.abort(e)
              throw e
//          }
            resp
        }
        .mapAsync(1) {
          resp =>
            Unmarshal(resp.entity).to[String]
              .map { str =>
                println("got " + str)
                str
              }
        }
        .runForeach(println)

      Await.result(future, Duration.Inf)
    } catch {
      case NonFatal(e) =>
        system.terminate()
    }
  }
}
Другие вопросы по тегам