在 Post 请求中调用 Django Celery 任务时出错
Django Celery task error while invoked in Post request
我有以下任务要 运行 异步通知
@shared_task(name="notifications_task")
def notifications_task(activity_type,obj=None, target=None):
dispatch_notification_activity(activity_type,obj, target)
在 post 请求中的视图中,我将其称为 notfications_task 如下
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)
函数 dispatch_notification_activity 负责推送通知,因为它需要更多时间,所以我 运行 异步设置它
调用 notifications_task 时出现以下错误:kombu.exceptions.EncodeError:UpdateTypeEnum 类型的对象不是 JSON 可序列化的
Traceback (most recent call last):
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\exception.py", line 34, in inner
response = get_response(request)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 115, in _get_response
response = self.process_exception_by_middleware(e, request)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 113, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\decorators\csrf.py", line 54, in wrapped_view
return view_func(*args, **kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\generic\base.py", line 71, in view
return self.dispatch(request, *args, **kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 505, in dispatch
response = self.handle_exception(exc)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 465, in handle_exception
self.raise_uncaught_exception(exc)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 476, in raise_uncaught_exception
raise exc
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 502, in dispatch
response = handler(request, *args, **kwargs)
File "E:\stocktalk-api-platform\apps\stock_watchlist\views.py", line 42, in post
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 421, in delay
return self.apply_async(args, kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 561, in apply_async
return app.send_task(
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\base.py", line 718, in send_task
amqp.send_task_message(P, name, message, **options)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\amqp.py", line 538, in send_task_message
ret = producer.publish(
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 164, in publish
body, content_type, content_encoding = self._prepare(
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 249, in _prepare
body) = dumps(body, serializer=serializer)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
payload = encoder(data)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 53, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\exceptions.py", line 21, in reraise
raise value.with_traceback(tb)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 49, in _reraise_errors
yield
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
payload = encoder(data)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 65, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\__init__.py", line 234, in dumps
return cls(
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 55, in default
return super().default(o)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type UpdateTypeEnum is not JSON serializable
正如@Alexandr 所指出的,任务参数必须JSON 可序列化。您可以传递用户和目标 ID,以及枚举值,然后在任务中将它们序列化,如下所示:
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED.value, obj_id=user.id, target_id=instance.id)
在任务中:
@shared_task(name="notifications_task")
def notifications_task(activity_type_val, obj_id=None, target_id=None):
# Get activity type enum object from value
activity_type = UpdateTypeEnum(activity_type_val)
# Get object by id
obj = YourUserModel.objects.get(id=obj_id)
# Get target by id
target = YourTargetModel.objects.get(id=target_id)
# Now call yout method
dispatch_notification_activity(activity_type, obj, target)
我有以下任务要 运行 异步通知
@shared_task(name="notifications_task")
def notifications_task(activity_type,obj=None, target=None):
dispatch_notification_activity(activity_type,obj, target)
在 post 请求中的视图中,我将其称为 notfications_task 如下
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)
函数 dispatch_notification_activity 负责推送通知,因为它需要更多时间,所以我 运行 异步设置它
调用 notifications_task 时出现以下错误:kombu.exceptions.EncodeError:UpdateTypeEnum 类型的对象不是 JSON 可序列化的
Traceback (most recent call last):
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\exception.py", line 34, in inner
response = get_response(request)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 115, in _get_response
response = self.process_exception_by_middleware(e, request)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\core\handlers\base.py", line 113, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\decorators\csrf.py", line 54, in wrapped_view
return view_func(*args, **kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\django\views\generic\base.py", line 71, in view
return self.dispatch(request, *args, **kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 505, in dispatch
response = self.handle_exception(exc)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 465, in handle_exception
self.raise_uncaught_exception(exc)
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 476, in raise_uncaught_exception
raise exc
File "E:\stocktalk-api-platform\venv\lib\site-packages\rest_framework\views.py", line 502, in dispatch
response = handler(request, *args, **kwargs)
File "E:\stocktalk-api-platform\apps\stock_watchlist\views.py", line 42, in post
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED, obj=user, target=instance)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 421, in delay
return self.apply_async(args, kwargs)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\task.py", line 561, in apply_async
return app.send_task(
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\base.py", line 718, in send_task
amqp.send_task_message(P, name, message, **options)
File "E:\stocktalk-api-platform\venv\lib\site-packages\celery\app\amqp.py", line 538, in send_task_message
ret = producer.publish(
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 164, in publish
body, content_type, content_encoding = self._prepare(
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\messaging.py", line 249, in _prepare
body) = dumps(body, serializer=serializer)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
payload = encoder(data)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 53, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\exceptions.py", line 21, in reraise
raise value.with_traceback(tb)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 49, in _reraise_errors
yield
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\serialization.py", line 220, in dumps
payload = encoder(data)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 65, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\__init__.py", line 234, in dumps
return cls(
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "E:\stocktalk-api-platform\venv\lib\site-packages\kombu\utils\json.py", line 55, in default
return super().default(o)
File "C:\Users\Fleetstudio\AppData\Local\Programs\Python\Python38\lib\json\encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type UpdateTypeEnum is not JSON serializable
正如@Alexandr 所指出的,任务参数必须JSON 可序列化。您可以传递用户和目标 ID,以及枚举值,然后在任务中将它们序列化,如下所示:
notifications_task.delay(UpdateTypeEnum.STOCK_WATCHED.value, obj_id=user.id, target_id=instance.id)
在任务中:
@shared_task(name="notifications_task")
def notifications_task(activity_type_val, obj_id=None, target_id=None):
# Get activity type enum object from value
activity_type = UpdateTypeEnum(activity_type_val)
# Get object by id
obj = YourUserModel.objects.get(id=obj_id)
# Get target by id
target = YourTargetModel.objects.get(id=target_id)
# Now call yout method
dispatch_notification_activity(activity_type, obj, target)