RxJava и rx.exceptions.MissingBackpressureException исключение

Я пытаюсь заставить работать очень простое приложение на основе RxJava. Я определил следующий класс Observable, который читает и возвращает строки из файла:

public Observable<String> getObservable() throws IOException
    {
        return Observable.create(subscribe -> {
            InputStream in = getClass().getResourceAsStream("/trial.txt");
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            String line = null;
            try {
                while((line = reader.readLine()) != null)
                {
                    subscribe.onNext(line);
                }
            } catch (IOException e) {
                subscribe.onError(e);
            }
            finally {
                subscribe.onCompleted();
            }
        });
    }

Далее я определил код subcrober:

public static void main(String[] args) throws IOException, InterruptedException {
        Thread thread = new Thread(() ->
        {
            RxObserver observer = new RxObserver();
            try {
                observer.getObservable()
                        .observeOn(Schedulers.io())
                        .subscribe( x ->System.out.println(x),
                                    t -> System.out.println(t),
                                    () -> System.out.println("Completed"));

            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        thread.start();
        thread.join();
    }

Файл имеет около 50000 записей. При запуске приложения я получаю "rx.exceptions.MissingBackpressureException". Я просмотрел некоторую документацию и, как было предложено, попытался добавить метод.onBackpressureBuffer() в цепочку вызовов. Но тогда я не получаю исключения, но завершенный вызов тоже не будет уволен.

Как правильно обращаться со сценарием, в котором у нас есть быстрое создание Observable?

2 ответа

Решение

Первая проблема заключается в том, что ваша логика readLine игнорирует противодавление. Вы можете подать заявку onBackpressureBuffer() как раз перед observeOn для начала, но есть недавнее дополнение SyncOnSubscribe что вы будете генерировать значения одно за другим и заботиться о противодавлении:

SyncOnSubscribe.createSingleState(() => {
    try {
        InputStream in = getClass().getResourceAsStream("/trial.txt");
        return new BufferedReader(new InputStreamReader(in));
    } catch (IOException ex) {
        throw new RuntimeException(ex);
    }
},
(s, o) -> {
    try {
        String line = s.readLine();
        if (line == null) {
            o.onCompleted();
        } else {
            o.onNext(line);
        }
    } catch (IOException ex) {
        s.onError(ex);
    }
},
s -> {
    try {
       s.close();
    } catch (IOException ex) {
    }
});

Вторая проблема заключается в том, что ваш поток завершит работу до того, как все элементы в потоке io будут доставлены, и, следовательно, основная программа завершит работу. Либо удалите observeOn, добавлять .toBlocking или использовать CountDownLatch,

RxObserver observer = new RxObserver();
try {

    CountDownLatch cdl = new CountDownLatch(1);

    observer.getObservable()
           .observeOn(Schedulers.io())
           .subscribe( x ->System.out.println(x),
                       t -> { System.out.println(t); cdl.countDown(); },
                       () -> { System.out.println("Completed"); cdl.countDown(); });

    cdl.await();
 } catch (IOException | InterruptedException e) {
     e.printStackTrace();
 }

Проблема здесь заключается в операторе наблюдателя, так как каждый вызов ObNerver onNext() планируется вызывать в отдельном потоке, ваш Observable продолжает генерировать эти запланированные вызовы в цикле независимо от пропускной способности абонента (наблюдателя).

Если вы сохраняете это синхронно, Observable не будет испускать следующий элемент, пока подписчик не завершит работу с предыдущим, так как все это выполняется в одном потоке, и у вас больше не будет проблем с противодавлением.

Если вы все еще хотите использовать наблюдаем, вам придется реализовать логику противодавления в методе вызова OnSubscribe# вашего Observable

Другие вопросы по тегам