Spring kafka - kafka 流拓扑设置

Spring kafka - kafka streams topology set up

我们有一个 kafka 流 spring 启动应用程序(使用 spring-kafka),这个应用程序当前从上游主题读取消息应用一些转换,并将它们写入下游主题,它不执行任何聚合或连接或任何高级 kafka 流功能。

代码目前看起来与此类似

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
  val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
  val branches: Array<KStream<String, SomeObject>> = stream.branch(
    { _, value -> isValidRawData(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transform(value) }.to(outputTopicName)
  branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}

这可以正常使用,但现在我们需要扩展此代码以使用来自第二个上游主题的不同模式的消息并应用稍微不同的转换,然后将它们写入相同的下游主题(具有类似的schema) 如上面的拓扑结构。

为了实现这一点,我们有 2 个选择;

  1. 创建第二个 @Bean 工厂方法,除了它的拓扑从一个单独的主题消耗并应用不同的转换外,几乎与上面的方法相似。

  2. 修改上面的拓扑来消费两个主题,为来自第二个主题的消息创建第三个分支如下

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
  val topics = listOf("topic1", "topic2")
  val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
  val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
    { _, value -> isRecordFromTopic1(value)},
    { _, value -> isRecordFromTopic2(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
  branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
  branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}

推荐使用这些方法中的哪一种?从 kafka 流资源管理或性能角度来看,我们需要考虑哪些事项?

感谢您的建议。

因为在第二个代码中显示了主题集合 API,所以我认为这两个变体都是有效且有意义的。其他一切都只是个人喜好。我会选择第一个,因为从技术上讲,最终一切都将在同一个 Streams 引擎上运行。当您引入第三种记录类型等时,第一种解决方案在未来更容易支持。或者您可能对特定流有额外的逻辑。您可能有一个公共流来读取所有主题并通过该条件和分支分发它们。您可以通过他们自己的中间主题在他们的个人流中执行的其余逻辑。但仍然:只是我的意见...