一个 Kafka Streams 拓扑(Spring Cloud Stream)中是否可以使用多个代理?

Is multiple brokers possible in one Kafka Streams Topology(Spring Cloud Stream)?

我们有一个从输入主题(使用绑定器:x - 代理地址:x)读取的拓扑,使用 [=27= 处理记录并将其写入输出主题(使用绑定器:y - 代理地址:y) ] 云流kafka流。记录不会写入输出主题。但是,当我将活页夹(代理地址)设置为相同(同时使用 x 或 y)时,记录将写入主题 y。 我应该在拓扑中使用相同的代理吗?我需要为输入和输出主题使用不同的绑定器和代理吗?我该如何解决?

错误: 2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] o.a.k.c.NetworkClient : [Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread- 1-producer] 获取相关 ID 为 182 的元数据时出错:{inputTopic=UNKNOWN_TOPIC_OR_PARTITION}

Application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          bindings:
            doob-output-topic-out:
              applicationId: doob-output-topic-out
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
            doob-input-topic-in:
              consumer:
                applicationId: doob-input-topic-in
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
      binders:
        outputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${1kafka.brokers1}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
        inputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${2kafka.brokers2}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
                          max:
                            request:
                              size: 20000000
      bindings:
        doob-output-topic-out:
          destination: outputTopic
          binder: outputKafka
          producer:
            partition-count: 8
        doob-input-topic-in:
          destination: inputTopic
          binder: inputKafka

manage:
  storeName: trackList15

源代码:

    @StreamListener(BASE_TOPIC_INPUT)
    @SendTo(BASE_TOPIC_OUTPUT)
    public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
        return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
                .reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
                        .withKeySerde(Serdes.String()).
                                withValueSerde(baseDataSerde)).toStream()
                .peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
    }

在单个 Kafka Streams 处理器中,无法从一个集群读取并写入另一个集群。但是,在单个应用程序 (JVM) 中,您可以有多个处理器,每个处理器都与单个 Kafka 集群交互。

有关详细信息,请参阅此 thread

使用 Spring Cloud Stream 的解决方法如下。

  1. 让您的 Kafka Streams 处理器在同一个集群中消费和生产。
  2. 然后,使用基于常规消息通道的 Kafka 绑定器(不是 Kafka Streams 绑定器)编写另一个简单的处理器。在此模型中,您可以应用上面的多绑定器模式,即输入从您在 Kafka Streams 处理器中写入的主题接收,然后输出转到另一个集群中的主题。该处理器只是成为一个直通处理器,将数据从集群 1 移动到集群 2。