为什么 KafkaTemplate 不关闭交易生产者?

Why KafkaTemplate does not close tansactional producers?

我用 spring 集成了 kafka 3.2.1.RELEASEkafka-clients 2.5[=55 编写了一个简单的 Kafka 应用程序=]学习kafka交易。

它从一个主题接收消息并将它们发送到另一个主题。 beans.xml文件如下

    <int-kafka:message-driven-channel-adapter
            listener-container="container"
            auto-startup="true"
            send-timeout="30000"
            channel="channelA"/>
   <bean id="container"  class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.ContainerProperties">
                <constructor-arg
                        name="topics"
                        value="test"/>
                <property name="transactionManager" ref="KafkaTransactionManager"/>
            </bean>
        </constructor-arg>
    </bean>
.
.
.
    <int-kafka:outbound-channel-adapter kafka-template="kafkaTemplate"
                                        auto-startup="true"
                                        channel="channelB"
                                        topic="output"/>
      <bean id="dbsenderTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                        <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                        <entry key="bootstrap.servers" value="localhost:9092"/>
                    </map>
                </constructor-arg>
                <property name="transactionIdPrefix" value="mytest-"/>
                <property name="producerPerConsumerPartition" value="false"/>
            </bean>
        </constructor-arg>
    </bean>

启动应用程序的代码如下:

GenericXmlApplicationContext tempContext = new GenericXmlApplicationContext("beans.xml");
tempContext.close();

//POINT A.

try {
  Thread.sleep(60000);
} catch (InterruptedException e) {
   e.printStackTrace();
}

GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("beans.xml");
context.refresh();

//POINT B

POINT A 我刚刚关闭上下文以检查哪些 bean 已关闭,并休眠 60 秒以便有时间检查 JMX 控制台。我注意到即使上下文已关闭,但生产者仍在 JMX 中注册。之后我跟踪代码并注意到在上下文关闭 KafkaTemplate 时调用以下代码:

    public void flush() {
        Producer<K, V> producer = getTheProducer();
        try {
            producer.flush();
        }
        finally {
            closeProducer(producer, inTransaction());
        }
    }

    protected void closeProducer(Producer<K, V> producer, boolean inTx) {
        if (!inTx) {
            producer.close(this.closeTimeout);
        }
    }

这意味着它创建了一个生产者,但因为它是事务性的,所以不会被关闭。

此行为导致在 POINT B 上再次运行上下文并发送消息导致 javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-mytest-0 异常。 为什么 KafkaTemplate 不关闭这些生产者?

另一个问题是,当在 POINT B 上创建新的 KafkaTemplate 时,这些生产者会发生什么?

最后一个问题是如果我将 producerPerConsumerPartition 属性 更改为 true 提到的应用程序仍然使用 producer-mytest-0[=55 注册生产者 Mbean =] 并且在命名时不遵循 groupid.topic.partition 模式。这是正确的行为吗?

更新:

我明白什么时候调用了 KafkaTemplate executeInTransaction。在 finally 块中,它调用生产者的关闭,因为它是逻辑关闭,所以在 CloseSafeProducer 上调用以下代码并将其放入缓存中:

if (!this.cache.contains(this)
        && !this.cache.offer(this)) {
        this.delegate.close(closeTimeout);
        }

这使得当上下文关闭时,DefaultKafkaProducerFactorydestroy 方法会清除缓存并物理关闭生产者。但在我的情况下,应用程序上下文已创建,但在使用和生成任何消息之前,上下文已关闭,只有 KafkaTemplateflush 方法在内部被调用,这迫使它创建一个事务性生产者但不把它在缓存中。由于我没有启动生产者,而 KafkaTemplate 是在刷新时启动的,所以 DefaultKafkaProducerFactory 在使用它们之前将它们放入缓存中不是很好吗?

如果此模板操作正在参与在模板外启动的事务,则无法关闭生产者。

即使关闭,它也只是 "logically" 关闭 - 缓存以供其他操作重用。

Is it a correct behaviour?

是,对于生产者发起的交易;当消费者发起交易时使用替代名称。

InstanceAlreadyExistsException 问题仅仅是因为您正在创建两个具有相同配置的应用程序上下文。你为什么要这样做?