Невозможно.get() из мультипроцессинга.

Я создаю веб-приложение для обработки ~60000 (и растущих) больших файлов, проведу некоторый анализ и верну "лучшее предположение", которое должен проверить пользователь. Файлы будут уточняться по категориям, чтобы избежать загрузки каждого файла, но у меня все еще остается сценарий, когда мне, возможно, придется обрабатывать более 1000 файлов одновременно.

Это большие файлы, обработка каждого из которых может занимать до 8-9 секунд, и в ситуации с файлами более 1000 нецелесообразно, чтобы пользователь ждал 8 секунд между проверками или 2 часа и более, пока файлы обрабатываются заранее.

Чтобы преодолеть это, я решил использовать многопроцессорность, чтобы порождать несколько рабочих, каждый из которых будет выбирать из очереди файлов, обрабатывать их и вставлять в очередь вывода. У меня есть другой метод, который в основном опрашивает выходную очередь на предмет, а затем передает их клиенту, когда он становится доступным.

Это работает хорошо, пока часть пути, когда очередь произвольно не прекратит возвращать элементы. Мы используем gevent с Django и uwsgi в нашей среде, и я знаю, что создание дочернего процесса с помощью многопроцессорной обработки в контексте gevent приводит к нежелательному состоянию цикла событий в дочернем элементе. Гринлеты, порожденные до разветвления, дублируются у ребенка. Поэтому я решил использовать gipc для помощи в обработке дочерних процессов.

Пример моего кода (я не могу опубликовать свой фактический код):

import multiprocessing
import gipc
from item import Item

MAX_WORKERS = 10

class ProcessFiles(object):

    def __init__(self):
        self.input_queue = multiprocessing.Queue()
        self.output_queue = multiprocessing.Queue()
        self.file_count = 0

    def query_for_results(self):
        # Query db for records of files to process.
        # Return results and set self.file_count equal to
        # the number of records returned.
        pass

    # The subprocess.
    def worker(self):
        # Chisel away at the input queue until no items remain.
        while True:
            if self.no_items_remain():
                return

            item = self.input_queue.get(item)
            item.process()
            self.output_queue.put(item)

    def start(self):
        # Get results and store in Queue for processing
        results = self.query_for_results()
        for result in results:
             item = Item(result)
             self.input_queue.put(item)

        # Spawn workers to process files.
        for _ in xrange(MAX_WORKERS):
            process = gipc.start_process(self.worker)

        # Poll for items to send to client.
        return self.get_processed_items()

    def get_processed_items(self):

        # Wait for the output queue to hold at least 1 item.
        # When an item becomes available, yield it to client.
        count = 0
        while count != self.file_count:
            #item = self._get_processed_item()
            # Debugging:
            try:
                item = self.output_queue.get(timeout=1)
            except:
                print '\tError fetching processed item. Retrying...'
                continue

            if item:
                print 'QUEUE COUNT: {}'.format(self.output_queue.qsize())
                count += 1
                yield item
        yield 'end'

Я ожидаю, что выходные данные покажут текущий счетчик очереди после обработки и выдачи элемента:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
...
QUEUE COUNT: 4
QUEUE COUNT: 3
QUEUE COUNT: 2
QUEUE COUNT: 1

Однако сценарию удается выдать только несколько элементов перед сбоем:

QUEUE COUNT: 999
QUEUE COUNT: 998
QUEUE COUNT: 997
QUEUE COUNT: 996
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    Error fetching processed item. Retrying...
    ...

Мой вопрос: что именно происходит? Почему я не могу get из очереди? Как я могу вернуть ожидаемый товар и избежать этого?

1 ответ

Какое именно исключение выдается, когда вы не можете получить предмет? Вы слепо ловите все исключения, которые могут быть выброшены. Кроме того, почему бы просто не использовать get без тайм-аута? Вы немедленно попробуйте снова, ничего не делая. Можно просто позвонить, чтобы получить блок, пока предмет не будет готов.

Что касается проблемы, я думаю, что происходит то, что gipc закрывает каналы, связанные с вашей очередью и тем самым нарушает очередь. Я ожидаю OSError бросается, а не queue.Empty, Смотрите этот отчет об ошибке для деталей.

В качестве альтернативы вы можете использовать пул процессов, инициировать пул до того, какgeventвещи случаются (то есть вам не нужно беспокоиться о проблеме цикла событий). Отправить задания в пул, используяimap_unordered и ты должен быть в порядке.

Ваша функция запуска будет выглядеть примерно так:

def start(self):
    results = self.query_for_results()
    return self.pool.imap_unordered(self.worker, results, 
        chunksize=len(results) // self.num_procs_in_pool)

@staticmethod
def worker(item):
    item.process()
    return item
Другие вопросы по тегам