Как справиться с рекурсией с помощью наблюдаемой моникса?
Используя monix, я пытаюсь пройти по графику, построив Observable[Node] и используя широкий алгоритм. Однако там у меня есть небольшая проблема рекурсии. Вот фрагмент, иллюстрирующий мою проблему:
package gp
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
object HelloObservable {
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i : Node) : List[Task[Node]] =
List(Task(i), Task(i+1))
def go(i :Node) : Task[Iterator[List[Node]]] =
Task.sequence(nexts(i).sliding(100,100).map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterator(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args : Array[String]) : Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}
Наблюдаемая остановка после первой итерации. Может кто-нибудь намекнуть мне, почему?
1 ответ
Я думаю, что проблема не связана с рекурсией. Я думаю, что это связано с тем, что вы используете sliding
который возвращает Iterator
, Основное различие между Iterator
а также Iterable
это то, что вы можете потреблять Iterator
только один раз и после этого все, что вам осталось, это пустое Iterator
, Это значит, когда вы делаете firsts.flatMap
ничего не осталось в Observable.fromIterator(ilr)
и так ничего не производится.
По сути, я не думаю, что вы можете выполнить поиск в ширину, если не можете удерживать (большую часть) префикс в памяти. Но так как ваш nexts
уже возвращается List
Я предполагаю, что вы можете позволить себе иметь две копии этого списка в памяти. И второй экземпляр является материализованным результатом sliding
, Таким образом, ваш фиксированный код будет выглядеть примерно так:
object HelloObservable {
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._
type Node = Int
//real case fetch next node across the network so the signature
//has to be Node -> List[Task[Node]]
def nexts(i: Node): List[Task[Node]] = List(Task(i), Task(i + 1))
def go(i: Node): Task[List[List[Node]]] =
Task.sequence(nexts(i).sliding(100, 100).toList.map(Task.gatherUnordered))
def explore(r: Node): Observable[Node] = {
val firsts = for {
ilr <- Observable.fromTask(go(r))
lr <- Observable.fromIterable(ilr)
r <- Observable.fromIterable(lr)
} yield r
firsts ++ firsts.flatMap(explore)
}
def main(args: Array[String]): Unit = {
val obs = explore(0)
val cancelable = obs
.dump("O")
.subscribe()
scala.io.StdIn.readLine()
}
}