Celery | Flask error: expected a bytes-like object, AsyncResult found
Celery | Flask error: expected a bytes-like object, AsyncResult found
我目前正在开发一个 flask 应用程序 (Python 3.6),并且想集成 celery,因为我有多个长 运行 后台任务。
编辑:Celery 4.1
集成没问题,芹菜任务正确执行,但我无法访问 运行 任务的当前状态。
Celery,Flask 设置:
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config["result_backend"],
broker=app.config["broker_url"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"
celery_app = make_celery(app)
芹菜任务:
@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
print(exchange, currency_a, currency_b)
time.sleep(50)
调用任务并将值存储在字典中:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id
获取任务状态:
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id
这是错误出现的地方。当我只打印结果对象时,它只打印 task_id。如果我尝试检索当前状态,我会引发以下异常:
TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
解释:
当您调用任务时:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
您的变量 task_id
是 AsyncResult
的实例,它不是字符串。
因此,您的变量 TASK_STATES[market.__id__()]
也是 AsyncResult
的一个实例,而它应该是一个字符串。
然后你试图用它实例化一个 AsyncResult
对象
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
所以你正在用另一个 AsyncResult
对象实例化一个 AsyncResult
对象,而它应该用一个字符串实例化。
也许您的困惑来自您的 print(task_id)
,它向您显示了一个字符串,但是当您这样做时,在幕后会调用 AsyncResult
对象的 __str__
方法,如果你在源码里看 here,
def __str__(self):
"""`str(self) -> self.id`."""
return str(self.id)
它只是打印 task_id
对象的 id
属性。
解决方案:
您可以通过 TASK_STATES[id] = task_id.id
或
来修复它
result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))
我目前正在开发一个 flask 应用程序 (Python 3.6),并且想集成 celery,因为我有多个长 运行 后台任务。
编辑:Celery 4.1
集成没问题,芹菜任务正确执行,但我无法访问 运行 任务的当前状态。
Celery,Flask 设置:
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config["result_backend"],
broker=app.config["broker_url"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"
celery_app = make_celery(app)
芹菜任务:
@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
print(exchange, currency_a, currency_b)
time.sleep(50)
调用任务并将值存储在字典中:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id
获取任务状态:
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id
这是错误出现的地方。当我只打印结果对象时,它只打印 task_id。如果我尝试检索当前状态,我会引发以下异常:
TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
解释:
当您调用任务时:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
您的变量 task_id
是 AsyncResult
的实例,它不是字符串。
因此,您的变量 TASK_STATES[market.__id__()]
也是 AsyncResult
的一个实例,而它应该是一个字符串。
然后你试图用它实例化一个 AsyncResult
对象
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
所以你正在用另一个 AsyncResult
对象实例化一个 AsyncResult
对象,而它应该用一个字符串实例化。
也许您的困惑来自您的 print(task_id)
,它向您显示了一个字符串,但是当您这样做时,在幕后会调用 AsyncResult
对象的 __str__
方法,如果你在源码里看 here,
def __str__(self):
"""`str(self) -> self.id`."""
return str(self.id)
它只是打印 task_id
对象的 id
属性。
解决方案:
您可以通过 TASK_STATES[id] = task_id.id
或
result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))