使用 JPA 启动时的分区分配和 ChainedKafkaTransactionManager

On Partitions Assignment and ChainedKafkaTransactionManager at startup with JPA

我有很多交易型消费者 ChainedKafkaTransactionManager 基于 JpaTransactionManagerKafkaTransactionManager(所有 @KafkaListener)。

JPA 需要设置一个 ThreadLocal 变量,以便能够知道要连接到哪个数据库(是租户 ID)。

启动应用程序时,在 onPartitionsAssigned listener 中,spring-kafka 正在尝试创建链式交易,因此尝试创建 JPA 交易,但没有租户集,然后就失败了。

该租户是通过 http 过滤器 and/or kafka 拦截器(通过事件 headers)设置的。

我尝试将 auto-wired KafkaListenerEndpointRegistrysetAutoStartup(false) 一起使用,但我发现消费者没有收到任何事件,可能是因为它们尚未初始化(我认为他们被初始化 on-demand).

如果我设置一个模拟租户 ID 并在应用程序准备就绪时调用 registry.start(),初始化似乎是在其他线程中完成的(可能是因为我使用的是 ConcurrentKafkaListenerContainerFactory),所以没用。

有没有办法避免初始 onPartitionsAssigned 侦听器上的 JPA 事务,这是消费者初始化的一部分?

如果您的链接 TM 首先有 KafkaTM,然后是 JPA TM(这是正常情况),您只需将 Kafka TM 注入容器并使用 @Transactional(与只是监听器上的 JPA TM)以在调用监听器时启动 JPA 事务。

事务提交之间的时间将略有增加,但它会提供类似的功能。

如果这对您不起作用,请打开 GitHub 问题;我们可以禁用分配时的初始提交,或者完全不使用事务(可选)。