Вызов (перегруженный) функций RxJava из Scala

Я хотел создать Observable из массива Observable вот так:

package rxtest

import concurrent._
import concurrent.ExecutionContext.Implicits.global

import rx.lang.scala._
import rx.lang.scala.JavaConversions._
import rx.lang.scala.schedulers._

object A extends App {
    val ps = Array.fill(3)(Promise[Int]())
    val os = ps map {
            p => Observable from p.future observeOn NewThreadScheduler()
        }
    val v = rx.Observable.merge(os map toJavaObservable)
}

Эта программа не компилируется, так как Observable имеет несколько перегруженных методов merge:

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:15: overloaded method value merge with alternatives:
[error]   [T](x$1: Array[rx.Observable[_ <: T]])rx.Observable[T] <and>
[error]   [T](x$1: rx.Observable[_ <: rx.Observable[_ <: T]])rx.Observable[T] <and>
[error]   [T](x$1: Iterable[_ <: rx.Observable[_ <: T]])rx.Observable[T]
[error]  cannot be applied to (Array[rx.Observable[_ <: Int]])
[error]     val v = rx.Observable.merge(os map toJavaObservable)
[error]                           ^
[error] one error found

Затем я хотел снять перегрузку с помощью другого Java-класса:

public class RxUtils {
    public final static <T> Observable<T> merge(Observable<? extends T>[] os) {
        return Observable.merge(os);
    }
}

Код Scala стал (здесь указана только соответствующая часть):

val ps = Array.fill(3)(Promise[Int]())
val os = ps map {
        p => Observable from p.future observeOn NewThreadScheduler()
    }
val v = RxUtils.merge(os map toJavaObservable)

Эта программа по-прежнему не компилируется:

[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: no type parameters for method merge: (os: Array[rx.Observable[_ <: T]])rx.Observable[T] exist so that it can be applied to arguments (Array[rx.Observable[_ <: Int]])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : Array[rx.Observable[_ <: Int]]
[error]  required: Array[rx.Observable[_ <: ?T]]
[error]     val v = RxUtils.merge(os map toJavaObservable)
[error]                     ^
[error] /home/xgp/work/rxtest/src/main/scala/rxtest/A.scala:17: type mismatch;
[error]  found   : Array[rx.Observable[_ <: Int]]
[error]  required: Array[rx.Observable[_ <: T]]
[error]     val v = RxUtils.merge(os map toJavaObservable)
[error]                              ^
[error] two errors found

У меня три вопроса:

  1. Как позвонить merge метод с чистым скала как в первом случае?
  2. Почему вторая программа не компилируется?
  3. Как позвонить merge метод в выше RxUtils класс в Скале?

1 ответ

Я действительно смущен тем, что ты здесь делаешь. Почему вы смешиваете rx.Observable а также rx.lang.scala.Observable, Просто выберите любой из них: если вы работаете в Scala, выберите последний; если вы пишете код Java, выберите первый!

Я также хочу указать вам на эту страницу, которая сравнивает оба вида Observable,

Что касается вашей первой программы, если я правильно выбрал тип ps является Array[Promise[Int]], так os должен иметь тип Array[Observable[Int]], Если вы хотите объединить их всех в один ObservableВы можете перейти по ссылке выше и найти merge(Array<Observable<? extends T>>) в левой колонке. Оказывается, вы можете написать это в Scala как Observable.from(os).flatten или же os.toObservable.flatten,

Что касается второго и третьего вопроса: я на самом деле не проверял это, но это, вероятно, связано с ковариационными различиями между Java и Scala. Вероятно, поможет системе типов с предоставлением некоторой дополнительной информации о типах. Но я думаю, что если вы просто останетесь на языке Scala и будете использовать библиотеку RxScala в том виде, в каком она должна быть, вам вообще не придется сталкиваться с такими проблемами.

Другие вопросы по тегам