Могу ли я.set_index() лениво (или выполняться одновременно) на Dask Dataframes?

ТЛ; др:

Это возможно .set_index() метод на нескольких Dask Dataframes параллельно? Альтернативно, возможно ли .set_index() лениво на нескольких Dask Dataframes, что, следовательно, приведет к параллельному заданию индексов?

Вот сценарий:

  • У меня есть несколько временных рядов
  • Каждый временной ряд хранится несколько .csv файлы. Каждый файл содержит данные, относящиеся к определенному дню. Кроме того, файлы разбросаны по разным папкам (каждая папка содержит данные за один месяц)
  • Каждый временной ряд имеет разные частоты дискретизации
  • Все временные ряды имеют одинаковые столбцы. У всех есть столбец, который содержит DateTime, среди других.
  • Данные слишком велики для обработки в памяти. Вот почему я использую Dask.
  • Я хочу объединить все временные ряды в один DataFrame, выровненный по DateTime, Для этого мне нужно сначала resample() все временные ряды с общей частотой дискретизации. А потом .join() все временные ряды.
  • .resample() может применяться только к индексу. Следовательно, перед пересчетом мне нужно .set_index() в столбце DateTime для каждого временного ряда.
  • Когда я спрашиваю .set_index() метод для одного временного ряда, вычисление начинается немедленно. Что приводит к блокировке и ожиданию моего кода. В этот момент, если я проверяю использование ресурсов моей машины, я вижу, что используется много ядер, но использование не превышает ~15%. Что заставляет меня думать, что в идеале я мог бы иметь .set_index() метод применяется к нескольким временным рядам одновременно.

Достигнув описанной выше ситуации, я попробовал некоторые не элегантные решения для распараллеливания применения .set_index() метод на нескольких временных рядах (например, создать multiprocessing.Pool), которые не были успешными. Прежде чем дать более подробную информацию о них, есть ли ясный способ решить вышеуказанную ситуацию? Был ли рассмотрен сценарий выше при реализации Dask?

Альтернативно, возможно ли .set_index() лениво? Если .set_index() метод можно применить лениво, я бы создал полный граф вычислений с шагами, описанными выше, и в конце все будет вычисляться параллельно (я думаю).

1 ответ

Dask.dataframe должен знать минимальное и максимальное значения всех разделов информационного кадра, чтобы разумно выполнять операции дата-время параллельно. По умолчанию он будет читать данные один раз, чтобы найти хорошие разделы. Если данные не отсортированы, они будут выполнять сортировку в случайном порядке (возможно, очень дорого)

В вашем случае это звучит так, как будто ваши данные уже отсортированы, и вы можете предоставить их явно. Вы должны посмотреть на последний пример dd.DataFrame.set_index строка документации

    A common case is when we have a datetime column that we know to be
    sorted and is cleanly divided by day.  We can set this index for free
    by specifying both that the column is pre-sorted and the particular
    divisions along which is is separated

    >>> import pandas as pd
    >>> divisions = pd.date_range('2000', '2010', freq='1D')
    >>> df2 = df.set_index('timestamp', sorted=True, divisions=divisions)  # doctest: +SKIP
Другие вопросы по тегам