Пространственно-временной запрос в Python с большим количеством записей

У меня есть кадр данных 600 000 точек x/y с информацией о дате и времени вместе с другим полем "статус", с дополнительной описательной информацией

Моя цель для каждой записи:

  • столбец суммы 'status' по записям, находящимся в определенном пространственно-временном буфере

конкретный буфер находится в пределах t - 8 часов и < 100 метров

В настоящее время у меня есть данные в фрейме данных панд.

Я мог бы циклически проходить по строкам и для каждой записи подбирать интересующие даты, а затем вычислять расстояния и дополнительно ограничивать выбор. Однако это было бы довольно медленно с таким количеством записей.

  • Это занимает 4,4 часа, чтобы бежать.

Я вижу, что я мог бы создать трехмерное дерево kdtree с x, y, date как время эпохи. Однако я не уверен, как правильно ограничить расстояния при включении дат и географических расстояний.

Вот некоторый воспроизводимый код, который вы, ребята, можете проверить:

Импортировать

import numpy.random as npr
import numpy
import pandas as pd
from pandas import DataFrame, date_range
from datetime import datetime, timedelta

Создать данные

np.random.seed(111)

Функция для генерации тестовых данных

def CreateDataSet(Number=1):

    Output = []

    for i in range(Number):

        # Create a date range with hour frequency
        date = date_range(start='10/1/2012', end='10/31/2012', freq='H')

        # Create long lat data
        laty = npr.normal(4815862, 5000,size=len(date))
        longx = npr.normal(687993, 5000,size=len(date))

        # status of interest
        status = [0,1]

        # Make a random list of statuses
        random_status = [status[npr.randint(low=0,high=len(status))] for i in range(len(date))]

        # user pool
        user = ['sally','derik','james','bob','ryan','chris']

        # Make a random list of users 
        random_user = [user[npr.randint(low=0,high=len(user))] for i in range(len(date))]

        Output.extend(zip(random_user, random_status, date, longx, laty))

    return pd.DataFrame(Output, columns = ['user', 'status', 'date', 'long', 'lat'])

#Create data  
data = CreateDataSet(3)
len(data)
#some time deltas
before = timedelta(hours = 8)
after = timedelta(minutes = 1)

Функция для ускорения

def work(df):

    output = []
    #loop through data index's
    for i in range(0, len(df)):
    l = []
        #first we will filter out the data by date to have a smaller list to compute distances for

        #create a mask to query all dates between range for date i
        date_mask = (df['date'] >= df['date'].iloc[i]-before) & (df['date'] <= df['date'].iloc[i]+after)
        #create a mask to query all users who are not user i (themselves)
        user_mask = df['user']!=df['user'].iloc[i]
        #apply masks
        dists_to_check = df[date_mask & user_mask]

        #for point i, create coordinate to calculate distances from
        a = np.array((df['long'].iloc[i], df['lat'].iloc[i]))
        #create array of distances to check on the masked data
        b = np.array((dists_to_check['long'].values, dists_to_check['lat'].values))

        #for j in the date queried data
        for j in range(1, len(dists_to_check)):
            #compute the ueclidean distance between point a and each point of b (the date masked data)
            x = np.linalg.norm(a-np.array((b[0][j], b[1][j])))

            #if the distance is within our range of interest append the index to a list
            if x <=100:
                l.append(j)
            else:
                pass
        try:
            #use the list of desired index's 'l' to query a final subset of the data
            data = dists_to_check.iloc[l]
            #summarize the column of interest then append to output list
            output.append(data['status'].sum())
        except IndexError, e:
            output.append(0)
            #print "There were no data to add"

    return pd.DataFrame(output)

Запустите код и время его

start = datetime.now()
out = work(data)
print datetime.now() - start

Есть ли способ сделать этот запрос векторизованным способом? Или я должен гоняться за другой техникой.

<3

1 ответ

Решение

Вот что хотя бы несколько решает мою проблему. Поскольку цикл может работать с разными частями данных независимо, здесь имеет смысл распараллеливание.

используя Ipython...

from IPython.parallel import Client
cli = Client()
cli.ids

cli = Client()
dview=cli[:]

with dview.sync_imports():
    import numpy as np
    import os
    from datetime import timedelta
    import pandas as pd

#We also need to add the time deltas and output list into the function as 
#local variables as well as add the Ipython.parallel decorator

@dview.parallel(block=True)
def work(df):
    before = timedelta(hours = 8)
    after = timedelta(minutes = 1)
    output = []

последний раз 1:17:54.910206, около 1/4 исходного времени

Мне все еще было бы очень интересно, чтобы кто-нибудь предложил небольшие улучшения скорости в теле функции.

Другие вопросы по тегам