Субъект, который отправляет события подписчикам в определенном порядке с обратным давлением

Представьте себе канал подписчиков, которому вы отправляете событие, и он посещает одного подписчика за другим.

Наличие PublishSubject и x подписчиков / наблюдаемых. Обычно события отправляются наблюдателям в определенном порядке, но одновременно, независимо от того, когда наблюдатели возвращаются. Можно ли сделать этот поток:

  1. отправить событие наблюдателю
  2. после возврата osbserverA отправьте событие
  3. после того, как обозреватель B возвращается, отправляет событие для наблюдателя C

Я использую RxScala и Monifu Rx реализации

Monifu даже имеет реализацию противодействия:

def onNext(elem: T): Future[Ack]

Мне бы хотелось, чтобы в этом примере было напечатано "И результат был: изменен!!":

  val subject = PublishSubject[Int]()

  var result = "Not Changed"
  subject.subscribe { i =>
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
      result = "Changed !!"
      x.get
    }
  }

  subject.subscribe { i =>
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
      println("And Result was : " + result)
      x.get
    }
  }

  subject.onNext(1)

Возможно ли это в RxScala/RxJava или Monifu без расширения Subject и переопределения реализации onNext? Эти классы в любом случае объявлены окончательными, так что это будет довольно взломанным.

1 ответ

Решение

Я думаю, что ответом является пользовательская реализация Subject, что-то вроде этого в Monifu, которая будет кормить наблюдателей в стиле flatMap (игнорируя тот факт, что PublishSubject является конечным классом):

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }

 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}
Другие вопросы по тегам