Преобразовать бесконечный поток конечных потоков в бесконечный поток - Reactive X
Каким образом в Reactive x (в идеале с примерами в RxJava или RxJs) этого можно добиться?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
это бесконечный поток событий, которые запускают конечный поток sn
событий, каждое из которых должно быть частью бесконечного потока S
будучи в состоянии подписаться на каждый sn
поток (для выполнения операций суммирования), но в то же время сохраняя поток S
как бесконечный.
РЕДАКТИРОВАТЬ: Чтобы быть более конкретным, я предоставляю реализацию того, что я ищу в Kotlin. Каждые 10 секунд генерируется событие, которое отображается в общий конечный поток из 4 событий. Метастрим flatMap
в нормальный бесконечный поток. Я использую doAfterNext
дополнительно подписаться на каждый конечный поток и распечатать результаты.
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
Однако я не уверен, что это "чистый прием".
1 ответ
Вы не ясно, как вы хотите сделать подсчет. Если вы делаете общий подсчет, тогда нет необходимости делать внутреннюю подписку:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it }
.doOnNext( counter.incrementAndget() )
.subscribe { println("Just received [$it] from the infinite stream ") }
С другой стороны, если вам нужно предоставить счет для каждого промежуточного наблюдаемого, то вы можете переместить подсчет внутри flatMap()
и распечатать счетчик и сбросить его по завершении:
AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.flatMap { it
.doOnNext( counter.incrementAndget()
.doOnCompleted( { long ctr = counter.getAndSet(0)
println("I am done. Total event count is $ctr")
} )
.subscribe { println("Just received [$it] from the infinite stream ") }
Это не очень функционально, но такого рода отчеты имеют тенденцию нарушать нормальные потоки.