使用 JVM 消费者获取 Kafka 'writeahead' 消息

Obtain Kafka 'writeahead' messages with JVM consumer

我们有一个使用恰好一次语义的流应用程序,其中一个主题分区已停滞。我们注意到偏移量以两个为增量增加,并且了解到奇数消息是 Kafka 事务两阶段提交的一部分。

我们已经写了一个 Consumer<Byte[], Byte[]>(使用 kafka-clients 2.1.0)来使用 isolation.level = "read_uncommitted" 将所有这些消息转储到磁盘,但它不会获取这些奇数编号的消息。我们可以做些什么来获得它们?

Control records 没有暴露给消费者。

"see" 他们需要使用 DumpLogSegments 工具:

./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log

对照批次将与正常批次一样显示,但它们的 isControl 标志设置为 true。

baseOffset: 1618 lastOffset: 1618 count: 1 baseSequence: 1 lastSequence: 1 producerId: 1000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 1778601 CreateTime: 1547217145114 isvalid: true size: 1097 magic: 2 compresscodec: NONE crc: 1680083731

baseOffset: 1619 lastOffset: 1619 count: 1 baseSequence: -1 lastSequence: -1 producerId: 1000 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 1779698 CreateTime: 1547217145210 isvalid: true size: 78 magic: 2 compresscodec: NONE crc: 2028573478

您还可以使用 --deep-iteration 标志来显示单个记录元数据(甚至 --print-data-log 来显示实际记录数据)。在那种情况下,您可以查看控制批处理是提交还是还原:

offset: 1618 position: 1778601 CreateTime: 1547217145114 isvalid: true keysize: 3 valuesize: 1024 magic: 2 compresscodec: NONE producerId: 1000 producerEpoch: 0 sequence: 1 isTransactional: true headerKeys: []

offset: 1619 position: 1779698 CreateTime: 1547217145210 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1000 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0