RxJava onBackpressureBuffer не излучает элементы

Я наблюдал странное поведение с onBackpressureBuffer, я не уверен, является ли это допустимым поведением или ошибкой.

У меня есть вызов TCP, который испускает элементы с определенной скоростью (с использованием потоковой передачи и inputStream, но это только для некоторой информации)

Вдобавок ко всему, я создал наблюдаемый, используя create, который будет генерировать элемент каждый раз, когда он будет готов.

Давайте назовем это сообщениями ().

Тогда я делаю это:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});

При использовании аналитических инструментов я заметил, что исключение MissingBackPressureException генерируется редко, поэтому я добавил onBackpressureBuffer к вызову.

Если я добавлю это после observeOn:

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})

Все работает нормально, но это означает, что он будет буферизироваться только после того, как попадет в основной поток пользовательского интерфейса, поэтому я предпочел, чтобы это было так:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});

И это где вещи начинают становиться странными.

Я заметил время messages() продолжает излучать предметы, в какой-то момент они перестанут доставляться подписчику.

Точнее, после того, как ровно 16 элементов, очевидно, что происходит то, что буфер начнет удерживать элементы, не передавая их вперед.

Как только я отменяю messages() с каким-то механизмом тайм-аута, это приведет к messages() излучать onError() и буфер немедленно выдаст все элементы, которые он сохранил (они будут обработаны).

Я проверил, виноват ли абонент в том, что он слишком много работал, но это не так, он закончил, и все же он не получил предметы...

Я также пытался использовать request(n) метод в подписчике, запрашивающий один пункт после onNext() закончен, но буфер не ведет себя

Я подозреваю, что это вызвано системой обмена сообщениями в главном потоке пользовательского интерфейса Android с обратным давлением, но я не могу объяснить, почему.

Может кто-нибудь объяснить, почему это происходит? это ошибка или правильное поведение? Tnx!

2 ответа

Решение

Не зная как messages()В зависимости от описанного поведения, это аналогичный тупик с тем же пулом, что и в этом вопросе.

Обходной путь, что вы не пытались, это поставить .onBackpressureBuffer между subscribeOn а также observeOn,

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});

Вопрос другой, но ответ сводится к одному и тому же. Реализация observeOn конструктор: OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;

}

Последняя строка указывает на размер буфера.

Размер буфера на Android составляет 16.

Решение - просто передать больший размер буфера observeOn(Scheduler scheduler, int bufferSize) оператор:

messages()
    .observeOn(AndroidSchedulers.mainThread(), {buffer_size})

Будьте осторожны, чтобы не придавать слишком большое значение, так как Android имеет ограниченную память.

Другие вопросы по тегам