Spring 无法获取分区信息错误的 Cloud Stream 项目
Spring Cloud Stream project with Failed to obtain partition information Error
当我使用这个配置时:
spring:
cloud:
stream:
kafka:
binder:
min-partition-count: 1
replication-factor: 1
kafka:
producer:
transaction-id-prefix: tx-
retries: 1
acks: all
我的应用程序正常启动,但我在控制台输出中看到的 transactional.id 显示为空。
我已将此额外配置(事务)应用到 spring-cloud-stream,以获得正确的 transactional.id:
spring:
cloud:
stream:
kafka:
binder:
min-partition-count: 1
replication-factor: 1
transaction:
transaction-id-prefix: txl-
kafka:
producer:
transaction-id-prefix: tx-
retries: 1
acks: all
但是服务没有启动成功,控制台输出显示:
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.435 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.437 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.437 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1606336069435
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.597 INFO [poc,,,] 1 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
app_poc.1.nqc57nvh0qhr@ms-poc-02 | acks = -1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | batch.size = 16384
app_poc.1.nqc57nvh0qhr@ms-poc-02 | bootstrap.servers = [kafka:29092]
app_poc.1.nqc57nvh0qhr@ms-poc-02 | buffer.memory = 33554432
app_poc.1.nqc57nvh0qhr@ms-poc-02 | client.dns.lookup = default
app_poc.1.nqc57nvh0qhr@ms-poc-02 | client.id = producer-txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | compression.type = none
app_poc.1.nqc57nvh0qhr@ms-poc-02 | connections.max.idle.ms = 540000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | delivery.timeout.ms = 120000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | enable.idempotence = true
app_poc.1.nqc57nvh0qhr@ms-poc-02 | interceptor.classes = []
app_poc.1.nqc57nvh0qhr@ms-poc-02 | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02 | linger.ms = 0
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.block.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.in.flight.requests.per.connection = 5
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.request.size = 1048576
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metadata.max.age.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metadata.max.idle.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metric.reporters = []
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.num.samples = 2
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.recording.level = INFO
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.sample.window.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
app_poc.1.nqc57nvh0qhr@ms-poc-02 | receive.buffer.bytes = 32768
app_poc.1.nqc57nvh0qhr@ms-poc-02 | reconnect.backoff.max.ms = 1000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | reconnect.backoff.ms = 50
app_poc.1.nqc57nvh0qhr@ms-poc-02 | request.timeout.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | retries = 1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | retry.backoff.ms = 100
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.client.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.jaas.config = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.kinit.cmd = /usr/bin/kinit
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.min.time.before.relogin = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.service.name = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.ticket.renew.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.ticket.renew.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.buffer.seconds = 300
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.min.period.seconds = 60
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.window.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.mechanism = GSSAPI
app_poc.1.nqc57nvh0qhr@ms-poc-02 | security.protocol = PLAINTEXT
app_poc.1.nqc57nvh0qhr@ms-poc-02 | security.providers = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | send.buffer.bytes = 131072
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.cipher.suites = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.enabled.protocols = [TLSv1.2]
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.endpoint.identification.algorithm = https
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.key.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keymanager.algorithm = SunX509
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.protocol = TLSv1.2
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.provider = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.secure.random.implementation = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.trustmanager.algorithm = PKIX
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02 | transaction.timeout.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | transactional.id = txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02 |
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.599 INFO [poc,,,] 1 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-txl-1, transactionalId=txl-1] Instantiated a transactional producer.
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.623 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.624 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.624 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1606336069623
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.626 INFO [poc,,,] 1 --- [ main] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-txl-1, transactionalId=txl-1] Invoking InitProducerId for the first time in order to acquire a producer ID
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.637 INFO [poc,,,] 1 --- [ producer-txl-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-txl-1, transactionalId=txl-1] Cluster ID: 3wV8FW9yTfKSVhNwNMoC2Q
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:28:49.630 ERROR [poc,,,] 1 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
app_poc.1.nqc57nvh0qhr@ms-poc-02 |
app_poc.1.nqc57nvh0qhr@ms-poc-02 | org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
获取分区信息失败
我认为我的配置有问题(肯定)
我的意图是使用 Exactly Once,以避免重复。这就是为什么我想看到 transactional.id
额外信息:
我的消费者是同时使用 JPA 和 Kafka 事务的事务(使用链式 KafkaTransactionManager 进行事务同步)
已编辑:
在 @Configuration class 我有这些 beans
@Bean
@Primary
fun transactionManager(em: EntityManagerFactory): JpaTransactionManager {
return JpaTransactionManager(em)
}
@Bean
fun kafkaTransactionManager(producerFactory: ProducerFactory<Any, Any>): KafkaTransactionManager<*, *> {
return KafkaTransactionManager(producerFactory)
}
@Bean
fun chainedTransactionManager(
kafkaTransactionManager: KafkaTransactionManager<String, String>,
transactionManager: JpaTransactionManager,
): ChainedKafkaTransactionManager<Any, Any> {
return ChainedKafkaTransactionManager(kafkaTransactionManager, transactionManager)
}
@Bean
fun kafkaListenerContainerFactory(
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
kafkaConsumerFactory: ConsumerFactory<Any, Any>,
chainedKafkaTransactionManager: ChainedKafkaTransactionManager<Any, Any>,
): ConcurrentKafkaListenerContainerFactory<*, *> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
configurer.configure(factory, kafkaConsumerFactory)
factory.containerProperties.transactionManager = chainedKafkaTransactionManager
return factory
}
而我的处理器class与对应的@Transactional
@EnableKafka
@EnableBinding(Channels::class)
@Service
@Transactional
class EventProcessor()
...
根据我显示的第一个配置,事务同步有效。
我使用此日志记录配置来确认初始化事务同步和清除事务同步 TransactionSynchronizationManager。
logging:
level:
org.springframework.kafka: trace
org.springframework.transaction: trace
参见 。
您很可能没有足够的副本或同步副本来处理事务日志主题。
using ChainedKafkaTransactionManager
仅在 spring-cloud-stream(开箱即用)中支持仅限生产者的交易。对于 consume->produce->publishToKafka
操作,您必须在侦听器上使用 @Transactional
,仅使用 JPA 事务管理器;结果类似于事务同步。
或者,您必须将正确配置的 CKTM 注入到绑定的侦听器容器中。
您需要展示您的代码和其余配置。
当我使用这个配置时:
spring:
cloud:
stream:
kafka:
binder:
min-partition-count: 1
replication-factor: 1
kafka:
producer:
transaction-id-prefix: tx-
retries: 1
acks: all
我的应用程序正常启动,但我在控制台输出中看到的 transactional.id 显示为空。 我已将此额外配置(事务)应用到 spring-cloud-stream,以获得正确的 transactional.id:
spring:
cloud:
stream:
kafka:
binder:
min-partition-count: 1
replication-factor: 1
transaction:
transaction-id-prefix: txl-
kafka:
producer:
transaction-id-prefix: tx-
retries: 1
acks: all
但是服务没有启动成功,控制台输出显示:
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.435 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.437 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.437 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1606336069435
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.597 INFO [poc,,,] 1 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
app_poc.1.nqc57nvh0qhr@ms-poc-02 | acks = -1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | batch.size = 16384
app_poc.1.nqc57nvh0qhr@ms-poc-02 | bootstrap.servers = [kafka:29092]
app_poc.1.nqc57nvh0qhr@ms-poc-02 | buffer.memory = 33554432
app_poc.1.nqc57nvh0qhr@ms-poc-02 | client.dns.lookup = default
app_poc.1.nqc57nvh0qhr@ms-poc-02 | client.id = producer-txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | compression.type = none
app_poc.1.nqc57nvh0qhr@ms-poc-02 | connections.max.idle.ms = 540000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | delivery.timeout.ms = 120000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | enable.idempotence = true
app_poc.1.nqc57nvh0qhr@ms-poc-02 | interceptor.classes = []
app_poc.1.nqc57nvh0qhr@ms-poc-02 | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02 | linger.ms = 0
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.block.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.in.flight.requests.per.connection = 5
app_poc.1.nqc57nvh0qhr@ms-poc-02 | max.request.size = 1048576
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metadata.max.age.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metadata.max.idle.ms = 300000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metric.reporters = []
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.num.samples = 2
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.recording.level = INFO
app_poc.1.nqc57nvh0qhr@ms-poc-02 | metrics.sample.window.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
app_poc.1.nqc57nvh0qhr@ms-poc-02 | receive.buffer.bytes = 32768
app_poc.1.nqc57nvh0qhr@ms-poc-02 | reconnect.backoff.max.ms = 1000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | reconnect.backoff.ms = 50
app_poc.1.nqc57nvh0qhr@ms-poc-02 | request.timeout.ms = 30000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | retries = 1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | retry.backoff.ms = 100
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.client.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.jaas.config = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.kinit.cmd = /usr/bin/kinit
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.min.time.before.relogin = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.service.name = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.ticket.renew.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.kerberos.ticket.renew.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.callback.handler.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.class = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.buffer.seconds = 300
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.min.period.seconds = 60
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.window.factor = 0.8
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.login.refresh.window.jitter = 0.05
app_poc.1.nqc57nvh0qhr@ms-poc-02 | sasl.mechanism = GSSAPI
app_poc.1.nqc57nvh0qhr@ms-poc-02 | security.protocol = PLAINTEXT
app_poc.1.nqc57nvh0qhr@ms-poc-02 | security.providers = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | send.buffer.bytes = 131072
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.cipher.suites = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.enabled.protocols = [TLSv1.2]
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.endpoint.identification.algorithm = https
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.key.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keymanager.algorithm = SunX509
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.keystore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.protocol = TLSv1.2
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.provider = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.secure.random.implementation = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.trustmanager.algorithm = PKIX
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.location = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.password = null
app_poc.1.nqc57nvh0qhr@ms-poc-02 | ssl.truststore.type = JKS
app_poc.1.nqc57nvh0qhr@ms-poc-02 | transaction.timeout.ms = 60000
app_poc.1.nqc57nvh0qhr@ms-poc-02 | transactional.id = txl-1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
app_poc.1.nqc57nvh0qhr@ms-poc-02 |
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.599 INFO [poc,,,] 1 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-txl-1, transactionalId=txl-1] Instantiated a transactional producer.
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.623 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.624 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.624 INFO [poc,,,] 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1606336069623
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.626 INFO [poc,,,] 1 --- [ main] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-txl-1, transactionalId=txl-1] Invoking InitProducerId for the first time in order to acquire a producer ID
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:27:49.637 INFO [poc,,,] 1 --- [ producer-txl-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-txl-1, transactionalId=txl-1] Cluster ID: 3wV8FW9yTfKSVhNwNMoC2Q
app_poc.1.nqc57nvh0qhr@ms-poc-02 | 2020-11-25 20:28:49.630 ERROR [poc,,,] 1 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
app_poc.1.nqc57nvh0qhr@ms-poc-02 |
app_poc.1.nqc57nvh0qhr@ms-poc-02 | org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
获取分区信息失败 我认为我的配置有问题(肯定)
我的意图是使用 Exactly Once,以避免重复。这就是为什么我想看到 transactional.id
额外信息: 我的消费者是同时使用 JPA 和 Kafka 事务的事务(使用链式 KafkaTransactionManager 进行事务同步)
已编辑: 在 @Configuration class 我有这些 beans
@Bean
@Primary
fun transactionManager(em: EntityManagerFactory): JpaTransactionManager {
return JpaTransactionManager(em)
}
@Bean
fun kafkaTransactionManager(producerFactory: ProducerFactory<Any, Any>): KafkaTransactionManager<*, *> {
return KafkaTransactionManager(producerFactory)
}
@Bean
fun chainedTransactionManager(
kafkaTransactionManager: KafkaTransactionManager<String, String>,
transactionManager: JpaTransactionManager,
): ChainedKafkaTransactionManager<Any, Any> {
return ChainedKafkaTransactionManager(kafkaTransactionManager, transactionManager)
}
@Bean
fun kafkaListenerContainerFactory(
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer,
kafkaConsumerFactory: ConsumerFactory<Any, Any>,
chainedKafkaTransactionManager: ChainedKafkaTransactionManager<Any, Any>,
): ConcurrentKafkaListenerContainerFactory<*, *> {
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
configurer.configure(factory, kafkaConsumerFactory)
factory.containerProperties.transactionManager = chainedKafkaTransactionManager
return factory
}
而我的处理器class与对应的@Transactional
@EnableKafka
@EnableBinding(Channels::class)
@Service
@Transactional
class EventProcessor()
...
根据我显示的第一个配置,事务同步有效。
我使用此日志记录配置来确认初始化事务同步和清除事务同步 TransactionSynchronizationManager。
logging:
level:
org.springframework.kafka: trace
org.springframework.transaction: trace
参见
您很可能没有足够的副本或同步副本来处理事务日志主题。
using
ChainedKafkaTransactionManager
仅在 spring-cloud-stream(开箱即用)中支持仅限生产者的交易。对于 consume->produce->publishToKafka
操作,您必须在侦听器上使用 @Transactional
,仅使用 JPA 事务管理器;结果类似于事务同步。
或者,您必须将正确配置的 CKTM 注入到绑定的侦听器容器中。
您需要展示您的代码和其余配置。