spring kafka 恰好一次语义

Exactly once semantic with spring kafka

我正在尝试测试我的 exactly once 配置以确保我设置的所有配置都是正确的并且行为符合我的预期

我似乎遇到了重复发送的问题

    public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();
    MessageConsumer consumer = new ConsumerBuilder()
            .setBootstrapServers("kafka:9992")
            .setIsolationLevel("read_committed")
            .setTopics("someTopic2")
            .setGroupId("bla")
            .setKeyDeserializerClass(StringDeserializer.class)
            .setValueDeserializerClass(MapDeserializer.class)
            .setConsumerMessageLogic(new ConsumerMessageLogic() {
                @Override
                public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                    producer.sendMessage(new TopicPartition("someTopic2", cr.partition()),
                            new OffsetAndMetadata(cr.offset() + 1),"something1", "im in transaction", cr.key());
                    acknowledgment.acknowledge();
                }
            }).build();
    consumer.start();
}

这是我的 "test",您可以假设构建器进行了正确的配置。

ConsumerMessageLogic 是一个 class,它处理 exactly once 语义支持的 "process" 部分读-过程-写

在生产者内部 class 我有一个像这样的发送消息方法:

    public void sendMessage(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata,String sendToTopic, V message, PK partitionKey) {
    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicPartition.topic(), partitionKey, message);
        if(kafkaTemplate.getProducerFactory().transactionCapable()){
            kafkaTemplate.executeInTransaction(operations -> {
                sendMessage(message, partitionKey, sendToTopic, partitionAndMessagePair, operations);
                operations.sendOffsetsToTransaction(
                        Map.of(topicPartition, offsetAndMetadata),"bla");
                return true;
            });

        }else{
            sendMessage(message, partitionKey, topicPartition.topic(), partitionAndMessagePair, kafkaTemplate);
        }
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

我这样创建我的消费者:

    /**
 * Start the message consumer
 * The record event will be delegate on the onMessage()
 */
public void start() {
    initConsumerMessageListenerContainer();
    container.start();
}

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer() {
    // start a acknowledge message listener to allow the manual commit
    messageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(messageListener);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener));
}

当我创建生产者时,我使用 UUID 作为事务前缀创建它,就像这样

public ProducerFactory<PK, V> producerFactory(boolean isTransactional) {
    ProducerFactory<PK, V> res = new DefaultKafkaProducerFactory<>(props);
    if(isTransactional){
        ((DefaultKafkaProducerFactory<PK, V>) res).setTransactionIdPrefix(UUID.randomUUID().toString());
        ((DefaultKafkaProducerFactory<PK, V>) res).setProducerPerConsumerPartition(true);
    }
    return res;
}

现在一切都设置好后,我在一个主题上启动了 2 个实例,有 2 个分区 每个实例从消耗的主题中获得 1 个分区。

我发送一条消息并在调试中等待事务超时(模拟连接丢失) 在实例 A 中,一旦超时通过另一个实例(实例 B)自动处理记录并将其发送到目标主题导致重新平衡发生

到目前为止一切顺利。 现在当我释放实例 A 上的断点时,它说它正在重新平衡并且无法提交,但我仍然在我的目标主题中看到另一条输出记录。

我的预期是,一旦我释放断点,实例 A 将不会继续其工作,因为记录已被处理。

我是不是做错了什么? 这个场景能实现吗?

编辑 2:

