Как транслировать холодную наблюдаемую: повтор с обратным давлением?

Я на самом деле использую Scala, но этот вопрос является общим для всех Rx и потоковых фреймворков.

Мой вариант использования состоит в том, что у меня есть сгенерированная наблюдаемая (поэтому холодная) и я хочу, чтобы несколько потребителей использовали одни и те же значения параллельно, но я ожидаю, что они будут иметь существенно различную пропускную способность.

То, что мне нужно, могло бы быть сделано путем трансляции наблюдаемой с повторением, но я вижу, что общая политика воспроизведения с максимальным размером буфера состоит в отбрасывании элементов из буфера при переполнении (которые затем теряются для самых медленных потребителей) вместо обратного давление на производителя. Это имеет смысл, если вы относитесь ко всем вещаемым наблюдаемым как к горячим, но в моем случае я знаю, что на самом деле они холодные и на них можно оказать давление.

Есть ли способ достичь этого в какой-либо из структур, совместимых с реактивным потоком JVM?

Большое спасибо!

1 ответ

Решение

RxJava поддерживает это через publish оператор, который координирует запросы от отдельных потребителей, то есть он запрашивает с фиксированной скоростью так же быстро, как самые медленные запросы потребителей. К сожалению, в настоящее время RxScala 2 не существует, и только RxJava 2 поддерживает спецификацию Reactive-Streams, поэтому у вас может возникнуть некоторое неудобство, превращающее это в Scala:

Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f -> 
    Flowable.mergeArray(
        f.observeOn(Schedulers.computation()).map(v -> v * v),
        f.observeOn(Schedulers.computation()).map(v -> v * v * v)
    )
 )
 .blockingSubscribe(System.out::println);

Альтернативой является использование ConnectableObservable и подключите вручную, как только все потребители подписались:

ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
    .publish();

co.observeOn(Schedulers.computation()).map(v -> v * v)
  .subscribe(System.out::println);

co.connect();
Другие вопросы по тегам