将消息从一个 Kafka 集群流式传输到另一个集群

Streaming messages from one Kafka Cluster to another

我目前正在尝试轻松地将消息从一个 Kafka 集群上的主题流式传输到另一个 Kafka 集群(远程 -> 本地集群)。
我们的想法是立即使用 Kafka-Streams,这样我们就不需要在本地集群上复制实际消息,而只需将 Kafka-Streams 处理的 "results" 获取到我们的 Kafka-Topics。

所以假设 WordCount 演示是在另一台 PC 上的一个 Kafka 实例上,而不是我自己的。我的本地机器上也有一个 Kafka 实例 运行ning。
现在我想让 WordCount 演示 运行 在主题 ("remote") 上包含应该计算哪些单词的句子。
然而,计数应该写入我本地系统上的主题而不是 "remote" 主题。

Kafka-Streams 可以实现这样的功能吗API?
例如。

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()

非常感谢
-蒂姆

Kafka Streams 仅为单个集群构建。

一种解决方法是使用 foreach() 或类似的方法并实例化您自己的 KafkaProducer 以写入目标集群。请注意,您自己的制作人 必须 使用同步写入!否则,您可能会在发生故障时丢失数据。因此,这不是一个非常高效的解决方案。

最好只将结果写入源集群,然后将数据复制到目标集群。请注意,您很可能可以在源集群中使用更短的输出主题保留期,因为实际数据无论如何都以更长的保留时间存储在目标集群中。这允许您限制源集群上所需的存储。

编辑(回复@quickinsights 下方的评论)

what if your Kafka streams service is down for longer period than the retention

这似乎是一个正交问题,可以针对任何设计提出。保留时间应根据您的最大停机时间来设置,以避免一般情况下的数据丢失。请注意,由于应用程序 reads/write from/to 源集群和源集群输出主题可能配置了较短的保留时间,因此如果应用程序宕机也不会发生什么坏事。输入主题将不会被处理,也不会产生新的输出数据。您可能只担心这种情况,即您到目标集群的复制管道出现故障——您应该相应地设置源集群中输出主题的保留时间,以确保您不会丢失任何数据。

It also doubles your writes back to Kafka.

是的。它还增加了磁盘上的存储空间。它是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往)。你的选择。如上所述,我个人建议选择更具弹性的选项。扩展 Kafka 集群比处理应用程序代码中的所有弹性边缘情况更容易。

That seems super inefficient

这是个人判断。这是一个权衡,没有 objective 对或错。