手动调用 flask socketio 事件处理程序
Call a flask socketio event handler manually
我有一个带有 flask-socketio 装饰器的函数。在我的应用程序的一部分中,我正在使用芹菜,在那个过程中,我想调用装饰器下面的函数
@socketio.on('Example')
@authenticated_only
def example(data):
...
问题是事件在内部使用 current_user,我如何模拟常规调用?只是为了提高代码效率,所以我不必在 celery 任务中重新创建完全相同的函数。
我是这样想的:
@celery.task()
def celery_task:
current_user=User.query.filter...
example(data)
但我不确定该解决方案,我什至没有尝试过
你可以这样做:
def basic_example(data, user):
# do everything here
@socketio.on('Example')
@authenticated_only
def example(data):
basic_example(data, current_user)
@celery.task()
def celery_task:
user = User.query.filter...
basic_example(data, user)
我有一个带有 flask-socketio 装饰器的函数。在我的应用程序的一部分中,我正在使用芹菜,在那个过程中,我想调用装饰器下面的函数
@socketio.on('Example')
@authenticated_only
def example(data):
...
问题是事件在内部使用 current_user,我如何模拟常规调用?只是为了提高代码效率,所以我不必在 celery 任务中重新创建完全相同的函数。
我是这样想的:
@celery.task()
def celery_task:
current_user=User.query.filter...
example(data)
但我不确定该解决方案,我什至没有尝试过
你可以这样做:
def basic_example(data, user):
# do everything here
@socketio.on('Example')
@authenticated_only
def example(data):
basic_example(data, current_user)
@celery.task()
def celery_task:
user = User.query.filter...
basic_example(data, user)