Параллельное вычисление с dask, когда необходимо вычислить столбец dataframe
У меня есть 360 миллионов записей данных о наблюдениях за птицами и я хотел бы вычислить центр тяжести каждого вида птиц в зависимости от дня года, используя dask
распределенным способом.
Я хотел бы сделать:
df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()
но мне нужно сначала вычислить yearday
, и я не могу понять, есть ли способ сделать это на лету с dask
, Я надеялся, что DASK может просто сохранить новые данные в dask
рабочие, но когда я пытаюсь
def yearday(r):
r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
return r
df.apply(yearday, axis=1).persist()
это не масштабируется.
Если кто-то хочет на самом деле попробовать, данные могут быть загружены следующим образом:
import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
storage_options={'anon': True, 'use_ssl': False})
Примечание: хотя я назвал этот набор данных EOD_CLO_2016.parq.gz
он разбит на множество объектов в сегменте S3 для облегчения распараллеливания. Каждый кусок разархивирован.
Есть ли способ сделать это вычисление на лету распределенным способом, или мне нужно записать другой файл данных с колонкой года, прежде чем я буду использовать groupby
сделать масштабируемую часть?
1 ответ
Следуя тому, что вы делали в своей записной книжке, я бы изменил действия до groupby
следующим образом
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
'DECIMALLONGITUDE', 'VERNACULARNAME'],
storage_options={'anon': True, 'use_ssl': False})
df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
lat=np.deg2rad(df['DECIMALLATITUDE'].values),
lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8'})
df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
z=np.sin(df['lat'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8',
'x':'f8', 'y':'f8', 'z':'f8'})
ОБНОВЛЕНИЕ: я не уверен, если это хорошая идея, чтобы хранить ваши данные в виде одного и заархивированного файла вместо нескольких файлов. Рассматривали ли вы разные варианты?
ОБНОВЛЕНИЕ 2: Учитывая, что преобразование из градусов в радианы является линейным, вы можете рассчитать lon, lat
а потом x,y,z
после groupby
,