Акка http и ошибка ReactiveMongo

У меня есть HTTP API, который транслирует документы из mongodb и вычисляет некоторые агрегаты. На самом деле следующий код не запускается, если mongodb недоступен, или останавливается, если у mongodb возникают проблемы после запуска HTTP-сервера.

val collection : Future[BSONCollection]= driver.connection(mongoUri).get.database(databaseName).map(_.collection(collectionName))

// Akka Stream source used to stream KPIs from randomCDR objects read in mongodb
val kpisSource : Source[Kpis,NotUsed] = mongodbCdrSource(collection,materializer)
  .viaMat(aggregateByWayAndType)(Keep.right)

logger.info("Starting stream API")

val route =
  // the path used to get KPIs
  path("kpi") {
    get {
      // provide the HTTP response by completing and materializing the KPIs source
      complete(kpisSource)
    }
  }

val serverBinding: Future[ServerBinding] = Http().bindAndHandle(route,httpHost)

serverBinding.failed.foreach { ex =>
  logger.error("{}, Failed to bind to {}:{}!",ex,httpHost,httpPort)
}

Кажется, ошибки ReactiveMongo не являются критическими, и драйвер пытается постоянно подключаться к MongoDB.

Какой будет лучший способ

  1. запустить сервер, даже если MongoDB недоступен?

  2. продолжать обслуживать HTTP-запрос, даже если MongoDB завершается ошибкой после запуска HTTP-сервера? Вероятно, ExceptionHandler из DSL маршрутизации?

редактировать

Изменение кода, как показано ниже, позволяет HTTP-серверу запускаться и отправлять пользовательское сообщение об ошибке, когда MongoDB недоступен. Но я все еще сталкиваюсь с проблемой, когда MongoDB становится недоступным после некоторого успешного HTTP-запроса. Никаких исключений не возникает, и я просто получаю ERR_CONNECTION_RESET, Когда MongoDB возвращается, HTTP-сервер может запросить сервер снова...

// assign connection to a val to reuse it each time and minimize creation cost
val connection = driver.connection(mongoUri)
// don't assign collection to a val to be able to recover a connection failure
def collection : Future[BSONCollection] = connection.get.database(databaseName).map(_.collection(collectionName))

// Akka Stream source used to stream KPIs from randomCDR objects read in mongodb
def kpisSource : Source[Kpis,NotUsed] = mongodbCdrSource(collection,materializer)
  .viaMat(aggregateByWayAndType)(Keep.right)

val mongodbExceptionHandler = ExceptionHandler {
  case _ : PrimaryUnavailableException =>
    complete(HttpResponse(InternalServerError, entity = "Failed to connect to mongodb primary !"))
  case _ =>
    complete(HttpResponse(InternalServerError, entity = "Failed to connect to mongodb !"))
}

logger.info("Starting stream API")

val route =
  // the path used to get KPIs
  handleExceptions(mongodbExceptionHandler) {
    path("kpi") {
      get {
        // provide the HTTP response by completing and materializing the KPIs source
        complete(kpisSource)
      }
    }
  }

Вот мой источник. Я подозреваю, что это не вызовет никаких исключений, а просто завершит Future[State] с провалом???

def mongodbCdrSource(collection : Future[BSONCollection],mat: ActorMaterializer): Source[RandomCdr,Future[State]]= {
    implicit val materializer = mat
    implicit val executionContext = materializer.executionContext

    val zoneId = ZoneId.systemDefault
    val day = LocalDateTime.now().truncatedTo(ChronoUnit.DAYS).atZone(zoneId).toEpochSecond()*1000L
    val selector = BSONDocument("dateTime" -> BSONDocument("$gt" -> day))
    Await.result(collection,5.second).find(selector).cursor[RandomCdr](ReadPreference.nearest).documentSource()
}

0 ответов

Другие вопросы по тегам