手动调用 flask socketio 事件处理程序

Call a flask socketio event handler manually

我有一个带有 flask-socketio 装饰器的函数。在我的应用程序的一部分中,我正在使用芹菜,在那个过程中,我想调用装饰器下面的函数

@socketio.on('Example')
@authenticated_only                            
def example(data):
    ...

问题是事件在内部使用 current_user,我如何模拟常规调用?只是为了提高代码效率,所以我不必在 celery 任务中重新创建完全相同的函数。

我是这样想的:

@celery.task()
def celery_task:
    current_user=User.query.filter...
    example(data)

但我不确定该解决方案,我什至没有尝试过

你可以这样做:

def basic_example(data, user):
    # do everything here

@socketio.on('Example')
@authenticated_only                            
def example(data):
    basic_example(data, current_user)

@celery.task()
def celery_task:
    user = User.query.filter...
    basic_example(data, user)