Kafka stream 加入一个特定的键作为输入

Kafka stream join with a specific key as input

我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我想加入的key和我把数据写入每个topic的key不一样

假设我们有这 3 个 Avro 文件:
报警:

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

事件:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

维护:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

我的 Kafka 中的每个 Avro 都有 3 个主题(比如 alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都会使用ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这 3 个主题并获得一条新记录并将其写入一个新主题。问题是我想加入 AlarmIncident基于alarm_idalarm_source_id并加入基于ne_id的告警和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定密钥吗?

这取决于您要使用哪种联接 (c.f。https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)

对于 KStream-KStream 加入,目前(v0.10.2 及更早版本)除了设置新密钥(例如,通过使用 selectKey()) 并重新分区。

为了KStream-KTable加入,Kafka 0.10.2(将在接下来的几周内发布)包含一个名为GlobalKTables的新功能(c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams).这允许您在 KTable 上执行 non-key 连接(即 KStream-GlobalKTable 连接,因此您不需要重新分区 GlobalKTable 中的数据)。

Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.

也计划添加 KTable-GlobalKTable 连接。这可能会在 0.10.3 中可用。不过没有计划添加 "global" KStream-KStream 连接。

您可以通过修改来维护相同的密钥。
您可以使用 KeyValueMapper 来修改您的密钥和值。
您应该按如下方式使用它:

val modifiedStream = kStream.map[String,String](
    new KeyValueMapper[String, String,KeyValue[String,String]]{
        override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
    }
)

您可以将以上逻辑应用于多个 Kstream 对象,以维护用于加入 KStream 的单个键。