Использование памяти при индексации большого кадра данных dask на одном многоядерном компьютере
Я пытаюсь превратить дамп Wikipedia CirrusSearch в защищенный паркетом фрейм данных dask, проиндексированный по названию на 16-ядерном экземпляре GCP 450G. Дампы CirrusSearch представлены в виде единого файла в формате json. Английские дампы Wipedia содержат 5 миллионов записей и сжаты на 12G и расширены на 90+G. Важной деталью является то, что записи не являются абсолютно плоскими.
Самый простой способ сделать это
import json
import dask
from dask import bag as db, dataframe as ddf
from toolz import curried as tz
from toolz.curried import operator as op
blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')
lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'
source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'
(
db
.read_text(source, blocksize=blocksize)
.map(json.loads)
.filter(tz.flip(op.contains, 'title'))
.to_dataframe()
.set_index('title', npartitions=npartitions)
.to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)
Первая проблема заключается в том, что в планировщике по умолчанию используется только одно ядро. Этой проблемы можно избежать, явно используя распределенные или многопроцессорные планировщики.
Большая проблема со всеми планировщиками и настройками, которые я пробовал, - использование памяти. Похоже, что dask пытается загрузить весь фрейм данных в память при индексации. Даже 450G недостаточно для этого.
- Как я могу уменьшить использование памяти для этой задачи?
- Как я могу оценить минимальный объем памяти, необходимый без проб и ошибок?
- Есть ли лучший подход?
1 ответ
Почему Dask использует только одно ядро?
Часть анализа JSON, вероятно, связана с GIL, вы хотите использовать процессы. Однако, когда вы, наконец, что-то вычисляете, вы используете фреймы данных, которые обычно предполагают, что вычисления высвобождают GIL (это часто встречается в Pandas), поэтому он использует потоковый бэкэнд по умолчанию. Если вы в основном связаны с этапом синтаксического анализа GIL, то, возможно, вы захотите использовать многопроцессорный планировщик. Это должно решить вашу проблему:
dask.config.set(scheduler='multiprocessing')
Как избежать использования памяти во время фазы set_index
Да, для вычисления set_index требуется полный набор данных. Это сложная проблема. Если вы используете планировщик с одной машиной (что, по-видимому, вы делаете), то для этого процесса сортировки следует использовать внешнюю структуру данных. Я удивлен, что у него заканчивается память.
Как я могу оценить минимальный объем памяти, необходимый без проб и ошибок?
К сожалению, трудно оценить размер JSON-подобных данных в памяти на любом языке. Это намного проще с плоскими схемами.
Есть ли лучший подход?
Это не решает вашу основную проблему, но вы можете рассмотреть возможность размещения данных в формате Parquet, прежде чем пытаться все отсортировать. Тогда попробуйте dd.read_parquet(...).set_index(...).to_parquet(...)
в изоляции. Это может помочь выделить некоторые расходы.