Настройка времени соединения Cassandra в Phantom DSL
Я использую фантом для подключения к Apache Cassandra и хочу настроить соединитель во время выполнения, то есть я хочу проанализировать некоторый файл конфигурации, извлечь список баз данных Cassandra и передать его каким-то образом Database
объект.
Я следовал этому руководству, чтобы иметь дополнительный слой DatabaseProvider
между Database
и мой сервис. Следовательно, я могу предоставить статический DatabaseProvider
как это:
object ProdConnector {
val connector = ContactPoints(Seq("dev-cassndr.containers"), 9042)
.keySpace("test")
}
object ProdDatabase extends MyDatabase(ProdConnector.connector)
trait ProdDatabaseProvider extends MyDatabaseProvider {
override def database: MyDatabase = ProdDatabase
}
и в моем main
Я делаю функцию
val service = new MessageService with ProdDatabaseProvider {}
Как я могу достичь того же результата во время выполнения без одноэлементных объектов?
Я сделал несколько попыток, но всегда получал NullPointerException
s. Мой текущий подход заключается в том, чтобы иметь объект конфигурации Cassandra, который Джексон читает из файла:
case class CassandraConfigurator(
contactPoints: Seq[String],
keySpace: String,
port: Int = 9042,
) {
@JsonCreator
def this() = this(null, null, 9042)
val connection: CassandraConnection = {
val p = ContactPoints(contactPoints, port)
p.keySpace(keySpace)
}
}
моя точка входа затем расширяет StreamApp от fs2
object Main extends StreamApp[IO] {
override def stream(args: List[String], reqShutdown: IO[Unit])
: Stream[IO, ExitCode] = {
val conf: CassandraConfigurator = ???
val service = new MyService with MyDatabaseProvider {
override def database: MyDatabase = new MyDatabase(conf.connection)
}
service.database.create()
val api = ApiWithService(service).getApi
BlazeBuilder[IO].bindHttp(80, "0.0.0.0").mountService(api, "/").serve
}
}
Это приводит к следующей ошибке:
12:44:03.436 [pool-10-thread-1] INFO com.datastax.driver.core.GuavaCompatibility - Detected Guava >= 19 in the classpath, using modern compatibility layer
12:44:03.436 [pool-10-thread-1] INFO com.datastax.driver.core.GuavaCompatibility - Detected Guava >= 19 in the classpath, using modern compatibility layer
12:44:03.463 [pool-10-thread-1] DEBUG com.datastax.driver.core.SystemProperties - com.datastax.driver.NEW_NODE_DELAY_SECONDS is undefined, using default value 1
12:44:03.463 [pool-10-thread-1] DEBUG com.datastax.driver.core.SystemProperties - com.datastax.driver.NEW_NODE_DELAY_SECONDS is undefined, using default value 1
12:44:03.477 [pool-10-thread-1] DEBUG com.datastax.driver.core.SystemProperties - com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS is undefined, using default value 60
12:44:03.477 [pool-10-thread-1] DEBUG com.datastax.driver.core.SystemProperties - com.datastax.driver.NOTIF_LOCK_TIMEOUT_SECONDS is undefined, using default value 60
java.lang.NullPointerException
at com.outworkers.phantom.connectors.ContactPoints$.$anonfun$apply$3(ContactPoint.scala:101)
at com.outworkers.phantom.connectors.DefaultSessionProvider.<init>(DefaultSessionProvider.scala:37)
at com.outworkers.phantom.connectors.CassandraConnection.provider$lzycompute(CassandraConnection.scala:46)
at com.outworkers.phantom.connectors.CassandraConnection.provider(CassandraConnection.scala:41)
at com.outworkers.phantom.connectors.CassandraConnection.session$lzycompute(CassandraConnection.scala:52)
at com.outworkers.phantom.connectors.CassandraConnection.session(CassandraConnection.scala:52)
at com.outworkers.phantom.database.Database.session$lzycompute(Database.scala:36)
at com.outworkers.phantom.database.Database.session(Database.scala:36)
at com.outworkers.phantom.ops.DbOps.$anonfun$createAsync$2(DbOps.scala:66)
at com.outworkers.phantom.builder.query.execution.ExecutionHelper$.$anonfun$sequencedTraverse$2(ExecutableStatements.scala:71)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:304)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception: sbt.TrapExitSecurityException thrown from the UncaughtExceptionHandler in thread "run-main-0"
[error] java.lang.RuntimeException: Nonzero exit code: 1
[error] at sbt.Run$.executeTrapExit(Run.scala:124)
[error] at sbt.Run.run(Run.scala:77)
[error] at sbt.Defaults$.$anonfun$bgRunTask$5(Defaults.scala:1168)
[error] at sbt.Defaults$.$anonfun$bgRunTask$5$adapted(Defaults.scala:1163)
[error] at sbt.internal.BackgroundThreadPool.$anonfun$run$1(DefaultBackgroundJobService.scala:366)
[error] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
[error] at scala.util.Try$.apply(Try.scala:209)
[error] at sbt.internal.BackgroundThreadPool$BackgroundRunnable.run(DefaultBackgroundJobService.scala:289)
[error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] at java.lang.Thread.run(Thread.java:748)
1 ответ
В вашем случае здесь нет ничего неявного, а среда выполнения выглядит так, как будто она должна быть простой. Ваша проблема, скорее всего, в том, что Джексон не смог прочитать файл. Чтобы проверить это, попробуйте ниже, если он успешно пытается подключиться к локальной Cassandra, то это ваша проблема.
object Main extends StreamApp[IO] {
override def stream(args: List[String], reqShutdown: IO[Unit])
: Stream[IO, ExitCode] = {
val service = new MyService with MyDatabaseProvider {
override def database: MyDatabase = new MyDatabase(Connector.default)
}
service.database.create()
val api = ApiWithService(service).getApi
BlazeBuilder[IO].bindHttp(80, "0.0.0.0").mountService(api, "/").serve
}
}
Уверены ли вы val conf: CassandraConfigurator = ???
правильно инициализируется? Если это то, что у вас есть в коде, я не удивлен, что вы получаете NPE.