Spring Cloud Stream Kafka 消费者应用程序不允许添加供应商
Spring Cloud Stream Kafka Consumer application doesn't allow adding Supplier
我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来消费来自主题的消息并使用 FIX 协议将它们传递给第三方。
到目前为止一切正常,但现在第三方发回了回复,我想将它们制作到一个新主题。当我在现有代码中添加 Supplier 时,它开始表现得很奇怪。 bootstrap.servers
配置从 remoteHost 代理更改为本地主机并开始出现以下错误:
[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.
如果尝试连接本地主机会出错,因为没有任何 Kafka 设置。
下面是我的 application.yml 文件:
spring.cloud.stream.function.definition: amerData;emeaData;ackResponse #added new ackResponse here
spring.cloud.stream.kafka.streams:
binder:
brokers: remoteHost:9092
configuration:
schema.registry.url: remoteHost:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
ackResponse-out-0: #new addition
producer.configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings:
amerData-in-0:
destination: topic1
emeaData-in-0:
destination: topic2
ackResponse-out-0: #new addition
destination: topic3
并尝试了供应商的可能选项 -> Supplier<String> ackResponse()
或 Supplier<Message<String>> ackResponse()
它只是在我做 Supplier<KStream<String,String>> ackResponse()
时不会将 remoteHost 更改为 localhost,然后 bootstrap.servers 显示配置的远程主机,但这是不正确的,我无法写入收到的响应(主要是一个字符串或 json) 像这样添加到 Kafka 主题。
我确实根据需要将我的消费者配置为 Consumer<KStream<String, AVROPOJO1>> amerData()
和 Consumer<KStream<String, AVROPOJO2>> emeaData()
,它们工作正常。
我是不是遗漏了或弄乱了什么?我们不能在同一个 spring 云流应用程序中同时拥有 producer/consumer 吗?使用 Streambridge
也无法解决这个问题。有人可以帮忙吗?
如果您像之前那样添加一个 Supplier
bean,它将成为使用基于 MessageChannel
的 Kafka 绑定器的常规生产者。您需要在项目中添加常规 Kafka 活页夹 (spring-cloud-stream-binder-kafka
)。那个的绑定应该在 spring.cloud.stream.kafka.bindings
下。我看到您在上面 spring.cloud.stream.kafka.streams.bindings
下定义了它。我想知道这是否是问题所在?
我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来消费来自主题的消息并使用 FIX 协议将它们传递给第三方。
到目前为止一切正常,但现在第三方发回了回复,我想将它们制作到一个新主题。当我在现有代码中添加 Supplier 时,它开始表现得很奇怪。 bootstrap.servers
配置从 remoteHost 代理更改为本地主机并开始出现以下错误:
[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.
如果尝试连接本地主机会出错,因为没有任何 Kafka 设置。
下面是我的 application.yml 文件:
spring.cloud.stream.function.definition: amerData;emeaData;ackResponse #added new ackResponse here
spring.cloud.stream.kafka.streams:
binder:
brokers: remoteHost:9092
configuration:
schema.registry.url: remoteHost:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
ackResponse-out-0: #new addition
producer.configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings:
amerData-in-0:
destination: topic1
emeaData-in-0:
destination: topic2
ackResponse-out-0: #new addition
destination: topic3
并尝试了供应商的可能选项 -> Supplier<String> ackResponse()
或 Supplier<Message<String>> ackResponse()
它只是在我做 Supplier<KStream<String,String>> ackResponse()
时不会将 remoteHost 更改为 localhost,然后 bootstrap.servers 显示配置的远程主机,但这是不正确的,我无法写入收到的响应(主要是一个字符串或 json) 像这样添加到 Kafka 主题。
我确实根据需要将我的消费者配置为 Consumer<KStream<String, AVROPOJO1>> amerData()
和 Consumer<KStream<String, AVROPOJO2>> emeaData()
,它们工作正常。
我是不是遗漏了或弄乱了什么?我们不能在同一个 spring 云流应用程序中同时拥有 producer/consumer 吗?使用 Streambridge
也无法解决这个问题。有人可以帮忙吗?
如果您像之前那样添加一个 Supplier
bean,它将成为使用基于 MessageChannel
的 Kafka 绑定器的常规生产者。您需要在项目中添加常规 Kafka 活页夹 (spring-cloud-stream-binder-kafka
)。那个的绑定应该在 spring.cloud.stream.kafka.bindings
下。我看到您在上面 spring.cloud.stream.kafka.streams.bindings
下定义了它。我想知道这是否是问题所在?