如何将 django class 方法从 views.py 继承到 tasks.py 以使用 Celery 安排作业?
How to inherit a django class method from views.py into tasks.py to schedule jobs with Celery?
我正在尝试使用 Celery 在 Django 中安排作业。问题是我无法让 tasks.py 中的 @shared_task 函数从 views.py 继承 class 方法。
我将 Celery 与 Flower 和 Beat 一起安装,当我在 tasks.py 中编写一个独立函数时,配置运行良好。
你对如何解决这个继承挑战有什么建议吗?感谢您的帮助。
我的views.py
from rest_framework import generics
from .serializers import TickerSerializer
from ticker.models import Ticker
import requests
import logging
from itertools import chain
from importlib import reload
import sys
logger = logging.getLogger(__name__)
class TickerList(generics.ListAPIView):
serializer_class = TickerSerializer
queryset = Ticker.objects.all()
class TickerRetrieve(generics.RetrieveAPIView):
serializer_class = TickerSerializer
queryset = Ticker.objects.all()
lookup_field = 'id'
def get_object(request, **args):
url = 'https://api.kraken.com/0/public/Ticker?pair=1INCHEUR,1INCHUSD'
response = requests.get(url)
data = response.json()
Ticker.objects.all().delete()
for key in data['result']:
if isinstance(data['result'][key], int):
continue
crypto_data =data['result'][key]
ticker_data = Ticker(crypto = (key),
a_0 = (crypto_data.get('a')[0]),
a_1 = (crypto_data.get('a')[1]),
)
ticker_data.save()
我的tasks.py
from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command
a = TickerRetrieve()
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task(ticker_data):
return a.get_object(ticker_data)
Flower 和本地服务器显示计划任务正常工作,但我无法让 tasks.py 触发 views.py 到 运行 中的 class 方法。
我在这里浏览了几个示例并阅读了文档,但 none 添加了一个类似的示例。
解决方法是在 tasks.py 中不带任何参数地返回函数名 a.get_object()
自动化任务现在运转正常。
tasks.py
from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command
a = TickerRetrieve()
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task():
return a.get_object()
我正在尝试使用 Celery 在 Django 中安排作业。问题是我无法让 tasks.py 中的 @shared_task 函数从 views.py 继承 class 方法。 我将 Celery 与 Flower 和 Beat 一起安装,当我在 tasks.py 中编写一个独立函数时,配置运行良好。 你对如何解决这个继承挑战有什么建议吗?感谢您的帮助。
我的views.py
from rest_framework import generics
from .serializers import TickerSerializer
from ticker.models import Ticker
import requests
import logging
from itertools import chain
from importlib import reload
import sys
logger = logging.getLogger(__name__)
class TickerList(generics.ListAPIView):
serializer_class = TickerSerializer
queryset = Ticker.objects.all()
class TickerRetrieve(generics.RetrieveAPIView):
serializer_class = TickerSerializer
queryset = Ticker.objects.all()
lookup_field = 'id'
def get_object(request, **args):
url = 'https://api.kraken.com/0/public/Ticker?pair=1INCHEUR,1INCHUSD'
response = requests.get(url)
data = response.json()
Ticker.objects.all().delete()
for key in data['result']:
if isinstance(data['result'][key], int):
continue
crypto_data =data['result'][key]
ticker_data = Ticker(crypto = (key),
a_0 = (crypto_data.get('a')[0]),
a_1 = (crypto_data.get('a')[1]),
)
ticker_data.save()
我的tasks.py
from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command
a = TickerRetrieve()
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task(ticker_data):
return a.get_object(ticker_data)
Flower 和本地服务器显示计划任务正常工作,但我无法让 tasks.py 触发 views.py 到 运行 中的 class 方法。 我在这里浏览了几个示例并阅读了文档,但 none 添加了一个类似的示例。
解决方法是在 tasks.py 中不带任何参数地返回函数名 a.get_object() 自动化任务现在运转正常。
tasks.py
from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command
a = TickerRetrieve()
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task():
return a.get_object()