Могу ли я.set_index() лениво (или выполняться одновременно) на Dask Dataframes?
ТЛ; др:
Это возможно.set_index()
метод на нескольких Dask Dataframesпараллельно? Альтернативно, возможно ли.set_index()
лениво на нескольких Dask Dataframes, что, следовательно, приведет к параллельному заданию индексов?
Вот сценарий:
- У меня есть несколько временных рядов
- Каждый временной ряд хранится несколько
.csv
файлы. Каждый файл содержит данные, относящиеся к определенному дню. Кроме того, файлы разбросаны по разным папкам (каждая папка содержит данные за один месяц) - Каждый временной ряд имеет разные частоты дискретизации
- Все временные ряды имеют одинаковые столбцы. У всех есть столбец, который содержит
DateTime
, среди других. - Данные слишком велики для обработки в памяти. Вот почему я использую Dask.
- Я хочу объединить все временные ряды в один DataFrame, выровненный по
DateTime
, Для этого мне нужно сначалаresample()
все временные ряды с общей частотой дискретизации. А потом.join()
все временные ряды. .resample()
может применяться только к индексу. Следовательно, перед пересчетом мне нужно.set_index()
в столбце DateTime для каждого временного ряда.- Когда я спрашиваю
.set_index()
метод для одного временного ряда, вычисление начинается немедленно. Что приводит к блокировке и ожиданию моего кода. В этот момент, если я проверяю использование ресурсов моей машины, я вижу, что используется много ядер, но использование не превышает ~15%. Что заставляет меня думать, что в идеале я мог бы иметь.set_index()
метод применяется к нескольким временным рядам одновременно.
Достигнув описанной выше ситуации, я попробовал некоторые не элегантные решения для распараллеливания применения .set_index()
метод на нескольких временных рядах (например, создать multiprocessing.Pool
), которые не были успешными. Прежде чем дать более подробную информацию о них, есть ли ясный способ решить вышеуказанную ситуацию? Был ли рассмотрен сценарий выше при реализации Dask?
Альтернативно, возможно ли .set_index()
лениво? Если .set_index()
метод можно применить лениво, я бы создал полный граф вычислений с шагами, описанными выше, и в конце все будет вычисляться параллельно (я думаю).
1 ответ
Dask.dataframe должен знать минимальное и максимальное значения всех разделов информационного кадра, чтобы разумно выполнять операции дата-время параллельно. По умолчанию он будет читать данные один раз, чтобы найти хорошие разделы. Если данные не отсортированы, они будут выполнять сортировку в случайном порядке (возможно, очень дорого)
В вашем случае это звучит так, как будто ваши данные уже отсортированы, и вы можете предоставить их явно. Вы должны посмотреть на последний пример dd.DataFrame.set_index
строка документации
A common case is when we have a datetime column that we know to be
sorted and is cleanly divided by day. We can set this index for free
by specifying both that the column is pre-sorted and the particular
divisions along which is is separated
>>> import pandas as pd
>>> divisions = pd.date_range('2000', '2010', freq='1D')
>>> df2 = df.set_index('timestamp', sorted=True, divisions=divisions) # doctest: +SKIP