Преобразовать бесконечный поток конечных потоков в бесконечный поток - Reactive X

Каким образом в Reactive x (в идеале с примерами в RxJava или RxJs) этого можно добиться?

a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2                       |-x-x-x-x-x-| (subscribe)
s2                                               |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)

a это бесконечный поток событий, которые запускают конечный поток sn событий, каждое из которых должно быть частью бесконечного потока S будучи в состоянии подписаться на каждый sn поток (для выполнения операций суммирования), но в то же время сохраняя поток S как бесконечный.

РЕДАКТИРОВАТЬ: Чтобы быть более конкретным, я предоставляю реализацию того, что я ищу в Kotlin. Каждые 10 секунд генерируется событие, которое отображается в общий конечный поток из 4 событий. Метастрим flatMapв нормальный бесконечный поток. Я использую doAfterNext дополнительно подписаться на каждый конечный поток и распечатать результаты.

/** Creates a finite stream with events
 * $ch-1 - $ch-4
 */
fun createFinite(ch: Char): Observable<String> =
        Observable.interval(1, TimeUnit.SECONDS)
                .take(4)
                .map({ "$ch-$it" }).share()

fun main(args: Array<String>) {

    var ch = 'A'

    Observable.interval(10, TimeUnit.SECONDS).startWith(0)
            .map { createFinite(ch++) }
            .doAfterNext {
                it
                        .count()
                        .subscribe({ c -> println("I am done. Total event count is $c") })
            }
            .flatMap { it }
            .subscribe { println("Just received [$it] from the infinite stream ") }

    // Let main thread wait forever
    CountDownLatch(1).await()
}

Однако я не уверен, что это "чистый прием".

1 ответ

Вы не ясно, как вы хотите сделать подсчет. Если вы делаете общий подсчет, тогда нет необходимости делать внутреннюю подписку:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it }
        .doOnNext( counter.incrementAndget() )
        .subscribe { println("Just received [$it] from the infinite stream ") }

С другой стороны, если вам нужно предоставить счет для каждого промежуточного наблюдаемого, то вы можете переместить подсчет внутри flatMap() и распечатать счетчик и сбросить его по завершении:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it
                     .doOnNext( counter.incrementAndget()
                     .doOnCompleted( { long ctr = counter.getAndSet(0)
                                        println("I am done. Total event count is $ctr")
                                     } )
        .subscribe { println("Just received [$it] from the infinite stream ") }

Это не очень функционально, но такого рода отчеты имеют тенденцию нарушать нормальные потоки.

Другие вопросы по тегам