Наблюдаемое противодавление в зависимости от дефицита ресурсов
В RxJava 1 / RxScala, как я могу регулировать / подавлять источник, наблюдаемый в следующей ситуации?
def fast: Observable[Foo] // Supports backpressure
def afterExpensiveOp: Observable[Bar] =
fast.flatMap(foo => Observable.from(expensiveOp(foo))
// Signature and behavior is out of my control
def expensiveOp(foo: Foo)(implicit ec: ExecutionContext): Future[Bar] = {
if(noResources()) Future.failed(new OutOfResourcesException())
else Future { Bar() }
}
Возможное решение - просто блокировка до. Что работает, но это очень не элегантно и предотвращает множественные одновременные запросы:
def afterExpensiveOp: Observable[Bar] = fast.flatMap(foo =>
Observable.just(Observable.from(expensiveOp(foo)).toBlocking.head)
)
1 ответ
flatMap имеет параметр для ограничения количества одновременных подписчиков. Если вы используете этот flatMap позаботится о противодавлении для вас.
def afterExpensiveOp = fast.flatMap(safeNumberOfConccurrentExpensiveOps, x => Observable.from(expensiveOp(x)))