Kafkajs - 获取统计数据(滞后)

Kafkajs - get statistics (lag)

在我们的 nest.js 应用程序中,我们使用 kafkajs kafka 客户端。 我们需要获得机会监视器统计信息。 其中一项指标是 lag.

试图弄清楚 kafkajs 是否提供了任何有趣的东西。 (payload中最有意思的是:timestampoffsetbatchContext.firstOffsetbatchContext.firstTimestampbatchContext.maxTimestamp

问题

有什么想法可以记录 lag 值和 kafkajs 提供的其他统计信息吗?

我是否应该考虑实现自己的统计监视器以在使用 kafka.js 客户端的节点应用程序中收集所需信息?

新详情 1

跟随documentation我可以得到batch.highWatermark,其中

batch.highWatermark is the last committed offset within the topic partition. It can be useful for calculating lag.

正在尝试

  await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

我可以像下一个一样获取信息:

Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

那么有什么想法如何在标签计算中使用 batch.highWatermark 吗?

大体上描述的配置工作正常。工作被额外的配置破坏,使用 eachMessage 属性 如:

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
})

所以同时consumer配置应该只配置了一个属性eachBatch或者eachMessage.