Объединение tqdm с отложенным выполнением с dask в python

tqdm а также dask оба удивительные пакеты для итераций в Python. В то время как tqdm реализует необходимый индикатор выполнения, dask реализует многопоточную платформу, и они оба могут сделать процесс итерации менее расстраивающим. Но у меня проблемы с объединением их обоих.

Например, следующий код реализует отложенное выполнение в dask, с tqdm.trange индикатор. Дело в том, что, так как delayed выполняется быстро, индикатор выполнения немедленно завершается, в то время как реальное время вычислений выполняется во время compute часть.

from dask import delayed,compute
from tqdm import trange
from time import sleep

ct = time()
result= []

def fun(x):
    sleep(x)
    return x

for i in trange(10):
    result.append(delayed(fun)(i))

print compute(result)

Как я могу прикрепить индикатор выполнения к фактическому выполнению в compute команда?

2 ответа

Решение

Рассмотрим индикатор прогресса Даска

from dask.diagnostics import ProgressBar

with ProgressBar():
    compute(result)

Создайте собственную диагностику

Вы можете использовать эту архитектуру плагинов, чтобы получить сигнал в конце каждой задачи. http://dask.pydata.org/en/latest/diagnostics.html

Вот пример того, кто делает именно это: https://github.com/tqdm/tqdm/issues/278

На основе:

Интеграция с даск

      from tqdm.dask import TqdmCallback

with TqdmCallback(desc="compute"):
    ...
    arr.compute()

# or use callback globally
cb = TqdmCallback(desc="global")
cb.register()
arr.compute()

Применительно к коду в вопросе:

      from dask import delayed,compute
from tqdm.auto import tqdm
# from tqdm import trange
from time import sleep

from tqdm.dask import TqdmCallback

# ct = time()
result= []

def fun(x):
    sleep(x)
    return x

for i in tqdm(range(10)):
    result.append(delayed(fun)(i))

with TqdmCallback(desc="compute"):
    print(compute(result))

скриншот вывода в jupyter:

Другие вопросы по тегам