Потребление значений одновременно выдается наблюдателем
Я изучаю реактивное программирование с помощью RxJava и хочу одновременно использовать извлеченные значения без блокирования в одном потоке выполнения.
Observable
.interval(50, TimeUnit.MILLISECONDS)
.take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long counter) {
sleep(1000);
System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
}
});
sleep(10000);
Я получу этот вывод
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1
как мне обрабатывать каждое событие в асинхронном режиме? как это
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
1 ответ
В Rx наблюдаемая представляет параллелизм1, поэтому для одновременной обработки уведомлений по отношению друг к другу необходимо проецировать каждое уведомление в наблюдаемую.
flatMap
является оператором асинхронной последовательной композиции. Он проецирует каждое уведомление из наблюдаемого источника в наблюдаемую, что позволяет обрабатывать каждое входное значение одновременно. Затем он объединяет результаты каждого вычисления в уплощенную наблюдаемую последовательность с неперекрывающимися уведомлениями.
Приложение:
в selector
за flatMap
часто существует несколько способов создания одновременной наблюдаемой в зависимости от целевой платформы. Я не знаю Java, но в.NET вы обычно используете Observable.Start
ввести параллелизм или асинхронный метод (async/await
) воспользоваться нативной асинхронностью, которая часто предпочтительнее.
1 Технически, индивидуальная подписка (наблюдатель) на наблюдаемую холодную среду обеспечивает параллелизм в Rx, хотя вместо этого часто удобно думать в терминах наблюдаемых. Смотрите этот ответ для получения дополнительной информации.