RxJava и rx.exceptions.MissingBackpressureException исключение
Я пытаюсь заставить работать очень простое приложение на основе RxJava. Я определил следующий класс Observable, который читает и возвращает строки из файла:
public Observable<String> getObservable() throws IOException
{
return Observable.create(subscribe -> {
InputStream in = getClass().getResourceAsStream("/trial.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null)
{
subscribe.onNext(line);
}
} catch (IOException e) {
subscribe.onError(e);
}
finally {
subscribe.onCompleted();
}
});
}
Далее я определил код subcrober:
public static void main(String[] args) throws IOException, InterruptedException {
Thread thread = new Thread(() ->
{
RxObserver observer = new RxObserver();
try {
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> System.out.println(t),
() -> System.out.println("Completed"));
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
Файл имеет около 50000 записей. При запуске приложения я получаю "rx.exceptions.MissingBackpressureException". Я просмотрел некоторую документацию и, как было предложено, попытался добавить метод.onBackpressureBuffer() в цепочку вызовов. Но тогда я не получаю исключения, но завершенный вызов тоже не будет уволен.
Как правильно обращаться со сценарием, в котором у нас есть быстрое создание Observable?
2 ответа
Первая проблема заключается в том, что ваша логика readLine игнорирует противодавление. Вы можете подать заявку onBackpressureBuffer()
как раз перед observeOn
для начала, но есть недавнее дополнение SyncOnSubscribe
что вы будете генерировать значения одно за другим и заботиться о противодавлении:
SyncOnSubscribe.createSingleState(() => {
try {
InputStream in = getClass().getResourceAsStream("/trial.txt");
return new BufferedReader(new InputStreamReader(in));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
},
(s, o) -> {
try {
String line = s.readLine();
if (line == null) {
o.onCompleted();
} else {
o.onNext(line);
}
} catch (IOException ex) {
s.onError(ex);
}
},
s -> {
try {
s.close();
} catch (IOException ex) {
}
});
Вторая проблема заключается в том, что ваш поток завершит работу до того, как все элементы в потоке io будут доставлены, и, следовательно, основная программа завершит работу. Либо удалите observeOn
, добавлять .toBlocking
или использовать CountDownLatch
,
RxObserver observer = new RxObserver();
try {
CountDownLatch cdl = new CountDownLatch(1);
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> { System.out.println(t); cdl.countDown(); },
() -> { System.out.println("Completed"); cdl.countDown(); });
cdl.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
Проблема здесь заключается в операторе наблюдателя, так как каждый вызов ObNerver onNext() планируется вызывать в отдельном потоке, ваш Observable продолжает генерировать эти запланированные вызовы в цикле независимо от пропускной способности абонента (наблюдателя).
Если вы сохраняете это синхронно, Observable не будет испускать следующий элемент, пока подписчик не завершит работу с предыдущим, так как все это выполняется в одном потоке, и у вас больше не будет проблем с противодавлением.
Если вы все еще хотите использовать наблюдаем, вам придется реализовать логику противодавления в методе вызова OnSubscribe# вашего Observable