Как использовать собственный планировщик в RxScala?
Я пытаюсь с
val executors = Executors.newSingleThreadExecutor()
val scheduler = Schedulers.from(executors)
Observable.just[Int](1,2,3).subscribeOn(scheduler)
вывести ошибку
Error:(103, 43) type mismatch;
found : rx.Scheduler
required: rx.lang.scala.Scheduler
Observable.just[Int](1,2,3).subscribeOn(scheduler)
^
как использовать собственный планировщик в RxScala
?
1 ответ
Проблема здесь в том, что вы смешиваете код RxJava и RxScala здесь. Видите ли, RxScala - это просто оболочка для функциональности RxJava; первый только перенаправляет второму и не имеет никаких "реальных" реализаций. Это полезно, поскольку вам нужно поддерживать только 1 версию, а не 2 или более.
Тип scheduler
в вашем примере это rx.Scheduler
так что это RxJava Scheduler
, Тем не мение, subscribeOn
требует от вас предоставить rx.lang.scala.Scheduler
, который является RxScala Scheduler
, Следовательно, вам нужно либо конвертировать RxJava Scheduler
одному из RxScala.
Тем не менее, для вашего случая есть лучший способ сделать что-то: обернуть Executors.newSingleThreadExecutor
в scala.concurrent.ExecutionContext
с использованием fromExecutor
заводской метод. Затем заверните это в rx.lang.scala.schedulers.ExecutionContextScheduler
и у вас есть планировщик, который вы можете использовать в subscribeOn
, Ваш код будет выглядеть примерно так (я включил оператор print, чтобы увидеть, в каком потоке работает этот материал):
val e = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)
val s = ExecutionContextScheduler(e)
Observable.just(1, 2, 3)
.subscribeOn(s)
.doOnNext(x => println(s"thread - ${Thread.currentThread().getName}, value - $x"))
.subscribe()