Почему приложение Akka Streams не завершается нормально?
Я написал это простое приложение, используя библиотеку Alpakka Cassandra
package com.abhi
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape}
import akka.stream.alpakka.cassandra.scaladsl.CassandraSource
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink}
import com.datastax.driver.core.{Cluster, Row, SimpleStatement}
import scala.concurrent.Await
import scala.concurrent.duration._
object MyApp extends App {
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val session = Cluster
.builder
.addContactPoints(List("localhost") :_*)
.withPort(9042)
.withCredentials("foo", "bar")
.build
.connect("foobar")
val stmt = new SimpleStatement("SELECT col1, col2 FROM foo").setFetchSize(20)
val source = CassandraSource(stmt)
val toFoo = Flow[Row].map(row => Foo(row.getLong(0), row.Long(1)))
val sink = Sink.foreach[Foo](foo => println(foo.col1, foo.col2))
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
s =>
import GraphDSL.Implicits._
source.take(10) ~> toFoo ~> s
ClosedShape
})
// let us run the graph
val future = graph.run()
import actorSystem.dispatcher
future.onComplete{_ =>
session.close()
Await.result(actorSystem.terminate(), Duration.Inf)
}
Await.result(future, Duration.Inf)
System.exit(0)
}
case class Foo(col1: Long, col2: Long)
Это приложение работает точно так, как ожидается, оно печатает 10 строк на экране.
Но пост, что он висит. Когда System.exit(0)
вызов выполнен, он выдает исключение
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
Но все равно приложение не перестает работать. это просто висит.
Я не понимаю, почему это приложение не завершается нормально (на самом деле ему даже не нужен вызов system.exit(0).
Единственный способ выйти из этого приложения - через элемент управления C.
1 ответ
Это может произойти, потому что sbt запускает ваш код в своем собственном экземпляре JVM, ваш System.exit
затем выйдет из JVM sbt и даст вышеуказанный результат.
Вы пробовали установить: fork in run := true
где-то в вашей сборке sbt?
Я также не уверен, что это хорошая идея, чтобы использовать actorSystem.dispatcher
выполнить ваш onComplete
обратный вызов (потому что вы используете его для ожидания завершения самой системы акторов).
То, что вы можете попробовать вместо этого:
import actorSystem.dispatcher
future.onComplete{ _ =>
session.close()
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)
Обратите внимание, что JVM выйдет без необходимости звонить System.exit
когда единственными оставшимися потоками являются потоки демона (см., например, Что такое поток демона в Java?).