Как элегантно выполнять несколько эффектов параллельно с ZIO
Я знаю, что я могу использовать
import zio.Task
def zip3Par[A, B, C](a: Task[A], b: Task[B], c: Task[C]): Task[(A, B, C)] =
a.zipPar(b).zipWithPar(c) { case ((a, b), c) => (a, b, c) }
def zip4Par[A, B, C, D](a: Task[A], b: Task[B], c: Task[C], d: Task[D]): Task[(A, B, C, D)] =
zip3Par(a, b, c).zipWithPar(d) { case ((a, b, c), d) => (a, b, c, d) }
выполнять 3 или 4 задачи параллельно, но я думаю, есть ли более элегантное решение?
4 ответа
Вы могли бы просто использовать ZIO.collectAllPar
со списком задач:
def collectTasks(tasks: Task[Int]*):Task[List[Int]] = ZIO.collectAllPar(tasks)
Тогда вы можете использовать его как:
val t1 = Task.effect{
Thread.sleep(100)
println("t1 started")
Thread.sleep(1000)
1
}
val t2 = Task.effect{
println("t2 started")
Thread.sleep(1000)
2
}
val t3 = Task.effect{
println("t3 started")
Thread.sleep(1000)
3
}
(new DefaultRuntime() {}).unsafeRun(collectTasks(t1,t2,t3))
и он будет выполнять все ваши задачи одновременно.
Общее решение с использованием кортежей вместо списка было бы трудно реализовать в Scala 2 без бесформенного. Это изменилось бы в Scala 3, потому что тогда они могли бы обрабатываться как разнородные списки.
Также обратите внимание, что есть <&>
комбинатор. Это псевдоним для zipPar
, Это даст кортеж, и если вы используете для понимания, я бы посоветовал взглянуть на better-monadic-for
который решает проблемы с кортежами для понимания
Вот пример использования <&>
комбинатор с картой:
(t1 <&> t2 <&> t3 <&> t4) map {
case i1 <*> i2 <*> i3 <*> i4 => s"$i1, $i2, $i3, $i4"
}
ZIO.collectAllPar
а также ZIO.collectAllParN
работать только тогда, когда все ZIO
имеют тот же тип возврата. Это был не вопрос.
В дополнение к ответу Кшиштофа Атласика, есть также collectAllParN, которое работает как collectAllPAr, но позволяет вам указать максимальное количество используемых волокон:
val a = Task {
println("t1 started")
Thread.sleep(2000)
println("t1 finished")
1
}
val b = Task {
println("t2 started")
Thread.sleep(1000)
println("t2 finished")
2
}
val c = Task {
println("t3 started")
Thread.sleep(3000)
println("t3 finished")
3
}
val d = Task {
println("t4 started")
Thread.sleep(1000)
println("t4 finished")
4
}
И вы можете запустить его так:
Task.collectAllParN(4)(List(a, b, c, d))
Это особенно полезно, если у вас много (сотни или тысячи) параллельных задач, чтобы избежать переполнения и ошибок памяти. Идите вперед и измените количество используемых волокон на 2 или 3 и убедитесь сами, как меняется исполнение.
Другой вариант для параллельного выполнения - поместить задачи в ZQueue и разветвлять их, как только ваши потребители получат их.
Начиная с ZIO 2.x вы можете использовать
foreachPar
который имеет возможность контролировать параллелизм с
withParallelism
. Простой пример может выглядеть примерно так
ZIO.foreachPar(urls)(download).withParallelism(8)
В
withParallelismUnbounded
метод можно использовать, когда мы хотим запустить параллельный эффект с неограниченным максимальным количеством волокон:
ZIO.foreachPar(urls)(download).withParallelismUnbounded
Все операторы параллелизма, оканчивающиеся на N, такие как
foreachParN
а также
collectAllParN
, устарели в версии 2.x.