Выполнить функцию после того, как Flask вернет ответ

У меня есть код, который нужно выполнить после того, как Flask вернет ответ. Я не думаю, что это достаточно сложно, чтобы настроить очередь задач, как Celery для него. Ключевое требование заключается в том, что Flask должен вернуть ответ клиенту перед запуском этой функции. Он не может ждать выполнения функции.

Есть некоторые существующие вопросы по этому поводу, но ни один из ответов, по-видимому, не относится к выполнению задачи после отправки ответа клиенту, они все равно выполняются синхронно, а затем возвращается ответ.

10 ответов

Решение

Короче говоря, Flask не предоставляет никаких специальных возможностей для достижения этой цели. Для простых одноразовых задач рассмотрите многопоточность Python, как показано ниже. Для более сложных конфигураций используйте очередь задач, такую ​​как RQ или Celery.

Зачем?

Важно понимать, какие функции обеспечивает Flask и почему они не достигают намеченной цели. Все это полезно в других случаях и хорошо читается, но не помогает при выполнении фоновых задач.

Колба-х after_request обработчик

Колба-х after_request Обработчик, как подробно описано в этом шаблоне для обратных вызовов с отложенным запросом и этого фрагмента при присоединении различных функций к запросу, передает запрос в функцию обратного вызова. Предполагаемый вариант использования - изменить запрос, например прикрепить файл cookie.

Таким образом, запрос будет ожидать завершения этих обработчиков, потому что ожидается, что сам запрос изменится в результате.

Колба-х teardown_request обработчик

Это похоже на after_request, но teardown_request не получает request объект. Значит, это не будет ждать запроса, верно?

Это похоже на решение, как предполагает ответ на аналогичный вопрос переполнения стека. А поскольку документация Flask объясняет, что обратные вызовы разрыва не зависят от фактического запроса и не получают контекст запроса, у вас есть веские основания верить этому.

К несчастью, teardown_request все еще синхронный, это происходит в более поздней части обработки запроса Flask, когда запрос больше не может быть изменен. Flask по- прежнему будет ждать завершения функций разрыва перед возвратом ответа, как предписывает этот список обратных вызовов и ошибок Flask.

Потоковые ответы Flask

Flask может передавать ответы, передавая генератор Response(), так как этот ответ переполнения стека предлагает аналогичный вопрос.

При потоковой передаче клиент начинает получать ответ до завершения запроса. Однако запрос по-прежнему выполняется синхронно, поэтому работник, обрабатывающий запрос, занят до тех пор, пока поток не будет завершен.

Этот шаблон Flask для потоковой передачи включает в себя некоторую документацию по использованию stream_with_context(), что необходимо для включения контекста запроса.

Так в чем же решение?

Flask не предлагает решение для запуска функций в фоновом режиме, потому что это не ответственность Flask.

В большинстве случаев лучшим способом решения этой проблемы является использование очереди задач, такой как RQ или Celery. Они управляют такими сложными вещами, как конфигурация, планирование и распределение работников. Это наиболее распространенный ответ на этот тип вопросов, потому что он наиболее правильный и заставляет вас настраивать вещи так, чтобы вы учитывали контекст и т. Д. правильно.

Если вам нужно запустить функцию в фоновом режиме и не хотите устанавливать очередь для управления этим, вы можете использовать встроенный в Python threading или же multiprocessing порождать фонового работника.

Вы не можете получить доступ request или другие локальные потоки Flask из фоновых задач, так как запрос там не будет активным. Вместо этого передайте необходимые данные из представления в фоновый поток при его создании.

@app.route('/start_task')
def start_task():
    def do_work(value):
        # do something that takes a long time
        import time
        time.sleep(20)

    thread = Thread(target=do_work, kwargs={'value': request.args.get('value', 20))
    thread.start()
    return 'started'

Flask - это приложение WSGI, и в результате оно ничего не может обработать после ответа. Вот почему такого обработчика не существует, само приложение WSGI отвечает только за создание объекта итератора ответа на сервер WSGI.

Однако сервер WSGI (например, gunicorn) может очень легко предоставить эту функцию, но привязка приложения к серверу является очень плохой идеей по ряду причин.

Именно по этой причине WSGI предоставляет спецификацию для Middleware, а Werkzeug предоставляет несколько помощников для упрощения общих функций Middleware. Среди них есть класс ClosingIterator, который позволяет подключать методы до close метод ответного итератора, который выполняется после закрытия запроса.

Вот пример наивного after_response реализация сделана как расширение Flask:

import traceback
from werkzeug.wsgi import ClosingIterator

class AfterResponse:
    def __init__(self, app=None):
        self.callbacks = []
        if app:
            self.init_app(app)

    def __call__(self, callback):
        self.callbacks.append(callback)
        return callback

    def init_app(self, app):
        # install extension
        app.after_response = self

        # install middleware
        app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)

    def flush(self):
        for fn in self.callbacks:
            try:
                fn()
            except Exception:
                traceback.print_exc()

