Читать большой файл параллельно?

У меня есть большой файл, который мне нужно прочитать и создать словарь. Я бы хотел, чтобы это было как можно быстрее. Однако мой код на python слишком медленный. Вот минимальный пример, который показывает проблему.

Сначала сделайте поддельные данные

paste <(seq 20000000) <(seq 2 20000001)  > largefile.txt

Теперь вот минимальный кусок кода на Python для чтения и создания словаря.

import sys
from collections import defaultdict
fin = open(sys.argv[1])

dict = defaultdict(list)

for line in fin:
    parts = line.split()
    dict[parts[0]].append(parts[1])

Тайминги:

time ./read.py largefile.txt
real    0m55.746s

Однако можно прочитать весь файл намного быстрее, так как:

time cut -f1 largefile.txt > /dev/null    
real    0m1.702s

Мой процессор имеет 8 ядер, возможно ли распараллелить эту программу на python, чтобы ускорить ее?

Одной из возможностей может быть чтение в больших порциях ввода и последующее параллельное выполнение 8 процессов на разных неперекрывающихся секциях, делая словари параллельными из данных в памяти, а затем считывая в другой большой порции. Возможно ли это как-то в Python, использующем многопроцессорность?

Обновление Поддельные данные были не очень хорошими, так как они имели только одно значение на ключ. Лучше

perl -E 'say int rand 1e7, $", int rand 1e4 for 1 .. 1e7' > largefile.txt

(Связано с прочтением большого файла и составлением словаря.)

6 ответов

Решение

Об этом на сайте Тима Брея несколько лет назад была серия постов в блоге "Проект широкого поиска" [1]. Вы можете найти там решение [2] Фредрика Лунда из ElementTree [3] и PIL [4]. Я знаю, что размещать ссылки на этом сайте обычно не рекомендуется, но я думаю, что эти ссылки дают вам лучший ответ, чем копирование его кода.

[1] http://www.tbray.org/ongoing/When/200x/2007/10/30/WF-Results
[2] http://effbot.org/zone/wide-finder.htm
[3] http://docs.python.org/3/library/xml.etree.elementtree.html
[4] http://www.pythonware.com/products/pil/

Может быть возможно распараллелить это, чтобы ускорить его, но выполнение нескольких параллельных чтений вряд ли поможет.

Ваша операционная система вряд ли будет выполнять несколько операций чтения параллельно (исключение составляет нечто вроде чередующегося рейдового массива, и в этом случае вам все равно нужно знать шаг, чтобы оптимально его использовать).

Что вы можете сделать, так это запустить относительно дорогие операции строки / словаря / списка параллельно чтению.

Таким образом, один поток считывает и помещает (большие) чанки в синхронизированную очередь, один или несколько потребительских потоков извлекают чанки из очереди, разбивают их на строки и заполняют словарь.

(Если вы используете несколько пользовательских потоков, как говорит Pappnese, создайте по одному словарю на поток, а затем присоединитесь к ним).


подсказки:


Число рейнольдса Баунти:

C, очевидно, не имеет GIL, с которым приходится бороться, поэтому многие потребители, вероятно, будут лучше масштабироваться. Поведение при чтении не меняется. Недостатком является то, что в C отсутствует встроенная поддержка хэш-карт (при условии, что вам все еще нужен словарь в стиле Python) и синхронизированных очередей, поэтому вы должны либо найти подходящие компоненты, либо написать свои собственные. Базовая стратегия, состоящая из множества потребителей, каждый из которых создает свой собственный словарь и затем объединяет их в конце, все еще, вероятно, является наилучшей.

С помощью strtok_r вместо str.split может быть быстрее, но помните, что вам нужно будет вручную управлять памятью для всех ваших строк. Да, и вам нужна логика для управления фрагментами строк тоже. Честно говоря, С дает вам так много вариантов, я думаю, вам просто нужно профилировать его и посмотреть.

Кажется заманчивым думать, что использование пула обработки решит подобные проблемы, но в конечном итоге это будет немного сложнее, по крайней мере, в чистом Python.

Поскольку ОП упоминал, что списки в каждой строке ввода на практике будут длиннее, чем два элемента, я сделал немного более реалистичный файл ввода, используя:

paste <(seq 20000000) <(seq 2 20000001) <(seq 3 20000002) |
  head -1000000 > largefile.txt

После профилирования исходного кода я обнаружил, что самой медленной частью процесса является процедура разделения строк. (.split() заняло примерно в 2 раза больше времени, чем .append() на моей машине.)

1000000    0.333    0.000    0.333    0.000 {method 'split' of 'str' objects}
1000000    0.154    0.000    0.154    0.000 {method 'append' of 'list' objects}

Поэтому я учел разбиение на другую функцию и использовал пул для распределения работы по разделению полей:

import sys
import collections
import multiprocessing as mp

d = collections.defaultdict(list)

def split(l):
    return l.split()

pool = mp.Pool(processes=4)
for keys in pool.map(split, open(sys.argv[1])):
    d[keys[0]].append(keys[1:])

К сожалению, добавление пула замедлило процесс более чем в 2 раза. Оригинальная версия выглядела так:

$ time python process.py smallfile.txt 
real    0m7.170s
user    0m6.884s
sys     0m0.260s

по сравнению с параллельной версией:

$ time python process-mp.py smallfile.txt 
real    0m16.655s
user    0m24.688s
sys     0m1.380s

Поскольку .map() Вызов call в основном должен сериализовать (извлекать) каждый вход, отправлять его удаленному процессу, а затем десериализовывать (снимать) возвращаемое значение с удаленного процесса, используя пул таким образом, намного медленнее. Вы получаете некоторое улучшение, добавляя больше ядер в пул, но я бы сказал, что это в корне неправильный способ распространения этой работы.

Я полагаю, что для ускорения процесса в разных ядрах вам нужно будет читать большими кусками входные данные, используя некоторый фиксированный размер блока. Затем вы можете отправить весь блок рабочему процессу и получить обратно сериализованные списки (хотя пока неизвестно, сколько будет стоить десериализация). Однако чтение ввода в блоках фиксированного размера может показаться сложным с ожидаемым вводом, поскольку я предполагаю, что каждая строка не обязательно имеет одинаковую длину.

Более кардинальное решение для медленного добавления словаря: замените словарь массивом пар строк. Заполните это и затем сортируйте.

Одна вещь, которую вы можете попробовать, - это получить счетчик строк из файла, затем создать 8 потоков, которые составляют словарь из 1/8 каждого файла, а затем присоединиться к словарям, когда все потоки будут завершены. Это, вероятно, ускорит его, если добавление требует времени, а не чтения строк.

Если ваши данные в файле не изменяются так часто, вы можете выбрать сериализацию. Интерпретатор Python будет десериализовать его намного быстрее. Вы можете использовать модуль cPickle.

Или создание 8 отдельных процессов - другой вариант. Потому что наличие единственного диктата делает это намного более возможным. Вы можете взаимодействовать между этими процессами через Pipe в модуле "multiprocessing" или в модуле "socket".

С наилучшими пожеланиями

Барыш Шухадар.

Другие вопросы по тегам