Анализ потока данных Dask
У меня есть набор данных, хранящийся в текстовом файле с разделителями табуляции. Файл выглядит следующим образом:
date time temperature
2010-01-01 12:00:00 10.0000
...
где temperature
столбец содержит значения в градусах Цельсия (°C). Я вычисляю среднесуточную температуру, используя Dask. Вот мой код:
from dask.distributed import Client
import dask.dataframe as dd
client = Client("<scheduler URL")
inputDataFrame = dd.read_table("<input file>").drop('time', axis=1)
groupedData = inputDataFrame.groupby('date')
meanDataframe = groupedData.mean()
result = meanDataframe.compute()
result.to_csv('result.out', sep='\t')
client.close()
Чтобы улучшить производительность моей программы, я хотел бы понять поток данных, вызванный кадрами данных Dask.
- Как текстовый файл считывается в фрейм данных
read_table()
? Читает ли клиент весь текстовый файл и отправляет ли данные планировщику, который разделяет данные и отправляет их рабочим? Или каждый работник читает разделы данных, с которыми он работает, непосредственно из текстового файла? - Когда создается промежуточный кадр данных (например, путем вызова
drop()
) весь промежуточный фрейм данных отправляется обратно клиенту, а затем отправляется рабочим для дальнейшей обработки? - Тот же вопрос для групп: где создаются и хранятся данные для группового объекта? Как это происходит между клиентом, планировщиком и работниками?
Причиной моего вопроса является то, что если я запускаю аналогичную программу с использованием Pandas, вычисления выполняются примерно в два раза быстрее, и я пытаюсь понять, что вызывает перегрузку в Dask. Поскольку размер фрейма данных результата очень мал по сравнению с размером входных данных, я предполагаю, что существуют некоторые накладные расходы, вызванные перемещением входных и промежуточных данных между клиентом, планировщиком и работниками.
1 ответ
1) Данные читают рабочие. Клиент читает немного раньше времени, чтобы выяснить имена и типы столбцов и, при необходимости, найти разделители строк для разделения файлов. Обратите внимание, что все работники должны иметь доступ к интересующему файлу (файлам), для которого может потребоваться некоторая общая файловая система при работе в кластере.
2), 3) На самом деле, drop
, groupby
а также mean
методы вообще не генерируют промежуточные кадры данных, они просто накапливают график операций, которые должны быть выполнены (т. е. они ленивы). Вы можете рассчитать эти шаги и увидеть, что они быстрые. Во время исполнения промежуточные документы делаются на рабочих, при необходимости копируются другим работникам и удаляются как можно скорее. Копий планировщику или клиенту никогда не будет, если вы явно не попросите об этом.
Итак, в корне вашего вопроса: вы можете лучше изучить производительность или свою работу, взглянув на панель управления.
Есть много факторов, которые определяют скорость развития: процессы могут совместно использовать канал ввода-вывода; некоторые задачи не освобождают GIL и поэтому плохо распараллеливаются в потоках; количество групп будет сильно влиять на количество перемешивания данных в группы... плюс всегда есть некоторые издержки для каждой задачи, выполняемой планировщиком.
Поскольку Pandas эффективен, неудивительно, что в случае, когда данные легко помещаются в память, они работают хорошо по сравнению с Dask.