OpenWhisk 发送消息到 Kafka 超时
OpenWhisk send message to Kafka timeout
环境详情
CentOS7、独立OpenWhisk
问题描述
我打算在openwhisk中向Kafka发送消息,数据流转过程为:WSK CLI -> OpenWhisk action -> kafka-console-consume.
但是过程中会出现断断续续的失败,比如:我发送“test01”~“test06”,只收到“test02”、“test04”、“test06”。
根据日志,失败的原因是超时。
这是我的动作脚本:
// js
const kafka = require('kafka-node');
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
kafkaHost: "192.168.68.132:3093"
});
const producer = new Producer(client);
function main(actionObj) {
var message = JSON.stringify(actionObj);
const payloads = [
{
topic: 'beforeReductionTopic',
messages: message,
partition: 0
}
];
return new Promise(function (resolve, reject) {
producer
.on('ready', () => {
producer.send(payloads, function (err, data) {
if (err) {
console.log("++++++ producer err: ", err);
return reject(err);
} else {
console.log("++++++ producer data: ", data);
return resolve(data);
}
});
})
.on("error", err => {
console.error(err);
return reject(err);
});
});
}
exports.main = main;
根据日志,是“action developer error”.
我用下面的代码测试了action脚本,kafka可以接收到所有的消息。
const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
action.main("ccc");
}
不知道怎么修改动作脚本
日志详细信息
使用WSK CLI调用openwhisk动作的操作记录
[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
"beforeReductionTopic": {
"0": 57
}
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
"beforeReductionTopic": {
"0": 58
}
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
"beforeReductionTopic": {
"0": 59
}
}
[root@localhost openwhisk-kafka]#
失败操作日志
[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
"namespace": "guest",
"name": "scicatTest",
"version": "0.0.4",
"subject": "guest",
"activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
"start": 1619540684355,
"end": 1619540744418,
"duration": 60063,
"statusCode": 0,
"response": {
"status": "action developer error",
"statusCode": 0,
"success": false,
"result": {
"error": "The action exceeded its time limits of 60000 milliseconds."
}
},
"logs": [],
"annotations": [
{
"key": "path",
"value": "guest/scicatTest"
},
{
"key": "waitTime",
"value": 652
},
{
"key": "kind",
"value": "nodejs:10"
},
{
"key": "timeout",
"value": true
},
{
"key": "limits",
"value": {
"concurrency": 1,
"logs": 10,
"memory": 256,
"timeout": 60000
}
}
],
"publish": false
}
不要使用“kafka-node”。替换为“kafkajs”
环境详情
CentOS7、独立OpenWhisk
问题描述
我打算在openwhisk中向Kafka发送消息,数据流转过程为:WSK CLI -> OpenWhisk action -> kafka-console-consume.
但是过程中会出现断断续续的失败,比如:我发送“test01”~“test06”,只收到“test02”、“test04”、“test06”。
根据日志,失败的原因是超时。
这是我的动作脚本:
// js
const kafka = require('kafka-node');
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
kafkaHost: "192.168.68.132:3093"
});
const producer = new Producer(client);
function main(actionObj) {
var message = JSON.stringify(actionObj);
const payloads = [
{
topic: 'beforeReductionTopic',
messages: message,
partition: 0
}
];
return new Promise(function (resolve, reject) {
producer
.on('ready', () => {
producer.send(payloads, function (err, data) {
if (err) {
console.log("++++++ producer err: ", err);
return reject(err);
} else {
console.log("++++++ producer data: ", data);
return resolve(data);
}
});
})
.on("error", err => {
console.error(err);
return reject(err);
});
});
}
exports.main = main;
根据日志,是“action developer error”.
我用下面的代码测试了action脚本,kafka可以接收到所有的消息。
const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
action.main("ccc");
}
不知道怎么修改动作脚本
日志详细信息
使用WSK CLI调用openwhisk动作的操作记录
[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
"beforeReductionTopic": {
"0": 57
}
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
"beforeReductionTopic": {
"0": 58
}
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
"beforeReductionTopic": {
"0": 59
}
}
[root@localhost openwhisk-kafka]#
失败操作日志
[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
"namespace": "guest",
"name": "scicatTest",
"version": "0.0.4",
"subject": "guest",
"activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
"start": 1619540684355,
"end": 1619540744418,
"duration": 60063,
"statusCode": 0,
"response": {
"status": "action developer error",
"statusCode": 0,
"success": false,
"result": {
"error": "The action exceeded its time limits of 60000 milliseconds."
}
},
"logs": [],
"annotations": [
{
"key": "path",
"value": "guest/scicatTest"
},
{
"key": "waitTime",
"value": 652
},
{
"key": "kind",
"value": "nodejs:10"
},
{
"key": "timeout",
"value": true
},
{
"key": "limits",
"value": {
"concurrency": 1,
"logs": 10,
"memory": 256,
"timeout": 60000
}
}
],
"publish": false
}
不要使用“kafka-node”。替换为“kafkajs”