Django Celery 使用 JS 微服务创建任务
Django Celery create task with JS microservice
你好,我有一个 Django 应用程序 运行 celery,我正在尝试使用 JS 中的微服务将任务添加到队列中。当我通过 Django 将任务添加到 AWS SQS 时一切正常,但是当我使用 JS 时出现此错误:
[2021-01-05 19:39:55,982: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"expires":null,"utc":true,"args":[5456,2878],"chord":null,"callbacks":null,"errbacks":null,"taskset":null,"id":"1a361c85-2209-4ffa-95c2-ee2e4855155e","retries":0,"task":"config.celery.debug_task","timelimit":[null,null],"eta":null,"kwargs":{}}' (244b)
{content_type:None content_encoding:None
delivery_info:{'sqs_message': {'MessageId': '7b7d2948-c069-4f8b-9fdc-9c068d52f463', 'ReceiptHandle': 'AQEBxhqW2sRWf+Z851fw7nqRX6MQFVcTfjH5xqiIgYIiMa3AN3R235VxhM8pM7mcByw3eOZ3Y7kH5oZ+noFVzfjSllgnoh8idB/V7WWY2urNHKJrQadRT5cf4NcUVkFmB8+d2rLiAXuuyqpGbEMvmx1Dn49/5C3Fx8Eq+eUyB1oeilIrCqfMvIkG/yX5TdedxM9B2VBThZ/XtHqrgYCkJvEt9ozssM0f+INRHUrpVQMYCmUX9aTWeWljrTOapMTg27M6aie6HaDQxLK0FJvZUNr2d0uJhZ4C2qRGWrSo2VpD7QK7pslltZ12PVHKPw9X+cBGdWwJrdh5I0fBITuoy+CUUnybDekz668jJnsf1gcmpx8cBoVrMLocPi753g2klGf++mbFeL7yjENzb1YqZrrfvg==', 'MD5OfBody': '9bb39da667d1e840f8532a74a8dcecaa', 'Body': 'eyJleHBpcmVzIjpudWxsLCJ1dGMiOnRydWUsImFyZ3MiOls1NDU2LDI4NzhdLCJjaG9yZCI6bnVsbCwiY2FsbGJhY2tzIjpudWxsLCJlcnJiYWNrcyI6bnVsbCwidGFza3NldCI6bnVsbCwiaWQiOiIxYTM2MWM4NS0yMjA5LTRmZmEtOTVjMi1lZTJlNDg1NTE1NWUiLCJyZXRyaWVzIjowLCJ0YXNrIjoiY29uZmlnLmNlbGVyeS5kZWJ1Z190YXNrIiwidGltZWxpbWl0IjpbbnVsbCxudWxsXSwiZXRhIjpudWxsLCJrd2FyZ3MiOnt9fQ=='}, 'sqs_queue': 'SQS_QUEUE_URL_HERE'} headers={}}
我正在使用此代码发送消息:
let taskId = uuidv4();
let result = {
"expires": null,
"utc": true,
"args": [5456, 2878],
"chord": null,
"callbacks": null,
"errbacks": null,
"taskset": null,
"id": taskId,
"retries": 0,
"task": "config.celery.debug_task",
"timelimit": [null, null],
"eta": null,
"kwargs": {}
}
const client = new SQSClient({
region: "eu-west-3",
credentialDefaultProvider: myCredentialProvider
});
const send = new SendMessageCommand({
// use wrangler secrets to provide this global variable
QueueUrl: "SQS_QUEUE_URL_HERE",
MessageBody: Buffer.from(JSON.stringify(result)).toString("base64")
});
let resultSQS = client.send(send);
我调试了 django 任务负载以复制它,所以我发送了它需要的相同数据,但收到此错误。有人知道我是否遗漏了什么吗?
谢谢
问题解决了,MessageBody 错误。这就是解决方案
let taskId = uuidv4();
let body = [[{"hello": "world from JS", "success": true}], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]
let properties = {
"correlation_id": taskId,
"reply_to": "5480744c-f3ca-3e3f-9403-445fa5b865e1",
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": "default"
},
"priority": 0,
"body_encoding": "base64",
"delivery_tag": uuidv4()
}
let headers = {
"lang": "py",
"task": "config.celery.debug_task",
"id": taskId,
"shadow": null,
"eta": null,
"expires": null,
"group": null,
"retries": 0,
"timelimit": [null, null],
"root_id": taskId,
"parent_id": null,
"argsrepr": "({'hello': 'wolrd', 'success': True},)",
"kwargsrepr": "{}",
"origin": "gen21089@Marcoss-MacBook-Pro.local"
}
let payload = {
"body": Buffer.from(JSON.stringify(body)).toString("base64"),
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": headers,
"properties": properties
}
let encodedPayload = Buffer.from(JSON.stringify(payload)).toString("base64")
const client = new SQSClient({
region: "eu-west-3",
credentialDefaultProvider: myCredentialProvider
});
const sendData = new SendMessageCommand({
// use wrangler secrets to provide this global variable
QueueUrl: "URL",
MessageBody: encodedPayload
});
let data = await client.send(sendData);
因此 base64 中的有效负载包含一个也在 base64 中的“body”和 Celery 读取以定位任务的 headers。
你好,我有一个 Django 应用程序 运行 celery,我正在尝试使用 JS 中的微服务将任务添加到队列中。当我通过 Django 将任务添加到 AWS SQS 时一切正常,但是当我使用 JS 时出现此错误:
[2021-01-05 19:39:55,982: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"expires":null,"utc":true,"args":[5456,2878],"chord":null,"callbacks":null,"errbacks":null,"taskset":null,"id":"1a361c85-2209-4ffa-95c2-ee2e4855155e","retries":0,"task":"config.celery.debug_task","timelimit":[null,null],"eta":null,"kwargs":{}}' (244b)
{content_type:None content_encoding:None
delivery_info:{'sqs_message': {'MessageId': '7b7d2948-c069-4f8b-9fdc-9c068d52f463', 'ReceiptHandle': 'AQEBxhqW2sRWf+Z851fw7nqRX6MQFVcTfjH5xqiIgYIiMa3AN3R235VxhM8pM7mcByw3eOZ3Y7kH5oZ+noFVzfjSllgnoh8idB/V7WWY2urNHKJrQadRT5cf4NcUVkFmB8+d2rLiAXuuyqpGbEMvmx1Dn49/5C3Fx8Eq+eUyB1oeilIrCqfMvIkG/yX5TdedxM9B2VBThZ/XtHqrgYCkJvEt9ozssM0f+INRHUrpVQMYCmUX9aTWeWljrTOapMTg27M6aie6HaDQxLK0FJvZUNr2d0uJhZ4C2qRGWrSo2VpD7QK7pslltZ12PVHKPw9X+cBGdWwJrdh5I0fBITuoy+CUUnybDekz668jJnsf1gcmpx8cBoVrMLocPi753g2klGf++mbFeL7yjENzb1YqZrrfvg==', 'MD5OfBody': '9bb39da667d1e840f8532a74a8dcecaa', 'Body': 'eyJleHBpcmVzIjpudWxsLCJ1dGMiOnRydWUsImFyZ3MiOls1NDU2LDI4NzhdLCJjaG9yZCI6bnVsbCwiY2FsbGJhY2tzIjpudWxsLCJlcnJiYWNrcyI6bnVsbCwidGFza3NldCI6bnVsbCwiaWQiOiIxYTM2MWM4NS0yMjA5LTRmZmEtOTVjMi1lZTJlNDg1NTE1NWUiLCJyZXRyaWVzIjowLCJ0YXNrIjoiY29uZmlnLmNlbGVyeS5kZWJ1Z190YXNrIiwidGltZWxpbWl0IjpbbnVsbCxudWxsXSwiZXRhIjpudWxsLCJrd2FyZ3MiOnt9fQ=='}, 'sqs_queue': 'SQS_QUEUE_URL_HERE'} headers={}}
我正在使用此代码发送消息:
let taskId = uuidv4();
let result = {
"expires": null,
"utc": true,
"args": [5456, 2878],
"chord": null,
"callbacks": null,
"errbacks": null,
"taskset": null,
"id": taskId,
"retries": 0,
"task": "config.celery.debug_task",
"timelimit": [null, null],
"eta": null,
"kwargs": {}
}
const client = new SQSClient({
region: "eu-west-3",
credentialDefaultProvider: myCredentialProvider
});
const send = new SendMessageCommand({
// use wrangler secrets to provide this global variable
QueueUrl: "SQS_QUEUE_URL_HERE",
MessageBody: Buffer.from(JSON.stringify(result)).toString("base64")
});
let resultSQS = client.send(send);
我调试了 django 任务负载以复制它,所以我发送了它需要的相同数据,但收到此错误。有人知道我是否遗漏了什么吗?
谢谢
问题解决了,MessageBody 错误。这就是解决方案
let taskId = uuidv4();
let body = [[{"hello": "world from JS", "success": true}], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]
let properties = {
"correlation_id": taskId,
"reply_to": "5480744c-f3ca-3e3f-9403-445fa5b865e1",
"delivery_mode": 2,
"delivery_info": {
"exchange": "",
"routing_key": "default"
},
"priority": 0,
"body_encoding": "base64",
"delivery_tag": uuidv4()
}
let headers = {
"lang": "py",
"task": "config.celery.debug_task",
"id": taskId,
"shadow": null,
"eta": null,
"expires": null,
"group": null,
"retries": 0,
"timelimit": [null, null],
"root_id": taskId,
"parent_id": null,
"argsrepr": "({'hello': 'wolrd', 'success': True},)",
"kwargsrepr": "{}",
"origin": "gen21089@Marcoss-MacBook-Pro.local"
}
let payload = {
"body": Buffer.from(JSON.stringify(body)).toString("base64"),
"content-encoding": "utf-8",
"content-type": "application/json",
"headers": headers,
"properties": properties
}
let encodedPayload = Buffer.from(JSON.stringify(payload)).toString("base64")
const client = new SQSClient({
region: "eu-west-3",
credentialDefaultProvider: myCredentialProvider
});
const sendData = new SendMessageCommand({
// use wrangler secrets to provide this global variable
QueueUrl: "URL",
MessageBody: encodedPayload
});
let data = await client.send(sendData);
因此 base64 中的有效负载包含一个也在 base64 中的“body”和 Celery 读取以定位任务的 headers。