如何在 Kafka 0.11 中正确提交生产者并使用事务性消息?

How to commit producer properly and consume transactional message in Kafka 0.11?

我正在 Java 尝试 Kafka 跨国制作人。

喜欢

    producer.initTransactions();
    try {
        producer.beginTransaction();
        producer.send(rec, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
                System.out.println("The offset of the record we just sent is: " + metadata.offset());
        });
        producer.commitTransaction();
    }catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e){
        producer.close();
    }catch(KafkaException e) {
        producer.abortTransaction();
    }catch (Exception x){}
    producer.close();

它没有抛出任何错误。 send 也在 Kafka 中推送消息可用。

我能看到的经纪人日志是这样的:

[2017-10-30 19:30:56,574] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __transaction_state-11. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-10-30 19:31:19,379] INFO [Transaction Coordinator 1001]: Initialized transactionalId TXN_ID:0.28508215642368573189913137 with producerId 11 and producer epoch 0 on partition __transaction_state-11 (kafka.coordinator.transaction.TransactionCoordinator)

5 分钟后,我找到了这个经纪人日志。 [2017-10-30 19:36:44,123] 信息 [Broker 1001 上的组元数据管理器]:在 0 毫秒内删除了 0 个过期的偏移量。 (kafka.coordinator.group.GroupMetadataManager)

在此我只能看到 the transaction is initialized 但没有进一步的提交日志或其他内容。

在我追加的生产者配置中

transactional.id=<some random transaction ID>
enable.idempotence=true

如前所述Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.

我在 Kafka Documentation

中找到了一条语句 Producer: Send an OffsetCommitRequest to commit the input state associated with the end of that transaction

这是否意味着我必须告诉我要提交哪个 Offset

我不确定生产者发生了什么

这是我的生产者调试日志:

1180 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=InitProducerIdRequest, transactionalId=TXN_ID:0.6069296543148491816257436, transactionTimeoutMs=60000) to node 127.0.0.1:9090 (id: 1001 rack: null)
1317 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] ProducerId set to 13 with epoch 0
1317 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state INITIALIZING to READY
1317 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state READY to IN_TRANSACTION
1323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient  - Sending metadata request (type=MetadataRequest, topics=topic) to node -1
1337 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata  - Updated cluster metadata version 2 to Cluster(id = 0WtNXiFvT_C6V9Uo1zPGVQ, nodes = [127.0.0.1:9090 (id: 1001 rack: null)], partitions = [Partition(topic = topic, partition = 0, leader = 1001, replicas = [1001], isr = [1001])])
1362 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Begin adding new partition topic-0 to transaction
1386 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0])
1387 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, partitions=[topic-0]) to node 127.0.0.1:9090 (id: 1001 rack: null)
1389 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
1392 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Enqueuing transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT)
1437 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Successfully added partitions [topic-0] to transaction
1439 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator  - Assigning sequence number 0 from producer (producerId=13, epoch=0) to dequeued batch from partition topic-0 bound for 127.0.0.1:9090 (id: 1001 rack: null).
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.records-per-batch
1444 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.bytes
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.compression-rate
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-retries
1445 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics  - Added sensor with name topic.topic.record-errors
1453 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - Incremented sequence number for topic-partition topic-0 to 1
The offset of the record we just sent is: 13
1455 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender  - [TransactionalId TXN_ID:0.6069296543148491816257436] Sending transactional request (type=EndTxnRequest, transactionalId=TXN_ID:0.6069296543148491816257436, producerId=13, producerEpoch=0, result=COMMIT) to node 127.0.0.1:9090 (id: 1001 rack: null)
1457 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager  - [TransactionalId TXN_ID:0.6069296543148491816257436] Transition from state COMMITTING_TRANSACTION to READY

我想我在提交交易之前遗漏了一些东西。在消费者中,如果我设置 READ_COMMITTED,我也无法消费。如果不是,它会正常工作,甚至我也会收到我使用事务生产者生产的消息。

我拥有的用于读取交易消息的消费者代码

configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9090");
configProperties.put("group.id","new-group-id");
configProperties.put("enable.auto.commit", "true");
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
configProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

并且消费者正在订阅 topic 主题并且

我的消费者调试控制台日志是:

126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Added READ_COMMITTED fetch request for partition topic-0 at offset 1 to node 127.0.0.1:9090 (id: 1001 rack: null)
126032 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Sending READ_COMMITTED fetch for partitions [topic-0] to broker 127.0.0.1:9090 (id: 1001 rack: null)
126551 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher  - Fetch READ_COMMITTED at offset 1 for partition topic-0 returned fetch data (error=NONE, highWaterMark=17, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0)

在此连续重复相同的 3 行,我将 13 作为最高偏移值。在消费者中我无法消费消息。

我有一个 1 节点集群的设置,我在 3 节点上试过它也显示相同的结果。

感谢任何帮助。

终于对我开始起作用了。

我不能确切地说出问题是什么。但根据我的观察,这是由于 WINDOWS OS.

造成的

如果我们的代理在 windows 机器上,它没有按预期工作。如果代理在 Linux 机器上,它工作正常。

我的观察:

windows 机器

__transaction_state 的日志转储段
Dumping 00000000000000000000.index
offset: 0 position: 0
Dumping 00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 .... transactionalId=TXN_ID:1 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510143189059,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:1 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189232,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:1 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510143189393,txnTimeoutMs=60000
Dumping 00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :D:\tmp\kafka-logs-0\__transaction_state-9[=10=]000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1510143189059
Index timestamp: 0, log timestamp: 1510143189059
Found out of order timestamp in :D:\tmp\kafka-logs-0\__transaction_state-9[=10=]000000000000000000.timeindex
Index timestamp: 0, Previously indexed timestamp: 0

在上面的日志中,您可以很容易地看到事务状态为 EmptyOngoingPrepareCommit。但是没有提到commit/transaction的完成。

但是在我的生产者控制台日志中,它表示交易已完成,所以肯定存在一些问题。

Linux 机器

__transaction_state 的日志转储段
offset: 0 position: 0 .... transactionalId=TXN_ID:2 ... ,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1510145904630,txnTimeoutMs=60000
offset: 1 position: 117 .... transactionalId=TXN_ID:2 ... ,state=Ongoing,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904763,txnTimeoutMs=60000
offset: 2 position: 250 .... transactionalId=TXN_ID:2 ... ,state=PrepareCommit,partitions=Set(topic-0),txnLastUpdateTimestamp=1510145904931,txnTimeoutMs=60000
offset: 3 position: 383 .... transactionalId=TXN_ID:2 ... ,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1510145904938,txnTimeoutMs=60000

不过这里我们不难发现一共提到了4种不同的状态

EmptyOngoingPrepareCommitCompleteCommit。这实际上使交易完成。

因此我们也可以重新使用该提交 ID。

因此,我可以得出结论,如果您想使用事务,现在可以在 Linux 上使用 Kafka 代理而不是 windows。