Spring Cloud Stream Kafka 消费者应用程序不允许添加供应商

Spring Cloud Stream Kafka Consumer application doesn't allow adding Supplier

我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来消费来自主题的消息并使用 FIX 协议将它们传递给第三方。

到目前为止一切正常,但现在第三方发回了回复,我想将它们制作到一个新主题。当我在现有代码中添加 Supplier 时,它开始表现得很奇怪。 bootstrap.servers 配置从 remoteHost 代理更改为本地主机并开始出现以下错误:

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.

如果尝试连接本地主机会出错,因为没有任何 Kafka 设置。

下面是我的 application.yml 文件:

spring.cloud.stream.function.definition: amerData;emeaData;ackResponse  #added new ackResponse here
spring.cloud.stream.kafka.streams:
  binder:
    brokers: remoteHost:9092
    configuration:
      schema.registry.url: remoteHost:8081
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
  bindings:
    ackResponse-out-0:           #new addition
      producer.configuration:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

spring.cloud.stream.bindings:
  amerData-in-0:
    destination: topic1
  emeaData-in-0:
    destination: topic2
  ackResponse-out-0:       #new addition
    destination: topic3

并尝试了供应商的可能选项 -> Supplier<String> ackResponse()Supplier<Message<String>> ackResponse() 它只是在我做 Supplier<KStream<String,String>> ackResponse() 时不会将 remoteHost 更改为 localhost,然后 bootstrap.servers 显示配置的远程主机,但这是不正确的,我无法写入收到的响应(主要是一个字符串或 json) 像这样添加到 Kafka 主题。

我确实根据需要将我的消费者配置为 Consumer<KStream<String, AVROPOJO1>> amerData()Consumer<KStream<String, AVROPOJO2>> emeaData(),它们工作正常。

我是不是遗漏了或弄乱了什么?我们不能在同一个 spring 云流应用程序中同时拥有 producer/consumer 吗?使用 Streambridge 也无法解决这个问题。有人可以帮忙吗?

如果您像之前那样添加一个 Supplier bean,它将成为使用基于 MessageChannel 的 Kafka 绑定器的常规生产者。您需要在项目中添加常规 Kafka 活页夹 (spring-cloud-stream-binder-kafka)。那个的绑定应该在 spring.cloud.stream.kafka.bindings 下。我看到您在上面 spring.cloud.stream.kafka.streams.bindings 下定义了它。我想知道这是否是问题所在?