Противодавление RxJava/RxScala с использованием запроса
У меня проблема с использованием противодавления RxJava. По сути, у меня есть один производитель, который производит больше элементов, чем может обработать потребитель, и хочет иметь некоторую буферную очередь для обработки только тех элементов, с которыми я могу иметь дело, и запрашивать, когда я завершаю некоторые из них, как в этом примере:
object Tester extends App {
Observable[Int] { subscriber =>
(1 to 100).foreach { e =>
subscriber.onNext(e)
Thread.sleep(100)
println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
}
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
new Subscriber[Int]() {
override def onStart(): Unit = {
request(2)
}
override def onNext(value: Int): Unit = {
Thread.sleep(1000)
println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
request(1)
}
override def onCompleted(): Unit = {
println("finished ")
}
})
Thread.sleep(100000)
Я ожидаю получить вывод, как
produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......
но вместо этого я получаю
produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....
1 ответ
Когда вы реализуете свой Observable
с помощью Observable.create
Вы должны управлять противодавлением (это не простая задача). Здесь ваша наблюдаемая просто игнорирует реактивные запросы извлечения (вы просто выполняете итерацию, не ожидая запроса для вызова итератора next()
метод).
Если возможно, попробуйте использовать Observable
заводские методы, такие как range
и т. д... и сочинение с использованием map
/ flatMap
чтобы получить желаемый источник Observable, так как те будут уважать противодавление.
В противном случае, посмотрите на экспериментальные классы утилит, представленные недавно для правильного управления противодавлением в OnSubscribe
реализация: AsyncOnSubscribe
а также SyncOnSubscribe
,
Вот довольно наивный пример:
Observable<Integer> backpressuredObservable =
Observable.create(SyncOnSubscribe.createStateful(
() -> 0, //starts the state at 0
(state, obs) -> {
int i = state++; //first i is 1 as desired
obs.next(i);
if (i == 100) { //maximum is 100, stop there
obs.onCompleted();
}
return i; //update the state
}));