dask.read_parquet вызывает ошибку OOM

Я использую Dask для очистки данных в нескольких файлах CSV. Этот код работает нормально:

import pandas as pd
import glob
import os
from timeit import default_timer
from dask.distributed import Client
import dask.dataframe as dd

cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]

col_types = {'barcode': object,
            'salesdate': object,
            'storecode': object,
            'quantity': float,
            'salesvalue': float,
            'promotion': object,
            'key_row': object}

trans = dd.read_csv(os.path.join(TRANS_PATH, "*.TXT"), 
                    sep=";", usecols=cols_to_keep, dtype=col_types, parse_dates=['salesdate'])

trans = trans[trans['barcode'].isin(barcodes)]

trans_df = trans.compute()

Я решил попробовать систему хранения паркета, так как она предположительно быстрее и поддерживается dask. После преобразования файлов CSV в.parquet с помощью панд to_parquet() Методом я попробовал следующее:

cols_to_keep = ["barcode", "salesdate", "storecode", "quantity", "salesvalue", "promotion", "key_row"]

trans = dd.read_parquet(os.path.join(PARQUET_PATH, '*.parquet'), columns=cols_to_keep)

trans = trans[trans['barcode'].isin(barcodes)]

trans_df = trans.compute()

Вскоре после того, как график начинает выполняться, рабочим не хватает памяти, и я получаю несколько предупреждений:

distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 13620 was killed by signal 15
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget.  Restarting
distributed.nanny - WARNING - Worker process 13396 was killed by signal 15

В конце концов вся программа вылетает. Мои файлы.parquet не проблема, я могу загрузить их просто отлично, используя pandas' read_parquet() метод. Из утилит dask я заметил, что по какой-то причине график пытается прочитать все, прежде чем выполнять какую-либо фильтрацию, используя .isin вызов: выполнение графа с помощью dd.read_parquet

Это не тот случай, когда dd.read_csv() используется. Здесь все работает "параллельно", поэтому фильтрация предотвращает OOM:

Кто-нибудь знает, что происходит? Что мне не хватает?

1 ответ

Решение

Ваша проблема использует pandas.to_parquet() написать данные. Это создает одну массивную группу строк из данных, которая становится одним разделом, когда Dask читает его - Dask следует за любым разделением в данных. И наоборот, Dask автоматически разделяет входные данные CSV, не предполагая, что данные имеют внутреннее разделение.

Поскольку вы уже используете Dask, вы должны использовать его и для записи данных паркета, используя dask.DataFrame.to_parquet, аналог метода Pandas. Он создаст несколько файлов в каталоге, которые будут прочитаны независимо и параллельно.

Другие вопросы по тегам