一个 Kafka Streams 拓扑(Spring Cloud Stream)中是否可以使用多个代理?
Is multiple brokers possible in one Kafka Streams Topology(Spring Cloud Stream)?
我们有一个从输入主题(使用绑定器:x - 代理地址:x)读取的拓扑,使用 [=27= 处理记录并将其写入输出主题(使用绑定器:y - 代理地址:y) ] 云流kafka流。记录不会写入输出主题。但是,当我将活页夹(代理地址)设置为相同(同时使用 x 或 y)时,记录将写入主题 y。
我应该在拓扑中使用相同的代理吗?我需要为输入和输出主题使用不同的绑定器和代理吗?我该如何解决?
错误:
2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] o.a.k.c.NetworkClient : [Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread- 1-producer] 获取相关 ID 为 182 的元数据时出错:{inputTopic=UNKNOWN_TOPIC_OR_PARTITION}
Application.yml
spring:
cloud:
stream:
kafka:
streams:
bindings:
doob-output-topic-out:
applicationId: doob-output-topic-out
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
doob-input-topic-in:
consumer:
applicationId: doob-input-topic-in
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binders:
outputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${1kafka.brokers1}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
inputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${2kafka.brokers2}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
max:
request:
size: 20000000
bindings:
doob-output-topic-out:
destination: outputTopic
binder: outputKafka
producer:
partition-count: 8
doob-input-topic-in:
destination: inputTopic
binder: inputKafka
manage:
storeName: trackList15
源代码:
@StreamListener(BASE_TOPIC_INPUT)
@SendTo(BASE_TOPIC_OUTPUT)
public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
.reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String()).
withValueSerde(baseDataSerde)).toStream()
.peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
}
在单个 Kafka Streams 处理器中,无法从一个集群读取并写入另一个集群。但是,在单个应用程序 (JVM) 中,您可以有多个处理器,每个处理器都与单个 Kafka 集群交互。
有关详细信息,请参阅此 thread。
使用 Spring Cloud Stream 的解决方法如下。
- 让您的 Kafka Streams 处理器在同一个集群中消费和生产。
- 然后,使用基于常规消息通道的 Kafka 绑定器(不是 Kafka Streams 绑定器)编写另一个简单的处理器。在此模型中,您可以应用上面的多绑定器模式,即输入从您在 Kafka Streams 处理器中写入的主题接收,然后输出转到另一个集群中的主题。该处理器只是成为一个直通处理器,将数据从集群 1 移动到集群 2。
我们有一个从输入主题(使用绑定器:x - 代理地址:x)读取的拓扑,使用 [=27= 处理记录并将其写入输出主题(使用绑定器:y - 代理地址:y) ] 云流kafka流。记录不会写入输出主题。但是,当我将活页夹(代理地址)设置为相同(同时使用 x 或 y)时,记录将写入主题 y。 我应该在拓扑中使用相同的代理吗?我需要为输入和输出主题使用不同的绑定器和代理吗?我该如何解决?
错误: 2021-06-17 12:17:21.483 WARN 20848 --- [read-1-producer] o.a.k.c.NetworkClient : [Producer clientId=inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread- 1-producer] 获取相关 ID 为 182 的元数据时出错:{inputTopic=UNKNOWN_TOPIC_OR_PARTITION}
Application.yml
spring:
cloud:
stream:
kafka:
streams:
bindings:
doob-output-topic-out:
applicationId: doob-output-topic-out
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
doob-input-topic-in:
consumer:
applicationId: doob-input-topic-in
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binders:
outputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${1kafka.brokers1}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
inputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${2kafka.brokers2}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
max:
request:
size: 20000000
bindings:
doob-output-topic-out:
destination: outputTopic
binder: outputKafka
producer:
partition-count: 8
doob-input-topic-in:
destination: inputTopic
binder: inputKafka
manage:
storeName: trackList15
源代码:
@StreamListener(BASE_TOPIC_INPUT)
@SendTo(BASE_TOPIC_OUTPUT)
public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
.reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String()).
withValueSerde(baseDataSerde)).toStream()
.peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
}
在单个 Kafka Streams 处理器中,无法从一个集群读取并写入另一个集群。但是,在单个应用程序 (JVM) 中,您可以有多个处理器,每个处理器都与单个 Kafka 集群交互。
有关详细信息,请参阅此 thread。
使用 Spring Cloud Stream 的解决方法如下。
- 让您的 Kafka Streams 处理器在同一个集群中消费和生产。
- 然后,使用基于常规消息通道的 Kafka 绑定器(不是 Kafka Streams 绑定器)编写另一个简单的处理器。在此模型中,您可以应用上面的多绑定器模式,即输入从您在 Kafka Streams 处理器中写入的主题接收,然后输出转到另一个集群中的主题。该处理器只是成为一个直通处理器,将数据从集群 1 移动到集群 2。