Повышение производительности запросов из большой таблицы HDFStore с помощью Pandas

У меня есть большой (~160 миллионов строк) фрейм данных, который я сохранил на диск примерно так:

    def fillStore(store, tablename):
        files = glob.glob('201312*.csv')
        names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
        for f in files:
            df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
            store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

Таблица имеет временной индекс, и я буду запрашивать, используя c_id а также f_id в дополнение к временам (через индекс).

У меня есть еще один кадр данных, содержащий ~18000 "инцидентов". Каждый инцидент состоит из нескольких (от нескольких сотен до сотен тысяч) отдельных записей. Мне нужно собрать несколько простых статистических данных для каждого инцидента и сохранить их, чтобы собрать статистические данные. В настоящее время я делаю это так:

def makeQueryString(c, f, start, stop):
    return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
    incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
    for ind, row in inc_times.iterrows():
        incidents = incidents.append(store.select(tablename, 
                                                  makeQueryString(row.c_id, 
                                                                  row.f_id, 
                                                                  row.start, 
                                                                  row.stop))).fillna(ind)
    return incidents

Все это прекрасно работает за исключением того факта, что каждый store.select() Оператор занимает примерно 5 секунд, что означает, что для обработки данных за весь месяц требуется примерно 24-30 часов обработки. Между тем фактическая статистика, которая мне нужна, относительно проста:

def getIncidentStats(df):
    incLen = (df.index[-1]-df.index[0]).total_seconds()
    if incLen == 0:
        incLen = .1
    rqsts = len(df)
    rqstRate_s = rqsts/incLen
    return pd.Series({'c_id':df.c_id[0],
                      'f_id':df.fqdn_id[0],
                      'Length_sec':incLen, 
                      'num_rqsts':rqsts, 
                      'rqst_rate':rqstRate_s, 
                      'avg_resp_size':df.response_len.mean(), 
                      'std_resp_size':df.response_len.std()})


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

У меня вопрос: как я могу улучшить производительность или эффективность любой части этого рабочего процесса? (Обратите внимание, что я фактически пакетирую большинство заданий, чтобы получать и хранить инциденты один день за раз, просто потому что я хочу ограничить риск потери уже обработанных данных даже в случае сбоя. Я оставил этот код здесь для простоты и потому что мне действительно нужно обрабатывать данные за весь месяц.)

Есть ли способ обработки данных, когда я получаю их из магазина, и есть ли какая-то польза от этого? Могу ли я получить выгоду от использования store.select_as_index? Если я получу индекс, мне все равно потребуется доступ к данным, чтобы получить правильную статистику?

Другие примечания / вопросы: я сравнил производительность хранения моего HDFStore на SSD и обычном жестком диске и не заметил каких-либо улучшений для SSD. Это ожидается?

Я также поиграл с идеей создания большого сочетания строк запроса и запроса их всех сразу. Это вызывает ошибки памяти, когда общая строка запроса слишком велика (~5-10 запросов).

Редактировать 1 Если это имеет значение, я использую таблицы версии 3.1.0 и pandas версии 0.13.1

Редактировать 2 Вот еще немного информации:

ptdump -av store.h5
/ (RootGroup) ''
  /._v_attrs (AttributeSet), 4 attributes:
   [CLASS := 'GROUP',
    PYTABLES_FORMAT_VERSION := '2.0',
    TITLE := '',
    VERSION := '1.0']
