Spring 云流kafka事务配置
Spring cloud stream kafka transaction configuration
我正在遵循 this Spring-cloud-stream-kafka
的模板,但在制作生产者方法 transactional
时卡住了。我之前没有使用过 kafka
,所以需要帮助以防 kafka
中需要任何配置更改
如果没有添加事务配置它工作正常但是当添加事务配置时它在启动时超时 -
2020-11-21 15:07:55.349 ERROR 20432 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
下面是我对 Spring-cloud-stream
的设置
pom.xml
<properties>
<java.version>11</java.version>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<kafka-avro-serializer.version>5.2.1</kafka-avro-serializer.version>
<avro.version>1.8.2</avro.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
事务管理器
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
application.yml
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
concurrency: 3
output:
destination: employee-details
content-type: application/*+avro
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
acks: all
max.block.ms: 60000
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
transaction:
transactionIdPrefix: tx-
producer:
enable:
idempotence: true
# requiredAcks: all
brokers:
- localhost:9094
我是 运行 kafka
minikube
,下面是我的主题的配置
[2020-11-21 06:18:21,655] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
Topic: employee-details PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: employee-details Partition: 0 Leader: 0 Replicas: 0 Isr: 0
来自 kafka 控制器的日志
TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
查看服务器日志。
如果事务状态日志的副本少于所需的副本,事务生产者将超时。默认情况下需要 3 个副本,并且至少需要同步 2 个副本。
见
transaction.state.log.replication.factor
和 transaction.state.log.min.isr
.
我正在遵循 this Spring-cloud-stream-kafka
的模板,但在制作生产者方法 transactional
时卡住了。我之前没有使用过 kafka
,所以需要帮助以防 kafka
如果没有添加事务配置它工作正常但是当添加事务配置时它在启动时超时 -
2020-11-21 15:07:55.349 ERROR 20432 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
下面是我对 Spring-cloud-stream
的设置pom.xml
<properties>
<java.version>11</java.version>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<kafka-avro-serializer.version>5.2.1</kafka-avro-serializer.version>
<avro.version>1.8.2</avro.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
事务管理器
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder("kafka",
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
application.yml
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
bindings:
input:
destination: employee-details
content-type: application/*+avro
group: group-1
concurrency: 3
output:
destination: employee-details
content-type: application/*+avro
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
acks: all
max.block.ms: 60000
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
transaction:
transactionIdPrefix: tx-
producer:
enable:
idempotence: true
# requiredAcks: all
brokers:
- localhost:9094
我是 运行 kafka
minikube
,下面是我的主题的配置
[2020-11-21 06:18:21,655] INFO [ZooKeeperClient] Connected. (kafka.zookeeper.ZooKeeperClient)
Topic: employee-details PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: employee-details Partition: 0 Leader: 0 Replicas: 0 Isr: 0
来自 kafka 控制器的日志
TRACE [Controller id=0] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] DEBUG [Controller id=0] Topics not in preferred replica for broker 0 Map() (kafka.controller.KafkaController)
[2020-11-24 06:56:21,379] TRACE [Controller id=0] Leader imbalance ratio for broker 0 is 0.0 (kafka.controller.KafkaController)
查看服务器日志。
如果事务状态日志的副本少于所需的副本,事务生产者将超时。默认情况下需要 3 个副本,并且至少需要同步 2 个副本。
见
transaction.state.log.replication.factor
和 transaction.state.log.min.isr
.