Как транслировать холодную наблюдаемую: повтор с обратным давлением?
Я на самом деле использую Scala, но этот вопрос является общим для всех Rx и потоковых фреймворков.
Мой вариант использования состоит в том, что у меня есть сгенерированная наблюдаемая (поэтому холодная) и я хочу, чтобы несколько потребителей использовали одни и те же значения параллельно, но я ожидаю, что они будут иметь существенно различную пропускную способность.
То, что мне нужно, могло бы быть сделано путем трансляции наблюдаемой с повторением, но я вижу, что общая политика воспроизведения с максимальным размером буфера состоит в отбрасывании элементов из буфера при переполнении (которые затем теряются для самых медленных потребителей) вместо обратного давление на производителя. Это имеет смысл, если вы относитесь ко всем вещаемым наблюдаемым как к горячим, но в моем случае я знаю, что на самом деле они холодные и на них можно оказать давление.
Есть ли способ достичь этого в какой-либо из структур, совместимых с реактивным потоком JVM?
Большое спасибо!
1 ответ
RxJava поддерживает это через publish
оператор, который координирует запросы от отдельных потребителей, то есть он запрашивает с фиксированной скоростью так же быстро, как самые медленные запросы потребителей. К сожалению, в настоящее время RxScala 2 не существует, и только RxJava 2 поддерживает спецификацию Reactive-Streams, поэтому у вас может возникнуть некоторое неудобство, превращающее это в Scala:
Flowable.fromPublisher(Flowable.range(1, 1000))
.publish(f ->
Flowable.mergeArray(
f.observeOn(Schedulers.computation()).map(v -> v * v),
f.observeOn(Schedulers.computation()).map(v -> v * v * v)
)
)
.blockingSubscribe(System.out::println);
Альтернативой является использование ConnectableObservable
и подключите вручную, как только все потребители подписались:
ConnectableFlowable<Integer> co = Flowable.fromPublisher(Flowable.range(1, 1000))
.publish();
co.observeOn(Schedulers.computation()).map(v -> v * v)
.subscribe(System.out::println);
co.connect();