在 garys 评论事务中的执行后,如果我冻结其中一个实例直到超时并在另一个实例处理记录后释放它,我会得到重复的记录,然后冻结的实例处理并产生相同的记录到输出主题...

 public static void main(String[] args) {
    MessageProducer producer = new ProducerBuilder()
            .setBootstrapServers("kafka:9992")
            .setKeySerializerClass(StringSerializer.class)
            .setValueSerializerClass(StringSerializer.class)
            .setProducerEnableIdempotence(true).build();


        MessageConsumer consumer = new ConsumerBuilder()
                .setBootstrapServers("kafka:9992")
                .setIsolationLevel("read_committed")
                .setTopics("someTopic2")
                .setGroupId("bla")
                .setKeyDeserializerClass(StringDeserializer.class)
                .setValueDeserializerClass(MapDeserializer.class)
                .setConsumerMessageLogic(new ConsumerMessageLogic() {
                    @Override
                    public void onMessage(ConsumerRecord cr, Acknowledgment acknowledgment) {
                        producer.sendMessage("something1", "im in transaction");
                    }
                }).build();
        consumer.start(producer.getProducerFactory());
}

生产者中新的 sendMessage 方法没有 executeInTransaction

public void sendMessage(V message, PK partitionKey, String topicName) {

    try {
        KafkaRecord<PK, V> partitionAndMessagePair = producerMessageLogic.prepareMessage(topicName, partitionKey, message);
        sendMessage(message, partitionKey, topicName, partitionAndMessagePair, kafkaTemplate);
    }catch (Exception e){
        failureHandler.onFailure(partitionKey, message, e);
    }
}

我还更改了消费者容器的创建,使其具有与建议的生产者工厂相同的事务管理器

/**
 * Initialize the kafka message listener
 */
private void initConsumerMessageListenerContainer(ProducerFactory<PK,V> producerFactory) {
    // start a acknowledge message listener to allow the manual commit
    acknowledgingMessageListener = consumerMessageLogic::onMessage;

    // start and initialize the consumer container
    container = initContainer(acknowledgingMessageListener, producerFactory);

    // sets the number of consumers, the topic partitions will be divided by the consumers
    container.setConcurrency(springConcurrency);
    springContainerPollTimeoutOpt.ifPresent(p -> container.getContainerProperties().setPollTimeout(p));
    if (springAckMode != null) {
        container.getContainerProperties().setAckMode(springAckMode);
    }
}

private ConcurrentMessageListenerContainer<PK, V> initContainer(AcknowledgingMessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    return new ConcurrentMessageListenerContainer<>(
            consumerFactory(props),
            containerProperties(messageListener, producerFactory));
}

 @NonNull
private ContainerProperties containerProperties(MessageListener<PK, V> messageListener, ProducerFactory<PK,V> producerFactory) {
    ContainerProperties containerProperties = new ContainerProperties(topics);
    containerProperties.setMessageListener(messageListener);
    containerProperties.setTransactionManager(new KafkaTransactionManager<>(producerFactory));
    return containerProperties;
}

我的期望是代理一旦从冻结的实例接收到处理过的记录,它就会知道该记录已经被另一个实例处理过,因为它包含完全相同的元数据(或者是吗?我的意思是, PID 会不同,但应该不同吗?)

也许我正在寻找的场景在当前的 exactly once 支持 kafka 中甚至不被支持并且 spring 提供...

如果我有 2 个读-过程-写实例 - 这意味着我有 2 个生产者和 2 个不同的 PID。

现在,当我冻结其中一个实例时,当未冻结的实例由于重新平衡而获得记录处理责任时,它将发送带有自己的 PID 和元数据中的序列的记录。

现在当我释放冻结的实例时,他发送相同的记录但有自己的 PID,所以代理不可能知道它是重复的...

我错了吗?我怎样才能避免这种情况?我虽然重新平衡停止了实例并且不让它完成它的过程(他产生重复记录的地方)因为他不再对该记录负责

添加日志: 冻结实例:您可以在 10:53:34 看到冻结时间,我在 10:54:02 释放它(重新平衡时间为 10 秒)

