Противодавление RxJava/RxScala с использованием запроса

У меня проблема с использованием противодавления RxJava. По сути, у меня есть один производитель, который производит больше элементов, чем может обработать потребитель, и хочет иметь некоторую буферную очередь для обработки только тех элементов, с которыми я могу иметь дело, и запрашивать, когда я завершаю некоторые из них, как в этом примере:

object Tester extends App {

Observable[Int] { subscriber =>
  (1 to 100).foreach { e =>
    subscriber.onNext(e)
    Thread.sleep(100)
    println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
  }
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
  new Subscriber[Int]() {
    override def onStart(): Unit = {
      request(2)
    }

    override def onNext(value: Int): Unit = {
      Thread.sleep(1000)
      println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
      request(1)
    }

    override def onCompleted(): Unit = {
      println("finished ")
    }
})

Thread.sleep(100000)

Я ожидаю получить вывод, как

produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......

но вместо этого я получаю

produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....

1 ответ

Когда вы реализуете свой Observable с помощью Observable.create Вы должны управлять противодавлением (это не простая задача). Здесь ваша наблюдаемая просто игнорирует реактивные запросы извлечения (вы просто выполняете итерацию, не ожидая запроса для вызова итератора next() метод).

Если возможно, попробуйте использовать Observable заводские методы, такие как range и т. д... и сочинение с использованием map / flatMap чтобы получить желаемый источник Observable, так как те будут уважать противодавление.

В противном случае, посмотрите на экспериментальные классы утилит, представленные недавно для правильного управления противодавлением в OnSubscribe реализация: AsyncOnSubscribe а также SyncOnSubscribe,

Вот довольно наивный пример:

Observable<Integer> backpressuredObservable = 
  Observable.create(SyncOnSubscribe.createStateful(
    () -> 0, //starts the state at 0
    (state, obs) -> {
        int i = state++; //first i is 1 as desired
        obs.next(i);
        if (i == 100) { //maximum is 100, stop there
            obs.onCompleted();
        }
        return i; //update the state
}));
Другие вопросы по тегам