Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams
Kafka Streams: Define multiple Kafka Streams using Spring Cloud Stream for each set of topics
我正在尝试使用 Kafka Streams 做一个简单的 POC。但是我在启动应用程序时遇到异常。我正在使用 Spring-Kafka,Kafka-Streams 2.5.1 和 Spring boot 2.3.5
Kafka 流配置
@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processCCC() {
return input -> input.peek((key, value) -> log
.info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
/*
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}
@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
return streamsBuilder.build();
} */
}
application.yaml 配置如下。这个想法是我有 3 个输入和 3 个输出主题。
该组件从输入主题获取输入并将输出提供给输出主题。
spring:
application.name: consumerapp-1
cloud:
function:
definition: processAAA;processBBB;processCCC
stream:
kafka.binder:
brokers: 127.0.0.1:9092
autoCreateTopics: true
auto-add-partitions: true
kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
processAAA-in-0:
destination: aaaInputTopic
processAAA-out-0:
destination: aaaOutputTopic
processBBB-in-0:
destination: bbbInputTopic
processBBB-out-0:
destination: bbbOutputTopic
processCCC-in-0:
destination: cccInputTopic
processCCC-out-0:
destination: cccOutputTopic
抛出的异常是
Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced) but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)
任何人都可以帮助我使用 Kafka Streams Spring-用于处理多个输入和输出主题的 Kafka 代码示例。
更新:2021 年 1 月 21 日
删除所有 kafkaStreams 和 kafkaStreamsTopology bean 配置后,我在无限循环中收到以下消息。消息消费仍然不起作用。我已经使用 @Bean 函数定义检查了 application.yaml 中的订阅。他们对我来说都很好,但我仍然遇到这个交叉接线错误。 我已经把上面的application.properties换成了application.yaml
[consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
看起来您正在应用程序中混合使用 Spring Cloud Stream 和 Spring Kafka。使用Binder时,不需要直接定义Spring Kafka需要的组件,如KafkaStreams
和Topology
,它们是由SCSt
隐式创建的。能把下面的bean去掉再试吗?
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
和
@Bean
public Topology kafkaStreamTopology() {
如果您仍然遇到问题,请分享一个可以重现的小样本,这样我们可以进一步分类。
我已经设法解决了这个问题。我写这篇文章是为了其他人的利益。
如果您想在单个应用程序 jar 中包含多个流,那么关键是定义 多个应用程序 ID,每个流一个。我一直都知道这一点,但我不知道如何定义它。最后,答案是我在阅读 SCSt 文档后设法挖掘出来的。下面是 application.yaml 的定义方式。
application.yaml 如下所示
spring:
application.name: kafkaMultiStreamConsumer
cloud:
function:
definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
min-partition-count: 3
replication-factor: 2
transaction:
transaction-id-prefix: transaction-id-2000
autoCreateTopics: true
auto-add-partitions: true
streams:
binder:
functions:
// needed for functional
processBBB:
application-id: SampleBBBapplication
processAAA:
application-id: SampleAAAapplication
processCCC:
application-id: SampleCCCapplication
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
// Below is for Imperative Style programming using
// the annotation namely @StreamListener, @SendTo in .java class
inputAAA:
destination: aaaInputTopic
outputAAA:
destination: aaaOutputTopic
inputBBB:
destination: bbbInputTopic
outputBBB:
destination: bbbOutputTopic
inputCCC:
destination: cccInputTopic
outputCCC:
destination: cccOutputTopic
// Functional Style programming using Function<KStream...> use either one of them
// as both are not required. If you use both its ok but only one of them works
// from what i have seen @StreamListener is triggered always.
// Below is from functional style
processAAA-in-0:
destination: aaaInputTopic
group: processAAA-group
processAAA-out-0:
destination: aaaOutputTopic
group: processAAA-group
processBBB-in-0:
destination: bbbInputTopic
group: processBBB-group
processBBB-out-0:
destination: bbbOutputTopic
group: processBBB-group
processCCC-in-0:
destination: cccInputTopic
group: processCCC-group
processCCC-out-0:
destination: cccOutputTopic
group: processCCC-group
定义完上面的内容后,我们现在需要定义实现流处理逻辑的各个 java classes。
您的 Java class 可以如下所示。根据您的要求为其他 2 个或 N 个流创建类似的。一个例子如下:AAASampleStreamTask.java
@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);
@StreamListener(AAASampleChannel.INPUT)
@SendTo(AAASampleChannel.OUTPUT)
public KStream<String, String> processAAA(KStream<String, String> input) {
input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
return input;
}
/**
* Use above or below. Below style is latest startting from ScSt 3.0 if iam not
* wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have
* both then above gets priority as per my observation
*/
/*
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log.info(
"Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
}
*/
}
如果您想使用命令式编程而非函数式编程,则需要通道。
AAASampleChannel.java
public interface AAASampleChannel {
String INPUT = "inputAAA";
String OUTPUT = "outputAAA";
@Input(INPUT)
KStream<String, String> inputAAA();
@Output(OUTPUT)
KStream<String, String> outputAAA();
}
我正在尝试使用 Kafka Streams 做一个简单的 POC。但是我在启动应用程序时遇到异常。我正在使用 Spring-Kafka,Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka 流配置
@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processCCC() {
return input -> input.peek((key, value) -> log
.info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
/*
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}
@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
return streamsBuilder.build();
} */
}
application.yaml 配置如下。这个想法是我有 3 个输入和 3 个输出主题。 该组件从输入主题获取输入并将输出提供给输出主题。
spring:
application.name: consumerapp-1
cloud:
function:
definition: processAAA;processBBB;processCCC
stream:
kafka.binder:
brokers: 127.0.0.1:9092
autoCreateTopics: true
auto-add-partitions: true
kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
processAAA-in-0:
destination: aaaInputTopic
processAAA-out-0:
destination: aaaOutputTopic
processBBB-in-0:
destination: bbbInputTopic
processBBB-out-0:
destination: bbbOutputTopic
processCCC-in-0:
destination: cccInputTopic
processCCC-out-0:
destination: cccOutputTopic
抛出的异常是
Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced) but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)
任何人都可以帮助我使用 Kafka Streams Spring-用于处理多个输入和输出主题的 Kafka 代码示例。
更新:2021 年 1 月 21 日
删除所有 kafkaStreams 和 kafkaStreamsTopology bean 配置后,我在无限循环中收到以下消息。消息消费仍然不起作用。我已经使用 @Bean 函数定义检查了 application.yaml 中的订阅。他们对我来说都很好,但我仍然遇到这个交叉接线错误。 我已经把上面的application.properties换成了application.yaml
[consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
看起来您正在应用程序中混合使用 Spring Cloud Stream 和 Spring Kafka。使用Binder时,不需要直接定义Spring Kafka需要的组件,如KafkaStreams
和Topology
,它们是由SCSt
隐式创建的。能把下面的bean去掉再试吗?
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
和
@Bean
public Topology kafkaStreamTopology() {
如果您仍然遇到问题,请分享一个可以重现的小样本,这样我们可以进一步分类。
我已经设法解决了这个问题。我写这篇文章是为了其他人的利益。 如果您想在单个应用程序 jar 中包含多个流,那么关键是定义 多个应用程序 ID,每个流一个。我一直都知道这一点,但我不知道如何定义它。最后,答案是我在阅读 SCSt 文档后设法挖掘出来的。下面是 application.yaml 的定义方式。 application.yaml 如下所示
spring:
application.name: kafkaMultiStreamConsumer
cloud:
function:
definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
min-partition-count: 3
replication-factor: 2
transaction:
transaction-id-prefix: transaction-id-2000
autoCreateTopics: true
auto-add-partitions: true
streams:
binder:
functions:
// needed for functional
processBBB:
application-id: SampleBBBapplication
processAAA:
application-id: SampleAAAapplication
processCCC:
application-id: SampleCCCapplication
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
// Below is for Imperative Style programming using
// the annotation namely @StreamListener, @SendTo in .java class
inputAAA:
destination: aaaInputTopic
outputAAA:
destination: aaaOutputTopic
inputBBB:
destination: bbbInputTopic
outputBBB:
destination: bbbOutputTopic
inputCCC:
destination: cccInputTopic
outputCCC:
destination: cccOutputTopic
// Functional Style programming using Function<KStream...> use either one of them
// as both are not required. If you use both its ok but only one of them works
// from what i have seen @StreamListener is triggered always.
// Below is from functional style
processAAA-in-0:
destination: aaaInputTopic
group: processAAA-group
processAAA-out-0:
destination: aaaOutputTopic
group: processAAA-group
processBBB-in-0:
destination: bbbInputTopic
group: processBBB-group
processBBB-out-0:
destination: bbbOutputTopic
group: processBBB-group
processCCC-in-0:
destination: cccInputTopic
group: processCCC-group
processCCC-out-0:
destination: cccOutputTopic
group: processCCC-group
定义完上面的内容后,我们现在需要定义实现流处理逻辑的各个 java classes。 您的 Java class 可以如下所示。根据您的要求为其他 2 个或 N 个流创建类似的。一个例子如下:AAASampleStreamTask.java
@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);
@StreamListener(AAASampleChannel.INPUT)
@SendTo(AAASampleChannel.OUTPUT)
public KStream<String, String> processAAA(KStream<String, String> input) {
input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
return input;
}
/**
* Use above or below. Below style is latest startting from ScSt 3.0 if iam not
* wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have
* both then above gets priority as per my observation
*/
/*
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log.info(
"Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
}
*/
}
如果您想使用命令式编程而非函数式编程,则需要通道。 AAASampleChannel.java
public interface AAASampleChannel {
String INPUT = "inputAAA";
String OUTPUT = "outputAAA";
@Input(INPUT)
KStream<String, String> inputAAA();
@Output(OUTPUT)
KStream<String, String> outputAAA();
}