Fastparquet, похоже, не давит на фильтры

Я создал файл паркета с использованием dask dataframe to_parquet метод с использованием fastparquet как двигатель. Чтение файла с использованием fastparquet.ParquetFile я получаю следующую информацию

from fastparquet import ParquetFile
file = ParquetFile('data/raw_data_fastpar.par/')
file.dtypes
OrderedDict([(u'@timestamp', dtype('<M8[ns]')),
         (u'@version', dtype('O')),
         (u'_id', dtype('O')),
         (u'browser_build', dtype('O')),
         (u'browser_device', dtype('O')),
         (u'browser_major', dtype('float64')),
         (u'browser_minor', dtype('float64')),
         (u'browser_name', dtype('O')),
         (u'browser_os', dtype('O')),
         (u'browser_os_name', dtype('O')),
         (u'dst', dtype('O')),
         (u'dst_port', dtype('float64')),
         (u'http_req_header_contentlength', dtype('O')),
         (u'http_req_header_host', dtype('O')),
         (u'http_req_header_referer', dtype('O')),
         (u'http_req_header_useragent', dtype('O')),
         (u'http_req_headers', dtype('O')),
         (u'http_req_method', dtype('O')),
         (u'http_req_secondleveldomain', dtype('O')),
         (u'http_req_url', dtype('O')),
         (u'http_req_version', dtype('O')),
         (u'http_resp_code', dtype('O')),
         (u'http_resp_header_contentlength', dtype('O')),
         (u'http_resp_header_contenttype', dtype('O')),
         (u'http_resp_headers', dtype('O')),
         (u'http_user', dtype('O')),
         (u'received_from', dtype('O')),
         (u'redis_db', dtype('O')),
         (u'src', dtype('O')),
         (u'src_port', dtype('float64')),
         (u'type', dtype('O')),
         (u'month', u'category'),
         (u'day', u'category')])


file.schema.text
u'- schema: \n
| - @timestamp: INT64, TIMESTAMP_MICROS, OPTIONAL\n
| - @version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - _id: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_build: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_device: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_major: DOUBLE, OPTIONAL\n
| - browser_minor: DOUBLE, OPTIONAL\n
| - browser_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst_port: DOUBLE, OPTIONAL\n
| - http_req_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_host: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_referer: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_useragent: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_method: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_secondleveldomain: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_url: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_code: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contenttype: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_user: BYTE_ARRAY, UTF8, OPTIONAL\n
| - received_from: BYTE_ARRAY, UTF8, OPTIONAL\n
| - redis_db: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src_port: DOUBLE, OPTIONAL\n  
| - type: BYTE_ARRAY, UTF8, OPTIONAL'

Таким образом, поля являются правильными. Так как они были данными временных рядов, месяц и день использовались для разделения данных. Общее количество данных 22815984, Теперь я пытаюсь прочитать паркет, используя ключевое слово filters, и получаю странное поведение.

# this works
import datetime
since = datetime.datetime(year=2018, month=10, day=1)
filters = [('@timestamp', '>', np.datetime64(since)),]

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'], filters=filters)

raw_data.count().compute()

http_user          3835971
dst                3835971
dst_port           3835971
http_req_method    3835971
dtype: int64

что правильно, и фильтрация была нажата. Когда я меняю фильтр на другое поле,

filters = [('http_req_method', '=', 'GET'),]

Возвращает все данные

http_user          22815984
dst                22815984
dst_port           22815984
http_req_method    22815984
dtype: int64

Делая это вручную, это работает:

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'])
raw_data.loc[raw_data.http_req_method == 'GET'].count().compute()
http_user          14407709
dst                14407709
dst_port           14407709
http_req_method    14407709
dtype: int64

Также изменение фильтра на несуществующее поле не вызывает никаких исключений, так что это тоже странно. Есть ли что-то, чего мне не хватает в отношении паркета и фильтрации?

Dask DataFrame Structure:
    http_user   dst     dst_port    http_req_method
npartitions=612                 
    object      object  float64         object
    ...         ...     ...             ...
    ...         ...     ...             ...     
... ...         ...     ...             ...
    ...         ...     ...             ...
Dask Name: read-parquet, 612 tasks

1 ответ

Решение

filters= Опция включена в качестве оптимизации для случаев, когда это имеет смысл, чтобы избежать рассмотрения разделов данных, которые наверняка не содержат никаких действительных данных.

В документах:

Это реализует только фильтрацию на уровне группы строк (разделов), т. Е. Для предотвращения загрузки некоторых фрагментов данных и только в том случае, если в метаданные включена соответствующая статистика.

Например, если у вас есть набор групп строк, в которых интересующий столбец монотонно увеличивается, то фильтр этого столбца, вероятно, сможет исключить многие из групп строк (или разделов). С другой стороны, если каждая группа строк содержит значения во всем диапазоне этого столбца, этот тип фильтра будет иметь какой-либо эффект.

data[raw_data.http_req_method == 'GET']

Это делает что-то другое: теперь каждая группа строк загружается как раздел, а затем фильтруется в памяти рабочих. Dask может загружать только определенные разделы только в особом случае, который вы фильтруете по индексу.

Если вам нужна оптимизация, но ваши данные не структурированы таким образом, что границы раздела идеально совпадают с условиями фильтра, вам нужно будет использовать оба метода.

Пожалуйста, поднимите вопрос, если считаете, что строка документа может быть более понятной.

Другие вопросы по тегам