Выполнить функцию после того, как Flask вернет ответ
У меня есть код, который нужно выполнить после того, как Flask вернет ответ. Я не думаю, что это достаточно сложно, чтобы настроить очередь задач, как Celery для него. Ключевое требование заключается в том, что Flask должен вернуть ответ клиенту перед запуском этой функции. Он не может ждать выполнения функции.
Есть некоторые существующие вопросы по этому поводу, но ни один из ответов, по-видимому, не относится к выполнению задачи после отправки ответа клиенту, они все равно выполняются синхронно, а затем возвращается ответ.
10 ответов
Короче говоря, Flask не предоставляет никаких специальных возможностей для достижения этой цели. Для простых одноразовых задач рассмотрите многопоточность Python, как показано ниже. Для более сложных конфигураций используйте очередь задач, такую как RQ или Celery.
Зачем?
Важно понимать, какие функции обеспечивает Flask и почему они не достигают намеченной цели. Все это полезно в других случаях и хорошо читается, но не помогает при выполнении фоновых задач.
Колба-х after_request
обработчик
Колба-х after_request
Обработчик, как подробно описано в этом шаблоне для обратных вызовов с отложенным запросом и этого фрагмента при присоединении различных функций к запросу, передает запрос в функцию обратного вызова. Предполагаемый вариант использования - изменить запрос, например прикрепить файл cookie.
Таким образом, запрос будет ожидать завершения этих обработчиков, потому что ожидается, что сам запрос изменится в результате.
Колба-х teardown_request
обработчик
Это похоже на after_request
, но teardown_request
не получает request
объект. Значит, это не будет ждать запроса, верно?
Это похоже на решение, как предполагает ответ на аналогичный вопрос переполнения стека. А поскольку документация Flask объясняет, что обратные вызовы разрыва не зависят от фактического запроса и не получают контекст запроса, у вас есть веские основания верить этому.
К несчастью, teardown_request
все еще синхронный, это происходит в более поздней части обработки запроса Flask, когда запрос больше не может быть изменен. Flask по- прежнему будет ждать завершения функций разрыва перед возвратом ответа, как предписывает этот список обратных вызовов и ошибок Flask.
Потоковые ответы Flask
Flask может передавать ответы, передавая генератор Response()
, так как этот ответ переполнения стека предлагает аналогичный вопрос.
При потоковой передаче клиент начинает получать ответ до завершения запроса. Однако запрос по-прежнему выполняется синхронно, поэтому работник, обрабатывающий запрос, занят до тех пор, пока поток не будет завершен.
Этот шаблон Flask для потоковой передачи включает в себя некоторую документацию по использованию stream_with_context()
, что необходимо для включения контекста запроса.
Так в чем же решение?
Flask не предлагает решение для запуска функций в фоновом режиме, потому что это не ответственность Flask.
В большинстве случаев лучшим способом решения этой проблемы является использование очереди задач, такой как RQ или Celery. Они управляют такими сложными вещами, как конфигурация, планирование и распределение работников. Это наиболее распространенный ответ на этот тип вопросов, потому что он наиболее правильный и заставляет вас настраивать вещи так, чтобы вы учитывали контекст и т. Д. правильно.
Если вам нужно запустить функцию в фоновом режиме и не хотите устанавливать очередь для управления этим, вы можете использовать встроенный в Python threading
или же multiprocessing
порождать фонового работника.
Вы не можете получить доступ request
или другие локальные потоки Flask из фоновых задач, так как запрос там не будет активным. Вместо этого передайте необходимые данные из представления в фоновый поток при его создании.
@app.route('/start_task')
def start_task():
def do_work(value):
# do something that takes a long time
import time
time.sleep(20)
thread = Thread(target=do_work, kwargs={'value': request.args.get('value', 20))
thread.start()
return 'started'
Flask - это приложение WSGI, и в результате оно ничего не может обработать после ответа. Вот почему такого обработчика не существует, само приложение WSGI отвечает только за создание объекта итератора ответа на сервер WSGI.
Однако сервер WSGI (например, gunicorn) может очень легко предоставить эту функцию, но привязка приложения к серверу является очень плохой идеей по ряду причин.
Именно по этой причине WSGI предоставляет спецификацию для Middleware, а Werkzeug предоставляет несколько помощников для упрощения общих функций Middleware. Среди них есть класс ClosingIterator, который позволяет подключать методы до close
метод ответного итератора, который выполняется после закрытия запроса.
Вот пример наивного after_response
реализация сделана как расширение Flask:
import traceback
from werkzeug.wsgi import ClosingIterator
class AfterResponse:
def __init__(self, app=None):
self.callbacks = []
if app:
self.init_app(app)
def __call__(self, callback):
self.callbacks.append(callback)
return callback
def init_app(self, app):
# install extension
app.after_response = self
# install middleware
app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)
def flush(self):
for fn in self.callbacks:
try:
fn()
except Exception:
traceback.print_exc()
class AfterResponseMiddleware:
def __init__(self, application, after_response_ext):
self.application = application
self.after_response_ext = after_response_ext
def __call__(self, environ, after_response):
iterator = self.application(environ, after_response)
try:
return ClosingIterator(iterator, [self.after_response_ext.flush])
except Exception:
traceback.print_exc()
return iterator
Вы можете использовать это расширение следующим образом:
import flask
app = flask.Flask("after_response")
AfterResponse(app)
@app.after_response
def say_hi():
print("hi")
@app.route("/")
def home():
return "Success!\n"
Когда вы свернете "/", вы увидите следующее в ваших журналах:
127.0.0.1 - - [24/Jun/2018 19:30:48] "GET / HTTP/1.1" 200 -
hi
Это решает проблему просто без введения потоков (GIL??) или необходимости устанавливать и управлять очередью задач и клиентским программным обеспечением.
Flask теперь поддерживает (через Werkzeug) call_on_close
декоратор обратного вызова для объектов ответа. Вот как вы его используете:
@app.after_request
def response_processor(response):
# Prepare all the local variables you need since the request context
# will be gone in the callback function
@response.call_on_close
def process_after_request():
# Do whatever is necessary here
pass
return response
Преимущества:
call_on_close
настраивает функции для вызова после возврата ответа, используя спецификацию WSGI дляclose
метод.Никаких потоков, никаких фоновых заданий, никакой сложной настройки. Он выполняется в том же потоке, не блокируя возврат запроса.
Недостатки:
- Нет контекста запроса или контекста приложения. Вы должны сохранить нужные вам переменные, чтобы перейти в закрытие.
- Нет локального стека, так как все, что сносится. Если вам нужно, вы должны создать свой собственный контекст приложения.
Flask-SQLAlchemy потерпит неудачу, если вы попытаетесь записать в базу данных (я не понял почему, но, вероятно, из-за отключения контекста).(Это работает, но если у вас есть существующий объект, его необходимо добавить в новый сеанс, используяsession.add
илиsession.merge
; не недостаток!)
Я пробовал все эти методы, в том числе с использованием Thread, AfterResponse, call_on_close.
ВСЕ работает хорошо.
- Нить
@app.route('/inner')
def foo():
for i in range(10):
sleep(1)
print(i)
return
@app.route('/inner', methods=['POST'])
def run_jobs():
try:
thread = Thread(target=foo)
thread.start()
return render_template("index_inner.html", img_path=DIR_OF_PHOTOS, video_path=UPLOAD_VIDEOS_FOLDER)
- AfterResponse
app = Flask(__name__)
AfterResponse(app)
@app.route('/inner', methods=['POST'])
def save_data():
pass
@app.after_response
def foo():
for i in range(10):
sleep(1)
print(i)
return
3.close_on_close
from time import sleep
from flask import Flask, Response, request
app = Flask('hello')
@app.route('/')
def hello():
response = Response('hello')
@response.call_on_close
def on_close():
for i in range(10):
sleep(1)
print(i)
return response
if __name__ == '__main__':
app.run()
Промежуточное решение для светокопий Flask
Это то же самое решение, предложенное Мэтью Стори (которое является ИМХО идеальным решением - спасибо Мэтью), адаптированное для Flask Blueprints. Секрет в том, чтобы получить контекст приложения, используя прокси-сервер current_app. Читайте здесь для получения дополнительной информации ( http://flask.pocoo.org/docs/1.0/appcontext/)
Давайте предположим, что классы AfterThisResponse и AfterThisResponseMiddleware размещены в модуле по адресу.utils.after_this_response.py
Тогда, когда происходит создание объекта Flask, вы можете иметь, например...
__init__.py
from api.routes import my_blueprint
from .utils.after_this_response import AfterThisResponse
app = Flask( __name__ )
AfterThisResponse( app )
app.register_blueprint( my_blueprint.mod )
И тогда в вашем модуле проекта...
a_blueprint.py
from flask import Blueprint, current_app
mod = Blueprint( 'a_blueprint', __name__, url_prefix=URL_PREFIX )
@mod.route( "/some_resource", methods=['GET', 'POST'] )
def some_resource():
# do some stuff here if you want
@current_app.after_this_response
def post_process():
# this will occur after you finish processing the route & return (below):
time.sleep(2)
print("after_response")
# do more stuff here if you like & then return like so:
return "Success!\n"
В дополнение к другим решениям вы можете выполнять определенные действия по маршруту, комбинируя after_this_request и response.call_on_close:
@app.route('/')
def index():
# Do your pre-response work here
msg = 'Hello World!'
@flask.after_this_request
def add_close_action(response):
@response.call_on_close
def process_after_request():
# Do your post-response work here
time.sleep(3.0)
print('Delayed: ' + msg)
return response
return msg
Спасибо Matthew Story и Paul Brackin, но мне нужно было изменить их предложения. Итак, рабочее решение:
.
├── __init__.py
├── blueprint.py
└── library.py
# __init__.py
from flask import Flask
from .blueprint import bp
from .library import AfterResponse
app = Flask(__name__)
with app.app_context():
app.register_blueprint(bp, url_prefix='/')
AfterResponse(app)
# blueprint.py
from flask import Blueprint, request, current_app as app
from time import sleep
bp = Blueprint('app', __name__)
@bp.route('/')
def root():
body = request.json
@app.after_response
def worker():
print(body)
sleep(5)
print('finished_after_processing')
print('returned')
return 'finished_fast', 200
# library.py
from werkzeug.wsgi import ClosingIterator
from traceback import print_exc
class AfterResponse:
def __init__(self, application=None):
self.functions = list()
if application:
self.init_app(application)
def __call__(self, function):
self.functions.append(function)
def init_app(self, application):
application.after_response = self
application.wsgi_app = AfterResponseMiddleware(application.wsgi_app, self)
def flush(self):
while self.functions:
try:
self.functions.pop()()
except Exception:
print_exc()
class AfterResponseMiddleware:
def __init__(self, application, after_response_ext):
self.application = application
self.after_response_ext = after_response_ext
def __call__(self, environ, after_response):
iterator = self.application(environ, after_response)
try:
return ClosingIterator(iterator, [self.after_response_ext.flush])
except Exception:
print_exc()
return iterator
Исходный код можно найти здесь
После прочитал много тем. Я нашел решение для себя, если использую Blueprint, он работает для python 3.8 и SQLAlchemy
init .py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
import dir
import time
from flask_mail import Mail
from flask_cors import CORS
import flask_excel as excel
# init SQLAlchemy so we can use it later in our models
dbb = SQLAlchemy()
def create_app():
app = Flask(__name__)
from .bp_route_1 import auth as bp_route_1_blueprint
app.register_blueprint(bp_route_1_blueprint)
CORS(app)
return app
bp_route_1.py
from flask import Blueprint, request, redirect, Response, url_for, abort, flash, render_template, \
copy_current_request_context
from . import dbb
from .models import #Import Models
from threading import Thread
bp_route_1 = Blueprint('bp_route_1', __name__)
@bp_route_1.route('/wehooks', methods=['POST'])
def route_1_wehooks_post():
@copy_current_request_context #to copy request
def foo_main():
# insert your code here
do_long_time_webhook(request)
Thread(target=foo_main).start()
print("do Webhook by Thread")
return Response(status=200)
def do_long_time_webhook(request):
try:
data = request.get_data()
print(data)
#do long tim function for webhook data
except Exception as e:
print('Dont do webhook', e)
Сигнал request_finished
получает Response
экземпляр как параметр. Любая постобработка может быть произведена путем подключения к этому сигналу.
Из https://flask-doc.readthedocs.io/en/latest/signals.html:
def log_response(sender, response, **extra):
sender.logger.debug('Request context is about to close down. '
'Response: %s', response)
from flask import request_finished
request_finished.connect(log_response, app)
Obs: В случае ошибки сигнал got_request_exception
можно использовать вместо этого.
Вы можете использовать этот код, который я пробовал. Это работает.
этот код будет печатать строку "сообщение". через 3 секунды, от времени планирования. Вы можете изменить время себя в соответствии с вашими требованиями.
import time, traceback
import threading
def every(delay,message, task):
next_time = time.time() + delay
time.sleep(max(0, next_time - time.time()))
task(message)
def foo(message):
print(message+" :foo", time.time())
def main(message):
threading.Thread(target=lambda: every(3,message, foo)).start()
main("message")