Чтение данных с LAZ в Dask с использованием отложенной загрузки

Действие Чтение нескольких файлов облаков точек LAZ в Dask DataFrame.

Проблема Распаковка LAZ (сжатый) в LAS (без сжатия) требует много памяти. Изменение размеров файлов и нескольких процессов, созданных Dask, приводит к MemoryError"S.

попытки

Я пытался ограничить число работников, следуя руководству, но, похоже, оно не работает.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)

dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/diskstation/geomaat/geomaat3d/raw', compression='GZIP')

Вопрос Как выполнить загрузку такого большого количества данных в нестандартном формате?

пример

Следующий пример - моя текущая реализация. Он группирует все входные файлы по 5, чтобы ограничить до 5 параллельных процессов распаковки. Затем переделите и напишите в Parquet для дальнейшей обработки. Мне кажется, что эта реализация полностью упускает смысл Dask.

from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def load(file):
    with File(file.as_posix(), mode='r') as las_data:
        las_df = pd.DataFrame(las_data.points['point'], dtype=float)
        return las_df

meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))

lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))

part_size = 5000000

for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
    try:
        dfs = [load(file) for file in sublasfiles]
        df = dd.from_delayed(dfs, meta=meta)
        df = df.repartition(npartitions=len(df) // part_size)
        df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')

1 ответ

Ваша реализация в основном мне подходит.

Единственное, что я хотел бы изменить здесь, это то, что я бы избегал len(df), что приведет к вычислению всего кадра данных (невозможно определить длину кадра без чтения всех файлов).

Просто чтобы быть ясно, Dask не сможет распараллелить в пределах вашего load функция (она не имеет понятия о файлах LAZ), поэтому ваш параллелизм будет ограничен количеством имеющихся у вас файлов.

Другие вопросы по тегам