RxJava Observable.fromEmitter странное поведение противодавления

Я использовал Observable.fromEmitter() как фантастическая альтернатива Observable.create(), Недавно я столкнулся с каким-то странным поведением, и я не могу понять, почему это так. Я бы очень признателен кому-то с некоторыми знаниями о противодавлении и планировщики, которые смотрят на это.

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

Выход этого приложения

Received 1
Received 2
...
Received 128

Затем он остается на уровне 128 (предположительно, потому что это размер буфера по умолчанию в RxJava).

Если я изменю режим, указанный в fromEmitter() в BackpressureMode.NONEтогда код работает как задумано. Если я удалю звонок observeOn()также работает как задумано. Кто-нибудь может объяснить, почему это так?

2 ответа

Решение

Это тупиковая ситуация в том же пуле. subscribeOn планирует вниз по течению request в том же потоке, который он использует, но если этот поток занят циклом ожидания / выброса, запрос никогда не доставляется fromEmitter и, таким образом, через некоторое время LATEST начинает отбрасывать элементы до самого конца, куда доставляется самое последнее значение (999), если основной источник ждет достаточно долго. (Это похожая ситуация с onBackpressureBlock мы удалили.)

Если subscribeOn не делал это планирование запросов, пример будет работать правильно.

Я открыл вопрос, чтобы выработать решения.

Обходной путь на данный момент должен использовать больший размер буфера с observeOn (есть перегрузка) или использовать fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

Это не странно, это ожидается.

Давайте проследим звонки. Начните с:

Observable.subscribe(Subscriber<? super T> subscriber)

Observable.subscribe(Subscriber<? super T> subscriber, Observable<T> observable)

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);

и так далее. Посмотрите на конструктор:

OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

Если вы не укажете буфер, по умолчанию RxRingBuffer.SIZE, какой размер зависит от платформы.

Вот почему, когда вы вызываете observeOn оператор без размера буфера, по умолчанию 128 (16 на Android).

Решение этой проблемы очень простое: просто используйте другой observeOn оператор и объявить размер буфера. Но если вы объявите размер буфера, скажем, 1000 (столько же, сколько элементов, поступающих из эмиттера), программа все равно завершится, не выдав все значения (около 170). Зачем? Потому что программа заканчивается. Основной поток заканчивается через 10 000 секунд, и ваш расчет выполняется в другом потоке (Schedulers.computation()). Решение для этого? использование CountdownLatch, Имейте в виду, никогда не используйте это производство, это полезно только для испытаний.

Другие вопросы по тегам