Повышение производительности запросов из большой таблицы HDFStore с помощью Pandas
У меня есть большой (~160 миллионов строк) фрейм данных, который я сохранил на диск примерно так:
def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
Таблица имеет временной индекс, и я буду запрашивать, используя c_id
а также f_id
в дополнение к временам (через индекс).
У меня есть еще один кадр данных, содержащий ~18000 "инцидентов". Каждый инцидент состоит из нескольких (от нескольких сотен до сотен тысяч) отдельных записей. Мне нужно собрать несколько простых статистических данных для каждого инцидента и сохранить их, чтобы собрать статистические данные. В настоящее время я делаю это так:
def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))
def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents
Все это прекрасно работает за исключением того факта, что каждый store.select()
Оператор занимает примерно 5 секунд, что означает, что для обработки данных за весь месяц требуется примерно 24-30 часов обработки. Между тем фактическая статистика, которая мне нужна, относительно проста:
def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})
incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)
У меня вопрос: как я могу улучшить производительность или эффективность любой части этого рабочего процесса? (Обратите внимание, что я фактически пакетирую большинство заданий, чтобы получать и хранить инциденты один день за раз, просто потому что я хочу ограничить риск потери уже обработанных данных даже в случае сбоя. Я оставил этот код здесь для простоты и потому что мне действительно нужно обрабатывать данные за весь месяц.)
Есть ли способ обработки данных, когда я получаю их из магазина, и есть ли какая-то польза от этого? Могу ли я получить выгоду от использования store.select_as_index? Если я получу индекс, мне все равно потребуется доступ к данным, чтобы получить правильную статистику?
Другие примечания / вопросы: я сравнил производительность хранения моего HDFStore на SSD и обычном жестком диске и не заметил каких-либо улучшений для SSD. Это ожидается?
Я также поиграл с идеей создания большого сочетания строк запроса и запроса их всех сразу. Это вызывает ошибки памяти, когда общая строка запроса слишком велика (~5-10 запросов).
Редактировать 1 Если это имеет значение, я использую таблицы версии 3.1.0 и pandas версии 0.13.1
Редактировать 2 Вот еще немного информации:
ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
Вот примеры как основной таблицы, так и inc_times:
In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5 rows x 5 columns]
In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5 rows x 4 columns]
Что касается c_id и f_id, набор идентификаторов, которые я хочу выбрать из полного хранилища, является относительно небольшим по сравнению с общим количеством идентификаторов в магазине. Другими словами, в inc_times есть несколько популярных идентификаторов, к которым я буду неоднократно запрашивать, полностью игнорируя некоторые идентификаторы, присутствующие в полной таблице. Я предполагаю, что идентификаторы, которые меня интересуют, составляют примерно 10% от общего числа идентификаторов, но это наиболее популярные идентификаторы, поэтому их записи доминируют над полным набором.
У меня 16 ГБ ОЗУ. Полный магазин - 7,4 ГБ, а полный набор данных (в виде файла CSV) - всего 8,7 ГБ. Первоначально я полагал, что смогу загрузить все это в память и, по крайней мере, выполнить некоторые ограниченные операции с ним, но я получаю ошибки памяти при загрузке всего этого. Следовательно, пакетирование его в ежедневные файлы (полный файл состоит из данных за один месяц).
1 ответ
Вот несколько рекомендаций и похожий вопрос здесь
Используйте сжатие: смотрите здесь. Вы должны попробовать это (это может сделать это быстрее / медленнее в зависимости от того, что именно вы запрашиваете), YMMV.
ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5
Используйте иерархический запрос в чанках. Я имею в виду это. Так как у вас есть относительно небольшое количество c_id
а также f_id
что вы заботитесь, структурировать один запрос что-то вроде этого. Это вроде как использовать isin
,
f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about
def create_batches(l, maxn=32):
""" create a list of batches, maxed at maxn """
batches = []
while(True):
if len(l) <= maxn:
if len(l) > 0:
batches.append(l)
break
batches.append(l[0:maxn])
l = l[maxn:]
return batches
results = []
for f_id_batch in create_batches(f_id_list):
for c_id_batch in create_batches(c_id_list):
q = "f_id={f_id} & c_id={c_id}".format(
f_id=f_id_batch,
c_id=c_id_batch)
# you can include the max/min times in here as well (they would be max/min
# time for ALL the included batches though, maybe easy for you to compute
result = store.select('df',where=q)
# sub process this result
def f(x):
# you will need to filter out the min/max timestamps here (which I gather
# are somewhat dependent on f_id/c_id group
#### process the data and return something
# you could do something like: ``return x.describe()`` for simple stats
results.append(result.groupby(['f_id','c_id').apply(f))
results = pd.concat(results)
Ключевым моментом здесь является обработка так, чтобы isin
НЕ имеет более 32 членов для любой переменной, к которой вы обращаетесь. Это внутреннее ограничение numpy / pytables. Если вы превысите это, запрос будет работать, но он отбросит эту переменную и выполнит переиндексацию ВСЕХ данных (что НЕ здесь, где вы хотите).
Таким образом, у вас будет хорошее подмножество данных в памяти всего за несколько циклов. Я думаю, что эти запросы будут занимать примерно столько же времени, сколько и большинство ваших запросов, но у вас их будет намного меньше.
Время запроса является приблизительно постоянным для данного подмножества (если данные не упорядочены так, что они полностью проиндексированы).
Таким образом, запрос сканирует "блоки" данных (на что указывают индексы). Если у вас много попаданий по многим блокам, тогда запрос медленнее.
Вот пример
In [5]: N = 100000000
In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])
In [7]: df['c_id'] = np.random.randint(0,10,size=N)
In [8]: df['f_id'] = np.random.randint(0,10,size=N)
In [9]: df.index = date_range('20130101',periods=N,freq='s')
In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])
In [11]: df.head()
Out[11]:
A B C c_id f_id
2013-01-01 00:00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9
In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop
In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop
In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop
Этот конкретный пример - 5 ГБ несжатого и 2,9 ГБ сжатого. Эти результаты находятся на сжатых данных. В этом случае на самом деле немного быстрее использовать несжатый (например, первый цикл занимает 3,5 с). Это 100 мм строк.
Таким образом, используя последний пример (4), вы получаете в 9 раз больше данных первого, чуть более чем в 3 раза больше времени запроса.
Однако ваше ускорение должно быть НАМНОГО больше, потому что вы не будете выбирать отдельные временные метки, а сделаете это позже.
Весь этот подход учитывает, что у вас достаточно основной памяти для хранения результатов в размерах пакета (например, вы выбираете относительно небольшую часть набора в пакетных запросах).