在 Post 请求中调用 Django Celery 任务时出错

Django Celery task error while invoked in Post request

我有以下任务要 运行 异步通知

@shared_task(name="notifications_task")
def notifications_task(activity_type,obj=None, target=None):
    dispatch_notification_activity(activity_type,obj, target) 

在 post 请求中的视图中,我将其称为 notfications_task 如下

 notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)

函数 dispatch_notification_activity 负责推送通知,因为它需要更多时间,所以我 运行 异步设置它

调用 notifications_task 时出现以下错误:kombu.exceptions.EncodeError:UpdateTypeEnum 类型的对象不是 JSON 可序列化的

Traceback (most recent call last):
  File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\exception.py", line 34, in inner
    response = get_response(request)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 115, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 113, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\decorators\csrf.py", line 54, in wrapped_view
    return view_func(*args, **kwargs)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\generic\base.py", line 71, in view
    return self.dispatch(request, *args, **kwargs)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 505, in dispatch
    response = self.handle_exception(exc)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 465, in handle_exception
    self.raise_uncaught_exception(exc)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 476, in raise_uncaught_exception
    raise exc
  File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 502, in dispatch
    response = handler(request, *args, **kwargs)
  File "E:\stocktalk-api-platform\apps\stock_watchlist\views.py", line 42, in post
    notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 421, in delay
    return self.apply_async(args, kwargs)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 561, in apply_async
    return app.send_task(
  File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\base.py", line 718, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\amqp.py", line 538, in send_task_message
    ret = producer.publish(
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 164, in publish
    body, content_type, content_encoding = self._prepare(
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 249, in _prepare
    body) = dumps(body, serializer=serializer)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
    payload = encoder(data)
  File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 53, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\exceptions.py", line 21, in reraise
    raise value.with_traceback(tb)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 49, in _reraise_errors
    yield
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
    payload = encoder(data)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 65, in dumps
    return _dumps(s, cls=cls or _default_encoder,
  File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\__init__.py", line 234, in dumps
    return cls(
  File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 55, in default
    return super().default(o)
  File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type UpdateTypeEnum is not JSON serializable

正如@Alexandr 所指出的,任务参数必须JSON 可序列化。您可以传递用户和目标 ID,以及枚举值,然后在任务中将它们序列化,如下所示:

notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED.value, obj_id=user.id, target_id=instance.id)

在任务中:

@shared_task(name="notifications_task")
def notifications_task(activity_type_val, obj_id=None, target_id=None):
    # Get activity type enum object from value
    activity_type = UpdateTypeEnum(activity_type_val)
    # Get object by id
    obj = YourUserModel.objects.get(id=obj_id)
    # Get target by id
    target = YourTargetModel.objects.get(id=target_id)
    # Now call yout method
    dispatch_notification_activity(activity_type, obj, target)