RxJava onBackpressureBuffer не излучает элементы
Я наблюдал странное поведение с onBackpressureBuffer, я не уверен, является ли это допустимым поведением или ошибкой.
У меня есть вызов TCP, который испускает элементы с определенной скоростью (с использованием потоковой передачи и inputStream, но это только для некоторой информации)
Вдобавок ко всему, я создал наблюдаемый, используя create, который будет генерировать элемент каждый раз, когда он будет готов.
Давайте назовем это сообщениями ().
Тогда я делаю это:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
При использовании аналитических инструментов я заметил, что исключение MissingBackPressureException генерируется редко, поэтому я добавил onBackpressureBuffer к вызову.
Если я добавлю это после observeOn
:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer()
.subscribe({//do some work})
Все работает нормально, но это означает, что он будет буферизироваться только после того, как попадет в основной поток пользовательского интерфейса, поэтому я предпочел, чтобы это было так:
messages()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
И это где вещи начинают становиться странными.
Я заметил время messages()
продолжает излучать предметы, в какой-то момент они перестанут доставляться подписчику.
Точнее, после того, как ровно 16 элементов, очевидно, что происходит то, что буфер начнет удерживать элементы, не передавая их вперед.
Как только я отменяю messages()
с каким-то механизмом тайм-аута, это приведет к messages()
излучать onError()
и буфер немедленно выдаст все элементы, которые он сохранил (они будут обработаны).
Я проверил, виноват ли абонент в том, что он слишком много работал, но это не так, он закончил, и все же он не получил предметы...
Я также пытался использовать request(n)
метод в подписчике, запрашивающий один пункт после onNext()
закончен, но буфер не ведет себя
Я подозреваю, что это вызвано системой обмена сообщениями в главном потоке пользовательского интерфейса Android с обратным давлением, но я не могу объяснить, почему.
Может кто-нибудь объяснить, почему это происходит? это ошибка или правильное поведение? Tnx!
2 ответа
Не зная как messages()
В зависимости от описанного поведения, это аналогичный тупик с тем же пулом, что и в этом вопросе.
Обходной путь, что вы не пытались, это поставить .onBackpressureBuffer
между subscribeOn
а также observeOn
,
messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer() // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
Вопрос другой, но ответ сводится к одному и тому же. Реализация observeOn
конструктор: OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize)
:
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
Последняя строка указывает на размер буфера.
Размер буфера на Android составляет 16.
Решение - просто передать больший размер буфера observeOn(Scheduler scheduler, int bufferSize)
оператор:
messages()
.observeOn(AndroidSchedulers.mainThread(), {buffer_size})
Будьте осторожны, чтобы не придавать слишком большое значение, так как Android имеет ограниченную память.