Наблюдаемый со значением LastFrom

Я реализовал псевдооператор с именем "FilterByLatestFrom" в качестве функции расширения для kotlin.

Я написал следующий код, используя этот оператор:

    fun testFilterByLatestFromOperator(){
    val observableA : Observable<Int> = Observable.fromArray(1,2,3,4,5,6,7,8,9,10)
    val observableC : PublishSubject<Int> = PublishSubject.create()
    val observableB : Observable<Int> = Observable.just(2).mergeWith(observableC)

    observableB.subscribe { println("observableB onNext: $it") }

    observableA
            .subscribe({ println("Original : $it")})

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result A : $it") })

    observableC.onNext(3)

    observableA.filterByLatestFrom(observableB, BiFunction { aVal, bVal -> aVal%bVal==0 })
            .subscribe({ println("Result AC : $it") })
}

выход:

observableB onNext: 2
Original : 1
Original : 2
Original : 3
Original : 4
Original : 5
Original : 6
Original : 7
Original : 8
Original : 9
Original : 10
Result A : 2
Result A : 4
Result A : 6
Result A : 8
Result A : 10
observableB onNext: 3
Result AC : 2
Result AC : 4
Result AC : 6
Result AC : 8
Result AC : 10

Я хочу, чтобы оператор фильтра отфильтровал obsA в соответствии с последним значением наблюдаемой B. Это работает для первого блока, но когда я добавляю On-next с новым значением, это не меняет результат (используйте то же самое последнее значение из исходной наблюдаемой).

это фильтр FilterByLatestFrom (он был разработан для Java также (с compose):

class FilterByLatestFrom<T,U>(private val observable: Observable<T>, private val biFunction: BiFunction<U, T, Boolean>){
fun filter() : ObservableTransformer<U,U> = ObservableTransformer {
    it
            .withLatestFrom(
                    observable,
                    BiFunction<U,T,Pair<U,Boolean>> {
                        u, t -> Pair(u,biFunction.apply(u,t))
                    })
            .filter { it.second }
            .map { it.first }
    }
}
fun <T,U> Observable<U>.filterByLatestFrom(observable: Observable<T>, biFunction: BiFunction<U, T, Boolean>) : Observable<U> =
        this.compose(FilterByLatestFrom(observable,biFunction).filter())

Что мне не хватает?

РЕДАКТИРОВАТЬ: я думаю, что нашел проблему: PublishSubject должен быть BehaviorSubject вместо этого. и функция слияния должна быть согласована с обещанием, что obsC будет выдавать после obsB.

1 ответ

Ваш псевдооператор filterByLatestFrom все в порядке, проблема заключается в тестировании, PublishSubject будет выдавать только последующие элементы, поэтому, когда в вашей последней подписке ("результат AC"), observableB будет излучать только 2, как observableC уже испустили 3 и не будут воспроизводить его observableB (с использованием merge).

Просто переместите observableC.onNext(3) после последней подписки (последняя строка) и вы должны увидеть ожидаемое поведение.

РЕДАКТИРОВАТЬ: также меняется на PublishSubject как вы решили ту же проблему (субъект будет воспроизводить последнее значение для новой подписки)

Другие вопросы по тегам