Как использовать собственный планировщик в RxScala?

Я пытаюсь с

val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)

вывести ошибку

Error:(103, 43) type mismatch;
found   : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
                                      ^ 

как использовать собственный планировщик в RxScala?

1 ответ

Решение

Проблема здесь в том, что вы смешиваете код RxJava и RxScala здесь. Видите ли, RxScala - это просто оболочка для функциональности RxJava; первый только перенаправляет второму и не имеет никаких "реальных" реализаций. Это полезно, поскольку вам нужно поддерживать только 1 версию, а не 2 или более.

Тип scheduler в вашем примере это rx.Schedulerтак что это RxJava Scheduler, Тем не мение, subscribeOn требует от вас предоставить rx.lang.scala.Scheduler, который является RxScala Scheduler, Следовательно, вам нужно либо конвертировать RxJava Scheduler одному из RxScala.

Тем не менее, для вашего случая есть лучший способ сделать что-то: обернуть Executors.newSingleThreadExecutor в scala.concurrent.ExecutionContext с использованием fromExecutor заводской метод. Затем заверните это в rx.lang.scala.schedulers.ExecutionContextScheduler и у вас есть планировщик, который вы можете использовать в subscribeOn, Ваш код будет выглядеть примерно так (я включил оператор print, чтобы увидеть, в каком потоке работает этот материал):

val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)

Observable.just(1, 2, 3)
    .subscribeOn(s)
    .doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
    .subscribe()
Другие вопросы по тегам