Использование dask для импорта множества файлов MAT в один DataFrame

У меня есть много файлов матов одного формата, и я хочу объединить эти файлы матов в один DataFrame с DatetimeIndex. В настоящее время цикл for считывает эти файлы матов и загружает содержимое каждого из них в DataFrames pandas, используя scipy.io.loadmat, а затем каждый DataFrame добавляется в таблицу hdf5.

Каждый файл mat содержит матрицу одинарной точности 4096x1024, и первоначально каждая итерация цикла занимает приблизительно 1,5 секунды. Я протестировал это с 806 файлами матов ( 12,5 ГБ, занимающих ~25 минут), но я хотел бы применить это к потенциально миллионам этих файлов, и я заинтересован в поиске рабочего процесса и контейнера данных, который позволил бы мне импортировать новые данные и запросы подмножества временных рядов быстро.

Возможно ли использовать dask или другой инструмент для ускорения этого процесса импорта и создания единого запрашиваемого временного ряда?

for rot_file in rotation_files:
    print(rot_file)
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    polar_image.to_hdf('rot_data.h5', 'polar_image', format='table', append=True, complib='blosc', complevel=9)

Кажется, что импорт должен быть возможен с использованием dask.delayed, но я не уверен, как это можно записать в один файл hdf.

1 ответ

Решение

Для запроса данных вам не нужно записывать в формат данных, явно поддерживаемый dask. Вы можете определить свой фрейм данных следующим образом:

def mat_to_dataframe(rot_file):
    time_stamps = pd.DataFrame(scipy.io.loadmat(rot_file)['LineInfo'][0][0][2][0])
    polar_image = pd.DataFrame(scipy.io.loadmat(rot_file)['PolarImage'])
    polar_image = polar_image.transpose()
    polar_image.index = time_stamps[0].apply(convert_to_python_datetime).values
    rot_id = time_stamps[0]
    rot_id_df = pd.DataFrame(len(polar_image)*[rot_id],columns=['rotation_id'], dtype='category')
    rot_id_df.index = polar_image.index
    polar_image.join(rot_id_df)
    polar_image.columns = [str(col_name) for col_name in polar_image.columns]
    return polar_image

from dask import delayed
import dask.dataframe as dd

parts = [delayed(mat_to_dataframe)(fn) for fn in matfiles_list]
df = dd.from_delayed(parts)

Это "ленивый" фрейм данных: к нему можно применить вычисления, похожие на панд, но они выполняются только при вызове .compute(), Если процесс matload содержит Python GIL, я бы порекомендовал использовать распределенный планировщик (даже на одной машине) client = dask.distributed.Client(),

Если вы можете знать временные метки каждого раздела априори, то вы также можете предоставить divisions= в from_delayedЭто означает, что если ваши запросы имеют фильтры по индексу, то dask будет знать, какие файлы не нужно загружать.

Если процесс загрузки медленный, и вы хотите запросить более быстрый формат, попробуйте df.to_hdf или же df.to_parquet, У каждого есть несколько вариантов, которые повлияют на вашу производительность.

Обратите внимание, что time_stamps[0].apply(convert_to_python_datetime).values вероятно, может быть достигнуто быстрее с помощью pd.to_datetime(time_stamps[0]),

Другие вопросы по тегам