Асинхронный "узел" в скаляр-потоке

У меня есть Process[Task, A]и мне нужно запустить функцию A => B время выполнения которого варьируется от мгновенного до очень длинного на каждом A потока, чтобы дать Process[Task, B],

Подвох в том, что я хотел бы обработать каждый A как можно скорее в ExecutionContext и передать результат, как только я его получу, независимо от того, в каком порядке As получены.

Конкретным примером будет следующий код, где я надеюсь, что все нечетные числа будут напечатаны немедленно, а четные - примерно через 500 мс. Вместо этого получается, что (нечетные, четные) пары печатаются с чередованием с паузами 500 мс:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))

  Process.range(0, 100).flatMap { i =>
    Process.eval(Task.apply {
      if(i % 2 == 0) Thread.sleep(500)
      i
    }(executor))
  }.to(io.printStreamSink(System.out)(_ println _))
  .run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}

1 ответ

Оказывается, ответ использует каналы. Вот обновленный код, который, кажется, делает именно то, что я хочу:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
  val chan = channel.lift[Task, Int, Int] { i => Task {
    if(i % 2 == 0) Thread.sleep(500)
    i
  }}

  merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
    .to(io.printStreamSink(System.out)(_ println _)).run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
Другие вопросы по тегам