Совместное использование глобальных данных в приложении Flask/mod_wsgi между всеми процессами mod_wsgi
Объяснение кода:
Я пытаюсь запустить и управлять некоторыми процессами из своего веб-приложения Flask. Я хочу иметь возможность запустить процесс и иметь возможность завершить / убить его.
В моем примере testscript.sh просто спит в течение 10 секунд.
Мой класс processManager реализует шаблон проектирования Борг в попытке решить мою проблему, но он ничего не изменил. (Ранее мои процессы, переменные блокировки и потока были просто глобальными переменными, а processManager вообще не был классом.)
Атрибут процессов - это список всех запущенных процессов.
addProcess запускает новый процесс, добавляет его к процессам. Он смотрит, работает ли поток опроса, и запускает его, если это не так.
Поток опроса опрашивает все процессы в списке процессов и проверяет, изменилось ли их состояние выхода. Если все процессы завершены, поток останавливается.
stopProcess перебирает процессы, чтобы найти правильный и завершить его.
class Borg:
_shared_state = {}
def __init__(self):
self.__dict__ = self._shared_state
# Pokes the processes to see their exit status. Stops when there is no running thread left.
class processManager(Borg):
class pollProcesses (threading.Thread):
def __init__(self, pManager):
threading.Thread.__init__(self)
self.pManager = pManager
def run(self):
while True:
#poll exit codes:
#None Running
#0 Finished nicely
#-n terminated by signal n
time.sleep(1)
pprint.pprint("-----------")
self.pManager.lock.acquire()
stop = True
for p in self.pManager.processes:
status = p.poll()
pprint.pprint(str(p.jobid)+" "+str(p.pid)+" "+str(status))
if status is None:
stop = False
#else log process status somewhere
self.pManager.lock.release()
if stop:
pprint.pprint("-----------")
pprint.pprint("No process running, stopping scan.")
break;
def __init__(self):
Borg.__init__(self)
pprint.pprint("New instance!")
if not hasattr(self, "processes"):
pprint.pprint("FIRST instance!")
self.processes = []
if not hasattr(self, "lock"):
self.lock = threading.Lock()
if not hasattr(self, "thread"):
self.thread = None
def addProcess(self, job):
pprint.pprint("-----------")
path = os.path.realpath(__file__)
pprint.pprint("Adding new process")
p = Popen(["/path/to/testscript.sh"], shell=True)
p.jobid = job['id']
# Lock the processes list before adding data
self.lock.acquire()
self.processes.append(p)
#If thread is finished, start it up
if self.thread is None or not self.thread.isAlive():
pprint.pprint("Starting thread")
self.thread = None
self.thread = self.pollProcesses(self)
self.thread.start()
pprint.pprint(self.thread)
self.lock.release()
return
def stopProcess(self, jobid):
pprint.pprint("STOP job"+str(jobid))
self.lock.acquire()
pprint.pprint(self.thread)
pprint.pprint("Jobs in processes:")
for p in self.processes:
pprint.pprint(p.jobid)
if p.jobid == jobid:
p.terminate()
self.lock.release()
return
В приложении фляги я в основном делаю что-то вроде этого:
@plugin.route('/startjob', methods=['GET'])
def startJob():
if not hasattr(g, "pManager"):
g.pManager = processManager()
#SNIP - create/obtain job
g.pManager.addProcess(job)
return "OK"
И что-то похожее на прекращение работы.
Теперь, если я запускаю задание и пытаюсь остановить его из своего приложения Flask, иногда в списке процессов создается новый Borg/processManager (печатается "FIRST instance!")
И с тех пор все становится непредсказуемым:
Мои статусы процессов все еще обновляются, даже если они из двух потоков и двух разных списков процессов. Это не было моей целью, но это работает.
Моя проблема:
Но если я хочу остановить процесс, функция stopProcess может быть "неправильной", так как теперь есть несколько ProcessManager с различным списком процессов, я не могу узнать, имеет ли вызываемая функция stopProcess доступ к конкретному процессу, который я хочу остановиться.
Я думаю, что это может быть вызвано mod_wsgi или apache, чьи многопоточные / многопроцессорные механизмы приводят к тому, что мой processManager запускается в совершенно разных контекстах.
Мне бы хотелось, чтобы список процессов, блокировка и поток процесса в ProcessManager были одинаковыми, независимо от того, какой процесс mod_wsgi обращается к нему.
Я все еще довольно плохо знаком с Python, поэтому моя проблема может быть совсем другой. Любая помощь приветствуется.
Пожалуйста, имейте в виду, что это упрощенный код, он все еще воспроизводит проблему, но на самом деле он мало что делает.
1 ответ
В целом, не рекомендуется создавать подпроцессы из веб-приложения, поскольку это может вызвать различные проблемы в зависимости от используемого веб-сервера. Проблемы могут возникнуть из-за использования многопроцессорной конфигурации веб-сервера, наследования нечетных сигнальных масок от родительского и т. Д.
если вы хотите создавать задания для выполнения определенных задач, вам, скорее всего, лучше использовать специально созданную систему очередей для задач, такую как Celery, Redis Queue (RQ), gearman или аналогичные.