Как сделать так, чтобы длинные задачи можно было отменять через HTTP на HTTP-серверах Tornado?

Я реализовал HTTP-оболочку для какой-то тяжелой задачи, и я выбрал Tornado в качестве инфраструктуры интерфейсного сервера (это потому, что тяжелая задача написана на Python, а я просто привык к Tornado).

В настоящее время я только что назвал тяжелые задачи непосредственно из процесса Торнадо. Я подготовил какой-то веб-интерфейс с использованием jQuery, чтобы он выполнял AJAX-запрос с параметрами, заданными в форме.

Как вы можете себе представить, задание, которое я бросил из своего веб-браузера, не может быть отменено. Единственный способ отменить это послать 9 или 15 сигнал процессу Python, а пользователи обычно не могут этого делать.

Я хочу, чтобы текущая рабочая задача была отменена путем запроса каких-либо "отмененных" запросов через HTTP. Как это можно сделать? Что делает большинство веб-сервисов, выполняющих сложные задачи (например, кодирование видео на YouTube)?

1 ответ

Решение

На самом деле Торнадо Futures не поддерживает отмену ( документы). Более того, даже используя with_timeout задание с истекшим сроком действия все еще выполняется, только его результаты не ожидают ничего.

Единственный способ, как указано также в разделе Как я могу отменить асинхронную задачу в торнадо с тайм-аутом?, чтобы реализовать логику таким образом, чтобы она могла быть отменена (с некоторым флагом или чем-то еще).

Пример:

  • работа - простой асинхронный сон
  • / списки вакансий
  • /add/TIME добавляет новую работу - ВРЕМЯ в секундах - укажите, как долго спать
  • /cancel/ID отменить работу

Код может выглядеть так:

from tornado.ioloop import IOLoop
from tornado import gen, web
from time import time

class Job():

    def __init__(self, run_sec):
        self.run_sec = int(run_sec)
        self.start_time = None
        self.end_time = None
        self._cancelled = False

    @gen.coroutine
    def run(self):
        """ Some job

        The job is simple: sleep for a given number of seconds.
        It could be implemented as:
             yield gen.sleep(self.run_sec)
        but this way makes it not cancellable, so
        it is divided: run 1s sleep, run_sec times 
        """
        self.start_time = time()
        deadline = self.start_time + self.run_sec
        while not self._cancelled:
            yield gen.sleep(1)
            if time() >= deadline:
                break
        self.end_time = time()

    def cancel(self):
    """ Cancels job

    Returns None on success,
    raises Exception on error:
      if job is already cancelled or done
    """
        if self._cancelled:
            raise Exception('Job is already cancelled')
        if self.end_time is not None:
            raise Exception('Job is already done')
        self._cancelled = True

    def get_state(self):
        if self._cancelled:
            if self.end_time is None:
                # job might be running still
                # and will be stopped on the next while check
                return 'CANCELING...'
            else:
                return 'CANCELLED'
        elif self.end_time is None:
            return 'RUNNING...'
        elif self.start_time is None:
            # actually this never will shown
            # as after creation, job is immediately started
            return 'NOT STARTED'
        else:
            return 'DONE'


class MainHandler(web.RequestHandler):

    def get(self, op=None, param=None):
        if op == 'add':
            # add new job
            new_job = Job(run_sec=param)
            self.application.jobs.append(new_job)
            new_job.run()
            self.write('Job added')
        elif op == 'cancel':
            # cancel job - stop running
            self.application.jobs[int(param)].cancel()
            self.write('Job cancelled')
        else:
            # list jobs
            self.write('<pre>') # this is so ugly... ;P
            self.write('ID\tRUNSEC\tSTART_TIME\tSTATE\tEND_TIME\n')
            for idx, job in enumerate(self.application.jobs):
                self.write('%s\t%s\t%s\t%s\t%s\n' % (
                    idx, job.run_sec, job.start_time,
                    job.get_state(), job.end_time
                ))


class MyApplication(web.Application):

    def __init__(self):

        # to store tasks
        self.jobs = []

        super(MyApplication, self).__init__([
            (r"/", MainHandler),
            (r"/(add)/(\d*)", MainHandler),
            (r"/(cancel)/(\d*)", MainHandler),
        ])

if __name__ == "__main__":
    MyApplication().listen(8888)
    IOLoop.current().start()

Добавить пару рабочих мест:

for a in `seq 12 120`; do curl http://127.0.0.1:8888/add/$a; done

Затем отмените некоторые... Обратите внимание - для этого требуется только Торнадо.

Этот пример очень прост gen.sleep предназначено быть вашей тяжелой задачей. Конечно, не все задания так просто реализовать, чтобы их можно было отменять.

Другие вопросы по тегам