Kafkajs - 获取统计数据(滞后)
Kafkajs - get statistics (lag)
在我们的 nest.js
应用程序中,我们使用 kafkajs kafka 客户端。
我们需要获得机会监视器统计信息。
其中一项指标是 lag
.
试图弄清楚 kafkajs 是否提供了任何有趣的东西。 (payload中最有意思的是:timestamp
、offset
、batchContext.firstOffset
、batchContext.firstTimestamp
、batchContext.maxTimestamp
)
问题
有什么想法可以记录 lag
值和 kafkajs
提供的其他统计信息吗?
我是否应该考虑实现自己的统计监视器以在使用 kafka.js
客户端的节点应用程序中收集所需信息?
新详情 1
跟随documentation我可以得到batch.highWatermark
,其中
batch.highWatermark
is the last committed offset within the topic partition. It can be useful for calculating lag.
正在尝试
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})
我可以像下一个一样获取信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147
那么有什么想法如何在标签计算中使用 batch.highWatermark
吗?
大体上描述的配置工作正常。工作被额外的配置破坏,使用 eachMessage
属性 如:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})
所以同时consumer配置应该只配置了一个属性eachBatch
或者eachMessage
.
在我们的 nest.js
应用程序中,我们使用 kafkajs kafka 客户端。
我们需要获得机会监视器统计信息。
其中一项指标是 lag
.
试图弄清楚 kafkajs 是否提供了任何有趣的东西。 (payload中最有意思的是:timestamp
、offset
、batchContext.firstOffset
、batchContext.firstTimestamp
、batchContext.maxTimestamp
)
问题
有什么想法可以记录 lag
值和 kafkajs
提供的其他统计信息吗?
我是否应该考虑实现自己的统计监视器以在使用 kafka.js
客户端的节点应用程序中收集所需信息?
新详情 1
跟随documentation我可以得到batch.highWatermark
,其中
batch.highWatermark
is the last committed offset within the topic partition. It can be useful for calculating lag.
正在尝试
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async (data) => {
console.log('Received data.batch.messages: ', data.batch.messages)
console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
},
})
我可以像下一个一样获取信息:
Received data.batch.messages: [
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '144',
key: null,
value: <Buffer 68 65 6c 6c 6f 21>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '145',
key: null,
value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
},
{
magicByte: 2,
attributes: 0,
timestamp: '1628877419958',
offset: '146',
key: null,
value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
headers: {},
isControlRecord: false,
batchContext: {
firstOffset: '144',
firstTimestamp: '1628877419958',
partitionLeaderEpoch: 0,
inTransaction: false,
isControlBatch: false,
lastOffsetDelta: 2,
producerId: '-1',
producerEpoch: 0,
firstSequence: 0,
maxTimestamp: '1628877419958',
timestampType: 0,
magicByte: 2
}
}
]
Received data.batch.highWatermark: 147
那么有什么想法如何在标签计算中使用 batch.highWatermark
吗?
大体上描述的配置工作正常。工作被额外的配置破坏,使用 eachMessage
属性 如:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})
所以同时consumer配置应该只配置了一个属性eachBatch
或者eachMessage
.