Kafka topic to multiple kafka topics dispatcher(同一个集群)

Kafka topic to multiple kafka topics dispatcher (same cluster)

我的用例如下: 我有一个 kafka 主题 A,消息“逻辑上”属于不同的“服务”,我既不处理系统发送消息到 A

我想从 A 中读取此类消息,并根据描述服务的一列将它们分派到同一集群(我们称它们为 A_1, ..., A_n)上的每项服务主题集(格式为 CSV 样式,但没关系)。

服务集是静态的,我暂时不需要处理addition/removal。

本来想用KafkaConnect来完成这样的任务,没想到居然没有Kafkasource/sinks(找不到票,被拒绝了)

我已经看到 MirrorMaker2 但对于我的(简单)用例来说它看起来有点矫枉过正。

我也知道 KafkaStreams 但我不想为此编写和维护代码。

我的问题是:有没有办法不用我自己写kafka-consumer/producer就可以用kafka原生工具实现这个主题调度?

PS:如果有人认为 MirrorMaker2 可能很合适我也有兴趣,我不太了解这个工具。

Mirror Maker 用于做...镜像。当您想要将一个集群从一个数据中心镜像到另一个具有相同主题的集群时,它很有用。您的用例不同。

Kafka Connect 用于通过 Kafka 主题同步不同的系统(例如来自数据库的数据),但我也没有在这个用例中看到它。

我会为此使用 Kafka Streams 应用程序。

据我所知,没有直接的方法可以根据传入的消息将传入的主题消息分支到主题列表。您需要编写自定义代码来实现此目的。

  1. 使用处理器APIRefer here
  2. 在 Processor 方法中传递主题列表
  3. 使用逻辑来确定需要分支的主题
  4. 使用context.forward向其他主题发布消息

context.forward(key, value, To.child("selected topic"))

所有其他答案都是正确的,在撰写本文时,我确实在 Kafka 工具集中找到了任何“config-only”解决方案。

最后的诀窍是使用 Logstash,因为它的“kafka 输出插件”支持 topic-id 参数中的 jinja 变量。

因此,一旦您在字段中获得了“目标主题名称”(比如 service_name),就这么简单:

output {
  kafka {
    id => "sink"
    codec => [...]
    bootstrap_servers => [...]
    topic_id => "%{[service_name]}"
    [...]
  }
}