Почему приложение Akka Streams не завершается нормально?

Я написал это простое приложение, используя библиотеку Alpakka Cassandra

package com.abhi

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._

object MyApp extends App {
   implicit val actorSystem = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   implicit val session = Cluster
      .builder
      .addContactPoints(List("localhost") :_*)
      .withPort(9042)
      .withCredentials("foo", "bar")
      .build
      .connect("foobar")
   val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
   val source = CassandraSource(stmt)
   val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
   val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
      s =>
      import GraphDSL.Implicits._
      source.take(10) ~> toFoo ~> s
      ClosedShape
   })
   // let us run the graph
   val future = graph.run()
   import actorSystem.dispatcher
   future.onComplete{_ =>
      session.close()
      Await.result(actorSystem.terminate(), Duration.Inf)
   }
   Await.result(future, Duration.Inf)
   System.exit(0)
}

case class Foo(col1: Long, col2: Long)

Это приложение работает точно так, как ожидается, оно печатает 10 строк на экране.

Но пост, что он висит. Когда System.exit(0) вызов выполнен, он выдает исключение

Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"

Но все равно приложение не перестает работать. это просто висит.

Я не понимаю, почему это приложение не завершается нормально (на самом деле ему даже не нужен вызов system.exit(0).

Единственный способ выйти из этого приложения - через элемент управления C.

1 ответ

Решение

Это может произойти, потому что sbt запускает ваш код в своем собственном экземпляре JVM, ваш System.exit затем выйдет из JVM sbt и даст вышеуказанный результат.

Вы пробовали установить: fork in run := true где-то в вашей сборке sbt?

Я также не уверен, что это хорошая идея, чтобы использовать actorSystem.dispatcher выполнить ваш onComplete обратный вызов (потому что вы используете его для ожидания завершения самой системы акторов).

То, что вы можете попробовать вместо этого:

import actorSystem.dispatcher
future.onComplete{ _ =>
  session.close()
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

Обратите внимание, что JVM выйдет без необходимости звонить System.exit когда единственными оставшимися потоками являются потоки демона (см., например, Что такое поток демона в Java?).

Другие вопросы по тегам