使用 JPA 启动时的分区分配和 ChainedKafkaTransactionManager
On Partitions Assignment and ChainedKafkaTransactionManager at startup with JPA
我有很多交易型消费者 ChainedKafkaTransactionManager
基于 JpaTransactionManager
和 KafkaTransactionManager
(所有 @KafkaListener
)。
JPA 需要设置一个 ThreadLocal 变量,以便能够知道要连接到哪个数据库(是租户 ID)。
启动应用程序时,在 onPartitionsAssigned
listener 中,spring-kafka 正在尝试创建链式交易,因此尝试创建 JPA 交易,但没有租户集,然后就失败了。
该租户是通过 http 过滤器 and/or kafka 拦截器(通过事件 headers)设置的。
我尝试将 auto-wired KafkaListenerEndpointRegistry
与 setAutoStartup(false)
一起使用,但我发现消费者没有收到任何事件,可能是因为它们尚未初始化(我认为他们被初始化 on-demand).
如果我设置一个模拟租户 ID 并在应用程序准备就绪时调用 registry.start()
,初始化似乎是在其他线程中完成的(可能是因为我使用的是 ConcurrentKafkaListenerContainerFactory
),所以没用。
有没有办法避免初始 onPartitionsAssigned
侦听器上的 JPA 事务,这是消费者初始化的一部分?
如果您的链接 TM 首先有 KafkaTM,然后是 JPA TM(这是正常情况),您只需将 Kafka TM 注入容器并使用 @Transactional
(与只是监听器上的 JPA TM)以在调用监听器时启动 JPA 事务。
事务提交之间的时间将略有增加,但它会提供类似的功能。
如果这对您不起作用,请打开 GitHub 问题;我们可以禁用分配时的初始提交,或者完全不使用事务(可选)。
我有很多交易型消费者 ChainedKafkaTransactionManager
基于 JpaTransactionManager
和 KafkaTransactionManager
(所有 @KafkaListener
)。
JPA 需要设置一个 ThreadLocal 变量,以便能够知道要连接到哪个数据库(是租户 ID)。
启动应用程序时,在 onPartitionsAssigned
listener 中,spring-kafka 正在尝试创建链式交易,因此尝试创建 JPA 交易,但没有租户集,然后就失败了。
该租户是通过 http 过滤器 and/or kafka 拦截器(通过事件 headers)设置的。
我尝试将 auto-wired KafkaListenerEndpointRegistry
与 setAutoStartup(false)
一起使用,但我发现消费者没有收到任何事件,可能是因为它们尚未初始化(我认为他们被初始化 on-demand).
如果我设置一个模拟租户 ID 并在应用程序准备就绪时调用 registry.start()
,初始化似乎是在其他线程中完成的(可能是因为我使用的是 ConcurrentKafkaListenerContainerFactory
),所以没用。
有没有办法避免初始 onPartitionsAssigned
侦听器上的 JPA 事务,这是消费者初始化的一部分?
如果您的链接 TM 首先有 KafkaTM,然后是 JPA TM(这是正常情况),您只需将 Kafka TM 注入容器并使用 @Transactional
(与只是监听器上的 JPA TM)以在调用监听器时启动 JPA 事务。
事务提交之间的时间将略有增加,但它会提供类似的功能。
如果这对您不起作用,请打开 GitHub 问题;我们可以禁用分配时的初始提交,或者完全不使用事务(可选)。