Как создать одну наблюдаемую из другой

Допустим, у нас есть источник Observable of Ints:

val source:Observable[Int]

Я хотел бы создать другой Observable, производящий значения, чья разница с первым появившимся значением в источнике больше 10:

def detect() = Observable[Int](
  subscriber =>
    if (!subscriber.isUnsubscribed) {
      var start:Option[Int] = None
      source.subscribe(
        item => {
          if (start.isEmpty) {
            start = Option(item)
          }
          else {
            start.filter(v => Math.abs(item - v) > 10).foreach {
              item => subscriber.onNext(item)
            }
          }
        }
      )
      subscriber.onCompleted()
    }
)

Здесь я использовал var start для хранения первого значения источника Observable.

Есть ли способ упростить этот код? Мне не нравится этот подход с присвоением значения переменной

2 ответа

Решение

Вот что я придумал:

import rx.lang.scala.Observable

val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50))

source.scan(Option.empty[(Int, Int)]) { (acc, next) =>
  acc.map(_.copy(_2 = next)) orElse Some((next, next))
}.collect {
  case Some((start, current)) if math.abs(start - current) > 10 => current
}.subscribe(x => println(x))

печать

16
-40
-70
50

в основном сканирование хранит аккумулятор, который может быть неинициализирован (None), или может содержать пару: первое значение и последний элемент, исходящий из источника. Затем мы собираем только те элементы, которые соответствуют вашему предикату.

Вам просто нужно применить filter оператор, который создает новый объект Observable, который отражает выбросы наблюдаемого источника, но пропускает те, для которых предикат тестирует ложь:

val filtered = source.filter(v => Math.abs(item - v) > 10)
Другие вопросы по тегам