Spring 使用 Spring Cloud Stream Kafka Binder + Kafka Streams Binder 启动应用程序不工作 - 生产者不发送消息
Spring Boot application using the Spring Cloud Stream Kafka Binder + Kafka Streams Binder not working - Producer doesn't send messages
我的 Spring 带有 SCS Hoshram.SR6 的 Boot 2.3.1 应用程序使用的是 Kafka Streams Binder。我需要添加一个将在应用程序的另一部分中使用的 Kafka Producer,因此我添加了 kafka 活页夹。问题是生产者不工作,抛出异常:
19:49:40.082 [scheduling-1] [900cdeb11106e199] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:332)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:79)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:222)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding(BindingService.java:336)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:368)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:342)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:319)
这是我的配置:
spring:
cloud:
function:
definition: myProducer
stream:
bindings:
myKStream-in-0:
destination: my-kstream-topic
producer:
useNativeEncoding: true
myProducer-out-0:
destination: producer-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: true
retries: 0x7fffffff
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
request:
timeout:
ms: 5000
streams:
binder:
brokers: ${kafka.brokers:localhost}
configuration:
application:
id: ${spring.application.name}
server: ${POD_IP:localhost}:${local.server.port:8080}
schema:
registry:
url: ${schema-registry.url}
key:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
processing:
guarantee: exactly_once
replication:
factor: 3
group:
id: kpi
deserialization-exception-handler: logandcontinue
min-partition-count: 3
replication-factor: 3
state-store-retry:
max-attempts: 20
backoff-period: 1500
这可能是什么问题?
更新
我调整配置如下:
spring:
cloud:
function:
definition: myProducer
stream:
function:
definition: myKStream
现在我看不到任何异常,但消息没有切入主题。
在另一个仅使用 kafka 活页夹的应用程序中它完美地工作:
@Configuration
class KafkaProducerConfiguration {
@Bean
fun myProducerProcessor(): EmitterProcessor<Message<XXX>> {
return EmitterProcessor.create()
}
@Bean
fun myProducer(): Supplier<Flux<Message<XXX>>> {
return Supplier { myProducerProcessor() }
}
}
...
@Component
class XXXProducer(@Qualifier("myProducerProcessor") private val myProducerProcessor: EmitterProcessor<Message<XXX>>) {
fun send(...): Mono<Void> {
return Mono.defer {
myProducerProcessor.onNext(message)
Mono.empty()
}
}
更新 2
我设置logging.level.org.springframework.cloud.stream:调试
在日志中显示以下跟踪:
o.s.c.s.binder.DefaultBinderFactory - Creating binder: kstream
但是 Creating binder: kafka
.
什么都没有
我错过了 kafka 和 kstreams 的多个活页夹配置 (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder)
因此,我必须设置:spring.cloud.stream.function.definition=myProducer;myKStream
我的 Spring 带有 SCS Hoshram.SR6 的 Boot 2.3.1 应用程序使用的是 Kafka Streams Binder。我需要添加一个将在应用程序的另一部分中使用的 Kafka Producer,因此我添加了 kafka 活页夹。问题是生产者不工作,抛出异常:
19:49:40.082 [scheduling-1] [900cdeb11106e199] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:332)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:79)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:222)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding(BindingService.java:336)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:368)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:342)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:319)
这是我的配置:
spring:
cloud:
function:
definition: myProducer
stream:
bindings:
myKStream-in-0:
destination: my-kstream-topic
producer:
useNativeEncoding: true
myProducer-out-0:
destination: producer-topic
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: true
retries: 0x7fffffff
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
request:
timeout:
ms: 5000
streams:
binder:
brokers: ${kafka.brokers:localhost}
configuration:
application:
id: ${spring.application.name}
server: ${POD_IP:localhost}:${local.server.port:8080}
schema:
registry:
url: ${schema-registry.url}
key:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
processing:
guarantee: exactly_once
replication:
factor: 3
group:
id: kpi
deserialization-exception-handler: logandcontinue
min-partition-count: 3
replication-factor: 3
state-store-retry:
max-attempts: 20
backoff-period: 1500
这可能是什么问题?
更新
我调整配置如下:
spring:
cloud:
function:
definition: myProducer
stream:
function:
definition: myKStream
现在我看不到任何异常,但消息没有切入主题。
在另一个仅使用 kafka 活页夹的应用程序中它完美地工作:
@Configuration
class KafkaProducerConfiguration {
@Bean
fun myProducerProcessor(): EmitterProcessor<Message<XXX>> {
return EmitterProcessor.create()
}
@Bean
fun myProducer(): Supplier<Flux<Message<XXX>>> {
return Supplier { myProducerProcessor() }
}
}
...
@Component
class XXXProducer(@Qualifier("myProducerProcessor") private val myProducerProcessor: EmitterProcessor<Message<XXX>>) {
fun send(...): Mono<Void> {
return Mono.defer {
myProducerProcessor.onNext(message)
Mono.empty()
}
}
更新 2
我设置logging.level.org.springframework.cloud.stream:调试
在日志中显示以下跟踪:
o.s.c.s.binder.DefaultBinderFactory - Creating binder: kstream
但是 Creating binder: kafka
.
我错过了 kafka 和 kstreams 的多个活页夹配置 (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder)
因此,我必须设置:spring.cloud.stream.function.definition=myProducer;myKStream