Как справиться с рекурсией с помощью наблюдаемой моникса?

Используя monix, я пытаюсь пройти по графику, построив Observable[Node] и используя широкий алгоритм. Однако там у меня есть небольшая проблема рекурсии. Вот фрагмент, иллюстрирующий мою проблему:

package gp

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive._


object HelloObservable {

  type Node = Int

  //real case fetch next node across the network so the signature 
  //has to be Node -> List[Task[Node]]
  def nexts(i : Node) : List[Task[Node]] =
    List(Task(i), Task(i+1))

  def go(i :Node) : Task[Iterator[List[Node]]] =
    Task.sequence(nexts(i).sliding(100,100).map(Task.gatherUnordered))

  def explore(r: Node): Observable[Node] = {
    val firsts = for {
      ilr <- Observable.fromTask(go(r))
      lr <- Observable.fromIterator(ilr)
      r <- Observable.fromIterable(lr)
    } yield r

    firsts ++ firsts.flatMap(explore)
  }


  def main(args : Array[String]) : Unit = {

    val obs = explore(0)

    val cancelable = obs
      .dump("O")
      .subscribe()

    scala.io.StdIn.readLine()

  }

}

Наблюдаемая остановка после первой итерации. Может кто-нибудь намекнуть мне, почему?

1 ответ

Решение

Я думаю, что проблема не связана с рекурсией. Я думаю, что это связано с тем, что вы используете sliding который возвращает Iterator, Основное различие между Iterator а также Iterable это то, что вы можете потреблять Iterator только один раз и после этого все, что вам осталось, это пустое Iterator, Это значит, когда вы делаете firsts.flatMap ничего не осталось в Observable.fromIterator(ilr) и так ничего не производится.

По сути, я не думаю, что вы можете выполнить поиск в ширину, если не можете удерживать (большую часть) префикс в памяти. Но так как ваш nexts уже возвращается ListЯ предполагаю, что вы можете позволить себе иметь две копии этого списка в памяти. И второй экземпляр является материализованным результатом sliding, Таким образом, ваш фиксированный код будет выглядеть примерно так:

object HelloObservable {

    import monix.eval.Task
    import monix.execution.Scheduler.Implicits.global
    import monix.reactive._

    type Node = Int

    //real case fetch next node across the network so the signature
    //has to be Node -> List[Task[Node]]
    def nexts(i: Node): List[Task[Node]] = List(Task(i), Task(i + 1))

    def go(i: Node): Task[List[List[Node]]] =
      Task.sequence(nexts(i).sliding(100, 100).toList.map(Task.gatherUnordered))


    def explore(r: Node): Observable[Node] = {
      val firsts = for {
        ilr <- Observable.fromTask(go(r))
        lr <- Observable.fromIterable(ilr)
        r <- Observable.fromIterable(lr)
      } yield r
      firsts ++ firsts.flatMap(explore)
    }


    def main(args: Array[String]): Unit = {

      val obs = explore(0)

      val cancelable = obs
        .dump("O")
        .subscribe()

      scala.io.StdIn.readLine()

    }
}
Другие вопросы по тегам