Многопроцессорная сломанная труба после долгого времени

Я разрабатываю сканер с использованием многопроцессорной модели.

которые используют multiprocessing.Queue для хранения URL-информации, которая должна сканироваться, содержимое страницы, которую нужно проанализировать, и что-то еще, использовать multiprocessing.Event для управления подпроцессами; использовать multiprocessing.Manager.dict для хранения хэша просканированного URL; каждая многопроцессорная обработка. Экземпляр Manager.dict использует многопроцессорную блокировку для контроля доступа.

Все три типа параметров совместно используются всеми подпроцессами и родительским процессом, и все параметры организованы в классе, я использую экземпляр класса для передачи общих параметров из родительского процесса в подпроцесс. Как: MGR = SyncManager() class Global_Params(): Queue_URL = multiprocessing.Queue() URL_RESULY = MGR.dict() URL_RESULY_Mutex = multiprocessing.Lock() STOP_EVENT = multiprocessing.Event() global_params = Global_Params()

В моем собственном механизме тайм-аута я использую process.terminate, чтобы остановить процесс, который не может останавливаться сам по себе в течение длительного времени!

В моем тестовом примере более 2500 целевых сайтов (некоторые не обслуживаются, некоторые огромные). сканировать сайт за сайтом в файле целевых сайтов.

В начале сканер мог работать хорошо, но через долгое время (иногда 8 часов, иногда 2 часа, иногда более 15 часов) сканер сканировал более 100(что не определено) сайтов, я получу информацию об ошибке:"Errno 32 сломанная труба"

Я пробовал следующие методы для определения местоположения и решения проблем:

  1. Если сайт А, на котором работает сканер, затем использует сканер, чтобы сканировать сайт отдельно, сканер работал хорошо. Даже если я получу фрагмент (например, 20 сайтов) из файла целевых сайтов, который содержит сайт A, сканер работал хорошо!

  2. добавьте "-X /tmp/pymp-* 240 /tmp" в /etc/cron.daily/tmpwatch

  3. когда произошел сбой, файл / tmp / pymp- * все еще там

  4. использовать multiprocessing.managers.SyncManager заменить multiprocessing.Manager и игнорировать большинство сигналов, кроме SIGKILL и SIGTERM

  5. для каждого целевого сайта я очищаю большинство общих параметров (очереди, дикты и события), если произошла ошибка, создайте новый экземпляр:

while global_params.Queue_url.qsize()>0: try: global_params.Queue_url.get(block=False) except Exception,e: print_info(str(e)) print_info("Clear Queue_url error!") time.sleep(1) global_params.Queue_url = Queue() pass ниже приведена информация о трассировке, функция print_info определена для печати и сохранения отладочной информации самостоятельно: [Errno 32] Broken pipe Traceback (most recent call last): File "Spider.py", line 613, in <module> main(args) File "Spider.py", line 565, in main spider.start() File "Spider.py", line 367, in start print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT)) File "<string>", line 2, in __len__ File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod kind, result = conn.recv() EOFError Я не могу понять почему, кто-нибудь знает причину?

1 ответ

Я не знаю, решает ли это вашу проблему, но стоит упомянуть один момент:

global_params.Queue_url.get(block=False)

... создает исключение Queue.Empty, если очередь пуста. Не стоит воссоздавать очередь за пустым исключением.

Воссоздание очереди может привести к гоночным условиям.

С моей точки зрения, у вас есть возможности:

  1. избавиться от блока кода "очередь на отдых"
  2. переключиться на другую реализацию очереди

использовать:

from Queue import Queue

вместо:

from multiprocessing import Queue
Другие вопросы по тегам