class AfterResponseMiddleware:
    def __init__(self, application, after_response_ext):
        self.application = application
        self.after_response_ext = after_response_ext

    def __call__(self, environ, after_response):
        iterator = self.application(environ, after_response)
        try:
            return ClosingIterator(iterator, [self.after_response_ext.flush])
        except Exception:
            traceback.print_exc()
            return iterator

Вы можете использовать это расширение следующим образом:

import flask
app = flask.Flask("after_response")
AfterResponse(app)

@app.after_response
def say_hi():
    print("hi")

@app.route("/")
def home():
    return "Success!\n"

Когда вы свернете "/", вы увидите следующее в ваших журналах:

127.0.0.1 - - [24/Jun/2018 19:30:48] "GET / HTTP/1.1" 200 -
hi

Это решает проблему просто без введения потоков (GIL??) или необходимости устанавливать и управлять очередью задач и клиентским программным обеспечением.

Flask теперь поддерживает (через Werkzeug) call_on_closeдекоратор обратного вызова для объектов ответа. Вот как вы его используете:

@app.after_request
def response_processor(response):
    # Prepare all the local variables you need since the request context
    # will be gone in the callback function

    @response.call_on_close
    def process_after_request():
        # Do whatever is necessary here
        pass

    return response

Преимущества:

  1. call_on_close настраивает функции для вызова после возврата ответа, используя спецификацию WSGI для close метод.

  2. Никаких потоков, никаких фоновых заданий, никакой сложной настройки. Он выполняется в том же потоке, не блокируя возврат запроса.

Недостатки:

  1. Нет контекста запроса или контекста приложения. Вы должны сохранить нужные вам переменные, чтобы перейти в закрытие.
  2. Нет локального стека, так как все, что сносится. Если вам нужно, вы должны создать свой собственный контекст приложения.
  3. Flask-SQLAlchemy потерпит неудачу, если вы попытаетесь записать в базу данных (я не понял почему, но, вероятно, из-за отключения контекста).(Это работает, но если у вас есть существующий объект, его необходимо добавить в новый сеанс, используяsession.add или session.merge; не недостаток!)

Я пробовал все эти методы, в том числе с использованием Thread, AfterResponse, call_on_close.

ВСЕ работает хорошо.

  1. Нить
      @app.route('/inner')
def foo():
    for i in range(10):
        sleep(1)
        print(i)
    return 

@app.route('/inner', methods=['POST'])
def run_jobs():
    try:
        thread = Thread(target=foo)
        thread.start()
        return render_template("index_inner.html", img_path=DIR_OF_PHOTOS, video_path=UPLOAD_VIDEOS_FOLDER)
                            
  1. AfterResponse
      app = Flask(__name__)
AfterResponse(app)

@app.route('/inner', methods=['POST'])
def save_data():
    pass

@app.after_response
def foo():
    for i in range(10):
        sleep(1)
        print(i)
    return 

3.close_on_close

      from time import sleep

from flask import Flask, Response, request


app = Flask('hello')


@app.route('/')
def hello():

    response = Response('hello')

    @response.call_on_close
    def on_close():
        for i in range(10):
            sleep(1)
            print(i)

    return response


if __name__ == '__main__':
    app.run()

Промежуточное решение для светокопий Flask

