Как элегантно выполнять несколько эффектов параллельно с ZIO

Я знаю, что я могу использовать

import zio.Task

def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
  a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }

def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
  zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }

выполнять 3 или 4 задачи параллельно, но я думаю, есть ли более элегантное решение?

4 ответа

Решение

Вы могли бы просто использовать ZIO.collectAllPar со списком задач:

def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)

Тогда вы можете использовать его как:

val t1 = Task.effect{
  Thread.sleep(100)
  println("t1 started")
  Thread.sleep(1000)
  1
}

val t2 = Task.effect{
  println("t2 started")
  Thread.sleep(1000)
  2
}


val t3 = Task.effect{
  println("t3 started")
  Thread.sleep(1000)
  3
}

(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))

и он будет выполнять все ваши задачи одновременно.

Общее решение с использованием кортежей вместо списка было бы трудно реализовать в Scala 2 без бесформенного. Это изменилось бы в Scala 3, потому что тогда они могли бы обрабатываться как разнородные списки.

Также обратите внимание, что есть <&> комбинатор. Это псевдоним для zipPar, Это даст кортеж, и если вы используете для понимания, я бы посоветовал взглянуть на better-monadic-for который решает проблемы с кортежами для понимания

Вот пример использования <&> комбинатор с картой:

(t1 <&> t2 <&> t3 <&> t4) map { case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4" }

ZIO.collectAllPar а также ZIO.collectAllParN работать только тогда, когда все ZIO имеют тот же тип возврата. Это был не вопрос.

В дополнение к ответу Кшиштофа Атласика, есть также collectAllParN, которое работает как collectAllPAr, но позволяет вам указать максимальное количество используемых волокон:

 val a = Task {
      println("t1 started")
      Thread.sleep(2000)
      println("t1 finished")
      1
    }
    val b = Task {
      println("t2 started")
      Thread.sleep(1000)
      println("t2 finished")
      2
    }
    val c = Task {
      println("t3 started")
      Thread.sleep(3000)
      println("t3 finished")
      3
    }
    val d = Task {
      println("t4 started")
      Thread.sleep(1000)
      println("t4 finished")
      4
    }

И вы можете запустить его так:

 Task.collectAllParN(4)(List(a, b, c, d))

Это особенно полезно, если у вас много (сотни или тысячи) параллельных задач, чтобы избежать переполнения и ошибок памяти. Идите вперед и измените количество используемых волокон на 2 или 3 и убедитесь сами, как меняется исполнение.

Другой вариант для параллельного выполнения - поместить задачи в ZQueue и разветвлять их, как только ваши потребители получат их.

Начиная с ZIO 2.x вы можете использовать foreachParкоторый имеет возможность контролировать параллелизм с withParallelism. Простой пример может выглядеть примерно так

      ZIO.foreachPar(urls)(download).withParallelism(8)

В withParallelismUnboundedметод можно использовать, когда мы хотим запустить параллельный эффект с неограниченным максимальным количеством волокон:

      ZIO.foreachPar(urls)(download).withParallelismUnbounded

Все операторы параллелизма, оканчивающиеся на N, такие как foreachParNа также collectAllParN, устарели в версии 2.x.

Другие вопросы по тегам