Расширение временных рядов событий с помощью Pandas

проблема

Я ищу предложения о том, как сделать это более питоническим и повысить эффективность.

У меня есть датафрейм с событиями, каждое из которых имеет как минимум начальную и конечную метки времени. Я расширяю количество записей, чтобы в новой таблице была одна запись на каждый час, в котором интервал перекрывается.

Это в основном тот же вариант использования, что и у функции IntervalMatch, найденной в QlikView.

Пример: событие с 18:00 до 20:00 расширяется до двух отдельных записей: одна для 18:00-19:00, а другая для 19:00-20:00.

Текущее решение

У меня есть полностью рабочее решение, но я думаю, что оно довольно уродливо и довольно медленно на больших наборах данных с>100 тыс. Строк и 10-20 столбцов.

import pandas as pd
from datetime import timedelta

def interval_match(df):

    intervals = []

    def perdelta(start, end, delta):
        curr = start.replace(minute=0, second=0)
        while curr < end:
            yield curr
            curr += delta

    def interval_split(x):

        for t in perdelta(x.Start, x.End, timedelta(hours=1)):
            _ = ([x.id,
                  x.Start,
                  x.End,
                  max(t, x.Start),
                  min((t+timedelta(hours=1), x.End))])

            intervals.append(_)

    df.apply(interval_split, axis=1)

    ndf = pd.DataFrame(intervals, 
                       columns=['id', 
                                'Start', 
                                'End', 
                                'intervalStart', 
                                'intervalEnd'])

    ndf['Duration'] = ndf.iEnd - ndf.iStart

    return ndf

С некоторыми примерами данных функция interval_match() можно использовать так:

# Some example data
df = pd.DataFrame({'End': {0: pd.Timestamp('2016-01-01 09:24:20')},
                   'Start': {0: pd.Timestamp('2016-01-01 06:56:10')},
                   'id': {0: 1234562}})


# Running the function
interval_match(df).to_dict()


# Output
{'Duration': {0: Timedelta('0 days 00:03:50'),
              1: Timedelta('0 days 01:00:00'),
              2: Timedelta('0 days 01:00:00'),
              3: Timedelta('0 days 00:24:20')},
      'End': {0: Timestamp('2016-01-01 09:24:20'),
              1: Timestamp('2016-01-01 09:24:20'),
              2: Timestamp('2016-01-01 09:24:20'),
              3: Timestamp('2016-01-01 09:24:20')},
    'Start': {0: Timestamp('2016-01-01 06:56:10'),
              1: Timestamp('2016-01-01 06:56:10'),
              2: Timestamp('2016-01-01 06:56:10'),
              3: Timestamp('2016-01-01 06:56:10')},
'intervalEnd':{0: Timestamp('2016-01-01 07:00:00'),
              1: Timestamp('2016-01-01 08:00:00'),
              2: Timestamp('2016-01-01 09:00:00'),
              3: Timestamp('2016-01-01 09:24:20')},
'intervalStart': {0: Timestamp('2016-01-01 06:56:10'),
              1: Timestamp('2016-01-01 07:00:00'),
              2: Timestamp('2016-01-01 08:00:00'),
              3: Timestamp('2016-01-01 09:00:00')},
       'id': {0: 1234562, 
              1: 1234562, 
              2: 1234562, 
              3: 1234562}}

Мое желание состоит в том, чтобы

  1. Сделайте это более эффективным, желательно используя встроенные функции Pandas или какую-нибудь магию.
  2. Не нужно иметь дело со столбцами, как я делаю сегодня в функции interval_split. Просто оперируйте и разверните весь фрейм данных.

Признательность за любые предложения или помощь.

0 ответов

Я сделал вариант (вдохновленный вашим кодом), и он работал очень медленно. Я получал ~5 минут на обработку 20 тыс. Строк данных, и виноватым после профилирования был.append. Есть уловка: поместить все записи в словарь, а затем использоватьDataFrameс from_dictметод. Используя from_dict для тех же 20k строк, он завершился примерно за 5 секунд (что примерно в 60 раз быстрее).

Я приложил свой код, вдохновленный вашим, и он также является общим для входных данных столбца (мое тестовое использование по сравнению с производственным использованием - это разница).

import pandas as pd
from collections import namedtuple
from datetime import timedelta

Interval = namedtuple('Interval', 'field_name start_time end_time delta')

class IntervalMatch(object):

    def __init__(self):
        pass

    def per_delta(self,interval: Interval, include_start: bool):
        current_interval = interval.start_time
        if not include_start:
            current_interval += pd.DateOffset(seconds=interval.delta)

        while current_interval < interval.end_time:
            yield current_interval
            current_interval += pd.DateOffset(seconds=interval.delta)

    def _copy(self, row, columns: pd.Index):
        values = pd.Series(row).values
        return pd.DataFrame([values], columns=columns.values).copy(True)

    def interval_split(self, interval: Interval, base_row: pd.Series, columns: pd.Index, include_start: bool):
        for time in self.per_delta(interval, include_start):
            extended_row = self._copy(base_row, columns)
            extended_row.at[(0, interval.field_name)] = time
            yield extended_row

    def get_exploded_records(self, data_to_examine: pd.DataFrame, time_field_name: str):
        last_row = None
        results = pd.DataFrame()
        delta = 1 # second

        time_col_index = data_to_examine.columns.get_loc(time_field_name)

        # process each row.  It is possible there is a map/reduce/fluent way of doing this w/ Pandas
        intermediate_results = {}
        current_row = -1
        for row in data_to_examine.itertuples(index=False):
            current_row += 1
            if last_row is None:
                last_row = row
                intermediate_results[current_row] = row
                continue

            total_seconds = (row[time_col_index] - last_row[time_col_index]).total_seconds()
            if total_seconds > 1 and total_seconds < 100:
                # there is a gap, so we want to explode the gap into the data and fill it with last_row values.
                interval = Interval(time_field_name, last_row[time_col_index], row[time_col_index], delta)
                for intrvl in self.interval_split(interval, last_row, data_to_examine.columns, False):
                    # we must unroll the list of rows to just the first row (since there is only one)
                    intermediate_results[current_row] = intrvl.values[0]
                    current_row += 1

            # append the current row
            intermediate_results[current_row] = row
            last_row = row

        results = pd.DataFrame.from_dict(intermediate_results, orient='index') #, columns=data_to_examine.columns)
        return results

def test():
        print("Preparing Data")
        timestamps = ['2016-01-01 09:24:20', '2016-01-01 09:24:21',
                      '2016-01-01 09:24:23', '2016-01-01 09:24:24', '2016-01-01 09:24:40']
        data_with_gaps = pd.DataFrame({'timestamp':[pd.Timestamp(timestamp) for timestamp in timestamps],
                                       'names':['Torial', 'Torial', 'Knut', 'Knut', 'Torial'],
                                       'action':['Add','Edit','Add', 'Edit','Delete']})

        interval = IntervalMatch()
        print("Getting Exploded Records")
        exploded = interval.get_exploded_records(data_with_gaps, 'timestamp')
        print(f"Data with Gaps: {data_with_gaps}")
        print(f"Exploded: {exploded}")
        exploded.to_csv("Exploded_test.csv")
Другие вопросы по тегам