Использование dask для импорта множества файлов MAT в один DataFrame
У меня есть много файлов матов одного формата, и я хочу объединить эти файлы матов в один DataFrame с DatetimeIndex. В настоящее время цикл for считывает эти файлы матов и загружает содержимое каждого из них в DataFrames pandas, используя scipy.io.loadmat, а затем каждый DataFrame добавляется в таблицу hdf5.
Каждый файл mat содержит матрицу одинарной точности 4096x1024, и первоначально каждая итерация цикла занимает приблизительно 1,5 секунды. Я протестировал это с 806 файлами матов ( 12,5 ГБ, занимающих ~25 минут), но я хотел бы применить это к потенциально миллионам этих файлов, и я заинтересован в поиске рабочего процесса и контейнера данных, который позволил бы мне импортировать новые данные и запросы подмножества временных рядов быстро.
Возможно ли использовать dask или другой инструмент для ускорения этого процесса импорта и создания единого запрашиваемого временного ряда?
for rot_file in rotation_files:
print(rot_file)
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)
Кажется, что импорт должен быть возможен с использованием dask.delayed, но я не уверен, как это можно записать в один файл hdf.
1 ответ
Для запроса данных вам не нужно записывать в формат данных, явно поддерживаемый dask. Вы можете определить свой фрейм данных следующим образом:
def mat_to_dataframe(rot_file):
time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
polar_image = polar_image.transpose()
polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
rot_id = time_stamps[0]
rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
rot_id_df.index = polar_image.index
polar_image.join(rot_id_df)
polar_image.columns = [str(col_name) for col_name in polar_image.columns]
return polar_image
from dask import delayed
import dask.dataframe as dd
parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)
Это "ленивый" фрейм данных: к нему можно применить вычисления, похожие на панд, но они выполняются только при вызове .compute()
, Если процесс matload содержит Python GIL, я бы порекомендовал использовать распределенный планировщик (даже на одной машине) client = dask.distributed.Client()
,
Если вы можете знать временные метки каждого раздела априори, то вы также можете предоставить divisions=
в from_delayed
Это означает, что если ваши запросы имеют фильтры по индексу, то dask будет знать, какие файлы не нужно загружать.
Если процесс загрузки медленный, и вы хотите запросить более быстрый формат, попробуйте df.to_hdf
или же df.to_parquet
, У каждого есть несколько вариантов, которые повлияют на вашу производительность.
Обратите внимание, что time_stamps[0].apply(convert_to_python_datetime).values
вероятно, может быть достигнуто быстрее с помощью pd.to_datetime(time_stamps[0])
,