是什么导致了 Node Kafka Streams 的间歇性问题?
What is causing this intermittent issue with Node Kafka Streams?
我有一个 Kafka 生产者和消费者。
制作人这样做:
const returnMessage = {
prop1: 'some string',
prop2: 'another string',
prop3: nestedObject
};
console.log(JSON.stringify(returnMessage))
await stream.writeToStream(JSON.stringify(returnMessage));
消费者这样做:
incomingStream.forEach(
message => {
console.log(message.value)
let messageObject = message.value;
...other stuff...
}
);
现在,在生产者方面,return 消息始终记录为正确的字符串,一切正常。但在消费者方面,起初,message.value 是一个正确的字符串,可以从中解析出 JSON,但在后续请求中,它会显示为“[object Object]”。如果
我觉得我在这里遗漏了一些重要的东西...如果您有任何见解,请提供帮助。
好的,我知道了。 incomingStream.forEach
发生在路由内部,而流是在控制器级别实例化的。我通过将 forEach
移动到控制器级别来修复它,并让它在每条消息上发出一条已解析的消息,然后订阅路由内的事件。
我有一个 Kafka 生产者和消费者。
制作人这样做:
const returnMessage = {
prop1: 'some string',
prop2: 'another string',
prop3: nestedObject
};
console.log(JSON.stringify(returnMessage))
await stream.writeToStream(JSON.stringify(returnMessage));
消费者这样做:
incomingStream.forEach(
message => {
console.log(message.value)
let messageObject = message.value;
...other stuff...
}
);
现在,在生产者方面,return 消息始终记录为正确的字符串,一切正常。但在消费者方面,起初,message.value 是一个正确的字符串,可以从中解析出 JSON,但在后续请求中,它会显示为“[object Object]”。如果
我觉得我在这里遗漏了一些重要的东西...如果您有任何见解,请提供帮助。
好的,我知道了。 incomingStream.forEach
发生在路由内部,而流是在控制器级别实例化的。我通过将 forEach
移动到控制器级别来修复它,并让它在每条消息上发出一条已解析的消息,然后订阅路由内的事件。