如何将 django class 方法从 views.py 继承到 tasks.py 以使用 Celery 安排作业?

How to inherit a django class method from views.py into tasks.py to schedule jobs with Celery?

我正在尝试使用 Celery 在 Django 中安排作业。问题是我无法让 tasks.py 中的 @shared_task 函数从 views.py 继承 class 方法。 我将 Celery 与 Flower 和 Beat 一起安装,当我在 tasks.py 中编写一个独立函数时,配置运行良好。 你对如何解决这个继承挑战有什么建议吗?感谢您的帮助。

我的views.py

from rest_framework import generics
from .serializers import TickerSerializer
from ticker.models import Ticker
import requests
import logging
from itertools import chain
from importlib import reload
import sys


logger = logging.getLogger(__name__)

class TickerList(generics.ListAPIView):
    serializer_class = TickerSerializer
    queryset = Ticker.objects.all()
    
class TickerRetrieve(generics.RetrieveAPIView):
    serializer_class = TickerSerializer
    queryset = Ticker.objects.all()
    lookup_field = 'id'   
    
    def get_object(request, **args):
        url = 'https://api.kraken.com/0/public/Ticker?pair=1INCHEUR,1INCHUSD'
        response = requests.get(url)
        data = response.json()
        Ticker.objects.all().delete()
        for key in data['result']:
            if isinstance(data['result'][key], int):
                continue
            crypto_data =data['result'][key]
            ticker_data = Ticker(crypto = (key),
                                a_0 = (crypto_data.get('a')[0]),
                                a_1 = (crypto_data.get('a')[1]),
                                )

            ticker_data.save()

我的tasks.py

from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command

a = TickerRetrieve()
    
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task(ticker_data):
    return a.get_object(ticker_data)

Flower 和本地服务器显示计划任务正常工作,但我无法让 tasks.py 触发 views.py 到 运行 中的 class 方法。 我在这里浏览了几个示例并阅读了文档,但 none 添加了一个类似的示例。

解决方法是在 tasks.py 中不带任何参数地返回函数名 a.get_object() 自动化任务现在运转正常。

tasks.py

from __future__ import absolute_import, unicode_literals
from celery.utils.log import get_task_logger
import sys
from celery import shared_task
from ticker.views import TickerRetrieve
from django.core.management import call_command


a = TickerRetrieve()
    
@shared_task(name="updatemy_ticker")
def updatemy_ticker_task():
    return a.get_object()