Вызвать обработчик событий socketio для колб вручную

У меня есть функция с декоратором колбы. В части моего приложения я использую сельдерей, и в этом процессе я хочу вызвать функцию ниже декоратора

@socketio.on('Example')
@authenticated_only                            
def example(data):
    ...

проблема в том, что событие использует current_user внутри, как я могу эмулировать обычный вызов? просто для эффективности кода, поэтому мне не нужно воссоздавать ту же функцию внутри задачи сельдерея.

Я думал что-то вроде этого:

@celery.task()
def celery_task:
    current_user=User.query.filter...
    example(data)

Но я не уверен насчет этого решения, и я даже не пробую его

1 ответ

Решение

Вы можете сделать что-то вроде этого:

def basic_example(data, user):
    # do everything here

@socketio.on('Example')
@authenticated_only                            
def example(data):
    basic_example(data, current_user)

@celery.task()
def celery_task:
    user = User.query.filter...
    basic_example(data, user)
Другие вопросы по тегам