Kafka 0.11 中 sendOffsetsToTransaction 的含义

Meaning of sendOffsetsToTransaction in Kafka 0.11

新的Kafka版本(0.11)支持exactly once语义。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我在 java 中有一个使用 kafka 事务代码的生产者设置,就像这样。

producer.initTransactions();
    try {
        producer.beginTransaction();
        for (ProducerRecord<String, String> record : payload) {
            producer.send(record);
        }

        Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
            {
                put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
            }
        };
        producer.sendOffsetsToTransaction(groupCommit, "groupId");
        producer.commitTransaction();
    } catch (ProducerFencedException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction();
    }

我不太确定如何使用 sendOffsetsToTransaction 及其预期用例。 AFAIK,消费者群体是消费者端的多线程读取功能。

java医生说

" 向消费者组协调器发送一个已消费的偏移量列表,并将这些偏移量标记为当前事务的一部分。只有在事务提交成功时,这些偏移量才会被视为已消耗。该方法应该在以下情况下使用您需要将消费和生产的消息一起批量处理,通常采用消费-转换-生产模式。"

produce 将如何维护已消耗偏移量的列表?这有什么意义呢?

这仅与您在其中消费然后根据您消费的内容生成消息的工作流程相关。此函数允许您仅在下游生产成功时提交您消耗的偏移量。如果您使用数据,以某种方式对其进行处理,然后生成结果,则可以在 consumption/production.

范围内实现事务保证

没有事务,您通常使用Consumer#commitSync()Consumer#commitAsync() 来提交消费者抵消。但是如果你在你和你的生产者生产之前使用这些方法,你将在知道生产者是否成功发送之前已经提交了偏移量。

因此,您可以在生产者上使用 Producer#sendOffsetsToTransaction() 来提交偏移量,而不是向消费者提交偏移量。这会将偏移量发送到处理事务的事务管理器。仅当整个交易(消费和生产)成功时,它才会提交偏移量。

(注意:发送偏移量提交时,应将上次读取的偏移量加1,以便以后从未读取的偏移量继续读取。无论您是向消费者还是向生产者提交,都是如此。参见:)。