Dask DataFrame: повторная выборка по групповому объекту с несколькими строками

У меня есть следующий dask dataframe, созданный из Castra:

import dask.dataframe as dd

df = dd.from_castra('data.castra', columns=['user_id','ts','text'])

Уступая:

                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e

То, что я пытаюсь сделать, это:

  1. Группа по user_id а также ts
  2. Попробуй за 3 часа
  3. На этапе повторной выборки любые объединенные строки должны объединять тексты.

Пример вывода:

                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e

Я попробовал следующее:

df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()

И получил следующую ошибку:

TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex

Я пытался пройти set_index('ts') в трубе, но это не похоже на атрибут Series,

Есть идеи, как этого добиться?

TL; DR

Если это облегчает проблему, я также могу изменить формат Castra DB, который я тоже создал. Реализация, которую я сейчас имею, была в значительной степени взята из этого великого поста.

Я устанавливаю индекс (в to_df() функция) следующим образом:

df.set_index('ts',drop=False,inplace=True)

И имеют:

  with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
     batches = partition_all(batch_size, f)
     df, frames = peek(map(self.to_df, batches))
     castra = Castra(S.CASTRA, template=df, categories=categories)
     castra.extend_sequence(frames, freq='3h')

Вот результирующие типы:

ts                datetime64[ns]
text                      object
user_id                  float64

2 ответа

Решение

Если мы можем предположить, что каждый user-id group может поместиться в памяти, тогда я рекомендую использовать dask.dataframe для выполнения external-groupby, но затем использовать pandas для выполнения операций в каждой группе, что-то вроде следующего.

def per_group(blk):
    return blk.groupby('ts').text.resample('3H', how='sum')

df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()

Это разделяет две сложные вещи на два разных проекта

  1. Перестановка всех пользовательских идентификаторов в нужные группы выполняется dask.dataframe.
  2. Выполнение сложной передискретизации по дате и времени в каждой группе явно выполняется пандами.

В идеале dask.dataframe написал бы функцию для каждой группы автоматически. В настоящее время dask.dataframe не выполняет интеллектуальную обработку нескольких индексов или повторной выборки поверх групповых столбцов с несколькими столбцами, поэтому автоматическое решение пока недоступно. Тем не менее, вполне возможно прибегнуть к pandas для расчета для каждого блока, но при этом использовать dask.dataframe для соответствующей подготовки групп.

Попробуйте преобразовать ваш индекс в DatetimeIndex следующим образом:

import datetime
# ...
df.index = dd.DatetimeIndex(df.index.map(lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S')))
# ...
Другие вопросы по тегам