Мультипроцессинг + psycopg2 дети зомби
Я пытаюсь вставить и обновить несколько миллионов строк, используя psycopg и многопроцессорность. Судя по документации, найденной в http://initd.org/psycopg/docs/usage.html, каждый дочерний элемент имеет свое собственное подключение к БД.
Но в ходе казни бегает только один ребенок, а остальные становятся зомби. Сценарий сам по себе довольно прост, и вот урезанная версия того же самого,
import os
import psycopg2
from multiprocessing import Process
def _target(args):
# Each forked process will have its own connection
# http://initd.org/psycopg/docs/usage.html#thread-and-process-safety
conn = get_db_connection()
# Stuff seems to execute till this point in all the children
print os.getpid(), os.getppid()
# Do some updates here. After this only one child is active and running
# Others become Zombies after a while.
if __name__ == '__main__':
args = "Foo"
for i in xrange(3):
p = Process(target=_target, args=(args,))
p.start()
Я также проверил, имеют ли таблицы повышенную блокировку, заглянув в pg_locks
, но, похоже, это не тот случай. Я что-то упускаю из виду?
1 ответ
Ваши процессы становятся зомби, потому что там задания завершены, но процессы не объединены. Я воспроизвел вашу проблему с помощью этого одиночного теста (я добавил сон для имитации длительных заданий):
import os
import time
from multiprocessing import Process
def _target(args):
print os.getpid(), os.getppid()
time.sleep(2)
print os.getpid(), "will stop"
if __name__ == '__main__':
args = "Foo"
for i in xrange(3):
p = Process(target=_target, args=(args,))
p.start()
import time
time.sleep(10)
при выполнении этого после того, как 3 процесса напечатают, что они остановятся, они станут в представлении ps (они больше не двигаются, но на самом деле не мертвы, потому что отец все еще держит их).
Если я заменю основную часть этим, у меня больше не будет зомби:
if __name__ == '__main__':
args = "Foo"
processes = []
for i in xrange(3):
p = Process(target=_target, args=(args,))
processes.append(p)
p.start()
for p in processes:
p.join()
import time
time.sleep(10)