将消息从一个 Kafka 集群流式传输到另一个集群
Streaming messages from one Kafka Cluster to another
我目前正在尝试轻松地将消息从一个 Kafka 集群上的主题流式传输到另一个 Kafka 集群(远程 -> 本地集群)。
我们的想法是立即使用 Kafka-Streams,这样我们就不需要在本地集群上复制实际消息,而只需将 Kafka-Streams 处理的 "results" 获取到我们的 Kafka-Topics。
所以假设 WordCount 演示是在另一台 PC 上的一个 Kafka 实例上,而不是我自己的。我的本地机器上也有一个 Kafka 实例 运行ning。
现在我想让 WordCount 演示 运行 在主题 ("remote") 上包含应该计算哪些单词的句子。
然而,计数应该写入我本地系统上的主题而不是 "remote" 主题。
Kafka-Streams 可以实现这样的功能吗API?
例如。
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
非常感谢
-蒂姆
Kafka Streams 仅为单个集群构建。
一种解决方法是使用 foreach()
或类似的方法并实例化您自己的 KafkaProducer
以写入目标集群。请注意,您自己的制作人 必须 使用同步写入!否则,您可能会在发生故障时丢失数据。因此,这不是一个非常高效的解决方案。
最好只将结果写入源集群,然后将数据复制到目标集群。请注意,您很可能可以在源集群中使用更短的输出主题保留期,因为实际数据无论如何都以更长的保留时间存储在目标集群中。这允许您限制源集群上所需的存储。
编辑(回复@quickinsights 下方的评论)
what if your Kafka streams service is down for longer period than the retention
这似乎是一个正交问题,可以针对任何设计提出。保留时间应根据您的最大停机时间来设置,以避免一般情况下的数据丢失。请注意,由于应用程序 reads/write from/to 源集群和源集群输出主题可能配置了较短的保留时间,因此如果应用程序宕机也不会发生什么坏事。输入主题将不会被处理,也不会产生新的输出数据。您可能只担心这种情况,即您到目标集群的复制管道出现故障——您应该相应地设置源集群中输出主题的保留时间,以确保您不会丢失任何数据。
It also doubles your writes back to Kafka.
是的。它还增加了磁盘上的存储空间。它是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往)。你的选择。如上所述,我个人建议选择更具弹性的选项。扩展 Kafka 集群比处理应用程序代码中的所有弹性边缘情况更容易。
That seems super inefficient
这是个人判断。这是一个权衡,没有 objective 对或错。
我目前正在尝试轻松地将消息从一个 Kafka 集群上的主题流式传输到另一个 Kafka 集群(远程 -> 本地集群)。
我们的想法是立即使用 Kafka-Streams,这样我们就不需要在本地集群上复制实际消息,而只需将 Kafka-Streams 处理的 "results" 获取到我们的 Kafka-Topics。
所以假设 WordCount 演示是在另一台 PC 上的一个 Kafka 实例上,而不是我自己的。我的本地机器上也有一个 Kafka 实例 运行ning。
现在我想让 WordCount 演示 运行 在主题 ("remote") 上包含应该计算哪些单词的句子。
然而,计数应该写入我本地系统上的主题而不是 "remote" 主题。
Kafka-Streams 可以实现这样的功能吗API?
例如。
val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic",
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("word-counts")
wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)
val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()
非常感谢
-蒂姆
Kafka Streams 仅为单个集群构建。
一种解决方法是使用 foreach()
或类似的方法并实例化您自己的 KafkaProducer
以写入目标集群。请注意,您自己的制作人 必须 使用同步写入!否则,您可能会在发生故障时丢失数据。因此,这不是一个非常高效的解决方案。
最好只将结果写入源集群,然后将数据复制到目标集群。请注意,您很可能可以在源集群中使用更短的输出主题保留期,因为实际数据无论如何都以更长的保留时间存储在目标集群中。这允许您限制源集群上所需的存储。
编辑(回复@quickinsights 下方的评论)
what if your Kafka streams service is down for longer period than the retention
这似乎是一个正交问题,可以针对任何设计提出。保留时间应根据您的最大停机时间来设置,以避免一般情况下的数据丢失。请注意,由于应用程序 reads/write from/to 源集群和源集群输出主题可能配置了较短的保留时间,因此如果应用程序宕机也不会发生什么坏事。输入主题将不会被处理,也不会产生新的输出数据。您可能只担心这种情况,即您到目标集群的复制管道出现故障——您应该相应地设置源集群中输出主题的保留时间,以确保您不会丢失任何数据。
It also doubles your writes back to Kafka.
是的。它还增加了磁盘上的存储空间。它是应用程序弹性和运行时性能与集群负载之间的权衡(一如既往)。你的选择。如上所述,我个人建议选择更具弹性的选项。扩展 Kafka 集群比处理应用程序代码中的所有弹性边缘情况更容易。
That seems super inefficient
这是个人判断。这是一个权衡,没有 objective 对或错。