Scala Rx можно наблюдать с помощью Monifu

Я просто пытаюсь понять понятия между горячей и холодной наблюдаемой и пробую библиотеку Монифу. Насколько я понимаю, следующий код должен привести к тому, что только один подписчик получит события, испускаемые Observable, но это не так!

scala> :paste
// Entering paste mode (ctrl-D to finish)

import monifu.reactive._
import scala.concurrent.duration._

import monifu.concurrent.Implicits.globalScheduler

val obs = Observable.interval(1.second).take(10)

val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))

// Exiting paste mode, now interpreting.

from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()

scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9

Итак, для меня это выглядит так, как будто Observable публикует события для всех заинтересованных подписчиков?

1 ответ

Решение

Я основной автор Monifu.

Холодное наблюдение означает, что его функция подписки инициирует новый источник данных для каждого подписчика (на каждом subscribe() вызов), тогда как горячая наблюдаемая разделяет один и тот же источник данных между несколькими подписчиками.

В качестве примера рассмотрим файл как источник данных. Позволяет моделировать простой Observable, который испускает строки из файла:

def fromFile(file: File): Observable[String] = {
  // this is the subscribe function that
  // we are passing to create ;-)
  Observable.create { subscriber =>
    // executing things on our thread-pool
    subscriber.scheduler.execute {
      val source = try {
        Observable.fromIterable(scala.io.Source
          .fromFile(file).getLines().toIterable)
      } 
      catch {
        // subscribe functions must be protected
        case NonFatal(ex) =>
          Observable.error(ex)
      }

      source.unsafeSubscribe(subscriber)
    }
  }
}

Эта функция создает наблюдаемую холодность. Это означает, что он откроет новый дескриптор файла для каждого подписанного наблюдателя, а затем прочитает и выдаст строки для каждого подписанного наблюдателя.

Но мы можем превратить это в горячую наблюдаемую:

// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()

И тогда разница будет, когда вы сделаете это:

val x = observable.subscribe()
val y = observable.subscribe()

Если наблюдаемое горячо:

  1. наблюдаемое ничего не делает, пока вы не позвоните connect() в теме
  2. после connect()один и тот же файл открывается, и оба получат одинаковые события
  3. после отправки всех строк из этого файла новые подписчики не получат ничего, поскольку (общий) источник данных уже исчерпан

Если наблюдаемое холодно:

  1. при каждой подписке открывается новый дескриптор файла и читается
  2. элементы выбрасываются сразу после subscribe()так что не нужно ждать connect()
  3. все подписчики получат все строки из этого файла, независимо от того, когда они это сделают

Некоторые ссылки, которые также относятся к Monifu:

  1. Подключаемый Наблюдаемый из вики RxJava
  2. Введение в Rx: горячие и холодные наблюдаемые
  3. Предметы из вики RxJava
Другие вопросы по тегам