Параллельное вычисление с dask, когда необходимо вычислить столбец dataframe

У меня есть 360 миллионов записей данных о наблюдениях за птицами введите описание изображения здесь и я хотел бы вычислить центр тяжести каждого вида птиц в зависимости от дня года, используя dask распределенным способом.

Я хотел бы сделать:

df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()

но мне нужно сначала вычислить yearday, и я не могу понять, есть ли способ сделать это на лету с dask, Я надеялся, что DASK может просто сохранить новые данные в dask рабочие, но когда я пытаюсь

def yearday(r):
    r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
    return r

df.apply(yearday, axis=1).persist()

это не масштабируется.

Если кто-то хочет на самом деле попробовать, данные могут быть загружены следующим образом:

import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
            storage_options={'anon': True, 'use_ssl': False})

Примечание: хотя я назвал этот набор данных EOD_CLO_2016.parq.gzон разбит на множество объектов в сегменте S3 для облегчения распараллеливания. Каждый кусок разархивирован.

Есть ли способ сделать это вычисление на лету распределенным способом, или мне нужно записать другой файл данных с колонкой года, прежде чем я буду использовать groupby сделать масштабируемую часть?

1 ответ

Решение

Следуя тому, что вы делали в своей записной книжке, я бы изменил действия до groupby следующим образом

df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
                     columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
                              'DECIMALLONGITUDE', 'VERNACULARNAME'],
                     storage_options={'anon': True, 'use_ssl': False})

df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
                                            lat=np.deg2rad(df['DECIMALLATITUDE'].values),
                                            lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),

                        meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                              'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                              'VERNACULARNAME':'object',
                              'yearday':'i8', 'lat':'f8', 'lon':'f8'})

df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
                                            y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
                                            z=np.sin(df['lat'].values)),
                       meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
                              'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
                              'VERNACULARNAME':'object',
                              'yearday':'i8', 'lat':'f8', 'lon':'f8',
                              'x':'f8', 'y':'f8', 'z':'f8'})

ОБНОВЛЕНИЕ: я не уверен, если это хорошая идея, чтобы хранить ваши данные в виде одного и заархивированного файла вместо нескольких файлов. Рассматривали ли вы разные варианты?

ОБНОВЛЕНИЕ 2: Учитывая, что преобразование из градусов в радианы является линейным, вы можете рассчитать lon, lat а потом x,y,z после groupby,

Другие вопросы по тегам