Потребление значений одновременно выдается наблюдателем

Я изучаю реактивное программирование с помощью RxJava и хочу одновременно использовать извлеченные значения без блокирования в одном потоке выполнения.

        Observable
            .interval(50, TimeUnit.MILLISECONDS)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long counter) {
                    sleep(1000);
                    System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
                }
            });

    sleep(10000);

Я получу этот вывод

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1

как мне обрабатывать каждое событие в асинхронном режиме? как это

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5

1 ответ

Решение

В Rx наблюдаемая представляет параллелизм1, поэтому для одновременной обработки уведомлений по отношению друг к другу необходимо проецировать каждое уведомление в наблюдаемую.

flatMap является оператором асинхронной последовательной композиции. Он проецирует каждое уведомление из наблюдаемого источника в наблюдаемую, что позволяет обрабатывать каждое входное значение одновременно. Затем он объединяет результаты каждого вычисления в уплощенную наблюдаемую последовательность с неперекрывающимися уведомлениями.

Приложение:

в selector за flatMapчасто существует несколько способов создания одновременной наблюдаемой в зависимости от целевой платформы. Я не знаю Java, но в.NET вы обычно используете Observable.Start ввести параллелизм или асинхронный метод (async/await) воспользоваться нативной асинхронностью, которая часто предпочтительнее.

1 Технически, индивидуальная подписка (наблюдатель) на наблюдаемую холодную среду обеспечивает параллелизм в Rx, хотя вместо этого часто удобно думать в терминах наблюдаемых. Смотрите этот ответ для получения дополнительной информации.

Другие вопросы по тегам