Субъект, который отправляет события подписчикам в определенном порядке с обратным давлением
Представьте себе канал подписчиков, которому вы отправляете событие, и он посещает одного подписчика за другим.
Наличие PublishSubject и x подписчиков / наблюдаемых. Обычно события отправляются наблюдателям в определенном порядке, но одновременно, независимо от того, когда наблюдатели возвращаются. Можно ли сделать этот поток:
- отправить событие наблюдателю
- после возврата osbserverA отправьте событие
- после того, как обозреватель B возвращается, отправляет событие для наблюдателя C
Я использую RxScala и Monifu Rx реализации
Monifu даже имеет реализацию противодействия:
def onNext(elem: T): Future[Ack]
Мне бы хотелось, чтобы в этом примере было напечатано "И результат был: изменен!!":
val subject = PublishSubject[Int]()
var result = "Not Changed"
subject.subscribe { i =>
Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
result = "Changed !!"
x.get
}
}
subject.subscribe { i =>
Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
println("And Result was : " + result)
x.get
}
}
subject.onNext(1)
Возможно ли это в RxScala/RxJava или Monifu без расширения Subject и переопределения реализации onNext? Эти классы в любом случае объявлены окончательными, так что это будет довольно взломанным.
1 ответ
Я думаю, что ответом является пользовательская реализация Subject, что-то вроде этого в Monifu, которая будет кормить наблюдателей в стиле flatMap (игнорируя тот факт, что PublishSubject является конечным классом):
class PipeSubject extends PublishSubject[RxEvent] {
override def onNext(elem: RxEvent): Future[Ack] = {
if (!isCompleted) {
val observers = subscriptions
if (observers.nonEmpty)
pipeThroughMany(observers, elem)
else
Continue
}
else
Cancel
}
private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
val length = array.length
def >>>(idx: Int = 0): Future[Continue] = {
val obs = array(idx)
obs.onNext(elem).flatMap {
case Continue =>
if (idx+1 < length)
>>>(idx+1)
else
Continue
case _ =>
removeSubscription(obs)
Continue
}
}
>>>()
}
}