Это то же самое решение, предложенное Мэтью Стори (которое является ИМХО идеальным решением - спасибо Мэтью), адаптированное для Flask Blueprints. Секрет в том, чтобы получить контекст приложения, используя прокси-сервер current_app. Читайте здесь для получения дополнительной информации ( http://flask.pocoo.org/docs/1.0/appcontext/)

Давайте предположим, что классы AfterThisResponse и AfterThisResponseMiddleware размещены в модуле по адресу.utils.after_this_response.py

Тогда, когда происходит создание объекта Flask, вы можете иметь, например...

__init__.py

from api.routes import my_blueprint
from .utils.after_this_response import AfterThisResponse

app = Flask( __name__ )
AfterThisResponse( app )
app.register_blueprint( my_blueprint.mod )

И тогда в вашем модуле проекта...

a_blueprint.py

from flask import Blueprint, current_app

mod = Blueprint( 'a_blueprint', __name__, url_prefix=URL_PREFIX )

@mod.route( "/some_resource", methods=['GET', 'POST'] )
def some_resource():
    # do some stuff here if you want

    @current_app.after_this_response
    def post_process():
        # this will occur after you finish processing the route & return (below):
        time.sleep(2)
        print("after_response")

    # do more stuff here if you like & then return like so:
    return "Success!\n"

В дополнение к другим решениям вы можете выполнять определенные действия по маршруту, комбинируя after_this_request и response.call_on_close:

      @app.route('/')
def index():
    # Do your pre-response work here
    msg = 'Hello World!'
    @flask.after_this_request
    def add_close_action(response):
        @response.call_on_close
        def process_after_request():
            # Do your post-response work here
            time.sleep(3.0)
            print('Delayed: ' + msg)
        return response
    return msg

Спасибо Matthew Story и Paul Brackin, но мне нужно было изменить их предложения. Итак, рабочее решение:

.
├── __init__.py
├── blueprint.py
└── library.py
# __init__.py
from flask import Flask
from .blueprint import bp
from .library import AfterResponse

app = Flask(__name__)

with app.app_context():
    app.register_blueprint(bp, url_prefix='/')
    AfterResponse(app)
# blueprint.py
from flask import Blueprint, request, current_app as app
from time import sleep

bp = Blueprint('app', __name__)


@bp.route('/')
def root():
    body = request.json

    @app.after_response
    def worker():
        print(body)
        sleep(5)
        print('finished_after_processing')

    print('returned')
    return 'finished_fast', 200
# library.py
from werkzeug.wsgi import ClosingIterator
from traceback import print_exc


class AfterResponse:
    def __init__(self, application=None):
        self.functions = list()
        if application:
            self.init_app(application)    

    def __call__(self, function):
        self.functions.append(function)

    def init_app(self, application):
        application.after_response = self
        application.wsgi_app = AfterResponseMiddleware(application.wsgi_app, self)

    def flush(self):
        while self.functions:
            try:
                self.functions.pop()()
            except Exception:
                print_exc()


class AfterResponseMiddleware:
    def __init__(self, application, after_response_ext):
        self.application = application
        self.after_response_ext = after_response_ext

    def __call__(self, environ, after_response):
        iterator = self.application(environ, after_response)
        try:
            return ClosingIterator(iterator, [self.after_response_ext.flush])
        except Exception:
            print_exc()
            return iterator

Исходный код можно найти здесь

После прочитал много тем. Я нашел решение для себя, если использую Blueprint, он работает для python 3.8 и SQLAlchemy

init .py

      from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
import dir
import time
from flask_mail import Mail
from flask_cors import CORS
import flask_excel as excel
    # init SQLAlchemy so we can use it later in our models
dbb = SQLAlchemy()
def create_app():
     app = Flask(__name__)
     from .bp_route_1 import auth as bp_route_1_blueprint
     app.register_blueprint(bp_route_1_blueprint)
     CORS(app)
     return app

bp_route_1.py

      from flask import Blueprint, request, redirect, Response, url_for, abort, flash, render_template, \
    copy_current_request_context
from . import dbb 
from .models import #Import Models
from threading import Thread
bp_route_1 = Blueprint('bp_route_1', __name__)

@bp_route_1.route('/wehooks', methods=['POST'])
def route_1_wehooks_post():
    @copy_current_request_context #to copy request
    def foo_main():
        # insert your code here
        do_long_time_webhook(request)
    Thread(target=foo_main).start()
    print("do Webhook by Thread")
    return Response(status=200)
def do_long_time_webhook(request):
    try:
        data = request.get_data()
        print(data)
        #do long tim function for webhook data

    except Exception as e:
        print('Dont do webhook', e)

Сигнал request_finished получает Responseэкземпляр как параметр. Любая постобработка может быть произведена путем подключения к этому сигналу.

Из https://flask-doc.readthedocs.io/en/latest/signals.html:

def log_response(sender, response, **extra):
    sender.logger.debug('Request context is about to close down.  '
                        'Response: %s', response)

from flask import request_finished
request_finished.connect(log_response, app)

Obs: В случае ошибки сигнал got_request_exception можно использовать вместо этого.

Вы можете использовать этот код, который я пробовал. Это работает.

этот код будет печатать строку "сообщение". через 3 секунды, от времени планирования. Вы можете изменить время себя в соответствии с вашими требованиями.

import time, traceback
import threading

def every(delay,message, task):
  next_time = time.time() + delay
  time.sleep(max(0, next_time - time.time()))
  task(message)

def foo(message):
  print(message+"  :foo", time.time())



def main(message):
    threading.Thread(target=lambda: every(3,message, foo)).start()

main("message")
Другие вопросы по тегам