Совместное использование глобальных данных в приложении Flask/mod_wsgi между всеми процессами mod_wsgi

Объяснение кода:

Я пытаюсь запустить и управлять некоторыми процессами из своего веб-приложения Flask. Я хочу иметь возможность запустить процесс и иметь возможность завершить / убить его.

В моем примере testscript.sh просто спит в течение 10 секунд.

Мой класс processManager реализует шаблон проектирования Борг в попытке решить мою проблему, но он ничего не изменил. (Ранее мои процессы, переменные блокировки и потока были просто глобальными переменными, а processManager вообще не был классом.)

Атрибут процессов - это список всех запущенных процессов.

addProcess запускает новый процесс, добавляет его к процессам. Он смотрит, работает ли поток опроса, и запускает его, если это не так.

Поток опроса опрашивает все процессы в списке процессов и проверяет, изменилось ли их состояние выхода. Если все процессы завершены, поток останавливается.

stopProcess перебирает процессы, чтобы найти правильный и завершить его.

class Borg:
  _shared_state = {}
  def __init__(self):
    self.__dict__ = self._shared_state
# Pokes the processes to see their exit status. Stops when there is no running thread left.
class processManager(Borg):
class pollProcesses (threading.Thread):
    def __init__(self, pManager):
        threading.Thread.__init__(self)
        self.pManager = pManager
    def run(self):
        while True:
            #poll exit codes:
            #None   Running
            #0      Finished nicely 
            #-n     terminated by signal n
            time.sleep(1)
            pprint.pprint("-----------")
            self.pManager.lock.acquire()
            stop = True
            for p in self.pManager.processes:
                status = p.poll()
                pprint.pprint(str(p.jobid)+" "+str(p.pid)+" "+str(status))
                if status is None:
                    stop = False
                                    #else log process status somewhere

            self.pManager.lock.release()
            if stop:
                pprint.pprint("-----------")
                pprint.pprint("No process running, stopping scan.")
                break;

def __init__(self):
    Borg.__init__(self)
    pprint.pprint("New instance!")
    if not hasattr(self, "processes"):
        pprint.pprint("FIRST instance!")
        self.processes = []
    if not hasattr(self, "lock"):
        self.lock = threading.Lock()
    if not hasattr(self, "thread"):
        self.thread = None

def addProcess(self, job):
    pprint.pprint("-----------")
    path = os.path.realpath(__file__)

    pprint.pprint("Adding new process")
    p = Popen(["/path/to/testscript.sh"], shell=True)
    p.jobid = job['id']

    # Lock the processes list before adding data
    self.lock.acquire()
    self.processes.append(p)

    #If thread is finished, start it up
    if self.thread is None or not self.thread.isAlive():
        pprint.pprint("Starting thread")
        self.thread = None
        self.thread = self.pollProcesses(self)
        self.thread.start()
        pprint.pprint(self.thread)
    self.lock.release()
    return

def stopProcess(self, jobid):
    pprint.pprint("STOP job"+str(jobid))
    self.lock.acquire()
    pprint.pprint(self.thread)
    pprint.pprint("Jobs in processes:")
    for p in self.processes:
        pprint.pprint(p.jobid)
        if p.jobid == jobid:
            p.terminate()
    self.lock.release()
    return

В приложении фляги я в основном делаю что-то вроде этого:

@plugin.route('/startjob', methods=['GET'])
def startJob():
    if not hasattr(g, "pManager"):
        g.pManager = processManager()

    #SNIP - create/obtain job

    g.pManager.addProcess(job)
    return "OK"

И что-то похожее на прекращение работы.

Теперь, если я запускаю задание и пытаюсь остановить его из своего приложения Flask, иногда в списке процессов создается новый Borg/processManager (печатается "FIRST instance!")

И с тех пор все становится непредсказуемым:

Мои статусы процессов все еще обновляются, даже если они из двух потоков и двух разных списков процессов. Это не было моей целью, но это работает.

Моя проблема:

Но если я хочу остановить процесс, функция stopProcess может быть "неправильной", так как теперь есть несколько ProcessManager с различным списком процессов, я не могу узнать, имеет ли вызываемая функция stopProcess доступ к конкретному процессу, который я хочу остановиться.

Я думаю, что это может быть вызвано mod_wsgi или apache, чьи многопоточные / многопроцессорные механизмы приводят к тому, что мой processManager запускается в совершенно разных контекстах.

Мне бы хотелось, чтобы список процессов, блокировка и поток процесса в ProcessManager были одинаковыми, независимо от того, какой процесс mod_wsgi обращается к нему.

Я все еще довольно плохо знаком с Python, поэтому моя проблема может быть совсем другой. Любая помощь приветствуется.

Пожалуйста, имейте в виду, что это упрощенный код, он все еще воспроизводит проблему, но на самом деле он мало что делает.

1 ответ

В целом, не рекомендуется создавать подпроцессы из веб-приложения, поскольку это может вызвать различные проблемы в зависимости от используемого веб-сервера. Проблемы могут возникнуть из-за использования многопроцессорной конфигурации веб-сервера, наследования нечетных сигнальных масок от родительского и т. Д.

если вы хотите создавать задания для выполнения определенных задач, вам, скорее всего, лучше использовать специально созданную систему очередей для задач, такую ​​как Celery, Redis Queue (RQ), gearman или аналогичные.

Другие вопросы по тегам