Акка http и ошибка ReactiveMongo
У меня есть HTTP API, который транслирует документы из mongodb и вычисляет некоторые агрегаты. На самом деле следующий код не запускается, если mongodb недоступен, или останавливается, если у mongodb возникают проблемы после запуска HTTP-сервера.
val collection : Future[BSONCollection]= driver.connection(mongoUri).get.database(databaseName).map(_.collection(collectionName))
// Akka Stream source used to stream KPIs from randomCDR objects read in mongodb
val kpisSource : Source[Kpis,NotUsed] = mongodbCdrSource(collection,materializer)
.viaMat(aggregateByWayAndType)(Keep.right)
logger.info("Starting stream API")
val route =
// the path used to get KPIs
path("kpi") {
get {
// provide the HTTP response by completing and materializing the KPIs source
complete(kpisSource)
}
}
val serverBinding: Future[ServerBinding] = Http().bindAndHandle(route,httpHost)
serverBinding.failed.foreach { ex =>
logger.error("{}, Failed to bind to {}:{}!",ex,httpHost,httpPort)
}
Кажется, ошибки ReactiveMongo не являются критическими, и драйвер пытается постоянно подключаться к MongoDB.
Какой будет лучший способ
запустить сервер, даже если MongoDB недоступен?
продолжать обслуживать HTTP-запрос, даже если MongoDB завершается ошибкой после запуска HTTP-сервера? Вероятно,
ExceptionHandler
из DSL маршрутизации?
редактировать
Изменение кода, как показано ниже, позволяет HTTP-серверу запускаться и отправлять пользовательское сообщение об ошибке, когда MongoDB недоступен. Но я все еще сталкиваюсь с проблемой, когда MongoDB становится недоступным после некоторого успешного HTTP-запроса. Никаких исключений не возникает, и я просто получаю ERR_CONNECTION_RESET
, Когда MongoDB возвращается, HTTP-сервер может запросить сервер снова...
// assign connection to a val to reuse it each time and minimize creation cost
val connection = driver.connection(mongoUri)
// don't assign collection to a val to be able to recover a connection failure
def collection : Future[BSONCollection] = connection.get.database(databaseName).map(_.collection(collectionName))
// Akka Stream source used to stream KPIs from randomCDR objects read in mongodb
def kpisSource : Source[Kpis,NotUsed] = mongodbCdrSource(collection,materializer)
.viaMat(aggregateByWayAndType)(Keep.right)
val mongodbExceptionHandler = ExceptionHandler {
case _ : PrimaryUnavailableException =>
complete(HttpResponse(InternalServerError, entity = "Failed to connect to mongodb primary !"))
case _ =>
complete(HttpResponse(InternalServerError, entity = "Failed to connect to mongodb !"))
}
logger.info("Starting stream API")
val route =
// the path used to get KPIs
handleExceptions(mongodbExceptionHandler) {
path("kpi") {
get {
// provide the HTTP response by completing and materializing the KPIs source
complete(kpisSource)
}
}
}
Вот мой источник. Я подозреваю, что это не вызовет никаких исключений, а просто завершит Future[State]
с провалом???
def mongodbCdrSource(collection : Future[BSONCollection],mat: ActorMaterializer): Source[RandomCdr,Future[State]]= {
implicit val materializer = mat
implicit val executionContext = materializer.executionContext
val zoneId = ZoneId.systemDefault
val day = LocalDateTime.now().truncatedTo(ChronoUnit.DAYS).atZone(zoneId).toEpochSecond()*1000L
val selector = BSONDocument("dateTime" -> BSONDocument("$gt" -> day))
Await.result(collection,5.second).find(selector).cursor[RandomCdr](ReadPreference.nearest).documentSource()
}