Многопроцессорность - чтение больших входных данных - зависание программы

Я хочу запустить параллельные вычисления для некоторых входных данных, которые загружаются из файла. (Файл может быть очень большим, поэтому я использую генератор для этого.)

На определенном количестве элементов мой код работает нормально, но выше этого порога программа зависает (некоторые из рабочих процессов не завершаются).

Какие-либо предложения? (Я запускаю это с python2.7, 8 CPU; 5000 строк все еще в порядке, 7500 не работает.)

Во-первых, вам нужен входной файл. Создайте это в bash:

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done

Затем запустите это:

python2.7 main.py 100 counter.txt > run_log.txt

main.py:

#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp

def eat_queue(job_queue, result_queue):
    """Eats input queue, feeds output queue
    """
    proc_name = mp.current_process().name
    while True:
        try:
            job = job_queue.get(block=False)
            if job == None:
                print(proc_name + " DONE")
                return
            result_queue.put(execute(job))
        except Queue.Empty:
            pass    

def execute(x):
    """Does the computation on the input data
    """
    return x*x

def save_result(result):
    """Saves results in a list
    """
    result_list.append(result)

def load(ifilename):
    """Generator reading the input file and
        yielding it row by row
    """
    ifile = open(ifilename, "r")
    for line in ifile:
        line = line.strip()
        num = int(line)
        yield (num)
    ifile.close()
    print("file closed".upper())

def put_tasks(job_queue, ifilename):
    """Feeds the job queue
    """
    for item in load(ifilename):
        job_queue.put(item)
    for _ in range(get_max_workers()):
        job_queue.put(None)

def get_max_workers():
    """Returns optimal number of processes to run
    """
    max_workers = mp.cpu_count() - 2
    if max_workers < 1:
        return 1
    return max_workers

def run(workers_num, ifilename):
    job_queue = mp.Queue()
    result_queue = mp.Queue()

    # decide how many processes are to be created
    max_workers = get_max_workers()
    print "processes available: %d" % max_workers
    if workers_num < 1 or workers_num > max_workers:
        workers_num = max_workers

    workers_list = []
    # a process for feeding job queue with the input file
    task_gen = mp.Process(target=put_tasks, name="task_gen",
                          args=(job_queue, ifilename))
    workers_list.append(task_gen)

    for i in range(workers_num):
        tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
                                      args=(job_queue, result_queue))
        workers_list.append(tmp)

    for worker in workers_list:
        worker.start()

    for worker in workers_list:
        worker.join()
        print "worker %s finished!" % worker.name

if __name__ == '__main__':
    result_list = []
    args = sys.argv
    workers_num = int(args[1])
    ifilename = args[2]
    run(workers_num, ifilename)

1 ответ

Решение

Это потому, что ничего в вашем коде ничего не снимает result_queue, Поведение зависит от деталей буферизации внутренней очереди: если "не много" данных ожидает, все выглядит нормально, но если "много" данных ожидает, все зависает. Можно сказать немного больше, потому что это включает в себя слои внутренней магии;-) Но документы предупреждают об этом:

Предупреждение

Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue.cancel_join_thread), то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в канал.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете зайти в тупик, если не уверены, что все элементы, помещенные в очередь, были использованы. Точно так же, если дочерний процесс не является демоническим, родительский процесс может зависать при выходе, когда он пытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что в очереди, созданной с помощью диспетчера, этой проблемы нет. Смотрите Руководство по программированию.

Один простой способ исправить это: сначала добавить

            result_queue.put(None)

до eat_queue() возвращается. Затем добавьте:

count = 0
while count < workers_num:
    if result_queue.get() is None:
        count += 1

перед основной программой .join()с рабочими. Это истощает очередь результатов, и тогда все чисто выключается.

Кстати, этот код довольно странный:

while True:
    try:
        job = job_queue.get(block=False)
        if job == None:
            print(proc_name + " DONE")
            return
        result_queue.put(execute(job))
    except Queue.Empty:
        pass

Почему вы делаете неблокирование get()? Это превращает CPU-Hog в "петлю занятости", пока очередь пуста. Основная точка .get() это предоставить эффективный способ ждать, пока работа не появится. Так:

while True:
    job = job_queue.get()
    if job is None:
        print(proc_name + " DONE")
        break
    else:
        result_queue.put(execute(job))
result_queue.put(None)

делает то же самое, но гораздо эффективнее.

Предупреждение о размере очереди

Вы не спрашивали об этом, но давайте рассмотрим это до того, как он вас укусит;-) По умолчанию нет ограничений на Queueразмер. Если, например, вы добавляете миллиард предметов в QueueЭто потребует достаточно оперативной памяти, чтобы вместить миллиард предметов. Таким образом, если ваш производитель (-ы) могут создавать рабочие элементы быстрее, чем ваши (-и) потребители могут их обрабатывать, использование памяти может быстро выйти из-под контроля.

К счастью, это легко исправить: укажите максимальный размер очереди. Например,

    job_queue = mp.Queue(maxsize=10*workers_num)
                         ^^^^^^^^^^^^^^^^^^^^^^^

затем job_queue.put(some_work_item) будет блокироваться, пока потребители не уменьшат размер очереди до значения меньше максимального. Таким образом, вы можете обрабатывать огромные проблемы с очередью, которая требует тривиальной оперативной памяти.

Другие вопросы по тегам