Как я могу объединить последовательное и параллельное выполнение отложенных вызовов функций?
Я застрял в странном месте. У меня есть куча отложенных вызовов функций, которые я хочу выполнить в определенном порядке. В то время как выполнение параллельно тривиально:
res = client.compute([myfuncs])
res = client.gather(res)
Я не могу найти способ выполнить их в последовательности, неблокирующим способом.
Вот минимальный пример:
import numpy as np
from time import sleep
from datetime import datetime
from dask import delayed
from dask.distributed import LocalCluster, Client
@delayed
def dosomething(name):
res = {"name": name, "beg": datetime.now()}
sleep(np.random.randint(10))
res.update(rand=np.random.rand())
res.update(end=datetime.now())
return res
seq1 = [dosomething(name) for name in ["foo", "bar", "baz"]]
par1 = dosomething("whaat")
par2 = dosomething("ahem")
pipeline = [seq1, par1, par2]
Учитывая приведенный выше пример, я хотел бы запустить seq1
, par1
, а также par2
параллельно, но составляющие seq1
: "foo", "bar" и "baz", в последовательности.
1 ответ
Вы можете определенно обмануть и добавить необязательную зависимость к вашей функции следующим образом:
@dask.delayed
def dosomething(name, *args):
...
Таким образом, вы можете сделать задачи зависимыми друг от друга, даже если вы не используете один результат при следующем запуске функции:
inputs = ["foo", "bar", "baz"]
seq1 = [dosomething(inputs[0])]
for bit in inputs[1:]:
seq1.append(dosomething(bit, seq1[-1]))
Кроме того, вы можете прочитать об интерфейсе "фьючерсов" распределенного планировщика, с помощью которого вы можете отслеживать ход выполнения задач в режиме реального времени.