/all_recs (Group) ''
  /all_recs._v_attrs (AttributeSet), 14 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := ['c_id', 'f_id'],
    encoding := None,
    index_cols := [(0, 'index')],
    info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
    levels := 1,
    nan_rep := 'nan',
    non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
    pandas_type := 'frame_table',
    pandas_version := '0.10.1',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
  "c_id": Int64Col(shape=(), dflt=0, pos=2),
  "f_id": Int64Col(shape=(), dflt=0, pos=3)}
  byteorder := 'little'
  chunkshape := (5461,)
  autoindex := True
  colindexes := {
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
  /all_recs/table._v_attrs (AttributeSet), 19 attributes:
   [CLASS := 'TABLE',
    FIELD_0_FILL := 0,
    FIELD_0_NAME := 'index',
    FIELD_1_FILL := 0,
    FIELD_1_NAME := 'values_block_0',
    FIELD_2_FILL := 0,
    FIELD_2_NAME := 'c_id',
    FIELD_3_FILL := 0,
    FIELD_3_NAME := 'f_id',
    NROWS := 161738653,
    TITLE := '',
    VERSION := '2.6',
    client_id_dtype := 'int64',
    client_id_kind := ['c_id'],
    fqdn_id_dtype := 'int64',
    fqdn_id_kind := ['f_id'],
    index_kind := 'datetime64',
    values_block_0_dtype := 'int64',
    values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

Вот примеры как основной таблицы, так и inc_times:

In [12]: df.head()
Out[12]: 
                          c_id        f_id          resp_id      resp_len  \
ts                                                                   
2013-12-04 08:00:00  637092486  5372764353               30      56767543   
2013-12-04 08:00:01  637092486  5399580619               23      61605423   
2013-12-04 08:00:04    5456242  5385485460               21      46742687   
2013-12-04 08:00:04    5456242  5385485460               21      49909681   
2013-12-04 08:00:04  624791800  5373236646               14      70461449   

                              s_id  
ts                           
2013-12-04 08:00:00           1829  
2013-12-04 08:00:01           1724  
2013-12-04 08:00:04           1679  
2013-12-04 08:00:04           1874  
2013-12-04 08:00:04           1727  

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]: 
        c_id     f_id                start                 stop
0       7254   196211  1385880945000000000  1385880960000000000
1       9286   196211  1387259840000000000  1387259850000000000
2      16032   196211  1387743730000000000  1387743735000000000
3      19793   196211  1386208175000000000  1386208200000000000
4      19793   196211  1386211800000000000  1386211810000000000

[5 rows x 4 columns]

Что касается c_id и f_id, набор идентификаторов, которые я хочу выбрать из полного хранилища, является относительно небольшим по сравнению с общим количеством идентификаторов в магазине. Другими словами, в inc_times есть несколько популярных идентификаторов, к которым я буду неоднократно запрашивать, полностью игнорируя некоторые идентификаторы, присутствующие в полной таблице. Я предполагаю, что идентификаторы, которые меня интересуют, составляют примерно 10% от общего числа идентификаторов, но это наиболее популярные идентификаторы, поэтому их записи доминируют над полным набором.

У меня 16 ГБ ОЗУ. Полный магазин - 7,4 ГБ, а полный набор данных (в виде файла CSV) - всего 8,7 ГБ. Первоначально я полагал, что смогу загрузить все это в память и, по крайней мере, выполнить некоторые ограниченные операции с ним, но я получаю ошибки памяти при загрузке всего этого. Следовательно, пакетирование его в ежедневные файлы (полный файл состоит из данных за один месяц).

1 ответ

Решение

Вот несколько рекомендаций и похожий вопрос здесь

Используйте сжатие: смотрите здесь. Вы должны попробовать это (это может сделать это быстрее / медленнее в зависимости от того, что именно вы запрашиваете), YMMV.

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

Используйте иерархический запрос в чанках. Я имею в виду это. Так как у вас есть относительно небольшое количество c_id а также f_id что вы заботитесь, структурировать один запрос что-то вроде этого. Это вроде как использовать isin,

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
    """ create a list of batches, maxed at maxn """
    batches = []
    while(True):
        if len(l) <= maxn:
            if len(l) > 0:
                batches.append(l)
            break
        batches.append(l[0:maxn])
        l = l[maxn:]
    return batches


results = []
for f_id_batch in create_batches(f_id_list):

    for c_id_batch in create_batches(c_id_list):

        q = "f_id={f_id} & c_id={c_id}".format(
                f_id=f_id_batch,
                c_id=c_id_batch)

        # you can include the max/min times in here as well (they would be max/min
        # time for ALL the included batches though, maybe easy for you to compute

        result = store.select('df',where=q)

        # sub process this result

        def f(x):
            # you will need to filter out the min/max timestamps here (which I gather
            # are somewhat dependent on f_id/c_id group

            #### process the data and return something
            # you could do something like: ``return x.describe()`` for simple stats

         results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

Ключевым моментом здесь является обработка так, чтобы isin НЕ имеет более 32 членов для любой переменной, к которой вы обращаетесь. Это внутреннее ограничение numpy / pytables. Если вы превысите это, запрос будет работать, но он отбросит эту переменную и выполнит переиндексацию ВСЕХ данных (что НЕ здесь, где вы хотите).

Таким образом, у вас будет хорошее подмножество данных в памяти всего за несколько циклов. Я думаю, что эти запросы будут занимать примерно столько же времени, сколько и большинство ваших запросов, но у вас их будет намного меньше.

Время запроса является приблизительно постоянным для данного подмножества (если данные не упорядочены так, что они полностью проиндексированы).

Таким образом, запрос сканирует "блоки" данных (на что указывают индексы). Если у вас много попаданий по многим блокам, тогда запрос медленнее.

Вот пример

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]: 
                            A         B         C  c_id  f_id
2013-01-01 00:00:00  0.037287  1.153534  0.639669     8     7
2013-01-01 00:00:01  1.741046  0.459821  0.194282     8     3
2013-01-01 00:00:02 -2.273919 -0.141789  0.770567     1     1
2013-01-01 00:00:03  0.320879 -0.108426 -1.310302     8     6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362     5     5
2013-01-01 00:00:05  1.608211  0.069196  0.025021     3     6
2013-01-01 00:00:06 -0.561690  0.613579  1.071438     8     2
2013-01-01 00:00:07  1.795043 -0.661966  1.210714     0     0
2013-01-01 00:00:08  0.176347 -0.461176  1.624514     3     6
2013-01-01 00:00:09 -1.084537  1.941610 -1.423559     9     1
2013-01-01 00:00:10 -0.101036  0.925010 -0.809951     0     9
2013-01-01 00:00:11 -1.185520  0.968519  2.871983     7     5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014     3     6
2013-01-01 00:00:13  0.544427  0.130439  0.423749     5     7
2013-01-01 00:00:14  0.112216  0.404801 -0.061730     5     4
2013-01-01 00:00:15 -1.349838 -0.639435  0.993495     0     9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

Этот конкретный пример - 5 ГБ несжатого и 2,9 ГБ сжатого. Эти результаты находятся на сжатых данных. В этом случае на самом деле немного быстрее использовать несжатый (например, первый цикл занимает 3,5 с). Это 100 мм строк.

Таким образом, используя последний пример (4), вы получаете в 9 раз больше данных первого, чуть более чем в 3 раза больше времени запроса.

Однако ваше ускорение должно быть НАМНОГО больше, потому что вы не будете выбирать отдельные временные метки, а сделаете это позже.

Весь этот подход учитывает, что у вас достаточно основной памяти для хранения результатов в размерах пакета (например, вы выбираете относительно небольшую часть набора в пакетных запросах).

Другие вопросы по тегам