2020-06-16 10:53:34,393 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]
2020-06-16 10:53:34,394 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
beginTransaction()
2020-06-16 10:53:34,395 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906]]
2020-06-16 10:54:02,157 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1,      
groupId=bla] Group coordinator X.X.X.X:9992 (id: 2147482646 rack: 
null) is unavailable or invalid, will attempt rediscovery
2020-06-16 10:54:02,181 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:54:02,189 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[252] to topic something1
2020-06-16 10:54:02,193 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0, transactionalId=b76e8aba-8149-48f8-857b- 
a19195f5a20abla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:54:02,263 INFO  [${sys:spring.application.name}] [kafka- 
coordinator-heartbeat-thread | bla] 
[o.a.k.c.c.i.AbstractCoordinator.:] [Consumer clientId=consumer-bla-1, 
groupId=bla] Discovered group coordinator 192.168.144.1:9992 (id: 
2147482646 rack: null)
2020-06-16 10:54:02,295 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:54:02,296 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@5c7f5906] 
commitTransaction()
2020-06-16 10:54:02,299 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:54:02,301 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.AbstractCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Attempt to heartbeat failed for 
since member id consumer-bla-1-b3ad1c09-ad06-4bc4-a891-47a2288a830f is 
not valid.
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Giving away all assigned 
partitions as lost since generation has been reset,indicating that 
consumer is no longer part of the group
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator.:] [Consumer 
clientId=consumer-bla-1, groupId=bla] Lost previously assigned 
partitions someTopic2-0
2020-06-16 10:54:02,302 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions lost: [someTopic2-0]
2020-06-16 10:54:02,303 INFO  [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.l.ConcurrentMessageListenerContainer.info:279] 
bla: partitions revoked: [someTopic2-0]
2020-06-16 10:54:02,303 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}

在重新平衡后接管分区并生成记录的常规实例

2020-06-16 10:53:46,536 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
Created new Producer: CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]
2020-06-16 10:53:46,537 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
beginTransaction()
2020-06-16 10:53:46,539 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.doBegin:149] Created 
Kafka transaction on producer [CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153]]
2020-06-16 10:53:46,556 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Sending offsets to transaction: {someTopic2- 
0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2020-06-16 10:53:46,563 INFO  [${sys:spring.application.name}] [kafka- 
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [i.i.k.s.p.SimpleSuccessHandler.:] Sent 
message=[im in transaction] with offset=[250] to topic something1
2020-06-16 10:53:46,566 INFO  [${sys:spring.application.name}] [kafka-        
producer-network-thread | producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] [o.a.k.c.p.i.TransactionManager.:] 
[Producer clientId=producer-1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0, transactionalId=1d8e74d3-8986-4458-89b7- 
6d3e5756e213bla.someTopic2.0] Discovered group coordinator 
X.X.X.X:9992 (id: 1001 rack: null)
2020-06-16 10:53:46,668 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.t.KafkaTransactionManager.processCommit:740] 
Initiating transaction commit
2020-06-16 10:53:46,669 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] [o.s.k.c.DefaultKafkaProducerFactory.debug:296] 
CloseSafeProducer 
[delegate=org.apache.kafka.clients.producer.KafkaProducer@26c76153] 
commitTransaction()
2020-06-16 10:53:46,672 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Commit list: {}
2020-06-16 10:53:51,673 DEBUG [${sys:spring.application.name}] 
[consumer-0-C-1] 
[o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.debug:296] 
Received: 0 records

我注意到他们都记录了完全相同的提交偏移量

Sending offsets to transaction: {someTopic2-0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}

我认为当他们试图提交完全相同的事情时,经纪人将中止其中一项交易...

我还注意到,如果我将 transaction.timeout.ms 减少到仅 2 秒,无论我在调试时冻结实例多长时间,它都不会中止事务...

也许 transaction.timeout.ms 的计时器只有在我发送消息后才开始计时?

您根本不能使用 executeInTransaction - 请参阅它的 Javadocs;当没有活动事务或者您明确不希望操作参与现有事务时使用它。

你需要在监听器容器中添加一个KafkaTransactionManager;它必须引用与模板相同的 ProducerFactory

然后,容器将启动事务,如果成功,将偏移量发送到事务。