通过 Celery 任务调用 DRF ViewSet
Call DRF ViewSet via Celery task
我有一个 Django Rest 框架视图集:
MyModelViewSet(generics.RetrieveUpdateDestroyAPIView):
def perform_destroy(self, instance):
# do something besides deleting the object
现在我正在编写一个 Celery 周期性任务,它根据过滤器删除过期对象(假设 end_date < now
)。
我希望任务重用并执行在 ViewSet 的 perform_destroy
方法中执行的相同操作。
这能做到吗?怎么样?
谢谢!
您可以通过在 DRF 中使用 Request 来解决您的问题,安排 celery 任务来调用请求。效果很好,我以前实现过。
示例代码:
from rest_framework.request import Request as DRFRequest
from django.conf import settings
from django.http import HttpRequest
from your_module.views import MyModelViewSet
CELERY_CACHING_QUEUE = getattr(settings, "CELERY_CACHING_QUEUE", None)
def delete_resource(resource_pk: int) -> None:
"""
This method helps to delete the resource by the id.
"""
print(f'Starting deleting resource {resource_pk}...')
request = HttpRequest()
request.method = 'DELETE'
request.META = {
'SERVER_NAME': settings.ALLOWED_HOSTS[0],
'SERVER_PORT': 443
}
drf_request = DRFRequest(request)
# If your API need user has access permission,
# you should handle for getting the value of
# user_has_access_permission before
# E.g. below
# drf_request.user = user_has_access_permission
try:
view = MyModelViewSet(
kwargs={
'pk': resource_pk
},
request=drf_request
)
view.initial(drf_request)
view.delete(drf_request)
except (Exception, KeyError) as e:
print(f'Cannot delete resource: {resource_pk}, error: {e}')
return
print(f'Finished deleting resource {resource_pk}...')
@task(name="delete_resource_task", queue=CELERY_CACHING_QUEUE)
def delete_resource_task(resource_pk: int) -> None:
"""
Async task helps to delete resource.
"""
delete_resource(resource_pk)
我有一个 Django Rest 框架视图集:
MyModelViewSet(generics.RetrieveUpdateDestroyAPIView):
def perform_destroy(self, instance):
# do something besides deleting the object
现在我正在编写一个 Celery 周期性任务,它根据过滤器删除过期对象(假设 end_date < now
)。
我希望任务重用并执行在 ViewSet 的 perform_destroy
方法中执行的相同操作。
这能做到吗?怎么样?
谢谢!
您可以通过在 DRF 中使用 Request 来解决您的问题,安排 celery 任务来调用请求。效果很好,我以前实现过。
示例代码:
from rest_framework.request import Request as DRFRequest
from django.conf import settings
from django.http import HttpRequest
from your_module.views import MyModelViewSet
CELERY_CACHING_QUEUE = getattr(settings, "CELERY_CACHING_QUEUE", None)
def delete_resource(resource_pk: int) -> None:
"""
This method helps to delete the resource by the id.
"""
print(f'Starting deleting resource {resource_pk}...')
request = HttpRequest()
request.method = 'DELETE'
request.META = {
'SERVER_NAME': settings.ALLOWED_HOSTS[0],
'SERVER_PORT': 443
}
drf_request = DRFRequest(request)
# If your API need user has access permission,
# you should handle for getting the value of
# user_has_access_permission before
# E.g. below
# drf_request.user = user_has_access_permission
try:
view = MyModelViewSet(
kwargs={
'pk': resource_pk
},
request=drf_request
)
view.initial(drf_request)
view.delete(drf_request)
except (Exception, KeyError) as e:
print(f'Cannot delete resource: {resource_pk}, error: {e}')
return
print(f'Finished deleting resource {resource_pk}...')
@task(name="delete_resource_task", queue=CELERY_CACHING_QUEUE)
def delete_resource_task(resource_pk: int) -> None:
"""
Async task helps to delete resource.
"""
delete_resource(resource_pk)