Многопроцессорная сломанная труба после долгого времени
Я разрабатываю сканер с использованием многопроцессорной модели.
которые используют multiprocessing.Queue для хранения URL-информации, которая должна сканироваться, содержимое страницы, которую нужно проанализировать, и что-то еще, использовать multiprocessing.Event для управления подпроцессами; использовать multiprocessing.Manager.dict для хранения хэша просканированного URL; каждая многопроцессорная обработка. Экземпляр Manager.dict использует многопроцессорную блокировку для контроля доступа.
Все три типа параметров совместно используются всеми подпроцессами и родительским процессом, и все параметры организованы в классе, я использую экземпляр класса для передачи общих параметров из родительского процесса в подпроцесс. Как:
MGR = SyncManager()
class Global_Params():
Queue_URL = multiprocessing.Queue()
URL_RESULY = MGR.dict()
URL_RESULY_Mutex = multiprocessing.Lock()
STOP_EVENT = multiprocessing.Event()
global_params = Global_Params()
В моем собственном механизме тайм-аута я использую process.terminate, чтобы остановить процесс, который не может останавливаться сам по себе в течение длительного времени!
В моем тестовом примере более 2500 целевых сайтов (некоторые не обслуживаются, некоторые огромные). сканировать сайт за сайтом в файле целевых сайтов.
В начале сканер мог работать хорошо, но через долгое время (иногда 8 часов, иногда 2 часа, иногда более 15 часов) сканер сканировал более 100(что не определено) сайтов, я получу информацию об ошибке:"Errno 32 сломанная труба"
Я пробовал следующие методы для определения местоположения и решения проблем:
Если сайт А, на котором работает сканер, затем использует сканер, чтобы сканировать сайт отдельно, сканер работал хорошо. Даже если я получу фрагмент (например, 20 сайтов) из файла целевых сайтов, который содержит сайт A, сканер работал хорошо!
добавьте "-X /tmp/pymp-* 240 /tmp" в /etc/cron.daily/tmpwatch
когда произошел сбой, файл / tmp / pymp- * все еще там
использовать multiprocessing.managers.SyncManager заменить multiprocessing.Manager и игнорировать большинство сигналов, кроме SIGKILL и SIGTERM
для каждого целевого сайта я очищаю большинство общих параметров (очереди, дикты и события), если произошла ошибка, создайте новый экземпляр:
while global_params.Queue_url.qsize()>0:
try:
global_params.Queue_url.get(block=False)
except Exception,e:
print_info(str(e))
print_info("Clear Queue_url error!")
time.sleep(1)
global_params.Queue_url = Queue()
pass
ниже приведена информация о трассировке, функция print_info определена для печати и сохранения отладочной информации самостоятельно:
[Errno 32] Broken pipe
Traceback (most recent call last):
File "Spider.py", line 613, in <module>
main(args)
File "Spider.py", line 565, in main
spider.start()
File "Spider.py", line 367, in start
print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT))
File "<string>", line 2, in __len__
File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod
kind, result = conn.recv()
EOFError
Я не могу понять почему, кто-нибудь знает причину?
1 ответ
Я не знаю, решает ли это вашу проблему, но стоит упомянуть один момент:
global_params.Queue_url.get(block=False)
... создает исключение Queue.Empty, если очередь пуста. Не стоит воссоздавать очередь за пустым исключением.
Воссоздание очереди может привести к гоночным условиям.
С моей точки зрения, у вас есть возможности:
- избавиться от блока кода "очередь на отдых"
- переключиться на другую реализацию очереди
использовать:
from Queue import Queue
вместо:
from multiprocessing import Queue