为什么 KafkaTemplate 不关闭交易生产者?
Why KafkaTemplate does not close tansactional producers?
我用 spring 集成了 kafka 3.2.1.RELEASE 和 kafka-clients 2.5[=55 编写了一个简单的 Kafka 应用程序=]学习kafka交易。
它从一个主题接收消息并将它们发送到另一个主题。 beans.xml文件如下
<int-kafka:message-driven-channel-adapter
listener-container="container"
auto-startup="true"
send-timeout="30000"
channel="channelA"/>
<bean id="container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg
name="topics"
value="test"/>
<property name="transactionManager" ref="KafkaTransactionManager"/>
</bean>
</constructor-arg>
</bean>
.
.
.
<int-kafka:outbound-channel-adapter kafka-template="kafkaTemplate"
auto-startup="true"
channel="channelB"
topic="output"/>
<bean id="dbsenderTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="bootstrap.servers" value="localhost:9092"/>
</map>
</constructor-arg>
<property name="transactionIdPrefix" value="mytest-"/>
<property name="producerPerConsumerPartition" value="false"/>
</bean>
</constructor-arg>
</bean>
启动应用程序的代码如下:
GenericXmlApplicationContext tempContext = new GenericXmlApplicationContext("beans.xml");
tempContext.close();
//POINT A.
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("beans.xml");
context.refresh();
//POINT B
在 POINT A 我刚刚关闭上下文以检查哪些 bean 已关闭,并休眠 60 秒以便有时间检查 JMX 控制台。我注意到即使上下文已关闭,但生产者仍在 JMX 中注册。之后我跟踪代码并注意到在上下文关闭 KafkaTemplate 时调用以下代码:
public void flush() {
Producer<K, V> producer = getTheProducer();
try {
producer.flush();
}
finally {
closeProducer(producer, inTransaction());
}
}
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
if (!inTx) {
producer.close(this.closeTimeout);
}
}
这意味着它创建了一个生产者,但因为它是事务性的,所以不会被关闭。
此行为导致在 POINT B 上再次运行上下文并发送消息导致 javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-mytest-0
异常。
为什么 KafkaTemplate 不关闭这些生产者?
另一个问题是,当在 POINT B 上创建新的 KafkaTemplate 时,这些生产者会发生什么?
最后一个问题是如果我将 producerPerConsumerPartition
属性 更改为 true
提到的应用程序仍然使用 producer-mytest-0[=55 注册生产者 Mbean =] 并且在命名时不遵循 groupid.topic.partition
模式。这是正确的行为吗?
更新:
我明白什么时候调用了 KafkaTemplate executeInTransaction
。在 finally 块中,它调用生产者的关闭,因为它是逻辑关闭,所以在 CloseSafeProducer
上调用以下代码并将其放入缓存中:
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
this.delegate.close(closeTimeout);
}
这使得当上下文关闭时,DefaultKafkaProducerFactory
的 destroy
方法会清除缓存并物理关闭生产者。但在我的情况下,应用程序上下文已创建,但在使用和生成任何消息之前,上下文已关闭,只有 KafkaTemplate
的 flush
方法在内部被调用,这迫使它创建一个事务性生产者但不把它在缓存中。由于我没有启动生产者,而 KafkaTemplate 是在刷新时启动的,所以 DefaultKafkaProducerFactory
在使用它们之前将它们放入缓存中不是很好吗?
如果此模板操作正在参与在模板外启动的事务,则无法关闭生产者。
即使关闭,它也只是 "logically" 关闭 - 缓存以供其他操作重用。
Is it a correct behaviour?
是,对于生产者发起的交易;当消费者发起交易时使用替代名称。
InstanceAlreadyExistsException
问题仅仅是因为您正在创建两个具有相同配置的应用程序上下文。你为什么要这样做?
我用 spring 集成了 kafka 3.2.1.RELEASE 和 kafka-clients 2.5[=55 编写了一个简单的 Kafka 应用程序=]学习kafka交易。
它从一个主题接收消息并将它们发送到另一个主题。 beans.xml文件如下
<int-kafka:message-driven-channel-adapter
listener-container="container"
auto-startup="true"
send-timeout="30000"
channel="channelA"/>
<bean id="container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" parent="kafkaMessageListenerContainerAbstract">
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg
name="topics"
value="test"/>
<property name="transactionManager" ref="KafkaTransactionManager"/>
</bean>
</constructor-arg>
</bean>
.
.
.
<int-kafka:outbound-channel-adapter kafka-template="kafkaTemplate"
auto-startup="true"
channel="channelB"
topic="output"/>
<bean id="dbsenderTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="bootstrap.servers" value="localhost:9092"/>
</map>
</constructor-arg>
<property name="transactionIdPrefix" value="mytest-"/>
<property name="producerPerConsumerPartition" value="false"/>
</bean>
</constructor-arg>
</bean>
启动应用程序的代码如下:
GenericXmlApplicationContext tempContext = new GenericXmlApplicationContext("beans.xml");
tempContext.close();
//POINT A.
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
GenericXmlApplicationContext context = new GenericXmlApplicationContext();
context.load("beans.xml");
context.refresh();
//POINT B
在 POINT A 我刚刚关闭上下文以检查哪些 bean 已关闭,并休眠 60 秒以便有时间检查 JMX 控制台。我注意到即使上下文已关闭,但生产者仍在 JMX 中注册。之后我跟踪代码并注意到在上下文关闭 KafkaTemplate 时调用以下代码:
public void flush() {
Producer<K, V> producer = getTheProducer();
try {
producer.flush();
}
finally {
closeProducer(producer, inTransaction());
}
}
protected void closeProducer(Producer<K, V> producer, boolean inTx) {
if (!inTx) {
producer.close(this.closeTimeout);
}
}
这意味着它创建了一个生产者,但因为它是事务性的,所以不会被关闭。
此行为导致在 POINT B 上再次运行上下文并发送消息导致 javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-mytest-0
异常。
为什么 KafkaTemplate 不关闭这些生产者?
另一个问题是,当在 POINT B 上创建新的 KafkaTemplate 时,这些生产者会发生什么?
最后一个问题是如果我将 producerPerConsumerPartition
属性 更改为 true
提到的应用程序仍然使用 producer-mytest-0[=55 注册生产者 Mbean =] 并且在命名时不遵循 groupid.topic.partition
模式。这是正确的行为吗?
更新:
我明白什么时候调用了 KafkaTemplate executeInTransaction
。在 finally 块中,它调用生产者的关闭,因为它是逻辑关闭,所以在 CloseSafeProducer
上调用以下代码并将其放入缓存中:
if (!this.cache.contains(this)
&& !this.cache.offer(this)) {
this.delegate.close(closeTimeout);
}
这使得当上下文关闭时,DefaultKafkaProducerFactory
的 destroy
方法会清除缓存并物理关闭生产者。但在我的情况下,应用程序上下文已创建,但在使用和生成任何消息之前,上下文已关闭,只有 KafkaTemplate
的 flush
方法在内部被调用,这迫使它创建一个事务性生产者但不把它在缓存中。由于我没有启动生产者,而 KafkaTemplate 是在刷新时启动的,所以 DefaultKafkaProducerFactory
在使用它们之前将它们放入缓存中不是很好吗?
如果此模板操作正在参与在模板外启动的事务,则无法关闭生产者。
即使关闭,它也只是 "logically" 关闭 - 缓存以供其他操作重用。
Is it a correct behaviour?
是,对于生产者发起的交易;当消费者发起交易时使用替代名称。
InstanceAlreadyExistsException
问题仅仅是因为您正在创建两个具有相同配置的应用程序上下文。你为什么要这样做?