为什么向 Celery 添加参数会导致 Python 中的测试出错?

Why adding arguments to Celery causing error on tests in Python?

我正在尝试测试 Celery 应用程序,这是我的代码

@celery.task(bind=True, default_retry_delay=30)
def convert_video(gif_url, webhook):
    // doing something awesome
       return
    except Exception as exc:
       raise convert_video.retry(exc=exc)

在我的测试中我有这个。

server.convert_video.apply(args=('some_gif', 'http://www.company.com?attachment_id=123')).get()

添加后 bind=True, default_retry_delay=30 我得到这个错误

TypeError: convert_video() takes exactly 2 arguments (3 given)

老实说,我从来没有用过芹菜,但快速浏览一下他们的 docsbind 论点表明:

The bind argument means that the function will be a “bound method” so that you can access attributes and methods on the task type instance.

如果这是 class 上的方法,您通常只会使用它,而不是独立函数。作为 class 上的方法,它的第一个参数是 self.

您正在使用 bind 参数,这意味着传递给函数的第一个参数将是任务实例 - 它有效地使函数成为 Task [=15= 的方法].

@celery.task(bind=True, default_retry_delay=30)
def convert_video(self, gif_url, webhook):
    try:
        log.info('here we have access to task metadata like id: %s', self.request.id)
        return
    except Exception as exc:
        raise convert_video.retry(exc=exc)